Repository: kafka Updated Branches: refs/heads/trunk a2bac70a6 -> a1e0b2240
KAFKA-4073; MirrorMaker should handle messages without timestamp correctly Author: Ismael Juma <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #1773 from ijuma/kafka-4073-mirror-maker-timestamps Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a1e0b224 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a1e0b224 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a1e0b224 Branch: refs/heads/trunk Commit: a1e0b2240dba0740135621d959441eefa6fd3124 Parents: a2bac70 Author: Ismael Juma <[email protected]> Authored: Mon Aug 22 21:49:40 2016 -0700 Committer: Jun Rao <[email protected]> Committed: Mon Aug 22 21:49:40 2016 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/tools/MirrorMaker.scala | 4 +++- .../scala/unit/kafka/tools/MirrorMakerTest.scala | 18 +++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a1e0b224/core/src/main/scala/kafka/tools/MirrorMaker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index f800032..4346074 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -38,6 +38,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.errors.WakeupException +import org.apache.kafka.common.record.Record import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap @@ -680,7 +681,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { private[tools] object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler { override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = { - Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](record.topic, null, record.timestamp, record.key, record.value)) + val timestamp: java.lang.Long = if (record.timestamp == Record.NO_TIMESTAMP) null else record.timestamp + Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](record.topic, null, timestamp, record.key, record.value)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/a1e0b224/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala index 39a0ac9..d6a5470 100644 --- a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala @@ -18,7 +18,7 @@ package kafka.tools import kafka.consumer.BaseConsumerRecord -import org.apache.kafka.common.record.TimestampType +import org.apache.kafka.common.record.{Record, TimestampType} import org.junit.Assert._ import org.junit.Test @@ -39,4 +39,20 @@ class MirrorMakerTest { assertEquals("key", new String(producerRecord.key)) assertEquals("value", new String(producerRecord.value)) } + + @Test + def testDefaultMirrorMakerMessageHandlerWithNoTimestampInSourceMessage() { + val consumerRecord = BaseConsumerRecord("topic", 0, 1L, Record.NO_TIMESTAMP, TimestampType.CREATE_TIME, "key".getBytes, "value".getBytes) + + val result = MirrorMaker.defaultMirrorMakerMessageHandler.handle(consumerRecord) + assertEquals(1, result.size) + + val producerRecord = result.get(0) + assertNull(producerRecord.timestamp) + assertEquals("topic", producerRecord.topic) + assertNull(producerRecord.partition) + assertEquals("key", new String(producerRecord.key)) + assertEquals("value", new String(producerRecord.value)) + } + }
