Repository: kafka Updated Branches: refs/heads/trunk 8ace736f7 -> b982eefd3
KAFKA-4927: Fix KStreamsTestDriver to not throw NPE when KStream.to() sinks are used a KStream.to() sink is also a topic ... so the KStreamTestDriver to fetch it when required Author: Wim Van Leuven <[email protected]> Author: Wim Van Leuven <[email protected]> Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang Closes #2716 from wimvanleuven/KAFKA-4927 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b982eefd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b982eefd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b982eefd Branch: refs/heads/trunk Commit: b982eefd37595b30600b7a979915db5da22271fe Parents: 8ace736 Author: Wim Van Leuven <[email protected]> Authored: Tue May 9 14:31:50 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue May 9 14:31:50 2017 -0700 ---------------------------------------------------------------------- .../streams/kstream/KStreamBuilderTest.java | 53 +++++++++++++++++--- .../apache/kafka/test/KStreamTestDriver.java | 45 ++++++++++------- 2 files changed, 72 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b982eefd/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java index 6fc6bd4..e7cb669 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java @@ -79,13 +79,52 @@ public class KStreamBuilderTest { assertEquals("Y-0000000001", builder.newName("Y-")); assertEquals("Z-0000000002", builder.newName("Z-")); - KStreamBuilder newBuilder = new KStreamBuilder(); + final KStreamBuilder newBuilder = new KStreamBuilder(); assertEquals("X-0000000000", newBuilder.newName("X-")); assertEquals("Y-0000000001", newBuilder.newName("Y-")); assertEquals("Z-0000000002", newBuilder.newName("Z-")); } + + @Test + public void shouldNotTryProcessingFromSinkTopic() { + final KStream<String, String> source = builder.stream("topic-source"); + source.to("topic-sink"); + + final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>(); + + source.process(processorSupplier); + + driver = new KStreamTestDriver(builder); + driver.setTime(0L); + + driver.process("topic-source", "A", "aa"); + + // no exception was thrown + assertEquals(Utils.mkList("A:aa"), processorSupplier.processed); + } + + @Test + public void shouldTryProcessingFromThoughTopic() { + final KStream<String, String> source = builder.stream("topic-source"); + final KStream<String, String> through = source.through("topic-sink"); + + final MockProcessorSupplier<String, String> sourceProcessorSupplier = new MockProcessorSupplier<>(); + final MockProcessorSupplier<String, String> throughProcessorSupplier = new MockProcessorSupplier<>(); + + source.process(sourceProcessorSupplier); + through.process(throughProcessorSupplier); + + driver = new KStreamTestDriver(builder); + driver.setTime(0L); + + driver.process("topic-source", "A", "aa"); + + assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.processed); + assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.processed); + } + @Test public void testNewStoreName() { assertEquals("X-STATE-STORE-0000000000", builder.newStoreName("X-")); @@ -101,14 +140,14 @@ public class KStreamBuilderTest { @Test public void testMerge() { - String topic1 = "topic-1"; - String topic2 = "topic-2"; + final String topic1 = "topic-1"; + final String topic2 = "topic-2"; - KStream<String, String> source1 = builder.stream(topic1); - KStream<String, String> source2 = builder.stream(topic2); - KStream<String, String> merged = builder.merge(source1, source2); + final KStream<String, String> source1 = builder.stream(topic1); + final KStream<String, String> source2 = builder.stream(topic2); + final KStream<String, String> merged = builder.merge(source1, source2); - MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>(); + final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>(); merged.process(processorSupplier); driver = new KStreamTestDriver(builder); http://git-wip-us.apache.org/repos/asf/kafka/blob/b982eefd/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index 73c944a..9fb83ba 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -22,12 +22,11 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; -import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorNode; -import org.apache.kafka.streams.processor.internals.ProcessorStateManager; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -109,23 +108,27 @@ public class KStreamTestDriver { public void process(final String topicName, final Object key, final Object value) { final ProcessorNode prevNode = context.currentNode(); - ProcessorNode currNode = topology.source(topicName); - if (currNode == null && globalTopology != null) { - currNode = globalTopology.source(topicName); - } + final ProcessorNode currNode = sourceNodeByTopicName(topicName); - // if currNode is null, check if this topic is a changelog topic; - // if yes, skip - if (topicName.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) { - return; + if (currNode != null) { + context.setRecordContext(createRecordContext(context.timestamp())); + context.setCurrentNode(currNode); + try { + context.forward(key, value); + } finally { + context.setCurrentNode(prevNode); + } } - context.setRecordContext(createRecordContext(context.timestamp())); - context.setCurrentNode(currNode); - try { - context.forward(key, value); - } finally { - context.setCurrentNode(prevNode); + } + + private ProcessorNode sourceNodeByTopicName(final String topicName) { + ProcessorNode topicNode = topology.source(topicName); + + if (topicNode == null && globalTopology != null) { + topicNode = globalTopology.source(topicName); } + + return topicNode; } public void punctuate(final long timestamp) { @@ -224,7 +227,9 @@ public class KStreamTestDriver { final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner) { // The serialization is skipped. - process(topic, key, value); + if (sourceNodeByTopicName(topic) != null) { + process(topic, key, value); + } } @Override @@ -236,7 +241,9 @@ public class KStreamTestDriver { final Serializer<K> keySerializer, final Serializer<V> valueSerializer) { // The serialization is skipped. - process(topic, key, value); + if (sourceNodeByTopicName(topic) != null) { + process(topic, key, value); + } } @Override
