This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 918c332 KAFKA-8460: produce records with current timestamp (#9877)
918c332 is described below
commit 918c332cbfed9cf3e00d6d8a3cdd353a903865b2
Author: Luke Chen <[email protected]>
AuthorDate: Wed Jan 20 15:48:44 2021 +0800
KAFKA-8460: produce records with current timestamp (#9877)
Reviewers: Ismael Juma <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../kafka/api/AbstractConsumerTest.scala | 6 +-
.../integration/kafka/api/BaseConsumerTest.scala | 5 +-
.../integration/kafka/api/ConsumerBounceTest.scala | 16 ++---
.../kafka/api/PlaintextConsumerTest.scala | 75 +++++++++++++---------
.../kafka/api/SaslMultiMechanismConsumerTest.scala | 24 ++++---
5 files changed, 76 insertions(+), 50 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
index 2c7c61d..ac62cdb 100644
--- a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
@@ -103,9 +103,11 @@ abstract class AbstractConsumerTest extends
BaseRequestTest {
}
protected def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
numRecords: Int,
- tp: TopicPartition):
Seq[ProducerRecord[Array[Byte], Array[Byte]]] = {
+ tp: TopicPartition,
+ startingTimestamp: Long =
System.currentTimeMillis()): Seq[ProducerRecord[Array[Byte], Array[Byte]]] = {
val records = (0 until numRecords).map { i =>
- val record = new ProducerRecord(tp.topic(), tp.partition(), i.toLong,
s"key $i".getBytes, s"value $i".getBytes)
+ val timestamp = startingTimestamp + i.toLong
+ val record = new ProducerRecord(tp.topic(), tp.partition(), timestamp,
s"key $i".getBytes, s"value $i".getBytes)
producer.send(record)
record
}
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 03882c6..fe56040 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -34,7 +34,8 @@ abstract class BaseConsumerTest extends AbstractConsumerTest {
def testSimpleConsumption(): Unit = {
val numRecords = 10000
val producer = createProducer()
- sendRecords(producer, numRecords, tp)
+ val startingTimestamp = System.currentTimeMillis()
+ sendRecords(producer, numRecords, tp, startingTimestamp =
startingTimestamp)
val consumer = createConsumer()
assertEquals(0, consumer.assignment.size)
@@ -42,7 +43,7 @@ abstract class BaseConsumerTest extends AbstractConsumerTest {
assertEquals(1, consumer.assignment.size)
consumer.seek(tp, 0)
- consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords,
startingOffset = 0)
+ consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords,
startingOffset = 0, startingTimestamp = startingTimestamp)
// check async commit callbacks
sendAndAwaitAsyncCommit(consumer)
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 5aa315a..a24a1b4 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -88,7 +88,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
def consumeWithBrokerFailures(numIters: Int): Unit = {
val numRecords = 1000
val producer = createProducer()
- sendRecords(producer, numRecords)
+ producerSend(producer, numRecords)
var consumed = 0L
val consumer = createConsumer()
@@ -126,7 +126,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
def seekAndCommitWithBrokerFailures(numIters: Int): Unit = {
val numRecords = 1000
val producer = createProducer()
- sendRecords(producer, numRecords)
+ producerSend(producer, numRecords)
val consumer = createConsumer()
consumer.assign(Collections.singletonList(tp))
@@ -214,7 +214,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
def testClose(): Unit = {
val numRecords = 10
val producer = createProducer()
- sendRecords(producer, numRecords)
+ producerSend(producer, numRecords)
checkCloseGoodPath(numRecords, "group1")
checkCloseWithCoordinatorFailure(numRecords, "group2", "group3")
@@ -360,7 +360,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
assertTrue(rejectedConsumer.thrownException.get.isInstanceOf[GroupMaxSizeReachedException])
// assert group continues to live
- sendRecords(createProducer(), maxGroupSize * 100, topic, numPartitions =
Some(partitions.size))
+ producerSend(createProducer(), maxGroupSize * 100, topic, numPartitions =
Some(partitions.size))
TestUtils.waitUntilTrue(() => {
consumerPollers.forall(p => p.receivedMessages >= 100)
}, "The consumers in the group could not fetch the expected records",
10000L)
@@ -512,10 +512,10 @@ class ConsumerBounceTest extends AbstractConsumerTest
with Logging {
Range(0, numPartitions).map(part => new TopicPartition(topic, part)).toSet
}
- private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
- numRecords: Int,
- topic: String = this.topic,
- numPartitions: Option[Int] = None): Unit = {
+ private def producerSend(producer: KafkaProducer[Array[Byte], Array[Byte]],
+ numRecords: Int,
+ topic: String = this.topic,
+ numPartitions: Option[Int] = None): Unit = {
var partitionIndex = 0
def getPartition: Int = {
numPartitions match {
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 2e744d9..97f4407 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -162,12 +162,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val numRecords = 10000
val producer = createProducer()
- sendRecords(producer, numRecords, tp)
+ val startingTimestamp = System.currentTimeMillis()
+ sendRecords(producer, numRecords, tp, startingTimestamp =
startingTimestamp)
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
maxPollRecords.toString)
val consumer = createConsumer()
consumer.assign(List(tp).asJava)
- consumeAndVerifyRecords(consumer, numRecords = numRecords, startingOffset
= 0, maxPollRecords = maxPollRecords)
+ consumeAndVerifyRecords(consumer, numRecords = numRecords, startingOffset
= 0, maxPollRecords = maxPollRecords,
+ startingTimestamp = startingTimestamp)
}
@Test
@@ -312,21 +314,23 @@ class PlaintextConsumerTest extends BaseConsumerTest {
@Test
def testAutoOffsetReset(): Unit = {
val producer = createProducer()
- sendRecords(producer, numRecords = 1, tp)
+ val startingTimestamp = System.currentTimeMillis()
+ sendRecords(producer, numRecords = 1, tp, startingTimestamp =
startingTimestamp)
val consumer = createConsumer()
consumer.assign(List(tp).asJava)
- consumeAndVerifyRecords(consumer = consumer, numRecords = 1,
startingOffset = 0)
+ consumeAndVerifyRecords(consumer = consumer, numRecords = 1,
startingOffset = 0, startingTimestamp = startingTimestamp)
}
@Test
def testGroupConsumption(): Unit = {
val producer = createProducer()
- sendRecords(producer, numRecords = 10, tp)
+ val startingTimestamp = System.currentTimeMillis()
+ sendRecords(producer, numRecords = 10, tp, startingTimestamp =
startingTimestamp)
val consumer = createConsumer()
consumer.subscribe(List(topic).asJava)
- consumeAndVerifyRecords(consumer = consumer, numRecords = 1,
startingOffset = 0)
+ consumeAndVerifyRecords(consumer = consumer, numRecords = 1,
startingOffset = 0, startingTimestamp = startingTimestamp)
}
/**
@@ -571,7 +575,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// Test seek non-compressed message
val producer = createProducer()
- sendRecords(producer, totalRecords.toInt, tp)
+ val startingTimestamp = 0
+ sendRecords(producer, totalRecords.toInt, tp, startingTimestamp =
startingTimestamp)
consumer.assign(List(tp).asJava)
consumer.seekToEnd(List(tp).asJava)
@@ -580,7 +585,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
consumer.seekToBeginning(List(tp).asJava)
assertEquals(0L, consumer.position(tp))
- consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0)
+ consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0,
startingTimestamp = startingTimestamp)
consumer.seek(tp, mid)
assertEquals(mid, consumer.position(tp))
@@ -620,7 +625,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
@Test
def testPositionAndCommit(): Unit = {
val producer = createProducer()
- sendRecords(producer, numRecords = 5, tp)
+ var startingTimestamp = System.currentTimeMillis()
+ sendRecords(producer, numRecords = 5, tp, startingTimestamp =
startingTimestamp)
val topicPartition = new TopicPartition(topic, 15)
val consumer = createConsumer()
@@ -634,34 +640,37 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(0L, consumer.position(tp), "position() on a partition that we
are subscribed to should reset the offset")
consumer.commitSync()
assertEquals(0L, consumer.committed(Set(tp).asJava).get(tp).offset)
-
- consumeAndVerifyRecords(consumer = consumer, numRecords = 5,
startingOffset = 0)
+
+ consumeAndVerifyRecords(consumer = consumer, numRecords = 5,
startingOffset = 0, startingTimestamp = startingTimestamp)
assertEquals(5L, consumer.position(tp), "After consuming 5 records,
position should be 5")
consumer.commitSync()
assertEquals(5L, consumer.committed(Set(tp).asJava).get(tp).offset,
"Committed offset should be returned")
- sendRecords(producer, numRecords = 1, tp)
+ startingTimestamp = System.currentTimeMillis()
+ sendRecords(producer, numRecords = 1, tp, startingTimestamp =
startingTimestamp)
// another consumer in the same group should get the same position
val otherConsumer = createConsumer()
otherConsumer.assign(List(tp).asJava)
- consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1,
startingOffset = 5)
+ consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1,
startingOffset = 5, startingTimestamp = startingTimestamp)
}
@Test
def testPartitionPauseAndResume(): Unit = {
val partitions = List(tp).asJava
val producer = createProducer()
- sendRecords(producer, numRecords = 5, tp)
+ var startingTimestamp = System.currentTimeMillis()
+ sendRecords(producer, numRecords = 5, tp, startingTimestamp =
startingTimestamp)
val consumer = createConsumer()
consumer.assign(partitions)
- consumeAndVerifyRecords(consumer = consumer, numRecords = 5,
startingOffset = 0)
+ consumeAndVerifyRecords(consumer = consumer, numRecords = 5,
startingOffset = 0, startingTimestamp = startingTimestamp)
consumer.pause(partitions)
- sendRecords(producer, numRecords = 5, tp)
+ startingTimestamp = System.currentTimeMillis()
+ sendRecords(producer, numRecords = 5, tp, startingTimestamp =
startingTimestamp)
assertTrue(consumer.poll(Duration.ofMillis(100)).isEmpty)
consumer.resume(partitions)
- consumeAndVerifyRecords(consumer = consumer, numRecords = 5,
startingOffset = 5)
+ consumeAndVerifyRecords(consumer = consumer, numRecords = 5,
startingOffset = 5, startingTimestamp = startingTimestamp)
}
@Test
@@ -772,6 +781,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// this behaves a little different than when remaining limit bytes is 0
and it's important to test it
this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
"500")
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
"100")
+
+ // Avoid a rebalance while the records are being sent (the default is 6
seconds)
+
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
20000.toString)
val consumer = createConsumer()
val topic1 = "topic1"
@@ -794,7 +806,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
awaitAssignment(consumer, partitions.toSet)
val producer = createProducer()
- val producerRecords = partitions.flatMap(sendRecords(producer, numRecords
= 15, _))
+
+ val producerRecords = partitions.flatMap(sendRecords(producer, numRecords
= partitionCount, _))
+
val consumerRecords = consumeRecords(consumer, producerRecords.size)
val expected = producerRecords.map { record =>
@@ -1112,17 +1126,16 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val numRecords = 50
// Test non-compressed messages
val producer = createProducer()
- sendRecords(producer, numRecords, tp)
+ val startingTimestamp = System.currentTimeMillis()
+ sendRecords(producer, numRecords, tp, startingTimestamp =
startingTimestamp)
val consumer = createConsumer()
consumer.assign(List(tp).asJava)
- consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords,
startingOffset = 0, startingKeyAndValueIndex = 0,
- startingTimestamp = 0)
+ consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords,
startingOffset = 0, startingTimestamp = startingTimestamp)
// Test compressed messages
sendCompressedMessages(numRecords, tp2)
consumer.assign(List(tp2).asJava)
- consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, tp =
tp2, startingOffset = 0, startingKeyAndValueIndex = 0,
- startingTimestamp = 0)
+ consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, tp =
tp2, startingOffset = 0)
}
@Test
@@ -1199,8 +1212,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
for (part <- 0 until numParts) {
val tp = new TopicPartition(topic, part)
// In sendRecords(), each message will have key, value and timestamp
equal to the sequence number.
- sendRecords(producer, numRecords = 100, tp)
- timestampsToSearch.put(tp, i * 20)
+ sendRecords(producer, numRecords = 100, tp, startingTimestamp = 0)
+ timestampsToSearch.put(tp, (i * 20).toLong)
i += 1
}
}
@@ -1285,9 +1298,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val consumer = createConsumer()
val producer = createProducer()
- sendRecords(producer, numRecords = 5, tp)
+ val startingTimestamp = System.currentTimeMillis()
+ sendRecords(producer, numRecords = 5, tp, startingTimestamp =
startingTimestamp)
consumer.subscribe(List(topic).asJava)
- consumeAndVerifyRecords(consumer = consumer, numRecords = 5,
startingOffset = 0)
+ consumeAndVerifyRecords(consumer = consumer, numRecords = 5,
startingOffset = 0, startingTimestamp = startingTimestamp)
consumer.pause(List(tp).asJava)
// subscribe to a new topic to trigger a rebalance
@@ -1295,7 +1309,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// after rebalance, our position should be reset and our pause state lost,
// so we should be able to consume from the beginning
- consumeAndVerifyRecords(consumer = consumer, numRecords = 0,
startingOffset = 5)
+ consumeAndVerifyRecords(consumer = consumer, numRecords = 0,
startingOffset = 5, startingTimestamp = startingTimestamp)
}
@Test
@@ -1575,12 +1589,13 @@ class PlaintextConsumerTest extends BaseConsumerTest {
def testQuotaMetricsNotCreatedIfNoQuotasConfigured(): Unit = {
val numRecords = 1000
val producer = createProducer()
- sendRecords(producer, numRecords, tp)
+ val startingTimestamp = System.currentTimeMillis()
+ sendRecords(producer, numRecords, tp, startingTimestamp =
startingTimestamp)
val consumer = createConsumer()
consumer.assign(List(tp).asJava)
consumer.seek(tp, 0)
- consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords,
startingOffset = 0)
+ consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords,
startingOffset = 0, startingTimestamp = startingTimestamp)
def assertNoMetric(broker: KafkaServer, name: String, quotaType:
QuotaType, clientId: String): Unit = {
val metricName = broker.metrics.metricName("throttle-time",
diff --git
a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
index 4136df0..eb2eab4 100644
---
a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
@@ -55,33 +55,41 @@ class SaslMultiMechanismConsumerTest extends
BaseConsumerTest with SaslSetup {
var startingOffset = 0
// Test SASL/PLAIN producer and consumer
- sendRecords(plainSaslProducer, numRecords, tp)
+ var startingTimestamp = System.currentTimeMillis()
+ sendRecords(plainSaslProducer, numRecords, tp, startingTimestamp =
startingTimestamp)
plainSaslConsumer.assign(List(tp).asJava)
plainSaslConsumer.seek(tp, 0)
- consumeAndVerifyRecords(consumer = plainSaslConsumer, numRecords =
numRecords, startingOffset = startingOffset)
+ consumeAndVerifyRecords(consumer = plainSaslConsumer, numRecords =
numRecords, startingOffset = startingOffset,
+ startingTimestamp = startingTimestamp)
sendAndAwaitAsyncCommit(plainSaslConsumer)
startingOffset += numRecords
// Test SASL/GSSAPI producer and consumer
- sendRecords(gssapiSaslProducer, numRecords, tp)
+ startingTimestamp = System.currentTimeMillis()
+ sendRecords(gssapiSaslProducer, numRecords, tp, startingTimestamp =
startingTimestamp)
gssapiSaslConsumer.assign(List(tp).asJava)
gssapiSaslConsumer.seek(tp, startingOffset)
- consumeAndVerifyRecords(consumer = gssapiSaslConsumer, numRecords =
numRecords, startingOffset = startingOffset)
+ consumeAndVerifyRecords(consumer = gssapiSaslConsumer, numRecords =
numRecords, startingOffset = startingOffset,
+ startingTimestamp = startingTimestamp)
sendAndAwaitAsyncCommit(gssapiSaslConsumer)
startingOffset += numRecords
// Test SASL/PLAIN producer and SASL/GSSAPI consumer
- sendRecords(plainSaslProducer, numRecords, tp)
+ startingTimestamp = System.currentTimeMillis()
+ sendRecords(plainSaslProducer, numRecords, tp, startingTimestamp =
startingTimestamp)
gssapiSaslConsumer.assign(List(tp).asJava)
gssapiSaslConsumer.seek(tp, startingOffset)
- consumeAndVerifyRecords(consumer = gssapiSaslConsumer, numRecords =
numRecords, startingOffset = startingOffset)
+ consumeAndVerifyRecords(consumer = gssapiSaslConsumer, numRecords =
numRecords, startingOffset = startingOffset,
+ startingTimestamp = startingTimestamp)
startingOffset += numRecords
// Test SASL/GSSAPI producer and SASL/PLAIN consumer
- sendRecords(gssapiSaslProducer, numRecords, tp)
+ startingTimestamp = System.currentTimeMillis()
+ sendRecords(gssapiSaslProducer, numRecords, tp, startingTimestamp =
startingTimestamp)
plainSaslConsumer.assign(List(tp).asJava)
plainSaslConsumer.seek(tp, startingOffset)
- consumeAndVerifyRecords(consumer = plainSaslConsumer, numRecords =
numRecords, startingOffset = startingOffset)
+ consumeAndVerifyRecords(consumer = plainSaslConsumer, numRecords =
numRecords, startingOffset = startingOffset,
+ startingTimestamp = startingTimestamp)
}
}