This is an automated email from the ASF dual-hosted git repository. ableegoldman pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new faef80a2e2a KAFKA-18026: KIP-1112, clean up StatefulProcessorNode (#18195) faef80a2e2a is described below commit faef80a2e2aaf04d2791348885efbfbb4d46c004 Author: A. Sophie Blee-Goldman <ableegold...@gmail.com> AuthorDate: Tue Jan 14 17:42:58 2025 -0800 KAFKA-18026: KIP-1112, clean up StatefulProcessorNode (#18195) Final cleanup of StatefulProcessorNode after converting all stateful operators to adding state stores via implementing the #stores method. --- .../internals/CogroupedStreamAggregateBuilder.java | 123 ++++++++------------- .../internals/GroupedStreamAggregateBuilder.java | 9 +- .../kstream/internals/KGroupedTableImpl.java | 7 +- .../streams/kstream/internals/KStreamImpl.java | 7 +- .../streams/kstream/internals/KTableImpl.java | 49 ++++---- .../graph/ForeignJoinSubscriptionSendNode.java | 2 +- .../internals/graph/GraphGraceSearchUtil.java | 4 +- .../internals/graph/ProcessorGraphNode.java | 7 -- .../graph/ProcessorToStateConnectorNode.java | 74 +++++++++++++ .../internals/graph/StatefulProcessorNode.java | 92 --------------- .../kstream/internals/graph/TableFilterNode.java | 4 +- .../internals/graph/TableProcessorNode.java | 76 ------------- .../kstream/internals/graph/TableSuppressNode.java | 10 +- .../internals/graph/GraphGraceSearchUtilTest.java | 112 +++++++++++++------ .../internals/graph/TableProcessorNodeTest.java | 55 --------- 15 files changed, 249 insertions(+), 382 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java index 126df7de17b..1ce1e7451a7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java @@ -30,8 +30,6 @@ import org.apache.kafka.streams.kstream.internals.graph.GraphNode; import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder; import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; -import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode; -import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.internals.StoreFactory; import java.util.ArrayList; @@ -61,24 +59,25 @@ class CogroupedStreamAggregateBuilder<K, VOut> { processRepartitions(groupPatterns, storeFactory.storeName()); final Collection<GraphNode> processors = new ArrayList<>(); final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>(); - boolean stateCreated = false; + int counter = 0; for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) { final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor = new KStreamAggregate<>(storeFactory, initializer, kGroupedStream.getValue()); parentProcessors.add(parentProcessor); - final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode( - named.suffixWithOrElseGet( - "-cogroup-agg-" + counter++, - builder, - CogroupedKStreamImpl.AGGREGATE_NAME), - stateCreated, - storeFactory, - parentProcessor); - statefulProcessorNode.setOutputVersioned(isOutputVersioned); - stateCreated = true; - processors.add(statefulProcessorNode); - builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); + + final String kStreamAggProcessorName = named.suffixWithOrElseGet( + "-cogroup-agg-" + counter++, + builder, + CogroupedKStreamImpl.AGGREGATE_NAME); + final ProcessorGraphNode<K, ?> aggProcessorNode = + new ProcessorGraphNode<>( + kStreamAggProcessorName, + new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName) + ); + aggProcessorNode.setOutputVersioned(isOutputVersioned); + processors.add(aggProcessorNode); + builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode); } return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName()); } @@ -96,7 +95,6 @@ class CogroupedStreamAggregateBuilder<K, VOut> { final Collection<GraphNode> processors = new ArrayList<>(); final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>(); - boolean stateCreated = false; int counter = 0; for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) { final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor = @@ -107,17 +105,18 @@ class CogroupedStreamAggregateBuilder<K, VOut> { initializer, kGroupedStream.getValue()); parentProcessors.add(parentProcessor); - final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode( - named.suffixWithOrElseGet( - "-cogroup-agg-" + counter++, - builder, - CogroupedKStreamImpl.AGGREGATE_NAME), - stateCreated, - storeFactory, - parentProcessor); - stateCreated = true; - processors.add(statefulProcessorNode); - builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); + + final String kStreamAggProcessorName = named.suffixWithOrElseGet( + "-cogroup-agg-" + counter++, + builder, + CogroupedKStreamImpl.AGGREGATE_NAME); + final ProcessorGraphNode<K, ?> aggProcessorNode = + new ProcessorGraphNode<>( + kStreamAggProcessorName, + new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName) + ); + processors.add(aggProcessorNode); + builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode); } return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName()); } @@ -135,7 +134,6 @@ class CogroupedStreamAggregateBuilder<K, VOut> { processRepartitions(groupPatterns, storeFactory.storeName()); final Collection<GraphNode> processors = new ArrayList<>(); final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>(); - boolean stateCreated = false; int counter = 0; for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) { final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor = @@ -147,17 +145,17 @@ class CogroupedStreamAggregateBuilder<K, VOut> { kGroupedStream.getValue(), sessionMerger); parentProcessors.add(parentProcessor); - final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode( - named.suffixWithOrElseGet( - "-cogroup-agg-" + counter++, - builder, - CogroupedKStreamImpl.AGGREGATE_NAME), - stateCreated, - storeFactory, - parentProcessor); - stateCreated = true; - processors.add(statefulProcessorNode); - builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); + final String kStreamAggProcessorName = named.suffixWithOrElseGet( + "-cogroup-agg-" + counter++, + builder, + CogroupedKStreamImpl.AGGREGATE_NAME); + final ProcessorGraphNode<K, ?> aggProcessorNode = + new ProcessorGraphNode<>( + kStreamAggProcessorName, + new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName) + ); + processors.add(aggProcessorNode); + builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode); } return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName()); } @@ -174,7 +172,6 @@ class CogroupedStreamAggregateBuilder<K, VOut> { processRepartitions(groupPatterns, storeFactory.storeName()); final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>(); final Collection<GraphNode> processors = new ArrayList<>(); - boolean stateCreated = false; int counter = 0; for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) { final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor = @@ -186,17 +183,17 @@ class CogroupedStreamAggregateBuilder<K, VOut> { initializer, kGroupedStream.getValue()); parentProcessors.add(parentProcessor); - final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode( - named.suffixWithOrElseGet( - "-cogroup-agg-" + counter++, - builder, - CogroupedKStreamImpl.AGGREGATE_NAME), - stateCreated, - storeFactory, - parentProcessor); - stateCreated = true; - processors.add(statefulProcessorNode); - builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); + final String kStreamAggProcessorName = named.suffixWithOrElseGet( + "-cogroup-agg-" + counter++, + builder, + CogroupedKStreamImpl.AGGREGATE_NAME); + final ProcessorGraphNode<K, ?> aggProcessorNode = + new ProcessorGraphNode<>( + kStreamAggProcessorName, + new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName) + ); + processors.add(aggProcessorNode); + builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode); } return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName()); } @@ -262,30 +259,6 @@ class CogroupedStreamAggregateBuilder<K, VOut> { builder); } - private StatefulProcessorNode<K, ?> getStatefulProcessorNode(final String processorName, - final boolean stateCreated, - final StoreFactory storeFactory, - final ProcessorSupplier<K, ?, K, ?> kStreamAggregate) { - final StatefulProcessorNode<K, ?> statefulProcessorNode; - if (!stateCreated) { - statefulProcessorNode = - new StatefulProcessorNode<>( - processorName, - new ProcessorParameters<>(kStreamAggregate, processorName), - storeFactory - ); - } else { - statefulProcessorNode = - new StatefulProcessorNode<>( - processorName, - new ProcessorParameters<>(kStreamAggregate, processorName), - new String[]{storeFactory.storeName()} - ); - } - - return statefulProcessorNode; - } - @SuppressWarnings("unchecked") private <VIn> void createRepartitionSource(final String repartitionTopicNamePrefix, final OptimizableRepartitionNodeBuilder<K, ?> optimizableRepartitionNodeBuilder, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java index c3360c9c013..bc84e69a1ff 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java @@ -21,8 +21,8 @@ import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.internals.graph.GraphNode; +import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; -import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode; import org.apache.kafka.streams.processor.internals.StoreFactory; import java.util.Collections; @@ -97,11 +97,10 @@ class GroupedStreamAggregateBuilder<K, V> { parentNode = repartitionNode; } - final StatefulProcessorNode<K, V> statefulProcessorNode = - new StatefulProcessorNode<>( + final ProcessorGraphNode<K, V> statefulProcessorNode = + new ProcessorGraphNode<>( aggFunctionName, - new ProcessorParameters<>(aggregateSupplier, aggFunctionName), - new String[] {storeFactory.storeName()} + new ProcessorParameters<>(aggregateSupplier, aggFunctionName) ); statefulProcessorNode.setOutputVersioned(isOutputVersioned); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index e500582244b..fbce445e7ee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -27,8 +27,8 @@ import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.internals.graph.GraphNode; import org.apache.kafka.streams.kstream.internals.graph.GroupedTableOperationRepartitionNode; +import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; -import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; @@ -88,10 +88,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr // the passed in StreamsGraphNode must be the parent of the repartition node builder.addGraphNode(this.graphNode, repartitionGraphNode); - final StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode<>( + final ProcessorGraphNode statefulProcessorNode = new ProcessorGraphNode<>( funcName, - new ProcessorParameters<>(aggregateSupplier, funcName), - new String[]{materialized.storeName()} + new ProcessorParameters<>(aggregateSupplier, funcName) ); statefulProcessorNode.setOutputVersioned(materialized.storeSupplier() instanceof VersionedBytesStoreSupplier); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 0ba9086e450..7deb9468c3f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -48,7 +48,7 @@ import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNo import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder; import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; -import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode; +import org.apache.kafka.streams.kstream.internals.graph.ProcessorToStateConnectorNode; import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode; import org.apache.kafka.streams.kstream.internals.graph.StreamTableJoinNode; import org.apache.kafka.streams.kstream.internals.graph.StreamToTableNode; @@ -1225,7 +1225,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K } final String name = new NamedInternal(named).name(); - final StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<>( + final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new + ProcessorToStateConnectorNode<>( name, new ProcessorParameters<>(processorSupplier, name), stateStoreNames); @@ -1270,7 +1271,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K } final String name = new NamedInternal(named).name(); - final StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<>( + final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new ProcessorToStateConnectorNode<>( name, new ProcessorParameters<>(processorSupplier, name), stateStoreNames); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 551b910be60..817365c787f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -54,11 +54,10 @@ import org.apache.kafka.streams.kstream.internals.graph.GraphNode; import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; -import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode; +import org.apache.kafka.streams.kstream.internals.graph.ProcessorToStateConnectorNode; import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode; import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode; import org.apache.kafka.streams.kstream.internals.graph.TableFilterNode; -import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode; import org.apache.kafka.streams.kstream.internals.graph.TableRepartitionMapNode; import org.apache.kafka.streams.kstream.internals.graph.TableSuppressNode; import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder; @@ -69,7 +68,9 @@ import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.internals.InternalTopicProperties; import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor; +import org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier; import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.ValueAndTimestamp; @@ -312,7 +313,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< final ProcessorParameters<K, VR, ?, ?> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType( new ProcessorParameters<>(processorSupplier, name) ); - final GraphNode tableNode = new TableProcessorNode<>( + final GraphNode tableNode = new ProcessorGraphNode<>( name, processorParameters ); @@ -439,7 +440,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< final Serde<K> keySerde; final Serde<VR> valueSerde; final String queryableStoreName; - final StoreFactory storeFactory; + final Set<StoreBuilder<?>> storeBuilder; if (materializedInternal != null) { // don't inherit parent value serde, since this operation may change the value type, more specifically: @@ -449,12 +450,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< valueSerde = materializedInternal.valueSerde(); queryableStoreName = materializedInternal.queryableStoreName(); // only materialize if materialized is specified and it has queryable name - storeFactory = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)) : null; + final StoreFactory storeFactory = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)) : null; + storeBuilder = Collections.singleton(new FactoryWrappingStoreBuilder<>(storeFactory)); } else { keySerde = this.keySerde; valueSerde = null; queryableStoreName = null; - storeFactory = null; + storeBuilder = null; } final String name = namedInternal.orElseGenerateWithPrefix(builder, TRANSFORMVALUES_NAME); @@ -464,14 +466,18 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< transformerSupplier, queryableStoreName); - final ProcessorParameters<K, VR, ?, ?> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType( - new ProcessorParameters<>(processorSupplier, name) - ); + final ProcessorParameters<K, VR, ?, ?> processorParameters = + unsafeCastProcessorParametersToCompletelyDifferentType( + new ProcessorParameters<>( + new StoreDelegatingProcessorSupplier<>( + processorSupplier, + storeBuilder), + name + )); - final GraphNode tableNode = new TableProcessorNode<>( + final GraphNode tableNode = new ProcessorToStateConnectorNode<>( name, processorParameters, - storeFactory, stateStoreNames ); maybeSetOutputVersioned(tableNode, materializedInternal); @@ -574,9 +580,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< final ProcessorGraphNode<K, Change<V>> node = new TableSuppressNode<>( name, - new ProcessorParameters<>(suppressionSupplier, name), - new String[]{storeName} + new ProcessorParameters<>(suppressionSupplier, name) ); + node.setOutputVersioned(false); builder.addGraphNode(graphNode, node); @@ -1235,26 +1241,24 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< final String subscriptionReceiveName = renamed.suffixWithOrElseGet( "-subscription-receive", builder, SUBSCRIPTION_PROCESSOR); - final StatefulProcessorNode<KO, SubscriptionWrapper<K>> subscriptionReceiveNode = - new StatefulProcessorNode<>( + final ProcessorGraphNode<KO, SubscriptionWrapper<K>> subscriptionReceiveNode = + new ProcessorGraphNode<>( subscriptionReceiveName, new ProcessorParameters<>( new SubscriptionReceiveProcessorSupplier<>(subscriptionStoreFactory, combinedKeySchema), - subscriptionReceiveName), - new String[]{subscriptionStoreName} + subscriptionReceiveName) ); builder.addGraphNode(subscriptionSource, subscriptionReceiveNode); final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetter = ((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier(); - final StatefulProcessorNode<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinNode = - new StatefulProcessorNode<>( + final ProcessorToStateConnectorNode<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinNode = + new ProcessorToStateConnectorNode<>( new ProcessorParameters<>( new SubscriptionJoinProcessorSupplier<>( foreignKeyValueGetter ), renamed.suffixWithOrElseGet("-subscription-join-foreign", builder, SUBSCRIPTION_PROCESSOR) ), - Collections.emptySet(), Collections.singleton(foreignKeyValueGetter) ); builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinNode); @@ -1306,7 +1310,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< builder.internalTopologyBuilder.copartitionSources(resultSourceNodes); final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier(); - final StatefulProcessorNode<K, SubscriptionResponseWrapper<VO>> responseJoinNode = new StatefulProcessorNode<>( + final ProcessorToStateConnectorNode<K, SubscriptionResponseWrapper<VO>> responseJoinNode = new ProcessorToStateConnectorNode<>( new ProcessorParameters<>( new ResponseJoinProcessorSupplier<>( primaryKeyValueGetter, @@ -1317,7 +1321,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< ), renamed.suffixWithOrElseGet("-subscription-response-resolver", builder, SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR) ), - Collections.emptySet(), Collections.singleton(primaryKeyValueGetter) ); builder.addGraphNode(foreignResponseSource, responseJoinNode); @@ -1339,7 +1342,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< final KTableSource<K, VR> resultProcessorSupplier = new KTableSource<>(materializedInternal); - final TableProcessorNode<K, VR> resultNode = new TableProcessorNode<>( + final ProcessorGraphNode<K, VR> resultNode = new ProcessorGraphNode<>( resultProcessorName, new ProcessorParameters<>( resultProcessorSupplier, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignJoinSubscriptionSendNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignJoinSubscriptionSendNode.java index 4efbd9b29f1..afd9ee1e64d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignJoinSubscriptionSendNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignJoinSubscriptionSendNode.java @@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.api.ProcessorSupplier; public class ForeignJoinSubscriptionSendNode<K, V> extends ProcessorGraphNode<K, V> implements VersionedSemanticsGraphNode { public ForeignJoinSubscriptionSendNode(final ProcessorParameters<K, V, ?, ?> processorParameters) { - super(processorParameters); + super(processorParameters.processorName(), processorParameters); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java index 66ffdc003ae..d58b3a3b1a9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java @@ -72,8 +72,8 @@ public final class GraphGraceSearchUtil { @SuppressWarnings("rawtypes") private static Long extractGracePeriod(final GraphNode node) { - if (node instanceof StatefulProcessorNode) { - final ProcessorSupplier processorSupplier = ((StatefulProcessorNode) node).processorParameters().processorSupplier(); + if (node instanceof ProcessorGraphNode) { + final ProcessorSupplier processorSupplier = ((ProcessorGraphNode) node).processorParameters().processorSupplier(); if (processorSupplier instanceof KStreamWindowAggregate) { final KStreamWindowAggregate kStreamWindowAggregate = (KStreamWindowAggregate) processorSupplier; final Windows windows = kStreamWindowAggregate.windows(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java index 1c8e8cace2b..514676af1f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java @@ -28,13 +28,6 @@ public class ProcessorGraphNode<K, V> extends GraphNode { private final ProcessorParameters<K, V, ?, ?> processorParameters; - public ProcessorGraphNode(final ProcessorParameters<K, V, ?, ?> processorParameters) { - - super(processorParameters.processorName()); - - this.processorParameters = processorParameters; - } - public ProcessorGraphNode(final String nodeName, final ProcessorParameters<K, V, ?, ?> processorParameters) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorToStateConnectorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorToStateConnectorNode.java new file mode 100644 index 00000000000..b476d6a7731 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorToStateConnectorNode.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.graph; + +import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier; +import org.apache.kafka.streams.processor.ConnectedStoreProvider; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; + +import java.util.Arrays; +import java.util.Set; + +/** + * Used for stateful processors that need to be manually connected to the state store(s) + * they need to access. This should only be used in cases where the stores) cannot + * be connected automatically by implementing the {@link ConnectedStoreProvider#stores()} method + * and returning the store directly. Generally this will only apply to DSL operators that utilize + * value getters to access another processor's state store(s), and the process/processValues + * operator where the user's custom processor supplier doesn't implement the #stores method + * and has to be connected to the store when compiling the topology. + */ +public class ProcessorToStateConnectorNode<K, V> extends ProcessorGraphNode<K, V> { + + private final String[] storeNames; + + /** + * Create a node representing a stateful processor that uses value getters to access stores, and needs to + * be connected with those stores + */ + public ProcessorToStateConnectorNode(final ProcessorParameters<K, V, ?, ?> processorParameters, + final Set<KTableValueGetterSupplier<?, ?>> valueGetterSuppliers) { + super(processorParameters.processorName(), processorParameters); + storeNames = valueGetterSuppliers.stream().flatMap(s -> Arrays.stream(s.storeNames())).toArray(String[]::new); + } + + /** + * Create a node representing a stateful processor, which needs to be connected to the provided stores + */ + public ProcessorToStateConnectorNode(final String nodeName, + final ProcessorParameters<K, V, ?, ?> processorParameters, + final String[] storeNames) { + super(nodeName, processorParameters); + this.storeNames = storeNames; + } + + @Override + public String toString() { + return "ProcessorNode{" + + "storeNames=" + Arrays.toString(storeNames) + + "} " + super.toString(); + } + + @Override + public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { + processorParameters().addProcessorTo(topologyBuilder, parentNodeNames()); + + if (storeNames != null && storeNames.length > 0) { + topologyBuilder.connectProcessorAndStateStores(processorParameters().processorName(), storeNames); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java deleted file mode 100644 index ec6c6583b3e..00000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals.graph; - -import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier; -import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; -import org.apache.kafka.streams.processor.internals.StoreFactory; -import org.apache.kafka.streams.state.StoreBuilder; - -import java.util.Arrays; -import java.util.Set; -import java.util.stream.Stream; - -public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> { - - private final String[] storeNames; - private final StoreFactory storeFactory; - - /** - * Create a node representing a stateful processor, where the named stores have already been registered. - */ - public StatefulProcessorNode(final ProcessorParameters<K, V, ?, ?> processorParameters, - final Set<StoreBuilder<?>> preRegisteredStores, - final Set<KTableValueGetterSupplier<?, ?>> valueGetterSuppliers) { - super(processorParameters.processorName(), processorParameters); - final Stream<String> registeredStoreNames = preRegisteredStores.stream().map(StoreBuilder::name); - final Stream<String> valueGetterStoreNames = valueGetterSuppliers.stream().flatMap(s -> Arrays.stream(s.storeNames())); - storeNames = Stream.concat(registeredStoreNames, valueGetterStoreNames).toArray(String[]::new); - storeFactory = null; - } - - /** - * Create a node representing a stateful processor, where the named stores have already been registered. - */ - public StatefulProcessorNode(final String nodeName, - final ProcessorParameters<K, V, ?, ?> processorParameters, - final String[] storeNames) { - super(nodeName, processorParameters); - - this.storeNames = storeNames; - this.storeFactory = null; - } - - - /** - * Create a node representing a stateful processor, - * where the store needs to be built and registered as part of building this node. - */ - public StatefulProcessorNode(final String nodeName, - final ProcessorParameters<K, V, ?, ?> processorParameters, - final StoreFactory materializedKTableStoreBuilder) { - super(nodeName, processorParameters); - - this.storeNames = null; - this.storeFactory = materializedKTableStoreBuilder; - } - - @Override - public String toString() { - return "StatefulProcessorNode{" + - "storeNames=" + Arrays.toString(storeNames) + - ", storeBuilder=" + storeFactory + - "} " + super.toString(); - } - - @Override - public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { - processorParameters().addProcessorTo(topologyBuilder, parentNodeNames()); - - if (storeNames != null && storeNames.length > 0) { - topologyBuilder.connectProcessorAndStateStores(processorParameters().processorName(), storeNames); - } - - if (storeFactory != null) { - topologyBuilder.addStateStore(storeFactory, processorParameters().processorName()); - } - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java index 6fef05604cf..a921dab0d1a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java @@ -20,11 +20,11 @@ package org.apache.kafka.streams.kstream.internals.graph; import org.apache.kafka.streams.kstream.internals.KTableFilter; import org.apache.kafka.streams.processor.api.ProcessorSupplier; -public class TableFilterNode<K, V> extends TableProcessorNode<K, V> implements VersionedSemanticsGraphNode { +public class TableFilterNode<K, V> extends ProcessorGraphNode<K, V> implements VersionedSemanticsGraphNode { public TableFilterNode(final String nodeName, final ProcessorParameters<K, V, ?, ?> processorParameters) { - super(nodeName, processorParameters, null, null); + super(nodeName, processorParameters); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java deleted file mode 100644 index af3ab15d490..00000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals.graph; - -import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; -import org.apache.kafka.streams.processor.internals.StoreFactory; - -import java.util.Arrays; - -public class TableProcessorNode<K, V> extends GraphNode { - - private final ProcessorParameters<K, V, ?, ?> processorParameters; - private final StoreFactory storeFactory; - private final String[] storeNames; - - public TableProcessorNode(final String nodeName, - final ProcessorParameters<K, V, ?, ?> processorParameters) { - this(nodeName, processorParameters, null, null); - } - - public TableProcessorNode(final String nodeName, - final ProcessorParameters<K, V, ?, ?> processorParameters, - final StoreFactory storeFactory, - final String[] storeNames) { - super(nodeName); - this.processorParameters = processorParameters; - this.storeFactory = storeFactory; - this.storeNames = storeNames != null ? storeNames : new String[] {}; - } - - public ProcessorParameters<K, V, ?, ?> processorParameters() { - return processorParameters; - } - - @Override - public String toString() { - return "TableProcessorNode{" + - ", processorParameters=" + processorParameters + - ", storeFactory=" + (storeFactory == null ? "null" : storeFactory.storeName()) + - ", storeNames=" + Arrays.toString(storeNames) + - "} " + super.toString(); - } - - @SuppressWarnings("unchecked") - @Override - public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { - processorParameters.addProcessorTo(topologyBuilder, parentNodeNames()); - - final String processorName = processorParameters.processorName(); - - if (storeNames.length > 0) { - // todo(rodesai): remove me once all operators have been moved to ProcessorSupplier - topologyBuilder.connectProcessorAndStateStores(processorName, storeNames); - } - - if (storeFactory != null) { - // todo(rodesai) remove when KTableImpl#doFilter, KTableImpl#doTransformValues moved to ProcessorSupplier - topologyBuilder.addStateStore(storeFactory, processorName); - } - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java index 595d0266aae..ac4a3f25c37 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java @@ -16,10 +16,12 @@ */ package org.apache.kafka.streams.kstream.internals.graph; -public class TableSuppressNode<K, V> extends StatefulProcessorNode<K, V> { +/** + * Marker interface to identify suppression nodes since they have some special requirements + */ +public class TableSuppressNode<K, V> extends ProcessorGraphNode<K, V> { public TableSuppressNode(final String nodeName, - final ProcessorParameters<K, V, ?, ?> processorParameters, - final String[] storeNames) { - super(nodeName, processorParameters, storeNames); + final ProcessorParameters<K, V, ?, ?> processorParameters) { + super(nodeName, processorParameters); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java index 8552790692c..5db1439835c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java @@ -25,7 +25,6 @@ import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.internals.StoreFactory; import org.junit.jupiter.api.Test; @@ -50,8 +49,20 @@ public class GraphGraceSearchUtilTest { public void shouldFailIfThereIsNoGraceAncestor() { // doesn't matter if this ancestor is stateless or stateful. The important thing it that there is // no grace period defined on any ancestor of the node - final StatefulProcessorNode<String, Long> gracelessAncestor = new StatefulProcessorNode<>( - "stateful", + final ProcessorGraphNode<String, Long> gracelessAncestor = new ProcessorGraphNode<>( + "graceless", + new ProcessorParameters<>( + () -> new Processor<String, Long, String, Long>() { + @Override + public void process(final Record<String, Long> record) {} + + }, + "graceless" + ) + ); + + final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>( + "stateless", new ProcessorParameters<>( () -> new Processor<String, Long, String, Long>() { @@ -59,26 +70,24 @@ public class GraphGraceSearchUtilTest { public void process(final Record<String, Long> record) {} }, - "dummy" - ), - (StoreFactory) null + "stateless" + ) ); - final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null); gracelessAncestor.addChild(node); try { GraphGraceSearchUtil.findAndVerifyWindowGrace(node); fail("should have thrown."); } catch (final TopologyException e) { - assertThat(e.getMessage(), is("Invalid topology: Window close time is only defined for windowed computations. Got [stateful->stateless].")); + assertThat(e.getMessage(), is("Invalid topology: Window close time is only defined for windowed computations. Got [graceless->stateless].")); } } @Test public void shouldExtractGraceFromKStreamWindowAggregateNode() { final TimeWindows windows = TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(1234L)); - final StatefulProcessorNode<String, Long> node = new StatefulProcessorNode<>( + final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>( "asdf", new ProcessorParameters<>( new KStreamWindowAggregate<String, Long, Integer, TimeWindow>( @@ -89,8 +98,7 @@ public class GraphGraceSearchUtilTest { null ), "asdf" - ), - (StoreFactory) null + ) ); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); @@ -101,7 +109,7 @@ public class GraphGraceSearchUtilTest { public void shouldExtractGraceFromKStreamSessionWindowAggregateNode() { final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)); - final StatefulProcessorNode<String, Long> node = new StatefulProcessorNode<>( + final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>( "asdf", new ProcessorParameters<>( new KStreamSessionWindowAggregate<String, Long, Integer>( @@ -113,8 +121,7 @@ public class GraphGraceSearchUtilTest { null ), "asdf" - ), - (StoreFactory) null + ) ); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); @@ -124,15 +131,14 @@ public class GraphGraceSearchUtilTest { @Test public void shouldExtractGraceFromSessionAncestorThroughStatefulParent() { final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)); - final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>( + final ProcessorGraphNode<String, Long> graceGrandparent = new ProcessorGraphNode<>( "asdf", new ProcessorParameters<>(new KStreamSessionWindowAggregate<String, Long, Integer>( windows, mockStoreFactory("asdf"), EmitStrategy.onWindowUpdate(), null, null, null - ), "asdf"), - (StoreFactory) null + ), "asdf") ); - final StatefulProcessorNode<String, Long> statefulParent = new StatefulProcessorNode<>( + final ProcessorGraphNode<String, Long> statefulParent = new ProcessorGraphNode<>( "stateful", new ProcessorParameters<>( () -> new Processor<String, Long, String, Long>() { @@ -142,12 +148,22 @@ public class GraphGraceSearchUtilTest { }, "dummy" - ), - (StoreFactory) null + ) ); graceGrandparent.addChild(statefulParent); - final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null); + final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>( + "stateless", + new ProcessorParameters<>( + () -> new Processor<String, Long, String, Long>() { + + @Override + public void process(final Record<String, Long> record) {} + + }, + "dummyChild-graceless" + ) + ); statefulParent.addChild(node); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); @@ -157,7 +173,7 @@ public class GraphGraceSearchUtilTest { @Test public void shouldExtractGraceFromSessionAncestorThroughStatelessParent() { final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)); - final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>( + final ProcessorGraphNode<String, Long> graceGrandparent = new ProcessorGraphNode<>( "asdf", new ProcessorParameters<>( new KStreamSessionWindowAggregate<String, Long, Integer>( @@ -169,14 +185,35 @@ public class GraphGraceSearchUtilTest { null ), "asdf" - ), - (StoreFactory) null + ) ); - final ProcessorGraphNode<String, Long> statelessParent = new ProcessorGraphNode<>("stateless", null); + final ProcessorGraphNode<String, Long> statelessParent = new ProcessorGraphNode<>( + "statelessParent", + new ProcessorParameters<>( + () -> new Processor<String, Long, String, Long>() { + + @Override + public void process(final Record<String, Long> record) {} + + }, + "statelessParent" + ) + ); graceGrandparent.addChild(statelessParent); - final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null); + final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>( + "stateless", + new ProcessorParameters<>( + () -> new Processor<String, Long, String, Long>() { + + @Override + public void process(final Record<String, Long> record) {} + + }, + "stateless" + ) + ); statelessParent.addChild(node); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); @@ -185,7 +222,7 @@ public class GraphGraceSearchUtilTest { @Test public void shouldUseMaxIfMultiParentsDoNotAgreeOnGrace() { - final StatefulProcessorNode<String, Long> leftParent = new StatefulProcessorNode<>( + final ProcessorGraphNode<String, Long> leftParent = new ProcessorGraphNode<>( "asdf", new ProcessorParameters<>( new KStreamSessionWindowAggregate<String, Long, Integer>( @@ -197,11 +234,10 @@ public class GraphGraceSearchUtilTest { null ), "asdf" - ), - (StoreFactory) null + ) ); - final StatefulProcessorNode<String, Long> rightParent = new StatefulProcessorNode<>( + final ProcessorGraphNode<String, Long> rightParent = new ProcessorGraphNode<>( "asdf", new ProcessorParameters<>( new KStreamWindowAggregate<String, Long, Integer, TimeWindow>( @@ -212,11 +248,21 @@ public class GraphGraceSearchUtilTest { null ), "asdf" - ), - (StoreFactory) null + ) ); - final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null); + final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>( + "stateless", + new ProcessorParameters<>( + () -> new Processor<String, Long, String, Long>() { + + @Override + public void process(final Record<String, Long> record) {} + + }, + "stateless" + ) + ); leftParent.addChild(node); rightParent.addChild(node); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java deleted file mode 100644 index 36a32d01dc2..00000000000 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals.graph; - -import org.apache.kafka.streams.processor.api.Processor; -import org.apache.kafka.streams.processor.api.Record; - -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class TableProcessorNodeTest { - private static class TestProcessor implements Processor<String, String, String, String> { - - @Override - public void process(final Record<String, String> record) { - } - - } - - @Test - public void shouldConvertToStringWithNullStoreBuilder() { - final TableProcessorNode<String, String> node = new TableProcessorNode<>( - "name", - new ProcessorParameters<>(TestProcessor::new, "processor"), - null, - new String[]{"store1", "store2"} - ); - - final String asString = node.toString(); - final String expected = "storeFactory=null"; - assertTrue( - asString.contains(expected), - String.format( - "Expected toString to return string with \"%s\", received: %s", - expected, - asString) - ); - } -} \ No newline at end of file