This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new ed12616 KAFKA-8705: Remove parent node after leaving loop to prevent
NPE (#7117)
ed12616 is described below
commit ed12616c58970920fcb4917045070a0ad163b9e2
Author: Bill Bejeck <[email protected]>
AuthorDate: Thu Dec 12 16:24:01 2019 -0500
KAFKA-8705: Remove parent node after leaving loop to prevent NPE (#7117)
Fixes case where multiple children merged from a key-changing node causes
an NPE.
Reviewers: Matthias J. Sax <[email protected]>, Boyang Chen
<[email protected]>
---
.../kstream/internals/InternalStreamsBuilder.java | 9 +++-
.../kstream/internals/graph/StreamsGraphTest.java | 49 ++++++++++++++++++++++
2 files changed, 56 insertions(+), 2 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index ca43c56..9509431 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -42,6 +42,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -392,6 +393,7 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
private void maybeUpdateKeyChangingRepartitionNodeMap() {
final Map<StreamsGraphNode, Set<StreamsGraphNode>>
mergeNodesToKeyChangers = new HashMap<>();
+ final Set<StreamsGraphNode> mergeNodeKeyChangingParentsToRemove = new
HashSet<>();
for (final StreamsGraphNode mergeNode : mergeNodes) {
mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
final Collection<StreamsGraphNode> keys =
keyChangingOperationsToOptimizableRepartitionNodes.keySet();
@@ -409,11 +411,14 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
final LinkedHashSet<OptimizableRepartitionNode> repartitionNodes =
new LinkedHashSet<>();
for (final StreamsGraphNode keyChangingParent :
keyChangingParents) {
repartitionNodes.addAll(keyChangingOperationsToOptimizableRepartitionNodes.get(keyChangingParent));
-
keyChangingOperationsToOptimizableRepartitionNodes.remove(keyChangingParent);
+ mergeNodeKeyChangingParentsToRemove.add(keyChangingParent);
}
-
keyChangingOperationsToOptimizableRepartitionNodes.put(mergeKey,
repartitionNodes);
}
+
+ for (final StreamsGraphNode mergeNodeKeyChangingParent :
mergeNodeKeyChangingParentsToRemove) {
+
keyChangingOperationsToOptimizableRepartitionNodes.remove(mergeNodeKeyChangingParent);
+ }
}
@SuppressWarnings("unchecked")
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index 0fecaa2..e2006e6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
@@ -124,6 +125,28 @@ public class StreamsGraphTest {
}
+ @Test
+ public void shouldOptimizeSeveralMergeNodesWithCommonKeyChangingParent() {
+ final StreamsBuilder streamsBuilder = new StreamsBuilder();
+ final KStream<Integer, Integer> parentStream =
streamsBuilder.stream("input_topic", Consumed.with(Serdes.Integer(),
Serdes.Integer()))
+ .selectKey(Integer::sum);
+
+ final KStream<Integer, Integer> childStream1 =
parentStream.mapValues(v -> v + 1);
+ final KStream<Integer, Integer> childStream2 =
parentStream.mapValues(v -> v + 2);
+ final KStream<Integer, Integer> childStream3 =
parentStream.mapValues(v -> v + 3);
+
+ childStream1
+ .merge(childStream2)
+ .merge(childStream3)
+ .to("output_topic");
+
+ final Properties properties = new Properties();
+ properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION,
StreamsConfig.OPTIMIZE);
+ final Topology topology = streamsBuilder.build(properties);
+
+ assertEquals(expectedMergeOptimizedTopology,
topology.describe().toString());
+ }
+
private Topology getTopologyWithChangingValuesAfterChangingKey(final
String optimizeConfig) {
final StreamsBuilder builder = new StreamsBuilder();
@@ -242,4 +265,30 @@ public class StreamsGraphTest {
+ " Sink: KSTREAM-SINK-0000000009
(topic: output-topic)\n"
+ " <--
KSTREAM-MAPVALUES-0000000008\n\n";
+
+ private String expectedMergeOptimizedTopology = "Topologies:\n" +
+ " Sub-topology: 0\n" +
+ " Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic])\n" +
+ " --> KSTREAM-KEY-SELECT-0000000001\n" +
+ " Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])\n" +
+ " --> KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003,
KSTREAM-MAPVALUES-0000000004\n" +
+ " <-- KSTREAM-SOURCE-0000000000\n" +
+ " Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])\n" +
+ " --> KSTREAM-MERGE-0000000005\n" +
+ " <-- KSTREAM-KEY-SELECT-0000000001\n" +
+ " Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n" +
+ " --> KSTREAM-MERGE-0000000005\n" +
+ " <-- KSTREAM-KEY-SELECT-0000000001\n" +
+ " Processor: KSTREAM-MAPVALUES-0000000004 (stores: [])\n" +
+ " --> KSTREAM-MERGE-0000000006\n" +
+ " <-- KSTREAM-KEY-SELECT-0000000001\n" +
+ " Processor: KSTREAM-MERGE-0000000005 (stores: [])\n" +
+ " --> KSTREAM-MERGE-0000000006\n" +
+ " <-- KSTREAM-MAPVALUES-0000000002,
KSTREAM-MAPVALUES-0000000003\n" +
+ " Processor: KSTREAM-MERGE-0000000006 (stores: [])\n" +
+ " --> KSTREAM-SINK-0000000007\n" +
+ " <-- KSTREAM-MERGE-0000000005, KSTREAM-MAPVALUES-0000000004\n" +
+ " Sink: KSTREAM-SINK-0000000007 (topic: output_topic)\n" +
+ " <-- KSTREAM-MERGE-0000000006\n\n";
+
}