This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push: new 1bcd351 KAFKA-7055: Update InternalTopologyBuilder to throw TopologyException if a processor or sink is added with no upstream node attached (#5215) 1bcd351 is described below commit 1bcd35183d8cd0b009bc1b8d592b5f24a0095890 Author: nixsticks <nikki.th...@gmail.com> AuthorDate: Thu Jun 14 23:26:01 2018 -0400 KAFKA-7055: Update InternalTopologyBuilder to throw TopologyException if a processor or sink is added with no upstream node attached (#5215) Reviewers: Matthias J. Sax <matth...@confluent.io>, Guozhang Wang <guozh...@confluent.io>, Bill Bejeck <b...@confluent.io> --- .../streams/kstream/internals/KStreamImpl.java | 1 - .../internals/InternalTopologyBuilder.java | 28 +++++++-------- .../processor/internals/ProcessorContextImpl.java | 3 +- .../org/apache/kafka/streams/TopologyTest.java | 41 +++++++++++++++++++++- .../internals/InternalTopologyBuilderTest.java | 24 ++++++++++++- .../processor/internals/StreamThreadTest.java | 4 +-- .../kafka/streams/TopologyTestDriverTest.java | 2 +- 7 files changed, 81 insertions(+), 22 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 7356aff..e7dabbf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -580,7 +580,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME); builder.internalTopologyBuilder.addProcessor(name, new KStreamKTableJoin<>(((KTableImpl<K, ?, V1>) other).valueGetterSupplier(), joiner, leftJoin), this.name); builder.internalTopologyBuilder.connectProcessorAndStateStores(name, ((KTableImpl) other).valueGetterSupplier().storeNames()); - builder.internalTopologyBuilder.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name); return new KStreamImpl<>(builder, name, allSourceNodes, false); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 36a2edc..5b4b4d73 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -442,6 +442,11 @@ public class InternalTopologyBuilder { final String... predecessorNames) { Objects.requireNonNull(name, "name must not be null"); Objects.requireNonNull(topic, "topic must not be null"); + Objects.requireNonNull(predecessorNames, "predecessor names must not be null"); + if (predecessorNames.length == 0) { + throw new TopologyException("Sink " + name + " must have at least one parent"); + } + addSink(name, new StaticTopicNameExtractor<K, V>(topic), keySerializer, valSerializer, partitioner, predecessorNames); nodeToSinkTopic.put(name, topic); } @@ -454,9 +459,13 @@ public class InternalTopologyBuilder { final String... predecessorNames) { Objects.requireNonNull(name, "name must not be null"); Objects.requireNonNull(topicExtractor, "topic extractor must not be null"); + Objects.requireNonNull(predecessorNames, "predecessor names must not be null"); if (nodeFactories.containsKey(name)) { throw new TopologyException("Processor " + name + " is already added."); } + if (predecessorNames.length == 0) { + throw new TopologyException("Sink " + name + " must have at least one parent"); + } for (final String predecessor : predecessorNames) { Objects.requireNonNull(predecessor, "predecessor name can't be null"); @@ -481,9 +490,13 @@ public class InternalTopologyBuilder { final String... predecessorNames) { Objects.requireNonNull(name, "name must not be null"); Objects.requireNonNull(supplier, "supplier must not be null"); + Objects.requireNonNull(predecessorNames, "predecessor names must not be null"); if (nodeFactories.containsKey(name)) { throw new TopologyException("Processor " + name + " is already added."); } + if (predecessorNames.length == 0) { + throw new TopologyException("Processor " + name + " must have at least one parent"); + } for (final String predecessor : predecessorNames) { Objects.requireNonNull(predecessor, "predecessor name must not be null"); @@ -592,21 +605,6 @@ public class InternalTopologyBuilder { storeToSourceChangelogTopic.put(storeBuilder, topic); } - public final void connectProcessors(final String... processorNames) { - if (processorNames.length < 2) { - throw new TopologyException("At least two processors need to participate in the connection."); - } - - for (final String processorName : processorNames) { - Objects.requireNonNull(processorName, "processor name can't be null"); - if (!nodeFactories.containsKey(processorName)) { - throw new TopologyException("Processor " + processorName + " is not added yet."); - } - } - - nodeGrouper.unite(processorNames[0], Arrays.copyOfRange(processorNames, 1, processorNames.length)); - } - public final void addInternalTopic(final String topicName) { Objects.requireNonNull(topicName, "topicName can't be null"); internalTopicNames.add(topicName); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index a539a1b..f1ee81f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -116,7 +116,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re if (sendTo != null) { final ProcessorNode child = currentNode().getChild(sendTo); if (child == null) { - throw new StreamsException("Unknown processor name: " + sendTo); + throw new StreamsException("Unknown downstream node: " + sendTo + " either does not exist or is not" + + " connected to this processor."); } forward(child, key, value); } else { diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java index 8b47885..ece157c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java @@ -174,6 +174,24 @@ public class TopologyTest { } catch (final TopologyException expected) { } } + @Test + public void shouldNotAllowToAddProcessorWithEmptyParents() { + topology.addSource("source", "topic-1"); + try { + topology.addProcessor("processor", new MockProcessorSupplier()); + fail("Should throw TopologyException for processor without at least one parent node"); + } catch (final TopologyException expected) { } + } + + @Test + public void shouldNotAllowToAddProcessorWithNullParents() { + topology.addSource("source", "topic-1"); + try { + topology.addProcessor("processor", new MockProcessorSupplier(), null); + fail("Should throw NullPointerException for processor when null parent names are provided"); + } catch (final NullPointerException expected) { } + } + @Test(expected = TopologyException.class) public void shouldFailOnUnknownSource() { topology.addProcessor("processor", new MockProcessorSupplier(), "source"); @@ -194,6 +212,26 @@ public class TopologyTest { } catch (final TopologyException expected) { } } + @Test + public void shouldNotAllowToAddSinkWithEmptyParents() { + topology.addSource("source", "topic-1"); + topology.addProcessor("processor", new MockProcessorSupplier(), "source"); + try { + topology.addSink("sink", "topic-2"); + fail("Should throw TopologyException for sink without at least one parent node"); + } catch (final TopologyException expected) { } + } + + @Test + public void shouldNotAllowToAddSinkWithNullParents() { + topology.addSource("source", "topic-1"); + topology.addProcessor("processor", new MockProcessorSupplier(), "source"); + try { + topology.addSink("sink", "topic-2", null); + fail("Should throw NullPointerException for sink when null parent names are provided"); + } catch (final NullPointerException expected) { } + } + @Test(expected = TopologyException.class) public void shouldFailWithUnknownParent() { topology.addSink("sink", "topic-2", "source"); @@ -236,7 +274,8 @@ public class TopologyTest { public void shouldNotAllowToAddStateStoreToSink() { mockStoreBuilder(); EasyMock.replay(storeBuilder); - topology.addSink("sink-1", "topic-1"); + topology.addSource("source-1", "topic-1"); + topology.addSink("sink-1", "topic-1", "source-1"); try { topology.addStateStore(storeBuilder, "sink-1"); fail("Should have thrown TopologyException for adding store to sink node"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 1da0425..b0674ea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -159,6 +159,16 @@ public class InternalTopologyBuilderTest { builder.addProcessor("processor", new MockProcessorSupplier(), "processor"); } + @Test(expected = TopologyException.class) + public void testAddProcessorWithEmptyParents() { + builder.addProcessor("processor", new MockProcessorSupplier()); + } + + @Test(expected = NullPointerException.class) + public void testAddProcessorWithNullParents() { + builder.addProcessor("processor", new MockProcessorSupplier(), null); + } + @Test public void testAddSinkWithSameName() { builder.addSource(null, "source", null, null, null, "topic-1"); @@ -179,6 +189,17 @@ public class InternalTopologyBuilderTest { builder.addSink("sink", "topic-2", null, null, null, "sink"); } + + @Test(expected = TopologyException.class) + public void testAddSinkWithEmptyParents() { + builder.addSink("sink", "topic", null, null, null); + } + + @Test(expected = NullPointerException.class) + public void testAddSinkWithNullParents() { + builder.addSink("sink", "topic", null, null, null, null); + } + @Test public void testAddSinkConnectedWithParent() { builder.addSource(null, "source", null, null, null, "source-topic"); @@ -275,7 +296,8 @@ public class InternalTopologyBuilderTest { @Test public void testAddStateStoreWithSink() { - builder.addSink("sink-1", "topic-1", null, null, null); + builder.addSource(null, "source-1", null, null, null, "topic-1"); + builder.addSink("sink-1", "topic-1", null, null, null, "source-1"); try { builder.addStateStore(storeBuilder, "sink-1"); fail("Should throw TopologyException with store cannot be added to sink"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 3412c62..513d1c0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -597,7 +597,7 @@ public class StreamThreadTest { @Test public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() { internalTopologyBuilder.addSource(null, "name", null, null, null, "topic"); - internalTopologyBuilder.addSink("out", "output", null, null, null); + internalTopologyBuilder.addSink("out", "output", null, null, null, "name"); final StreamThread thread = createStreamThread(clientId, config, false); @@ -690,7 +690,7 @@ public class StreamThreadTest { @Test public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAtBeginTransactionWhenTaskIsResumed() { internalTopologyBuilder.addSource(null, "name", null, null, null, topic1); - internalTopologyBuilder.addSink("out", "output", null, null, null); + internalTopologyBuilder.addSink("out", "output", null, null, null, "name"); final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true); diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index 135fb3f..d0d4ed1 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -707,7 +707,7 @@ public class TopologyTestDriverTest { @Test public void shouldReturnAllStores() { final Topology topology = setupSourceSinkTopology(); - topology.addProcessor("processor", () -> null); + topology.addProcessor("processor", () -> null, "source"); topology.addStateStore( new KeyValueStoreBuilder<>( Stores.inMemoryKeyValueStore("store"), -- To stop receiving notification emails like this one, please contact mj...@apache.org.