This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 86450bf9aca KAFKA-15498: bump snappy-java version to 1.1.10.4 (#14434)
86450bf9aca is described below
commit 86450bf9aca481113fadbb0ecf0eb4b180762a30
Author: Luke Chen <[email protected]>
AuthorDate: Wed Sep 27 19:00:50 2023 +0800
KAFKA-15498: bump snappy-java version to 1.1.10.4 (#14434)
bump snappy-java version to 1.1.10.4, and add more tests to verify the
compressed data can be correctly decompressed and read.
For LogCleanerParameterizedIntegrationTest, we increased the message size
for snappy decompression since in the new version of snappy, the decompressed
size is increasing compared with the previous version. But since the
compression algorithm is not kafka's scope, all we need to do is to make sure
the compressed data can be successfully decompressed and parsed/read.
Reviewers: Divij Vaidya <[email protected]>, Ismael Juma
<[email protected]>, Josep Prat <[email protected]>, Kamal Chandraprakash
<[email protected]>
---
LICENSE-binary | 2 +-
.../kafka/api/ProducerCompressionTest.scala | 63 ++++++++++++++++++----
.../LogCleanerParameterizedIntegrationTest.scala | 4 +-
gradle/dependencies.gradle | 2 +-
4 files changed, 58 insertions(+), 13 deletions(-)
diff --git a/LICENSE-binary b/LICENSE-binary
index 15db2a14808..d6d7dd7219a 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -259,7 +259,7 @@ scala-library-2.13.12
scala-logging_2.13-3.9.4
scala-reflect-2.13.12
scala-java8-compat_2.13-1.0.2
-snappy-java-1.1.10.3
+snappy-java-1.1.10.4
swagger-annotations-2.2.8
zookeeper-3.8.2
zookeeper-jute-3.8.2
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index b1e39ebde49..6135ec952ca 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -19,8 +19,10 @@ package kafka.api.test
import kafka.server.{KafkaBroker, KafkaConfig, QuorumTestHarness}
import kafka.utils.TestUtils
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.header.Header
+import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.ByteArraySerializer
@@ -29,7 +31,10 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach,
TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
+import java.util.concurrent.Future
import java.util.{Collections, Properties}
+import scala.collection.mutable.ListBuffer
+import scala.util.Random
class ProducerCompressionTest extends QuorumTestHarness {
@@ -64,10 +69,10 @@ class ProducerCompressionTest extends QuorumTestHarness {
"kraft,snappy",
"kraft,lz4",
"kraft,zstd",
- "zk,gzip"
+ "zk,gzip",
+ "zk,snappy"
))
def testCompression(quorum: String, compression: String): Unit = {
-
val producerProps = new Properties()
val bootstrapServers = TestUtils.plaintextBootstrapServers(Seq(broker))
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers)
@@ -88,13 +93,28 @@ class ProducerCompressionTest extends QuorumTestHarness {
}
val partition = 0
+ def messageValue(length: Int): String = {
+ val random = new Random(0)
+ new String(random.alphanumeric.take(length).toArray)
+ }
+
// prepare the messages
- val messageValues = (0 until numRecords).map(i => "value" + i)
+ val messageValues = (0 until numRecords).map(i => messageValue(i))
+ val headerArr = Array[Header](new RecordHeader("key", "value".getBytes))
+ val headers = new RecordHeaders(headerArr)
// make sure the returned messages are correct
val now = System.currentTimeMillis()
- val responses = for (message <- messageValues)
- yield producer.send(new ProducerRecord(topic, null, now, null,
message.getBytes))
+ val responses: ListBuffer[Future[RecordMetadata]] = new
ListBuffer[Future[RecordMetadata]]()
+
+ for (message <- messageValues) {
+ // 1. send message without key and header
+ responses += producer.send(new ProducerRecord(topic, null, now, null,
message.getBytes))
+ // 2. send message with key, without header
+ responses += producer.send(new ProducerRecord(topic, null, now,
message.length.toString.getBytes, message.getBytes))
+ // 3. send message with key and header
+ responses += producer.send(new ProducerRecord(topic, null, now,
message.length.toString.getBytes, message.getBytes, headers))
+ }
for ((future, offset) <- responses.zipWithIndex) {
assertEquals(offset.toLong, future.get.offset)
}
@@ -103,12 +123,37 @@ class ProducerCompressionTest extends QuorumTestHarness {
// make sure the fetched message count match
consumer.assign(Collections.singleton(tp))
consumer.seek(tp, 0)
- val records = TestUtils.consumeRecords(consumer, numRecords)
+ val records = TestUtils.consumeRecords(consumer, numRecords*3)
+
+ for (i <- 0 until numRecords) {
+ val messageValue = messageValues(i)
+ // 1. verify message without key and header
+ var offset = i * 3
+ var record = records(offset)
+ assertNull(record.key())
+ assertEquals(messageValue, new String(record.value))
+ assertEquals(0, record.headers().toArray.length)
+ assertEquals(now, record.timestamp)
+ assertEquals(offset.toLong, record.offset)
+
+ // 2. verify message with key, without header
+ offset = i * 3 + 1
+ record = records(offset)
+ assertEquals(messageValue.length.toString, new String(record.key()))
+ assertEquals(messageValue, new String(record.value))
+ assertEquals(0, record.headers().toArray.length)
+ assertEquals(now, record.timestamp)
+ assertEquals(offset.toLong, record.offset)
- for (((messageValue, record), index) <-
messageValues.zip(records).zipWithIndex) {
+ // 3. verify message with key and header
+ offset = i * 3 + 2
+ record = records(offset)
+ assertEquals(messageValue.length.toString, new String(record.key()))
assertEquals(messageValue, new String(record.value))
+ assertEquals(1, record.headers().toArray.length)
+ assertEquals(headerArr.apply(0), record.headers().toArray.apply(0))
assertEquals(now, record.timestamp)
- assertEquals(index.toLong, record.offset)
+ assertEquals(offset.toLong, record.offset)
}
} finally {
producer.close()
diff --git
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index 5ab53699cd4..132a77ff97b 100755
---
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -144,9 +144,9 @@ class LogCleanerParameterizedIntegrationTest extends
AbstractLogCleanerIntegrati
case _ =>
// the broker assigns absolute offsets for message format 0 which
potentially causes the compressed size to
// increase because the broker offsets are larger than the ones
assigned by the client
- // adding `5` to the message set size is good enough for this test: it
covers the increased message size while
+ // adding `6` to the message set size is good enough for this test: it
covers the increased message size while
// still being less than the overhead introduced by the conversion
from message format version 0 to 1
- largeMessageSet.sizeInBytes + 5
+ largeMessageSet.sizeInBytes + 6
}
cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize =
maxMessageSize)
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index c61abd3a264..013c8cbc5a7 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -156,7 +156,7 @@ versions += [
scalaJava8Compat : "1.0.2",
scoverage: "1.9.3",
slf4j: "1.7.36",
- snappy: "1.1.10.3",
+ snappy: "1.1.10.4",
spotbugs: "4.7.3",
// New version of Swagger 2.2.14 requires minimum JDK 11.
swaggerAnnotations: "2.2.8",