Repository: kafka
Updated Branches:
  refs/heads/trunk e31c0c9bd -> a10d7b1b7


KAFKA-4789: Added support to ProcessorTopologyTestDriver to forward timestamps 
to internal topics

This resolves the issue in the ProcessorTopologyTestDriver that the extracted 
timestamp is not forwarded with the produced record to the internal topics.

JIRA ticket: https://issues.apache.org/jira/browse/KAFKA-4789

The contribution is my original work and I license the work to the project 
under the project's open source license.

guozhangwang dguy

Author: Hamidreza Afzali <[email protected]>

Reviewers: Damian Guy, Guozhang Wang

Closes #2590 from hrafzali/KAFKA-4789_ProcessorTopologyTestDriver_timestamp


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a10d7b1b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a10d7b1b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a10d7b1b

Branch: refs/heads/trunk
Commit: a10d7b1b765a2ac4af3e219b7c344f67a93de487
Parents: e31c0c9
Author: Hamidreza Afzali <[email protected]>
Authored: Tue Feb 28 13:31:32 2017 -0800
Committer: Guozhang Wang <[email protected]>
Committed: Tue Feb 28 13:31:32 2017 -0800

----------------------------------------------------------------------
 .../internals/ProcessorTopologyTest.java        | 46 ++++++++++++++++++++
 .../kafka/test/ProcessorTopologyTestDriver.java | 29 ++++++++----
 2 files changed, 67 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a10d7b1b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 2f387cb..322c178 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -46,6 +46,7 @@ import java.io.File;
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -246,6 +247,20 @@ public class ProcessorTopologyTest {
     }
 
     @Test
+    public void testDrivingInternalRepartitioningForwardingTimestampTopology() 
{
+        driver = new ProcessorTopologyTestDriver(config, 
createInternalRepartitioningWithValueTimestampTopology());
+        driver.process(INPUT_TOPIC_1, "key1", "value1@1000", 
STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key2", "value2@2000", 
STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key3", "value3@3000", 
STRING_SERIALIZER, STRING_SERIALIZER);
+        assertThat(driver.readOutput(OUTPUT_TOPIC_1, STRING_DESERIALIZER, 
STRING_DESERIALIZER),
+                equalTo(new ProducerRecord<>(OUTPUT_TOPIC_1, null, 1000L, 
"key1", "value1")));
+        assertThat(driver.readOutput(OUTPUT_TOPIC_1, STRING_DESERIALIZER, 
STRING_DESERIALIZER),
+                equalTo(new ProducerRecord<>(OUTPUT_TOPIC_1, null, 2000L, 
"key2", "value2")));
+        assertThat(driver.readOutput(OUTPUT_TOPIC_1, STRING_DESERIALIZER, 
STRING_DESERIALIZER),
+                equalTo(new ProducerRecord<>(OUTPUT_TOPIC_1, null, 3000L, 
"key3", "value3")));
+    }
+
+    @Test
     public void shouldCreateStringWithSourceAndTopics() throws Exception {
         builder.addSource("source", "topic1", "topic2");
         final ProcessorTopology topology = builder.build(null);
@@ -356,6 +371,15 @@ public class ProcessorTopologyTest {
             .addSink("sink1", OUTPUT_TOPIC_1, "source1");
     }
 
+    private TopologyBuilder 
createInternalRepartitioningWithValueTimestampTopology() {
+        return builder.addSource("source", INPUT_TOPIC_1)
+                .addInternalTopic(THROUGH_TOPIC_1)
+                .addProcessor("processor", define(new 
ValueTimestampProcessor()), "source")
+                .addSink("sink0", THROUGH_TOPIC_1, "processor")
+                .addSource("source1", THROUGH_TOPIC_1)
+                .addSink("sink1", OUTPUT_TOPIC_1, "source1");
+    }
+
     private TopologyBuilder createSimpleMultiSourceTopology(int partition) {
         return builder.addSource("source-1", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_1)
                 .addProcessor("processor-1", define(new 
ForwardingProcessor()), "source-1")
@@ -383,6 +407,18 @@ public class ProcessorTopologyTest {
     }
 
     /**
+     * A processor that removes custom timestamp information from messages and 
forwards modified messages to each child.
+     * A message contains custom timestamp information if the value is in 
".*@[0-9]+" format.
+     */
+    protected static class ValueTimestampProcessor extends 
AbstractProcessor<String, String> {
+
+        @Override
+        public void process(String key, String value) {
+            context().forward(key, value.split("@")[0]);
+        }
+    }
+
+    /**
      * A processor that forwards slightly-modified messages to each child.
      */
     protected static class MultiplexingProcessor extends 
AbstractProcessor<String, String> {
@@ -487,9 +523,19 @@ public class ProcessorTopologyTest {
         };
     }
 
+    /**
+     * A custom timestamp extractor that extracts the timestamp from the 
record's value if the value is in ".*@[0-9]+"
+     * format. Otherwise, it returns the record's timestamp or the default 
timestamp if the record's timestamp is zero.
+    */
     public static class CustomTimestampExtractor implements TimestampExtractor 
{
         @Override
         public long extract(final ConsumerRecord<Object, Object> record, final 
long previousTimestamp) {
+            if (record.value().toString().matches(".*@[0-9]+"))
+                return Long.parseLong(record.value().toString().split("@")[1]);
+
+            if (record.timestamp() > 0L)
+                return record.timestamp();
+
             return timestamp;
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a10d7b1b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index d2fb1c7..5808e9a 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -233,23 +233,27 @@ public class ProcessorTopologyTestDriver {
     }
 
     /**
-     * Send an input message with the given key and value on the specified 
topic to the topology, and then commit the messages.
+     * Send an input message with the given key, value and timestamp on the 
specified topic to the topology, and then commit the messages.
      *
      * @param topicName the name of the topic on which the message is to be 
sent
      * @param key the raw message key
      * @param value the raw message value
+     * @param timestamp the raw message timestamp
      */
-    public void process(String topicName, byte[] key, byte[] value) {
+    private void process(String topicName, byte[] key, byte[] value, long 
timestamp) {
+
         TopicPartition tp = partitionsByTopic.get(topicName);
         if (tp != null) {
             // Add the record ...
             long offset = offsetsByTopicPartition.get(tp).incrementAndGet();
-            task.addRecords(tp, records(new ConsumerRecord<>(tp.topic(), 
tp.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, value)));
+            task.addRecords(tp, records(new ConsumerRecord<>(tp.topic(), 
tp.partition(), offset, timestamp, TimestampType.CREATE_TIME, 0L, 0, 0, key, 
value)));
             producer.clear();
+
             // Process the record ...
             task.process();
-            ((InternalProcessorContext) task.context()).setRecordContext(new 
ProcessorRecordContext(0L, offset, tp.partition(), topicName));
+            ((InternalProcessorContext) task.context()).setRecordContext(new 
ProcessorRecordContext(timestamp, offset, tp.partition(), topicName));
             task.commit();
+
             // Capture all the records sent to the producer ...
             for (ProducerRecord<byte[], byte[]> record : producer.history()) {
                 Queue<ProducerRecord<byte[], byte[]>> outputRecords = 
outputRecordsByTopic.get(record.topic());
@@ -261,7 +265,7 @@ public class ProcessorTopologyTestDriver {
 
                 // Forward back into the topology if the produced record is to 
an internal topic ...
                 if (internalTopics.contains(record.topic())) {
-                    process(record.topic(), record.key(), record.value());
+                    process(record.topic(), record.key(), record.value(), 
record.timestamp());
                 }
             }
         } else {
@@ -270,11 +274,20 @@ public class ProcessorTopologyTestDriver {
                 throw new IllegalArgumentException("Unexpected topic: " + 
topicName);
             }
             final long offset = 
offsetsByTopicPartition.get(global).incrementAndGet();
-            globalStateTask.update(new ConsumerRecord<>(global.topic(), 
global.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, 
value));
+            globalStateTask.update(new ConsumerRecord<>(global.topic(), 
global.partition(), offset, timestamp, TimestampType.CREATE_TIME, 0L, 0, 0, 
key, value));
             globalStateTask.flushState();
         }
+    }
 
-
+    /**
+     * Send an input message with the given key and value on the specified 
topic to the topology.
+     *
+     * @param topicName the name of the topic on which the message is to be 
sent
+     * @param key the raw message key
+     * @param value the raw message value
+     */
+    public void process(String topicName, byte[] key, byte[] value) {
+        process(topicName, key, value, 0L);
     }
 
     /**
@@ -317,7 +330,7 @@ public class ProcessorTopologyTestDriver {
         if (record == null) return null;
         K key = keyDeserializer.deserialize(record.topic(), record.key());
         V value = valueDeserializer.deserialize(record.topic(), 
record.value());
-        return new ProducerRecord<K, V>(record.topic(), record.partition(), 
key, value);
+        return new ProducerRecord<K, V>(record.topic(), record.partition(), 
record.timestamp(), key, value);
     }
 
     private Iterable<ConsumerRecord<byte[], byte[]>> 
records(ConsumerRecord<byte[], byte[]> record) {

Reply via email to