Repository: kafka Updated Branches: refs/heads/trunk e31c0c9bd -> a10d7b1b7
KAFKA-4789: Added support to ProcessorTopologyTestDriver to forward timestamps to internal topics This resolves the issue in the ProcessorTopologyTestDriver that the extracted timestamp is not forwarded with the produced record to the internal topics. JIRA ticket: https://issues.apache.org/jira/browse/KAFKA-4789 The contribution is my original work and I license the work to the project under the project's open source license. guozhangwang dguy Author: Hamidreza Afzali <[email protected]> Reviewers: Damian Guy, Guozhang Wang Closes #2590 from hrafzali/KAFKA-4789_ProcessorTopologyTestDriver_timestamp Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a10d7b1b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a10d7b1b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a10d7b1b Branch: refs/heads/trunk Commit: a10d7b1b765a2ac4af3e219b7c344f67a93de487 Parents: e31c0c9 Author: Hamidreza Afzali <[email protected]> Authored: Tue Feb 28 13:31:32 2017 -0800 Committer: Guozhang Wang <[email protected]> Committed: Tue Feb 28 13:31:32 2017 -0800 ---------------------------------------------------------------------- .../internals/ProcessorTopologyTest.java | 46 ++++++++++++++++++++ .../kafka/test/ProcessorTopologyTestDriver.java | 29 ++++++++---- 2 files changed, 67 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a10d7b1b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 2f387cb..322c178 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -46,6 +46,7 @@ import java.io.File; import java.util.Properties; import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -246,6 +247,20 @@ public class ProcessorTopologyTest { } @Test + public void testDrivingInternalRepartitioningForwardingTimestampTopology() { + driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningWithValueTimestampTopology()); + driver.process(INPUT_TOPIC_1, "key1", "value1@1000", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key2", "value2@2000", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key3", "value3@3000", STRING_SERIALIZER, STRING_SERIALIZER); + assertThat(driver.readOutput(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER), + equalTo(new ProducerRecord<>(OUTPUT_TOPIC_1, null, 1000L, "key1", "value1"))); + assertThat(driver.readOutput(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER), + equalTo(new ProducerRecord<>(OUTPUT_TOPIC_1, null, 2000L, "key2", "value2"))); + assertThat(driver.readOutput(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER), + equalTo(new ProducerRecord<>(OUTPUT_TOPIC_1, null, 3000L, "key3", "value3"))); + } + + @Test public void shouldCreateStringWithSourceAndTopics() throws Exception { builder.addSource("source", "topic1", "topic2"); final ProcessorTopology topology = builder.build(null); @@ -356,6 +371,15 @@ public class ProcessorTopologyTest { .addSink("sink1", OUTPUT_TOPIC_1, "source1"); } + private TopologyBuilder createInternalRepartitioningWithValueTimestampTopology() { + return builder.addSource("source", INPUT_TOPIC_1) + .addInternalTopic(THROUGH_TOPIC_1) + .addProcessor("processor", define(new ValueTimestampProcessor()), "source") + .addSink("sink0", THROUGH_TOPIC_1, "processor") + .addSource("source1", THROUGH_TOPIC_1) + .addSink("sink1", OUTPUT_TOPIC_1, "source1"); + } + private TopologyBuilder createSimpleMultiSourceTopology(int partition) { return builder.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) .addProcessor("processor-1", define(new ForwardingProcessor()), "source-1") @@ -383,6 +407,18 @@ public class ProcessorTopologyTest { } /** + * A processor that removes custom timestamp information from messages and forwards modified messages to each child. + * A message contains custom timestamp information if the value is in ".*@[0-9]+" format. + */ + protected static class ValueTimestampProcessor extends AbstractProcessor<String, String> { + + @Override + public void process(String key, String value) { + context().forward(key, value.split("@")[0]); + } + } + + /** * A processor that forwards slightly-modified messages to each child. */ protected static class MultiplexingProcessor extends AbstractProcessor<String, String> { @@ -487,9 +523,19 @@ public class ProcessorTopologyTest { }; } + /** + * A custom timestamp extractor that extracts the timestamp from the record's value if the value is in ".*@[0-9]+" + * format. Otherwise, it returns the record's timestamp or the default timestamp if the record's timestamp is zero. + */ public static class CustomTimestampExtractor implements TimestampExtractor { @Override public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) { + if (record.value().toString().matches(".*@[0-9]+")) + return Long.parseLong(record.value().toString().split("@")[1]); + + if (record.timestamp() > 0L) + return record.timestamp(); + return timestamp; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/a10d7b1b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index d2fb1c7..5808e9a 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -233,23 +233,27 @@ public class ProcessorTopologyTestDriver { } /** - * Send an input message with the given key and value on the specified topic to the topology, and then commit the messages. + * Send an input message with the given key, value and timestamp on the specified topic to the topology, and then commit the messages. * * @param topicName the name of the topic on which the message is to be sent * @param key the raw message key * @param value the raw message value + * @param timestamp the raw message timestamp */ - public void process(String topicName, byte[] key, byte[] value) { + private void process(String topicName, byte[] key, byte[] value, long timestamp) { + TopicPartition tp = partitionsByTopic.get(topicName); if (tp != null) { // Add the record ... long offset = offsetsByTopicPartition.get(tp).incrementAndGet(); - task.addRecords(tp, records(new ConsumerRecord<>(tp.topic(), tp.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, value))); + task.addRecords(tp, records(new ConsumerRecord<>(tp.topic(), tp.partition(), offset, timestamp, TimestampType.CREATE_TIME, 0L, 0, 0, key, value))); producer.clear(); + // Process the record ... task.process(); - ((InternalProcessorContext) task.context()).setRecordContext(new ProcessorRecordContext(0L, offset, tp.partition(), topicName)); + ((InternalProcessorContext) task.context()).setRecordContext(new ProcessorRecordContext(timestamp, offset, tp.partition(), topicName)); task.commit(); + // Capture all the records sent to the producer ... for (ProducerRecord<byte[], byte[]> record : producer.history()) { Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(record.topic()); @@ -261,7 +265,7 @@ public class ProcessorTopologyTestDriver { // Forward back into the topology if the produced record is to an internal topic ... if (internalTopics.contains(record.topic())) { - process(record.topic(), record.key(), record.value()); + process(record.topic(), record.key(), record.value(), record.timestamp()); } } } else { @@ -270,11 +274,20 @@ public class ProcessorTopologyTestDriver { throw new IllegalArgumentException("Unexpected topic: " + topicName); } final long offset = offsetsByTopicPartition.get(global).incrementAndGet(); - globalStateTask.update(new ConsumerRecord<>(global.topic(), global.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, value)); + globalStateTask.update(new ConsumerRecord<>(global.topic(), global.partition(), offset, timestamp, TimestampType.CREATE_TIME, 0L, 0, 0, key, value)); globalStateTask.flushState(); } + } - + /** + * Send an input message with the given key and value on the specified topic to the topology. + * + * @param topicName the name of the topic on which the message is to be sent + * @param key the raw message key + * @param value the raw message value + */ + public void process(String topicName, byte[] key, byte[] value) { + process(topicName, key, value, 0L); } /** @@ -317,7 +330,7 @@ public class ProcessorTopologyTestDriver { if (record == null) return null; K key = keyDeserializer.deserialize(record.topic(), record.key()); V value = valueDeserializer.deserialize(record.topic(), record.value()); - return new ProducerRecord<K, V>(record.topic(), record.partition(), key, value); + return new ProducerRecord<K, V>(record.topic(), record.partition(), record.timestamp(), key, value); } private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]> record) {
