This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new bfc7a7f KAFKA-9739: 2.3 null child node fix (#8419)
bfc7a7f is described below
commit bfc7a7f5309b519ebd69fafb051a37c06249f563
Author: Bill Bejeck <[email protected]>
AuthorDate: Sat Apr 4 11:45:55 2020 -0400
KAFKA-9739: 2.3 null child node fix (#8419)
A port of #8400 for 2.3. The process of sorting source and sink nodes
changed in 2.4, so we can't cherry-pick the PR directly as we need to update
the expected topology to what it would be in the 2.3 version.
Reviewers: John Roesler <[email protected]>, Andrew Choi
<[email protected]>
---
.../kstream/internals/InternalStreamsBuilder.java | 17 +-
.../kstream/internals/graph/StreamsGraphTest.java | 186 +++++++++++++++++++++
2 files changed, 198 insertions(+), 5 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index dc50770..b7c30f2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -370,11 +370,13 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
final Set<StreamsGraphNode> mergeNodeKeyChangingParentsToRemove = new
HashSet<>();
for (final StreamsGraphNode mergeNode : mergeNodes) {
mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
- final Collection<StreamsGraphNode> keys =
keyChangingOperationsToOptimizableRepartitionNodes.keySet();
- for (final StreamsGraphNode key : keys) {
- final StreamsGraphNode maybeParentKey =
findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(key));
- if (maybeParentKey != null) {
- mergeNodesToKeyChangers.get(mergeNode).add(key);
+ final Set<Map.Entry<StreamsGraphNode,
LinkedHashSet<OptimizableRepartitionNode>>> entrySet =
keyChangingOperationsToOptimizableRepartitionNodes.entrySet();
+ for (final Map.Entry<StreamsGraphNode,
LinkedHashSet<OptimizableRepartitionNode>> entry : entrySet) {
+ if (mergeNodeHasRepartitionChildren(mergeNode,
entry.getValue())) {
+ final StreamsGraphNode maybeParentKey =
findParentNodeMatching(mergeNode, node ->
node.parentNodes().contains(entry.getKey()));
+ if (maybeParentKey != null) {
+
mergeNodesToKeyChangers.get(mergeNode).add(entry.getKey());
+ }
}
}
}
@@ -395,6 +397,11 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
}
}
+ private boolean mergeNodeHasRepartitionChildren(final StreamsGraphNode
mergeNode,
+ final
LinkedHashSet<OptimizableRepartitionNode> repartitionNodes) {
+ return repartitionNodes.stream().allMatch(n ->
findParentNodeMatching(n, gn -> gn.parentNodes().contains(mergeNode)) != null);
+ }
+
@SuppressWarnings("unchecked")
private OptimizableRepartitionNode createRepartitionNode(final String
repartitionTopicName,
final Serde
keySerde,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index e2006e6..84578ea 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -22,14 +22,22 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.ProcessorContext;
import org.junit.Test;
import java.time.Duration;
@@ -47,6 +55,8 @@ import static org.junit.Assert.assertEquals;
public class StreamsGraphTest {
private final Pattern repartitionTopicPattern = Pattern.compile("Sink:
.*-repartition");
+ private Initializer<String> initializer;
+ private Aggregator<String, String, String> aggregator;
// Test builds topology in succesive manner but only graph node not yet
processed written to topology
@@ -102,6 +112,76 @@ public class StreamsGraphTest {
}
@Test
+ @SuppressWarnings("unchecked")
+ public void shouldNotThrowNPEWithMergeNodes() {
+ final Properties properties = new Properties();
+ properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,
"test-application");
+ properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
+ properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION,
StreamsConfig.OPTIMIZE);
+
+ final StreamsBuilder builder = new StreamsBuilder();
+ initializer = () -> "";
+ aggregator = (aggKey, value, aggregate) -> aggregate + value.length();
+ final TransformerSupplier<String, String, KeyValue<String, String>>
transformSupplier = () -> new Transformer<String, String, KeyValue<String,
String>>() {
+ @Override
+ public void init(final ProcessorContext context) {
+
+ }
+
+ @Override
+ public KeyValue<String, String> transform(final String key, final
String value) {
+ return KeyValue.pair(key, value);
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
+
+ final KStream<String, String> retryStream =
builder.stream("retryTopic", Consumed.with(Serdes.String(), Serdes.String()))
+ .transform(transformSupplier)
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+ .aggregate(initializer,
+ aggregator,
+ Materialized.with(Serdes.String(), Serdes.String()))
+ .suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(500),
Suppressed.BufferConfig.maxBytes(64_000_000)))
+ .toStream()
+ .flatMap((k, v) -> new ArrayList<>());
+
+ final KTable<String, String> idTable =
builder.stream("id-table-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .flatMap((k, v) -> new ArrayList<KeyValue<String, String>>())
+ .peek((subscriptionId, recipientId) ->
System.out.println("data " + subscriptionId + " " + recipientId))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+ .aggregate(initializer,
+ aggregator,
+ Materialized.with(Serdes.String(), Serdes.String()));
+
+ final KStream<String, String> joinStream =
builder.stream("internal-topic-command", Consumed.with(Serdes.String(),
Serdes.String()))
+ .peek((subscriptionId, command) ->
System.out.println("stdoutput"))
+ .mapValues((k, v) -> v)
+ .merge(retryStream)
+ .leftJoin(idTable, (v1, v2) -> v1 + v2,
+ Joined.with(Serdes.String(), Serdes.String(),
Serdes.String()));
+
+ final KStream<String, String>[] branches = joinStream.branch((k, v) ->
v.equals("some-value"), (k, v) -> true);
+
+ branches[0].map(KeyValue::pair)
+ .peek((recipientId, command) -> System.out.println("printing
out"))
+ .to("external-command", Produced.with(Serdes.String(),
Serdes.String()));
+
+ branches[1].filter((k, v) -> v != null)
+ .peek((subscriptionId, wrapper) ->
System.out.println("Printing output"))
+ .mapValues((k, v) -> v)
+ .to("dlq-topic", Produced.with(Serdes.String(),
Serdes.String()));
+
+ branches[1].map(KeyValue::pair).to("retryTopic",
Produced.with(Serdes.String(), Serdes.String()));
+
+ final Topology topology = builder.build(properties);
+ assertEquals(expectedComplexMergeOptimizeTopology,
topology.describe().toString());
+ }
+
+ @Test
public void
shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() {
final Topology attemptedOptimize =
getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE);
@@ -291,4 +371,110 @@ public class StreamsGraphTest {
" Sink: KSTREAM-SINK-0000000007 (topic: output_topic)\n" +
" <-- KSTREAM-MERGE-0000000006\n\n";
+
+ private final String expectedComplexMergeOptimizeTopology = "Topologies:\n"
+ + " Sub-topology: 0\n"
+ + " Source: KSTREAM-SOURCE-0000000000 (topics: [retryTopic])\n"
+ + " --> KSTREAM-TRANSFORM-0000000001\n"
+ + " Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])\n"
+ + " --> KSTREAM-FILTER-0000000040\n"
+ + " <-- KSTREAM-SOURCE-0000000000\n"
+ + " Processor: KSTREAM-FILTER-0000000040 (stores: [])\n"
+ + " --> KSTREAM-SINK-0000000039\n"
+ + " <-- KSTREAM-TRANSFORM-0000000001\n"
+ + " Sink: KSTREAM-SINK-0000000039 (topic:
KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n"
+ + " <-- KSTREAM-FILTER-0000000040\n"
+ + "\n"
+ + " Sub-topology: 1\n"
+ + " Source: KSTREAM-SOURCE-0000000011 (topics: [id-table-topic])\n"
+ + " --> KSTREAM-FLATMAP-0000000012\n"
+ + " Processor: KSTREAM-FLATMAP-0000000012 (stores: [])\n"
+ + " --> KSTREAM-FILTER-0000000043\n"
+ + " <-- KSTREAM-SOURCE-0000000011\n"
+ + " Processor: KSTREAM-FILTER-0000000043 (stores: [])\n"
+ + " --> KSTREAM-SINK-0000000042\n"
+ + " <-- KSTREAM-FLATMAP-0000000012\n"
+ + " Sink: KSTREAM-SINK-0000000042 (topic:
KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition)\n"
+ + " <-- KSTREAM-FILTER-0000000043\n"
+ + "\n"
+ + " Sub-topology: 2\n"
+ + " Source: KSTREAM-SOURCE-0000000041 (topics:
[KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n"
+ + " --> KSTREAM-AGGREGATE-0000000003\n"
+ + " Processor: KSTREAM-AGGREGATE-0000000003 (stores:
[KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n"
+ + " --> KTABLE-SUPPRESS-0000000007\n"
+ + " <-- KSTREAM-SOURCE-0000000041\n"
+ + " Source: KSTREAM-SOURCE-0000000019 (topics:
[internal-topic-command])\n"
+ + " --> KSTREAM-PEEK-0000000020\n"
+ + " Processor: KTABLE-SUPPRESS-0000000007 (stores:
[KTABLE-SUPPRESS-STATE-STORE-0000000008])\n"
+ + " --> KTABLE-TOSTREAM-0000000009\n"
+ + " <-- KSTREAM-AGGREGATE-0000000003\n"
+ + " Processor: KSTREAM-PEEK-0000000020 (stores: [])\n"
+ + " --> KSTREAM-MAPVALUES-0000000021\n"
+ + " <-- KSTREAM-SOURCE-0000000019\n"
+ + " Processor: KTABLE-TOSTREAM-0000000009 (stores: [])\n"
+ + " --> KSTREAM-FLATMAP-0000000010\n"
+ + " <-- KTABLE-SUPPRESS-0000000007\n"
+ + " Processor: KSTREAM-FLATMAP-0000000010 (stores: [])\n"
+ + " --> KSTREAM-MERGE-0000000022\n"
+ + " <-- KTABLE-TOSTREAM-0000000009\n"
+ + " Processor: KSTREAM-MAPVALUES-0000000021 (stores: [])\n"
+ + " --> KSTREAM-MERGE-0000000022\n"
+ + " <-- KSTREAM-PEEK-0000000020\n"
+ + " Processor: KSTREAM-MERGE-0000000022 (stores: [])\n"
+ + " --> KSTREAM-FILTER-0000000024\n"
+ + " <-- KSTREAM-MAPVALUES-0000000021,
KSTREAM-FLATMAP-0000000010\n"
+ + " Processor: KSTREAM-FILTER-0000000024 (stores: [])\n"
+ + " --> KSTREAM-SINK-0000000023\n"
+ + " <-- KSTREAM-MERGE-0000000022\n"
+ + " Sink: KSTREAM-SINK-0000000023 (topic:
KSTREAM-MERGE-0000000022-repartition)\n"
+ + " <-- KSTREAM-FILTER-0000000024\n"
+ + "\n"
+ + " Sub-topology: 3\n"
+ + " Source: KSTREAM-SOURCE-0000000025 (topics:
[KSTREAM-MERGE-0000000022-repartition])\n"
+ + " --> KSTREAM-LEFTJOIN-0000000026\n"
+ + " Processor: KSTREAM-LEFTJOIN-0000000026 (stores:
[KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n"
+ + " --> KSTREAM-BRANCH-0000000027\n"
+ + " <-- KSTREAM-SOURCE-0000000025\n"
+ + " Processor: KSTREAM-BRANCH-0000000027 (stores: [])\n"
+ + " --> KSTREAM-BRANCHCHILD-0000000029,
KSTREAM-BRANCHCHILD-0000000028\n"
+ + " <-- KSTREAM-LEFTJOIN-0000000026\n"
+ + " Processor: KSTREAM-BRANCHCHILD-0000000029 (stores: [])\n"
+ + " --> KSTREAM-FILTER-0000000033, KSTREAM-MAP-0000000037\n"
+ + " <-- KSTREAM-BRANCH-0000000027\n"
+ + " Processor: KSTREAM-BRANCHCHILD-0000000028 (stores: [])\n"
+ + " --> KSTREAM-MAP-0000000030\n"
+ + " <-- KSTREAM-BRANCH-0000000027\n"
+ + " Processor: KSTREAM-FILTER-0000000033 (stores: [])\n"
+ + " --> KSTREAM-PEEK-0000000034\n"
+ + " <-- KSTREAM-BRANCHCHILD-0000000029\n"
+ + " Processor: KSTREAM-MAP-0000000030 (stores: [])\n"
+ + " --> KSTREAM-PEEK-0000000031\n"
+ + " <-- KSTREAM-BRANCHCHILD-0000000028\n"
+ + " Processor: KSTREAM-PEEK-0000000034 (stores: [])\n"
+ + " --> KSTREAM-MAPVALUES-0000000035\n"
+ + " <-- KSTREAM-FILTER-0000000033\n"
+ + " Source: KSTREAM-SOURCE-0000000044 (topics:
[KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition])\n"
+ + " --> KSTREAM-PEEK-0000000013\n"
+ + " Processor: KSTREAM-MAP-0000000037 (stores: [])\n"
+ + " --> KSTREAM-SINK-0000000038\n"
+ + " <-- KSTREAM-BRANCHCHILD-0000000029\n"
+ + " Processor: KSTREAM-MAPVALUES-0000000035 (stores: [])\n"
+ + " --> KSTREAM-SINK-0000000036\n"
+ + " <-- KSTREAM-PEEK-0000000034\n"
+ + " Processor: KSTREAM-PEEK-0000000013 (stores: [])\n"
+ + " --> KSTREAM-AGGREGATE-0000000015\n"
+ + " <-- KSTREAM-SOURCE-0000000044\n"
+ + " Processor: KSTREAM-PEEK-0000000031 (stores: [])\n"
+ + " --> KSTREAM-SINK-0000000032\n"
+ + " <-- KSTREAM-MAP-0000000030\n"
+ + " Processor: KSTREAM-AGGREGATE-0000000015 (stores:
[KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n"
+ + " --> none\n"
+ + " <-- KSTREAM-PEEK-0000000013\n"
+ + " Sink: KSTREAM-SINK-0000000032 (topic: external-command)\n"
+ + " <-- KSTREAM-PEEK-0000000031\n"
+ + " Sink: KSTREAM-SINK-0000000036 (topic: dlq-topic)\n"
+ + " <-- KSTREAM-MAPVALUES-0000000035\n"
+ + " Sink: KSTREAM-SINK-0000000038 (topic: retryTopic)\n"
+ + " <-- KSTREAM-MAP-0000000037\n\n";
+
}