jlprat commented on code in PR #14434:
URL: https://github.com/apache/kafka/pull/14434#discussion_r1338092163
##########
core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:
##########
@@ -103,12 +123,37 @@ class ProducerCompressionTest extends QuorumTestHarness {
// make sure the fetched message count match
consumer.assign(Collections.singleton(tp))
consumer.seek(tp, 0)
- val records = TestUtils.consumeRecords(consumer, numRecords)
+ val records = TestUtils.consumeRecords(consumer, numRecords*3)
+
+ for (i <- 0 until numRecords) {
+ val messageValue = messageValues.apply(i)
Review Comment:
This is a heavy nitpick, but usually `apply` is not spelled in Scala. This
should read:
```suggestion
val messageValue = messageValues(i)
```
##########
core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:
##########
@@ -103,12 +123,37 @@ class ProducerCompressionTest extends QuorumTestHarness {
// make sure the fetched message count match
consumer.assign(Collections.singleton(tp))
consumer.seek(tp, 0)
- val records = TestUtils.consumeRecords(consumer, numRecords)
+ val records = TestUtils.consumeRecords(consumer, numRecords*3)
+
+ for (i <- 0 until numRecords) {
+ val messageValue = messageValues.apply(i)
+ // 1. verify message without key and header
+ var offset = i * 3
+ var record = records.apply(offset)
Review Comment:
Same here
```suggestion
var record = records(offset)
```
##########
core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:
##########
@@ -103,12 +123,37 @@ class ProducerCompressionTest extends QuorumTestHarness {
// make sure the fetched message count match
consumer.assign(Collections.singleton(tp))
consumer.seek(tp, 0)
- val records = TestUtils.consumeRecords(consumer, numRecords)
+ val records = TestUtils.consumeRecords(consumer, numRecords*3)
+
+ for (i <- 0 until numRecords) {
+ val messageValue = messageValues.apply(i)
+ // 1. verify message without key and header
+ var offset = i * 3
+ var record = records.apply(offset)
+ assertNull(record.key())
+ assertEquals(messageValue, new String(record.value))
+ assertEquals(0, record.headers().toArray.length)
+ assertEquals(now, record.timestamp)
+ assertEquals(offset.toLong, record.offset)
+
+ // 2. verify message with key, without header
+ offset = i * 3 + 1
+ record = records.apply(offset)
+ assertEquals(messageValue.size.toString, new String(record.key()))
+ assertEquals(messageValue, new String(record.value))
+ assertEquals(0, record.headers().toArray.length)
+ assertEquals(now, record.timestamp)
+ assertEquals(offset.toLong, record.offset)
- for (((messageValue, record), index) <-
messageValues.zip(records).zipWithIndex) {
+ // 3. verify message with key and header
+ offset = i * 3 + 2
+ record = records.apply(offset)
Review Comment:
As well
```suggestion
record = records(offset)
```
##########
core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:
##########
@@ -103,12 +123,37 @@ class ProducerCompressionTest extends QuorumTestHarness {
// make sure the fetched message count match
consumer.assign(Collections.singleton(tp))
consumer.seek(tp, 0)
- val records = TestUtils.consumeRecords(consumer, numRecords)
+ val records = TestUtils.consumeRecords(consumer, numRecords*3)
+
+ for (i <- 0 until numRecords) {
+ val messageValue = messageValues.apply(i)
+ // 1. verify message without key and header
+ var offset = i * 3
+ var record = records.apply(offset)
+ assertNull(record.key())
+ assertEquals(messageValue, new String(record.value))
+ assertEquals(0, record.headers().toArray.length)
+ assertEquals(now, record.timestamp)
+ assertEquals(offset.toLong, record.offset)
+
+ // 2. verify message with key, without header
+ offset = i * 3 + 1
+ record = records.apply(offset)
+ assertEquals(messageValue.size.toString, new String(record.key()))
+ assertEquals(messageValue, new String(record.value))
+ assertEquals(0, record.headers().toArray.length)
+ assertEquals(now, record.timestamp)
+ assertEquals(offset.toLong, record.offset)
- for (((messageValue, record), index) <-
messageValues.zip(records).zipWithIndex) {
+ // 3. verify message with key and header
+ offset = i * 3 + 2
+ record = records.apply(offset)
+ assertEquals(messageValue.size.toString, new String(record.key()))
assertEquals(messageValue, new String(record.value))
+ assertEquals(1, record.headers().toArray.length)
+ assertEquals(headerArr.apply(0), record.headers().toArray.apply(0))
Review Comment:
And here, but only on the first one, the second we can't remove easily
```suggestion
assertEquals(headerArr(0), record.headers().toArray.apply(0))
```
##########
core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:
##########
@@ -103,12 +123,37 @@ class ProducerCompressionTest extends QuorumTestHarness {
// make sure the fetched message count match
consumer.assign(Collections.singleton(tp))
consumer.seek(tp, 0)
- val records = TestUtils.consumeRecords(consumer, numRecords)
+ val records = TestUtils.consumeRecords(consumer, numRecords*3)
+
+ for (i <- 0 until numRecords) {
+ val messageValue = messageValues.apply(i)
+ // 1. verify message without key and header
+ var offset = i * 3
+ var record = records.apply(offset)
+ assertNull(record.key())
+ assertEquals(messageValue, new String(record.value))
+ assertEquals(0, record.headers().toArray.length)
+ assertEquals(now, record.timestamp)
+ assertEquals(offset.toLong, record.offset)
+
+ // 2. verify message with key, without header
+ offset = i * 3 + 1
+ record = records.apply(offset)
Review Comment:
And here
```suggestion
record = records(offset)
```
##########
core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:
##########
@@ -103,12 +123,37 @@ class ProducerCompressionTest extends QuorumTestHarness {
// make sure the fetched message count match
consumer.assign(Collections.singleton(tp))
consumer.seek(tp, 0)
- val records = TestUtils.consumeRecords(consumer, numRecords)
+ val records = TestUtils.consumeRecords(consumer, numRecords*3)
+
+ for (i <- 0 until numRecords) {
+ val messageValue = messageValues.apply(i)
+ // 1. verify message without key and header
+ var offset = i * 3
+ var record = records.apply(offset)
+ assertNull(record.key())
+ assertEquals(messageValue, new String(record.value))
+ assertEquals(0, record.headers().toArray.length)
+ assertEquals(now, record.timestamp)
+ assertEquals(offset.toLong, record.offset)
+
+ // 2. verify message with key, without header
+ offset = i * 3 + 1
+ record = records.apply(offset)
+ assertEquals(messageValue.size.toString, new String(record.key()))
Review Comment:
```suggestion
assertEquals(messageValue.length.toString, new String(record.key()))
```
##########
core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:
##########
@@ -88,13 +93,28 @@ class ProducerCompressionTest extends QuorumTestHarness {
}
val partition = 0
+ def messageValue(length: Int): String = {
+ val random = new Random(0)
+ new String(random.alphanumeric.take(length).toArray)
+ }
+
// prepare the messages
- val messageValues = (0 until numRecords).map(i => "value" + i)
+ val messageValues = (0 until numRecords).map(i => messageValue(i))
+ val headerArr = Array[Header](new RecordHeader("key", "value".getBytes))
+ val headers = new RecordHeaders(headerArr)
// make sure the returned messages are correct
val now = System.currentTimeMillis()
- val responses = for (message <- messageValues)
- yield producer.send(new ProducerRecord(topic, null, now, null,
message.getBytes))
+ val responses: ListBuffer[Future[RecordMetadata]] = new
ListBuffer[Future[RecordMetadata]]()
+
+ for (message <- messageValues) {
+ // 1. send message without key and header
+ responses += producer.send(new ProducerRecord(topic, null, now, null,
message.getBytes))
+ // 2. send message with key, without header
+ responses += producer.send(new ProducerRecord(topic, null, now,
message.size.toString.getBytes, message.getBytes))
+ // 3. send message with key and header
+ responses += producer.send(new ProducerRecord(topic, null, now,
message.size.toString.getBytes, message.getBytes, headers))
Review Comment:
Same as above here:
```suggestion
responses += producer.send(new ProducerRecord(topic, null, now,
message.length.toString.getBytes, message.getBytes, headers))
```
##########
core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:
##########
@@ -88,13 +93,28 @@ class ProducerCompressionTest extends QuorumTestHarness {
}
val partition = 0
+ def messageValue(length: Int): String = {
+ val random = new Random(0)
+ new String(random.alphanumeric.take(length).toArray)
+ }
+
// prepare the messages
- val messageValues = (0 until numRecords).map(i => "value" + i)
+ val messageValues = (0 until numRecords).map(i => messageValue(i))
+ val headerArr = Array[Header](new RecordHeader("key", "value".getBytes))
+ val headers = new RecordHeaders(headerArr)
// make sure the returned messages are correct
val now = System.currentTimeMillis()
- val responses = for (message <- messageValues)
- yield producer.send(new ProducerRecord(topic, null, now, null,
message.getBytes))
+ val responses: ListBuffer[Future[RecordMetadata]] = new
ListBuffer[Future[RecordMetadata]]()
+
+ for (message <- messageValues) {
+ // 1. send message without key and header
+ responses += producer.send(new ProducerRecord(topic, null, now, null,
message.getBytes))
+ // 2. send message with key, without header
+ responses += producer.send(new ProducerRecord(topic, null, now,
message.size.toString.getBytes, message.getBytes))
Review Comment:
Another nitpick, but using `message.size` would make compilation a bit
slower as StringOps implicit conversion needs to kick in to then call `.length`.
We could replace String calls to `size` with `length`
```suggestion
responses += producer.send(new ProducerRecord(topic, null, now,
message.length.toString.getBytes, message.getBytes))
```
##########
core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:
##########
@@ -103,12 +123,37 @@ class ProducerCompressionTest extends QuorumTestHarness {
// make sure the fetched message count match
consumer.assign(Collections.singleton(tp))
consumer.seek(tp, 0)
- val records = TestUtils.consumeRecords(consumer, numRecords)
+ val records = TestUtils.consumeRecords(consumer, numRecords*3)
+
+ for (i <- 0 until numRecords) {
+ val messageValue = messageValues.apply(i)
+ // 1. verify message without key and header
+ var offset = i * 3
+ var record = records.apply(offset)
+ assertNull(record.key())
+ assertEquals(messageValue, new String(record.value))
+ assertEquals(0, record.headers().toArray.length)
+ assertEquals(now, record.timestamp)
+ assertEquals(offset.toLong, record.offset)
+
+ // 2. verify message with key, without header
+ offset = i * 3 + 1
+ record = records.apply(offset)
+ assertEquals(messageValue.size.toString, new String(record.key()))
+ assertEquals(messageValue, new String(record.value))
+ assertEquals(0, record.headers().toArray.length)
+ assertEquals(now, record.timestamp)
+ assertEquals(offset.toLong, record.offset)
- for (((messageValue, record), index) <-
messageValues.zip(records).zipWithIndex) {
+ // 3. verify message with key and header
+ offset = i * 3 + 2
+ record = records.apply(offset)
+ assertEquals(messageValue.size.toString, new String(record.key()))
Review Comment:
```suggestion
assertEquals(messageValue.length.toString, new String(record.key()))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]