jlprat commented on code in PR #14434:
URL: https://github.com/apache/kafka/pull/14434#discussion_r1338092163


##########
core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:
##########
@@ -103,12 +123,37 @@ class ProducerCompressionTest extends QuorumTestHarness {
       // make sure the fetched message count match
       consumer.assign(Collections.singleton(tp))
       consumer.seek(tp, 0)
-      val records = TestUtils.consumeRecords(consumer, numRecords)
+      val records = TestUtils.consumeRecords(consumer, numRecords*3)
+
+      for (i <- 0 until numRecords) {
+        val messageValue = messageValues.apply(i)

Review Comment:
   This is a heavy nitpick, but usually `apply` is not spelled in Scala. This 
should read:
   ```suggestion
           val messageValue = messageValues(i)
   ```



##########
core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:
##########
@@ -103,12 +123,37 @@ class ProducerCompressionTest extends QuorumTestHarness {
       // make sure the fetched message count match
       consumer.assign(Collections.singleton(tp))
       consumer.seek(tp, 0)
-      val records = TestUtils.consumeRecords(consumer, numRecords)
+      val records = TestUtils.consumeRecords(consumer, numRecords*3)
+
+      for (i <- 0 until numRecords) {
+        val messageValue = messageValues.apply(i)
+        // 1. verify message without key and header
+        var offset = i * 3
+        var record = records.apply(offset)

Review Comment:
   Same here
   ```suggestion
           var record = records(offset)
   ```



##########
core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:
##########
@@ -103,12 +123,37 @@ class ProducerCompressionTest extends QuorumTestHarness {
       // make sure the fetched message count match
       consumer.assign(Collections.singleton(tp))
       consumer.seek(tp, 0)
-      val records = TestUtils.consumeRecords(consumer, numRecords)
+      val records = TestUtils.consumeRecords(consumer, numRecords*3)
+
+      for (i <- 0 until numRecords) {
+        val messageValue = messageValues.apply(i)
+        // 1. verify message without key and header
+        var offset = i * 3
+        var record = records.apply(offset)
+        assertNull(record.key())
+        assertEquals(messageValue, new String(record.value))
+        assertEquals(0, record.headers().toArray.length)
+        assertEquals(now, record.timestamp)
+        assertEquals(offset.toLong, record.offset)
+
+        // 2. verify message with key, without header
+        offset = i * 3 + 1
+        record = records.apply(offset)
+        assertEquals(messageValue.size.toString, new String(record.key()))
+        assertEquals(messageValue, new String(record.value))
+        assertEquals(0, record.headers().toArray.length)
+        assertEquals(now, record.timestamp)
+        assertEquals(offset.toLong, record.offset)
 
-      for (((messageValue, record), index) <- 
messageValues.zip(records).zipWithIndex) {
+        // 3. verify message with key and header
+        offset = i * 3 + 2
+        record = records.apply(offset)

Review Comment:
   As well
   ```suggestion
           record = records(offset)
   ```



##########
core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:
##########
@@ -103,12 +123,37 @@ class ProducerCompressionTest extends QuorumTestHarness {
       // make sure the fetched message count match
       consumer.assign(Collections.singleton(tp))
       consumer.seek(tp, 0)
-      val records = TestUtils.consumeRecords(consumer, numRecords)
+      val records = TestUtils.consumeRecords(consumer, numRecords*3)
+
+      for (i <- 0 until numRecords) {
+        val messageValue = messageValues.apply(i)
+        // 1. verify message without key and header
+        var offset = i * 3
+        var record = records.apply(offset)
+        assertNull(record.key())
+        assertEquals(messageValue, new String(record.value))
+        assertEquals(0, record.headers().toArray.length)
+        assertEquals(now, record.timestamp)
+        assertEquals(offset.toLong, record.offset)
+
+        // 2. verify message with key, without header
+        offset = i * 3 + 1
+        record = records.apply(offset)
+        assertEquals(messageValue.size.toString, new String(record.key()))
+        assertEquals(messageValue, new String(record.value))
+        assertEquals(0, record.headers().toArray.length)
+        assertEquals(now, record.timestamp)
+        assertEquals(offset.toLong, record.offset)
 
-      for (((messageValue, record), index) <- 
messageValues.zip(records).zipWithIndex) {
+        // 3. verify message with key and header
+        offset = i * 3 + 2
+        record = records.apply(offset)
+        assertEquals(messageValue.size.toString, new String(record.key()))
         assertEquals(messageValue, new String(record.value))
+        assertEquals(1, record.headers().toArray.length)
+        assertEquals(headerArr.apply(0), record.headers().toArray.apply(0))

Review Comment:
   And here, but only on the first one, the second we can't remove easily
   ```suggestion
           assertEquals(headerArr(0), record.headers().toArray.apply(0))
   ```



##########
core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:
##########
@@ -103,12 +123,37 @@ class ProducerCompressionTest extends QuorumTestHarness {
       // make sure the fetched message count match
       consumer.assign(Collections.singleton(tp))
       consumer.seek(tp, 0)
-      val records = TestUtils.consumeRecords(consumer, numRecords)
+      val records = TestUtils.consumeRecords(consumer, numRecords*3)
+
+      for (i <- 0 until numRecords) {
+        val messageValue = messageValues.apply(i)
+        // 1. verify message without key and header
+        var offset = i * 3
+        var record = records.apply(offset)
+        assertNull(record.key())
+        assertEquals(messageValue, new String(record.value))
+        assertEquals(0, record.headers().toArray.length)
+        assertEquals(now, record.timestamp)
+        assertEquals(offset.toLong, record.offset)
+
+        // 2. verify message with key, without header
+        offset = i * 3 + 1
+        record = records.apply(offset)

Review Comment:
   And here 
   ```suggestion
           record = records(offset)
   ```



##########
core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:
##########
@@ -103,12 +123,37 @@ class ProducerCompressionTest extends QuorumTestHarness {
       // make sure the fetched message count match
       consumer.assign(Collections.singleton(tp))
       consumer.seek(tp, 0)
-      val records = TestUtils.consumeRecords(consumer, numRecords)
+      val records = TestUtils.consumeRecords(consumer, numRecords*3)
+
+      for (i <- 0 until numRecords) {
+        val messageValue = messageValues.apply(i)
+        // 1. verify message without key and header
+        var offset = i * 3
+        var record = records.apply(offset)
+        assertNull(record.key())
+        assertEquals(messageValue, new String(record.value))
+        assertEquals(0, record.headers().toArray.length)
+        assertEquals(now, record.timestamp)
+        assertEquals(offset.toLong, record.offset)
+
+        // 2. verify message with key, without header
+        offset = i * 3 + 1
+        record = records.apply(offset)
+        assertEquals(messageValue.size.toString, new String(record.key()))

Review Comment:
   ```suggestion
           assertEquals(messageValue.length.toString, new String(record.key()))
   ```



##########
core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:
##########
@@ -88,13 +93,28 @@ class ProducerCompressionTest extends QuorumTestHarness {
       }
       val partition = 0
 
+      def messageValue(length: Int): String = {
+        val random = new Random(0)
+        new String(random.alphanumeric.take(length).toArray)
+      }
+
       // prepare the messages
-      val messageValues = (0 until numRecords).map(i => "value" + i)
+      val messageValues = (0 until numRecords).map(i => messageValue(i))
+      val headerArr = Array[Header](new RecordHeader("key", "value".getBytes))
+      val headers = new RecordHeaders(headerArr)
 
       // make sure the returned messages are correct
       val now = System.currentTimeMillis()
-      val responses = for (message <- messageValues)
-        yield producer.send(new ProducerRecord(topic, null, now, null, 
message.getBytes))
+      val responses: ListBuffer[Future[RecordMetadata]] = new 
ListBuffer[Future[RecordMetadata]]()
+
+      for (message <- messageValues) {
+        // 1. send message without key and header
+        responses += producer.send(new ProducerRecord(topic, null, now, null, 
message.getBytes))
+        // 2. send message with key, without header
+        responses += producer.send(new ProducerRecord(topic, null, now, 
message.size.toString.getBytes, message.getBytes))
+        // 3. send message with key and header
+        responses += producer.send(new ProducerRecord(topic, null, now, 
message.size.toString.getBytes, message.getBytes, headers))

Review Comment:
   Same as above here:
   ```suggestion
           responses += producer.send(new ProducerRecord(topic, null, now, 
message.length.toString.getBytes, message.getBytes, headers))
   ```



##########
core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:
##########
@@ -88,13 +93,28 @@ class ProducerCompressionTest extends QuorumTestHarness {
       }
       val partition = 0
 
+      def messageValue(length: Int): String = {
+        val random = new Random(0)
+        new String(random.alphanumeric.take(length).toArray)
+      }
+
       // prepare the messages
-      val messageValues = (0 until numRecords).map(i => "value" + i)
+      val messageValues = (0 until numRecords).map(i => messageValue(i))
+      val headerArr = Array[Header](new RecordHeader("key", "value".getBytes))
+      val headers = new RecordHeaders(headerArr)
 
       // make sure the returned messages are correct
       val now = System.currentTimeMillis()
-      val responses = for (message <- messageValues)
-        yield producer.send(new ProducerRecord(topic, null, now, null, 
message.getBytes))
+      val responses: ListBuffer[Future[RecordMetadata]] = new 
ListBuffer[Future[RecordMetadata]]()
+
+      for (message <- messageValues) {
+        // 1. send message without key and header
+        responses += producer.send(new ProducerRecord(topic, null, now, null, 
message.getBytes))
+        // 2. send message with key, without header
+        responses += producer.send(new ProducerRecord(topic, null, now, 
message.size.toString.getBytes, message.getBytes))

Review Comment:
   Another nitpick, but using `message.size` would make compilation a bit 
slower as StringOps implicit conversion needs to kick in to then call `.length`.
   We could replace String calls to `size` with `length`
   ```suggestion
           responses += producer.send(new ProducerRecord(topic, null, now, 
message.length.toString.getBytes, message.getBytes))
   ```



##########
core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:
##########
@@ -103,12 +123,37 @@ class ProducerCompressionTest extends QuorumTestHarness {
       // make sure the fetched message count match
       consumer.assign(Collections.singleton(tp))
       consumer.seek(tp, 0)
-      val records = TestUtils.consumeRecords(consumer, numRecords)
+      val records = TestUtils.consumeRecords(consumer, numRecords*3)
+
+      for (i <- 0 until numRecords) {
+        val messageValue = messageValues.apply(i)
+        // 1. verify message without key and header
+        var offset = i * 3
+        var record = records.apply(offset)
+        assertNull(record.key())
+        assertEquals(messageValue, new String(record.value))
+        assertEquals(0, record.headers().toArray.length)
+        assertEquals(now, record.timestamp)
+        assertEquals(offset.toLong, record.offset)
+
+        // 2. verify message with key, without header
+        offset = i * 3 + 1
+        record = records.apply(offset)
+        assertEquals(messageValue.size.toString, new String(record.key()))
+        assertEquals(messageValue, new String(record.value))
+        assertEquals(0, record.headers().toArray.length)
+        assertEquals(now, record.timestamp)
+        assertEquals(offset.toLong, record.offset)
 
-      for (((messageValue, record), index) <- 
messageValues.zip(records).zipWithIndex) {
+        // 3. verify message with key and header
+        offset = i * 3 + 2
+        record = records.apply(offset)
+        assertEquals(messageValue.size.toString, new String(record.key()))

Review Comment:
   ```suggestion
           assertEquals(messageValue.length.toString, new String(record.key()))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to