Repository: kafka
Updated Branches:
  refs/heads/trunk a2bac70a6 -> a1e0b2240


KAFKA-4073; MirrorMaker should handle messages without timestamp correctly

Author: Ismael Juma <[email protected]>

Reviewers: Jun Rao <[email protected]>

Closes #1773 from ijuma/kafka-4073-mirror-maker-timestamps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a1e0b224
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a1e0b224
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a1e0b224

Branch: refs/heads/trunk
Commit: a1e0b2240dba0740135621d959441eefa6fd3124
Parents: a2bac70
Author: Ismael Juma <[email protected]>
Authored: Mon Aug 22 21:49:40 2016 -0700
Committer: Jun Rao <[email protected]>
Committed: Mon Aug 22 21:49:40 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/MirrorMaker.scala |  4 +++-
 .../scala/unit/kafka/tools/MirrorMakerTest.scala  | 18 +++++++++++++++++-
 2 files changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a1e0b224/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala 
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index f800032..4346074 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.errors.WakeupException
+import org.apache.kafka.common.record.Record
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.HashMap
@@ -680,7 +681,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
   private[tools] object defaultMirrorMakerMessageHandler extends 
MirrorMakerMessageHandler {
     override def handle(record: BaseConsumerRecord): 
util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
-      Collections.singletonList(new ProducerRecord[Array[Byte], 
Array[Byte]](record.topic, null, record.timestamp, record.key, record.value))
+      val timestamp: java.lang.Long = if (record.timestamp == 
Record.NO_TIMESTAMP) null else record.timestamp
+      Collections.singletonList(new ProducerRecord[Array[Byte], 
Array[Byte]](record.topic, null, timestamp, record.key, record.value))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1e0b224/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala 
b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala
index 39a0ac9..d6a5470 100644
--- a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala
@@ -18,7 +18,7 @@
 package kafka.tools
 
 import kafka.consumer.BaseConsumerRecord
-import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.record.{Record, TimestampType}
 import org.junit.Assert._
 import org.junit.Test
 
@@ -39,4 +39,20 @@ class MirrorMakerTest {
     assertEquals("key", new String(producerRecord.key))
     assertEquals("value", new String(producerRecord.value))
   }
+
+  @Test
+  def testDefaultMirrorMakerMessageHandlerWithNoTimestampInSourceMessage() {
+    val consumerRecord = BaseConsumerRecord("topic", 0, 1L, 
Record.NO_TIMESTAMP, TimestampType.CREATE_TIME, "key".getBytes, 
"value".getBytes)
+
+    val result = 
MirrorMaker.defaultMirrorMakerMessageHandler.handle(consumerRecord)
+    assertEquals(1, result.size)
+
+    val producerRecord = result.get(0)
+    assertNull(producerRecord.timestamp)
+    assertEquals("topic", producerRecord.topic)
+    assertNull(producerRecord.partition)
+    assertEquals("key", new String(producerRecord.key))
+    assertEquals("value", new String(producerRecord.value))
+  }
+
 }

Reply via email to