This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 918c332  KAFKA-8460: produce records with current timestamp (#9877)
918c332 is described below

commit 918c332cbfed9cf3e00d6d8a3cdd353a903865b2
Author: Luke Chen <[email protected]>
AuthorDate: Wed Jan 20 15:48:44 2021 +0800

    KAFKA-8460: produce records with current timestamp (#9877)
    
    Reviewers: Ismael Juma <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../kafka/api/AbstractConsumerTest.scala           |  6 +-
 .../integration/kafka/api/BaseConsumerTest.scala   |  5 +-
 .../integration/kafka/api/ConsumerBounceTest.scala | 16 ++---
 .../kafka/api/PlaintextConsumerTest.scala          | 75 +++++++++++++---------
 .../kafka/api/SaslMultiMechanismConsumerTest.scala | 24 ++++---
 5 files changed, 76 insertions(+), 50 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
index 2c7c61d..ac62cdb 100644
--- a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
@@ -103,9 +103,11 @@ abstract class AbstractConsumerTest extends 
BaseRequestTest {
   }
 
   protected def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], 
numRecords: Int,
-                            tp: TopicPartition): 
Seq[ProducerRecord[Array[Byte], Array[Byte]]] = {
+                            tp: TopicPartition,
+                            startingTimestamp: Long = 
System.currentTimeMillis()): Seq[ProducerRecord[Array[Byte], Array[Byte]]] = {
     val records = (0 until numRecords).map { i =>
-      val record = new ProducerRecord(tp.topic(), tp.partition(), i.toLong, 
s"key $i".getBytes, s"value $i".getBytes)
+      val timestamp = startingTimestamp + i.toLong
+      val record = new ProducerRecord(tp.topic(), tp.partition(), timestamp, 
s"key $i".getBytes, s"value $i".getBytes)
       producer.send(record)
       record
     }
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 03882c6..fe56040 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -34,7 +34,8 @@ abstract class BaseConsumerTest extends AbstractConsumerTest {
   def testSimpleConsumption(): Unit = {
     val numRecords = 10000
     val producer = createProducer()
-    sendRecords(producer, numRecords, tp)
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
 
     val consumer = createConsumer()
     assertEquals(0, consumer.assignment.size)
@@ -42,7 +43,7 @@ abstract class BaseConsumerTest extends AbstractConsumerTest {
     assertEquals(1, consumer.assignment.size)
 
     consumer.seek(tp, 0)
-    consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, 
startingOffset = 0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, 
startingOffset = 0, startingTimestamp = startingTimestamp)
 
     // check async commit callbacks
     sendAndAwaitAsyncCommit(consumer)
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 5aa315a..a24a1b4 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -88,7 +88,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
   def consumeWithBrokerFailures(numIters: Int): Unit = {
     val numRecords = 1000
     val producer = createProducer()
-    sendRecords(producer, numRecords)
+    producerSend(producer, numRecords)
 
     var consumed = 0L
     val consumer = createConsumer()
@@ -126,7 +126,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
   def seekAndCommitWithBrokerFailures(numIters: Int): Unit = {
     val numRecords = 1000
     val producer = createProducer()
-    sendRecords(producer, numRecords)
+    producerSend(producer, numRecords)
 
     val consumer = createConsumer()
     consumer.assign(Collections.singletonList(tp))
@@ -214,7 +214,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
   def testClose(): Unit = {
     val numRecords = 10
     val producer = createProducer()
-    sendRecords(producer, numRecords)
+    producerSend(producer, numRecords)
 
     checkCloseGoodPath(numRecords, "group1")
     checkCloseWithCoordinatorFailure(numRecords, "group2", "group3")
@@ -360,7 +360,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
     
assertTrue(rejectedConsumer.thrownException.get.isInstanceOf[GroupMaxSizeReachedException])
 
     // assert group continues to live
-    sendRecords(createProducer(), maxGroupSize * 100, topic, numPartitions = 
Some(partitions.size))
+    producerSend(createProducer(), maxGroupSize * 100, topic, numPartitions = 
Some(partitions.size))
     TestUtils.waitUntilTrue(() => {
       consumerPollers.forall(p => p.receivedMessages >= 100)
     }, "The consumers in the group could not fetch the expected records", 
10000L)
@@ -512,10 +512,10 @@ class ConsumerBounceTest extends AbstractConsumerTest 
with Logging {
     Range(0, numPartitions).map(part => new TopicPartition(topic, part)).toSet
   }
 
-  private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
-                          numRecords: Int,
-                          topic: String = this.topic,
-                          numPartitions: Option[Int] = None): Unit = {
+  private def producerSend(producer: KafkaProducer[Array[Byte], Array[Byte]],
+                           numRecords: Int,
+                           topic: String = this.topic,
+                           numPartitions: Option[Int] = None): Unit = {
     var partitionIndex = 0
     def getPartition: Int = {
       numPartitions match {
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 2e744d9..97f4407 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -162,12 +162,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val numRecords = 10000
 
     val producer = createProducer()
-    sendRecords(producer, numRecords, tp)
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
 
     this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
maxPollRecords.toString)
     val consumer = createConsumer()
     consumer.assign(List(tp).asJava)
-    consumeAndVerifyRecords(consumer, numRecords = numRecords, startingOffset 
= 0, maxPollRecords = maxPollRecords)
+    consumeAndVerifyRecords(consumer, numRecords = numRecords, startingOffset 
= 0, maxPollRecords = maxPollRecords,
+      startingTimestamp = startingTimestamp)
   }
 
   @Test
@@ -312,21 +314,23 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @Test
   def testAutoOffsetReset(): Unit = {
     val producer = createProducer()
-    sendRecords(producer, numRecords = 1, tp)
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords = 1, tp, startingTimestamp = 
startingTimestamp)
 
     val consumer = createConsumer()
     consumer.assign(List(tp).asJava)
-    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, 
startingOffset = 0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, 
startingOffset = 0, startingTimestamp = startingTimestamp)
   }
 
   @Test
   def testGroupConsumption(): Unit = {
     val producer = createProducer()
-    sendRecords(producer, numRecords = 10, tp)
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords = 10, tp, startingTimestamp = 
startingTimestamp)
 
     val consumer = createConsumer()
     consumer.subscribe(List(topic).asJava)
-    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, 
startingOffset = 0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, 
startingOffset = 0, startingTimestamp = startingTimestamp)
   }
 
   /**
@@ -571,7 +575,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     // Test seek non-compressed message
     val producer = createProducer()
-    sendRecords(producer, totalRecords.toInt, tp)
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
     consumer.assign(List(tp).asJava)
 
     consumer.seekToEnd(List(tp).asJava)
@@ -580,7 +585,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     consumer.seekToBeginning(List(tp).asJava)
     assertEquals(0L, consumer.position(tp))
-    consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0)
+    consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0, 
startingTimestamp = startingTimestamp)
 
     consumer.seek(tp, mid)
     assertEquals(mid, consumer.position(tp))
@@ -620,7 +625,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @Test
   def testPositionAndCommit(): Unit = {
     val producer = createProducer()
-    sendRecords(producer, numRecords = 5, tp)
+    var startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords = 5, tp, startingTimestamp = 
startingTimestamp)
 
     val topicPartition = new TopicPartition(topic, 15)
     val consumer = createConsumer()
@@ -634,34 +640,37 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(0L, consumer.position(tp), "position() on a partition that we 
are subscribed to should reset the offset")
     consumer.commitSync()
     assertEquals(0L, consumer.committed(Set(tp).asJava).get(tp).offset)
-
-    consumeAndVerifyRecords(consumer = consumer, numRecords = 5, 
startingOffset = 0)
+    
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 5, 
startingOffset = 0, startingTimestamp = startingTimestamp)
     assertEquals(5L, consumer.position(tp), "After consuming 5 records, 
position should be 5")
     consumer.commitSync()
     assertEquals(5L, consumer.committed(Set(tp).asJava).get(tp).offset, 
"Committed offset should be returned")
 
-    sendRecords(producer, numRecords = 1, tp)
+    startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords = 1, tp, startingTimestamp = 
startingTimestamp)
 
     // another consumer in the same group should get the same position
     val otherConsumer = createConsumer()
     otherConsumer.assign(List(tp).asJava)
-    consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5)
+    consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
   @Test
   def testPartitionPauseAndResume(): Unit = {
     val partitions = List(tp).asJava
     val producer = createProducer()
-    sendRecords(producer, numRecords = 5, tp)
+    var startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords = 5, tp, startingTimestamp = 
startingTimestamp)
 
     val consumer = createConsumer()
     consumer.assign(partitions)
-    consumeAndVerifyRecords(consumer = consumer, numRecords = 5, 
startingOffset = 0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 5, 
startingOffset = 0, startingTimestamp = startingTimestamp)
     consumer.pause(partitions)
-    sendRecords(producer, numRecords = 5, tp)
+    startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords = 5, tp, startingTimestamp = 
startingTimestamp)
     assertTrue(consumer.poll(Duration.ofMillis(100)).isEmpty)
     consumer.resume(partitions)
-    consumeAndVerifyRecords(consumer = consumer, numRecords = 5, 
startingOffset = 5)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 5, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
   @Test
@@ -772,6 +781,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     // this behaves a little different than when remaining limit bytes is 0 
and it's important to test it
     this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 
"500")
     
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 "100")
+
+    // Avoid a rebalance while the records are being sent (the default is 6 
seconds)
+    
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
20000.toString)
     val consumer = createConsumer()
 
     val topic1 = "topic1"
@@ -794,7 +806,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     awaitAssignment(consumer, partitions.toSet)
 
     val producer = createProducer()
-    val producerRecords = partitions.flatMap(sendRecords(producer, numRecords 
= 15, _))
+
+    val producerRecords = partitions.flatMap(sendRecords(producer, numRecords 
= partitionCount, _))
+
     val consumerRecords = consumeRecords(consumer, producerRecords.size)
 
     val expected = producerRecords.map { record =>
@@ -1112,17 +1126,16 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val numRecords = 50
     // Test non-compressed messages
     val producer = createProducer()
-    sendRecords(producer, numRecords, tp)
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
     val consumer = createConsumer()
     consumer.assign(List(tp).asJava)
-    consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, 
startingOffset = 0, startingKeyAndValueIndex = 0,
-      startingTimestamp = 0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, 
startingOffset = 0, startingTimestamp = startingTimestamp)
 
     // Test compressed messages
     sendCompressedMessages(numRecords, tp2)
     consumer.assign(List(tp2).asJava)
-    consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, tp = 
tp2, startingOffset = 0, startingKeyAndValueIndex = 0,
-      startingTimestamp = 0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, tp = 
tp2, startingOffset = 0)
   }
 
   @Test
@@ -1199,8 +1212,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       for (part <- 0 until numParts) {
         val tp = new TopicPartition(topic, part)
         // In sendRecords(), each message will have key, value and timestamp 
equal to the sequence number.
-        sendRecords(producer, numRecords = 100, tp)
-        timestampsToSearch.put(tp, i * 20)
+        sendRecords(producer, numRecords = 100, tp, startingTimestamp = 0)
+        timestampsToSearch.put(tp, (i * 20).toLong)
         i += 1
       }
     }
@@ -1285,9 +1298,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val consumer = createConsumer()
 
     val producer = createProducer()
-    sendRecords(producer, numRecords = 5, tp)
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords = 5, tp, startingTimestamp = 
startingTimestamp)
     consumer.subscribe(List(topic).asJava)
-    consumeAndVerifyRecords(consumer = consumer, numRecords = 5, 
startingOffset = 0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 5, 
startingOffset = 0, startingTimestamp = startingTimestamp)
     consumer.pause(List(tp).asJava)
 
     // subscribe to a new topic to trigger a rebalance
@@ -1295,7 +1309,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     // after rebalance, our position should be reset and our pause state lost,
     // so we should be able to consume from the beginning
-    consumeAndVerifyRecords(consumer = consumer, numRecords = 0, 
startingOffset = 5)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 0, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
   @Test
@@ -1575,12 +1589,13 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   def testQuotaMetricsNotCreatedIfNoQuotasConfigured(): Unit = {
     val numRecords = 1000
     val producer = createProducer()
-    sendRecords(producer, numRecords, tp)
+    val startingTimestamp = System.currentTimeMillis()
+    sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
 
     val consumer = createConsumer()
     consumer.assign(List(tp).asJava)
     consumer.seek(tp, 0)
-    consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, 
startingOffset = 0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, 
startingOffset = 0, startingTimestamp = startingTimestamp)
 
     def assertNoMetric(broker: KafkaServer, name: String, quotaType: 
QuotaType, clientId: String): Unit = {
         val metricName = broker.metrics.metricName("throttle-time",
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
index 4136df0..eb2eab4 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
@@ -55,33 +55,41 @@ class SaslMultiMechanismConsumerTest extends 
BaseConsumerTest with SaslSetup {
     var startingOffset = 0
 
     // Test SASL/PLAIN producer and consumer
-    sendRecords(plainSaslProducer, numRecords, tp)
+    var startingTimestamp = System.currentTimeMillis()
+    sendRecords(plainSaslProducer, numRecords, tp, startingTimestamp = 
startingTimestamp)
     plainSaslConsumer.assign(List(tp).asJava)
     plainSaslConsumer.seek(tp, 0)
-    consumeAndVerifyRecords(consumer = plainSaslConsumer, numRecords = 
numRecords, startingOffset = startingOffset)
+    consumeAndVerifyRecords(consumer = plainSaslConsumer, numRecords = 
numRecords, startingOffset = startingOffset,
+      startingTimestamp = startingTimestamp)
     sendAndAwaitAsyncCommit(plainSaslConsumer)
     startingOffset += numRecords
 
     // Test SASL/GSSAPI producer and consumer
-    sendRecords(gssapiSaslProducer, numRecords, tp)
+    startingTimestamp = System.currentTimeMillis()
+    sendRecords(gssapiSaslProducer, numRecords, tp, startingTimestamp = 
startingTimestamp)
     gssapiSaslConsumer.assign(List(tp).asJava)
     gssapiSaslConsumer.seek(tp, startingOffset)
-    consumeAndVerifyRecords(consumer = gssapiSaslConsumer, numRecords = 
numRecords, startingOffset = startingOffset)
+    consumeAndVerifyRecords(consumer = gssapiSaslConsumer, numRecords = 
numRecords, startingOffset = startingOffset,
+      startingTimestamp = startingTimestamp)
     sendAndAwaitAsyncCommit(gssapiSaslConsumer)
     startingOffset += numRecords
 
     // Test SASL/PLAIN producer and SASL/GSSAPI consumer
-    sendRecords(plainSaslProducer, numRecords, tp)
+    startingTimestamp = System.currentTimeMillis()
+    sendRecords(plainSaslProducer, numRecords, tp, startingTimestamp = 
startingTimestamp)
     gssapiSaslConsumer.assign(List(tp).asJava)
     gssapiSaslConsumer.seek(tp, startingOffset)
-    consumeAndVerifyRecords(consumer = gssapiSaslConsumer, numRecords = 
numRecords, startingOffset = startingOffset)
+    consumeAndVerifyRecords(consumer = gssapiSaslConsumer, numRecords = 
numRecords, startingOffset = startingOffset,
+      startingTimestamp = startingTimestamp)
     startingOffset += numRecords
 
     // Test SASL/GSSAPI producer and SASL/PLAIN consumer
-    sendRecords(gssapiSaslProducer, numRecords, tp)
+    startingTimestamp = System.currentTimeMillis()
+    sendRecords(gssapiSaslProducer, numRecords, tp, startingTimestamp = 
startingTimestamp)
     plainSaslConsumer.assign(List(tp).asJava)
     plainSaslConsumer.seek(tp, startingOffset)
-    consumeAndVerifyRecords(consumer = plainSaslConsumer, numRecords = 
numRecords, startingOffset = startingOffset)
+    consumeAndVerifyRecords(consumer = plainSaslConsumer, numRecords = 
numRecords, startingOffset = startingOffset,
+      startingTimestamp = startingTimestamp)
   }
 
 }

Reply via email to