Repository: kafka Updated Branches: refs/heads/0.10.0 8bf9addd2 -> 4e4e2fb50
KAFKA-4073; MirrorMaker should handle messages without timestamp correctly Author: Ismael Juma <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #1773 from ijuma/kafka-4073-mirror-maker-timestamps (cherry picked from commit a1e0b2240dba0740135621d959441eefa6fd3124) Signed-off-by: Jun Rao <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4e4e2fb5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4e4e2fb5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4e4e2fb5 Branch: refs/heads/0.10.0 Commit: 4e4e2fb5085758ee9ccf6307433ad531a33198d3 Parents: 8bf9add Author: Ismael Juma <[email protected]> Authored: Mon Aug 22 21:49:40 2016 -0700 Committer: Jun Rao <[email protected]> Committed: Mon Aug 22 21:50:25 2016 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/tools/MirrorMaker.scala | 4 +++- .../scala/unit/kafka/tools/MirrorMakerTest.scala | 18 +++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4e4e2fb5/core/src/main/scala/kafka/tools/MirrorMaker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 7d6b5fb..5de2038 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -38,6 +38,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.errors.WakeupException +import org.apache.kafka.common.record.Record import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap @@ -675,7 +676,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { private[tools] object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler { override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = { - Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](record.topic, null, record.timestamp, record.key, record.value)) + val timestamp: java.lang.Long = if (record.timestamp == Record.NO_TIMESTAMP) null else record.timestamp + Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](record.topic, null, timestamp, record.key, record.value)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4e4e2fb5/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala index 39a0ac9..d6a5470 100644 --- a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala @@ -18,7 +18,7 @@ package kafka.tools import kafka.consumer.BaseConsumerRecord -import org.apache.kafka.common.record.TimestampType +import org.apache.kafka.common.record.{Record, TimestampType} import org.junit.Assert._ import org.junit.Test @@ -39,4 +39,20 @@ class MirrorMakerTest { assertEquals("key", new String(producerRecord.key)) assertEquals("value", new String(producerRecord.value)) } + + @Test + def testDefaultMirrorMakerMessageHandlerWithNoTimestampInSourceMessage() { + val consumerRecord = BaseConsumerRecord("topic", 0, 1L, Record.NO_TIMESTAMP, TimestampType.CREATE_TIME, "key".getBytes, "value".getBytes) + + val result = MirrorMaker.defaultMirrorMakerMessageHandler.handle(consumerRecord) + assertEquals(1, result.size) + + val producerRecord = result.get(0) + assertNull(producerRecord.timestamp) + assertEquals("topic", producerRecord.topic) + assertNull(producerRecord.partition) + assertEquals("key", new String(producerRecord.key)) + assertEquals("value", new String(producerRecord.value)) + } + }
