This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new 4fdac6136b6 KAFKA-15498: bump snappy-java version to 1.1.10.4 (#14434)
4fdac6136b6 is described below
commit 4fdac6136b63264808e08970a35796272ac3abf4
Author: Luke Chen <[email protected]>
AuthorDate: Wed Sep 27 19:00:50 2023 +0800
KAFKA-15498: bump snappy-java version to 1.1.10.4 (#14434)
bump snappy-java version to 1.1.10.4, and add more tests to verify the
compressed data can be correctly decompressed and read.
For LogCleanerParameterizedIntegrationTest, we increased the message size
for snappy decompression since in the new version of snappy, the decompressed
size is increasing compared with the previous version. But since the
compression algorithm is not kafka's scope, all we need to do is to make sure
the compressed data can be successfully decompressed and parsed/read.
Reviewers: Divij Vaidya <[email protected]>, Ismael Juma
<[email protected]>, Josep Prat <[email protected]>, Kamal Chandraprakash
<[email protected]>
---
LICENSE-binary | 2 +-
.../kafka/api/ProducerCompressionTest.scala | 63 ++++++++++++++++++----
.../LogCleanerParameterizedIntegrationTest.scala | 4 +-
gradle/dependencies.gradle | 2 +-
4 files changed, 58 insertions(+), 13 deletions(-)
diff --git a/LICENSE-binary b/LICENSE-binary
index f031ebecf02..5a0642b4d94 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -261,7 +261,7 @@ scala-library-2.13.11
scala-logging_2.13-3.9.4
scala-reflect-2.13.11
scala-java8-compat_2.13-1.0.2
-snappy-java-1.1.10.3
+snappy-java-1.1.10.4
swagger-annotations-2.2.8
zookeeper-3.8.2
zookeeper-jute-3.8.2
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index b1e39ebde49..6135ec952ca 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -19,8 +19,10 @@ package kafka.api.test
import kafka.server.{KafkaBroker, KafkaConfig, QuorumTestHarness}
import kafka.utils.TestUtils
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.header.Header
+import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.ByteArraySerializer
@@ -29,7 +31,10 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach,
TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
+import java.util.concurrent.Future
import java.util.{Collections, Properties}
+import scala.collection.mutable.ListBuffer
+import scala.util.Random
class ProducerCompressionTest extends QuorumTestHarness {
@@ -64,10 +69,10 @@ class ProducerCompressionTest extends QuorumTestHarness {
"kraft,snappy",
"kraft,lz4",
"kraft,zstd",
- "zk,gzip"
+ "zk,gzip",
+ "zk,snappy"
))
def testCompression(quorum: String, compression: String): Unit = {
-
val producerProps = new Properties()
val bootstrapServers = TestUtils.plaintextBootstrapServers(Seq(broker))
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers)
@@ -88,13 +93,28 @@ class ProducerCompressionTest extends QuorumTestHarness {
}
val partition = 0
+ def messageValue(length: Int): String = {
+ val random = new Random(0)
+ new String(random.alphanumeric.take(length).toArray)
+ }
+
// prepare the messages
- val messageValues = (0 until numRecords).map(i => "value" + i)
+ val messageValues = (0 until numRecords).map(i => messageValue(i))
+ val headerArr = Array[Header](new RecordHeader("key", "value".getBytes))
+ val headers = new RecordHeaders(headerArr)
// make sure the returned messages are correct
val now = System.currentTimeMillis()
- val responses = for (message <- messageValues)
- yield producer.send(new ProducerRecord(topic, null, now, null,
message.getBytes))
+ val responses: ListBuffer[Future[RecordMetadata]] = new
ListBuffer[Future[RecordMetadata]]()
+
+ for (message <- messageValues) {
+ // 1. send message without key and header
+ responses += producer.send(new ProducerRecord(topic, null, now, null,
message.getBytes))
+ // 2. send message with key, without header
+ responses += producer.send(new ProducerRecord(topic, null, now,
message.length.toString.getBytes, message.getBytes))
+ // 3. send message with key and header
+ responses += producer.send(new ProducerRecord(topic, null, now,
message.length.toString.getBytes, message.getBytes, headers))
+ }
for ((future, offset) <- responses.zipWithIndex) {
assertEquals(offset.toLong, future.get.offset)
}
@@ -103,12 +123,37 @@ class ProducerCompressionTest extends QuorumTestHarness {
// make sure the fetched message count match
consumer.assign(Collections.singleton(tp))
consumer.seek(tp, 0)
- val records = TestUtils.consumeRecords(consumer, numRecords)
+ val records = TestUtils.consumeRecords(consumer, numRecords*3)
+
+ for (i <- 0 until numRecords) {
+ val messageValue = messageValues(i)
+ // 1. verify message without key and header
+ var offset = i * 3
+ var record = records(offset)
+ assertNull(record.key())
+ assertEquals(messageValue, new String(record.value))
+ assertEquals(0, record.headers().toArray.length)
+ assertEquals(now, record.timestamp)
+ assertEquals(offset.toLong, record.offset)
+
+ // 2. verify message with key, without header
+ offset = i * 3 + 1
+ record = records(offset)
+ assertEquals(messageValue.length.toString, new String(record.key()))
+ assertEquals(messageValue, new String(record.value))
+ assertEquals(0, record.headers().toArray.length)
+ assertEquals(now, record.timestamp)
+ assertEquals(offset.toLong, record.offset)
- for (((messageValue, record), index) <-
messageValues.zip(records).zipWithIndex) {
+ // 3. verify message with key and header
+ offset = i * 3 + 2
+ record = records(offset)
+ assertEquals(messageValue.length.toString, new String(record.key()))
assertEquals(messageValue, new String(record.value))
+ assertEquals(1, record.headers().toArray.length)
+ assertEquals(headerArr.apply(0), record.headers().toArray.apply(0))
assertEquals(now, record.timestamp)
- assertEquals(index.toLong, record.offset)
+ assertEquals(offset.toLong, record.offset)
}
} finally {
producer.close()
diff --git
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index 5ab53699cd4..132a77ff97b 100755
---
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -144,9 +144,9 @@ class LogCleanerParameterizedIntegrationTest extends
AbstractLogCleanerIntegrati
case _ =>
// the broker assigns absolute offsets for message format 0 which
potentially causes the compressed size to
// increase because the broker offsets are larger than the ones
assigned by the client
- // adding `5` to the message set size is good enough for this test: it
covers the increased message size while
+ // adding `6` to the message set size is good enough for this test: it
covers the increased message size while
// still being less than the overhead introduced by the conversion
from message format version 0 to 1
- largeMessageSet.sizeInBytes + 5
+ largeMessageSet.sizeInBytes + 6
}
cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize =
maxMessageSize)
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 92199c036db..6994f994e43 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -142,7 +142,7 @@ versions += [
scalaJava8Compat : "1.0.2",
scoverage: "1.9.3",
slf4j: "1.7.36",
- snappy: "1.1.10.3",
+ snappy: "1.1.10.4",
spotbugs: "4.7.3",
// New version of Swagger 2.2.14 requires minimum JDK 11.
swaggerAnnotations: "2.2.8",