This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 0293cbdbfa3cad3cf1ae662eda3e1f899d3b7dbe
Author: A. Sophie Blee-Goldman <ableegold...@gmail.com>
AuthorDate: Tue Jan 14 17:42:58 2025 -0800

    KAFKA-18026: KIP-1112, clean up StatefulProcessorNode (#18195)
    
    Final cleanup of StatefulProcessorNode after converting all stateful 
operators to adding state stores via implementing the #stores method.
---
 .../internals/CogroupedStreamAggregateBuilder.java | 123 ++++++++-------------
 .../internals/GroupedStreamAggregateBuilder.java   |   9 +-
 .../kstream/internals/KGroupedTableImpl.java       |   7 +-
 .../streams/kstream/internals/KStreamImpl.java     |   7 +-
 .../streams/kstream/internals/KTableImpl.java      |  49 ++++----
 .../graph/ForeignJoinSubscriptionSendNode.java     |   2 +-
 .../internals/graph/GraphGraceSearchUtil.java      |   4 +-
 .../internals/graph/ProcessorGraphNode.java        |   7 --
 .../graph/ProcessorToStateConnectorNode.java       |  74 +++++++++++++
 .../internals/graph/StatefulProcessorNode.java     |  92 ---------------
 .../kstream/internals/graph/TableFilterNode.java   |   4 +-
 .../internals/graph/TableProcessorNode.java        |  76 -------------
 .../kstream/internals/graph/TableSuppressNode.java |  10 +-
 .../internals/graph/GraphGraceSearchUtilTest.java  | 112 +++++++++++++------
 .../internals/graph/TableProcessorNodeTest.java    |  55 ---------
 15 files changed, 249 insertions(+), 382 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
index 126df7de17b..1ce1e7451a7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
@@ -30,8 +30,6 @@ import 
org.apache.kafka.streams.kstream.internals.graph.GraphNode;
 import 
org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
-import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
-import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.StoreFactory;
 
 import java.util.ArrayList;
@@ -61,24 +59,25 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
         processRepartitions(groupPatterns, storeFactory.storeName());
         final Collection<GraphNode> processors = new ArrayList<>();
         final Collection<KStreamAggProcessorSupplier> parentProcessors = new 
ArrayList<>();
-        boolean stateCreated = false;
+        
         int counter = 0;
         for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, 
Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
             final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
                 new KStreamAggregate<>(storeFactory, initializer, 
kGroupedStream.getValue());
             parentProcessors.add(parentProcessor);
-            final StatefulProcessorNode<K, ?> statefulProcessorNode = 
getStatefulProcessorNode(
-                named.suffixWithOrElseGet(
-                    "-cogroup-agg-" + counter++,
-                    builder,
-                    CogroupedKStreamImpl.AGGREGATE_NAME),
-                stateCreated,
-                storeFactory,
-                parentProcessor);
-            statefulProcessorNode.setOutputVersioned(isOutputVersioned);
-            stateCreated = true;
-            processors.add(statefulProcessorNode);
-            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), 
statefulProcessorNode);
+            
+            final String kStreamAggProcessorName = named.suffixWithOrElseGet(
+                "-cogroup-agg-" + counter++,
+                builder,
+                CogroupedKStreamImpl.AGGREGATE_NAME);
+            final ProcessorGraphNode<K, ?> aggProcessorNode =
+                new ProcessorGraphNode<>(
+                    kStreamAggProcessorName,
+                    new ProcessorParameters<>(parentProcessor, 
kStreamAggProcessorName)
+                );
+            aggProcessorNode.setOutputVersioned(isOutputVersioned);
+            processors.add(aggProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), 
aggProcessorNode);
         }
         return createTable(processors, parentProcessors, named, keySerde, 
valueSerde, queryableName, storeFactory.storeName());
     }
@@ -96,7 +95,6 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
 
         final Collection<GraphNode> processors = new ArrayList<>();
         final Collection<KStreamAggProcessorSupplier> parentProcessors = new 
ArrayList<>();
-        boolean stateCreated = false;
         int counter = 0;
         for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, 
Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
             final KStreamAggProcessorSupplier<K, ?, K, ?>  parentProcessor =
@@ -107,17 +105,18 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                     initializer,
                     kGroupedStream.getValue());
             parentProcessors.add(parentProcessor);
-            final StatefulProcessorNode<K, ?> statefulProcessorNode = 
getStatefulProcessorNode(
-                named.suffixWithOrElseGet(
-                    "-cogroup-agg-" + counter++,
-                    builder,
-                    CogroupedKStreamImpl.AGGREGATE_NAME),
-                stateCreated,
-                storeFactory,
-                parentProcessor);
-            stateCreated = true;
-            processors.add(statefulProcessorNode);
-            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), 
statefulProcessorNode);
+            
+            final String kStreamAggProcessorName = named.suffixWithOrElseGet(
+                "-cogroup-agg-" + counter++,
+                builder,
+                CogroupedKStreamImpl.AGGREGATE_NAME);
+            final ProcessorGraphNode<K, ?> aggProcessorNode =
+                new ProcessorGraphNode<>(
+                    kStreamAggProcessorName,
+                    new ProcessorParameters<>(parentProcessor, 
kStreamAggProcessorName)
+                );
+            processors.add(aggProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), 
aggProcessorNode);
         }
         return createTable(processors, parentProcessors, named, keySerde, 
valueSerde, queryableName, storeFactory.storeName());
     }
@@ -135,7 +134,6 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
         processRepartitions(groupPatterns, storeFactory.storeName());
         final Collection<GraphNode> processors = new ArrayList<>();
         final Collection<KStreamAggProcessorSupplier> parentProcessors = new 
ArrayList<>();
-        boolean stateCreated = false;
         int counter = 0;
         for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, 
Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
             final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
@@ -147,17 +145,17 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                     kGroupedStream.getValue(),
                     sessionMerger);
             parentProcessors.add(parentProcessor);
-            final StatefulProcessorNode<K, ?> statefulProcessorNode = 
getStatefulProcessorNode(
-                named.suffixWithOrElseGet(
-                    "-cogroup-agg-" + counter++,
-                    builder,
-                    CogroupedKStreamImpl.AGGREGATE_NAME),
-                stateCreated,
-                storeFactory,
-                parentProcessor);
-            stateCreated = true;
-            processors.add(statefulProcessorNode);
-            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), 
statefulProcessorNode);
+            final String kStreamAggProcessorName = named.suffixWithOrElseGet(
+                "-cogroup-agg-" + counter++,
+                builder,
+                CogroupedKStreamImpl.AGGREGATE_NAME);
+            final ProcessorGraphNode<K, ?> aggProcessorNode =
+                new ProcessorGraphNode<>(
+                    kStreamAggProcessorName,
+                    new ProcessorParameters<>(parentProcessor, 
kStreamAggProcessorName)
+                );
+            processors.add(aggProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), 
aggProcessorNode);
         }
         return createTable(processors, parentProcessors, named, keySerde, 
valueSerde, queryableName, storeFactory.storeName());
     }
@@ -174,7 +172,6 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
         processRepartitions(groupPatterns, storeFactory.storeName());
         final Collection<KStreamAggProcessorSupplier> parentProcessors = new 
ArrayList<>();
         final Collection<GraphNode> processors = new ArrayList<>();
-        boolean stateCreated = false;
         int counter = 0;
         for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, 
Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
             final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
@@ -186,17 +183,17 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                     initializer,
                     kGroupedStream.getValue());
             parentProcessors.add(parentProcessor);
-            final StatefulProcessorNode<K, ?> statefulProcessorNode = 
getStatefulProcessorNode(
-                named.suffixWithOrElseGet(
-                    "-cogroup-agg-" + counter++,
-                    builder,
-                    CogroupedKStreamImpl.AGGREGATE_NAME),
-                stateCreated,
-                storeFactory,
-                parentProcessor);
-            stateCreated = true;
-            processors.add(statefulProcessorNode);
-            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), 
statefulProcessorNode);
+            final String kStreamAggProcessorName = named.suffixWithOrElseGet(
+                "-cogroup-agg-" + counter++,
+                builder,
+                CogroupedKStreamImpl.AGGREGATE_NAME);
+            final ProcessorGraphNode<K, ?> aggProcessorNode =
+                new ProcessorGraphNode<>(
+                    kStreamAggProcessorName,
+                    new ProcessorParameters<>(parentProcessor, 
kStreamAggProcessorName)
+                );
+            processors.add(aggProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), 
aggProcessorNode);
         }
         return createTable(processors, parentProcessors, named, keySerde, 
valueSerde, queryableName, storeFactory.storeName());
     }
@@ -262,30 +259,6 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
             builder);
     }
 
-    private StatefulProcessorNode<K, ?> getStatefulProcessorNode(final String 
processorName,
-                                                                 final boolean 
stateCreated,
-                                                                 final 
StoreFactory storeFactory,
-                                                                 final 
ProcessorSupplier<K, ?, K, ?> kStreamAggregate) {
-        final StatefulProcessorNode<K, ?> statefulProcessorNode;
-        if (!stateCreated) {
-            statefulProcessorNode =
-                new StatefulProcessorNode<>(
-                    processorName,
-                    new ProcessorParameters<>(kStreamAggregate, processorName),
-                    storeFactory
-                );
-        } else {
-            statefulProcessorNode =
-                new StatefulProcessorNode<>(
-                    processorName,
-                    new ProcessorParameters<>(kStreamAggregate, processorName),
-                    new String[]{storeFactory.storeName()}
-                );
-        }
-
-        return statefulProcessorNode;
-    }
-
     @SuppressWarnings("unchecked")
     private <VIn> void createRepartitionSource(final String 
repartitionTopicNamePrefix,
                                                final 
OptimizableRepartitionNodeBuilder<K, ?> optimizableRepartitionNodeBuilder,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index c3360c9c013..bc84e69a1ff 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -21,8 +21,8 @@ import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
-import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
 import org.apache.kafka.streams.processor.internals.StoreFactory;
 
 import java.util.Collections;
@@ -97,11 +97,10 @@ class GroupedStreamAggregateBuilder<K, V> {
             parentNode = repartitionNode;
         }
 
-        final StatefulProcessorNode<K, V> statefulProcessorNode =
-            new StatefulProcessorNode<>(
+        final ProcessorGraphNode<K, V> statefulProcessorNode =
+            new ProcessorGraphNode<>(
                 aggFunctionName,
-                new ProcessorParameters<>(aggregateSupplier, aggFunctionName),
-                new String[] {storeFactory.storeName()}
+                new ProcessorParameters<>(aggregateSupplier, aggFunctionName)
             );
         statefulProcessorNode.setOutputVersioned(isOutputVersioned);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index e500582244b..fbce445e7ee 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -27,8 +27,8 @@ import org.apache.kafka.streams.kstream.Named;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
 import 
org.apache.kafka.streams.kstream.internals.graph.GroupedTableOperationRepartitionNode;
+import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
-import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
@@ -88,10 +88,9 @@ public class KGroupedTableImpl<K, V> extends 
AbstractStream<K, V> implements KGr
         // the passed in StreamsGraphNode must be the parent of the 
repartition node
         builder.addGraphNode(this.graphNode, repartitionGraphNode);
 
-        final StatefulProcessorNode statefulProcessorNode = new 
StatefulProcessorNode<>(
+        final ProcessorGraphNode statefulProcessorNode = new 
ProcessorGraphNode<>(
             funcName,
-            new ProcessorParameters<>(aggregateSupplier, funcName),
-            new String[]{materialized.storeName()}
+            new ProcessorParameters<>(aggregateSupplier, funcName)
         );
         statefulProcessorNode.setOutputVersioned(materialized.storeSupplier() 
instanceof VersionedBytesStoreSupplier);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 0ba9086e450..7deb9468c3f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -48,7 +48,7 @@ import 
org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNo
 import 
org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
-import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
+import 
org.apache.kafka.streams.kstream.internals.graph.ProcessorToStateConnectorNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamTableJoinNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamToTableNode;
@@ -1225,7 +1225,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
         }
 
         final String name = new NamedInternal(named).name();
-        final StatefulProcessorNode<? super K, ? super V> processNode = new 
StatefulProcessorNode<>(
+        final ProcessorToStateConnectorNode<? super K, ? super V> processNode 
= new
+            ProcessorToStateConnectorNode<>(
             name,
             new ProcessorParameters<>(processorSupplier, name),
             stateStoreNames);
@@ -1270,7 +1271,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
         }
 
         final String name = new NamedInternal(named).name();
-        final StatefulProcessorNode<? super K, ? super V> processNode = new 
StatefulProcessorNode<>(
+        final ProcessorToStateConnectorNode<? super K, ? super V> processNode 
= new ProcessorToStateConnectorNode<>(
             name,
             new ProcessorParameters<>(processorSupplier, name),
             stateStoreNames);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 5367b8dede2..06a6043ffd3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -54,11 +54,10 @@ import 
org.apache.kafka.streams.kstream.internals.graph.GraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
-import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
+import 
org.apache.kafka.streams.kstream.internals.graph.ProcessorToStateConnectorNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode;
 import org.apache.kafka.streams.kstream.internals.graph.TableFilterNode;
-import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode;
 import 
org.apache.kafka.streams.kstream.internals.graph.TableRepartitionMapNode;
 import org.apache.kafka.streams.kstream.internals.graph.TableSuppressNode;
 import 
org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
@@ -69,7 +68,9 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
 import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
+import 
org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.StoreFactory;
+import 
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -312,7 +313,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
         final ProcessorParameters<K, VR, ?, ?> processorParameters = 
unsafeCastProcessorParametersToCompletelyDifferentType(
             new ProcessorParameters<>(processorSupplier, name)
         );
-        final GraphNode tableNode = new TableProcessorNode<>(
+        final GraphNode tableNode = new ProcessorGraphNode<>(
             name,
             processorParameters
         );
@@ -439,7 +440,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
         final Serde<K> keySerde;
         final Serde<VR> valueSerde;
         final String queryableStoreName;
-        final StoreFactory storeFactory;
+        final Set<StoreBuilder<?>> storeBuilder;
 
         if (materializedInternal != null) {
             // don't inherit parent value serde, since this operation may 
change the value type, more specifically:
@@ -449,12 +450,13 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
             valueSerde = materializedInternal.valueSerde();
             queryableStoreName = materializedInternal.queryableStoreName();
             // only materialize if materialized is specified and it has 
queryable name
-            storeFactory = queryableStoreName != null ? (new 
KeyValueStoreMaterializer<>(materializedInternal)) : null;
+            final StoreFactory storeFactory = queryableStoreName != null ? 
(new KeyValueStoreMaterializer<>(materializedInternal)) : null;
+            storeBuilder = Collections.singleton(new 
FactoryWrappingStoreBuilder<>(storeFactory));
         } else {
             keySerde = this.keySerde;
             valueSerde = null;
             queryableStoreName = null;
-            storeFactory = null;
+            storeBuilder = null;
         }
 
         final String name = namedInternal.orElseGenerateWithPrefix(builder, 
TRANSFORMVALUES_NAME);
@@ -464,14 +466,18 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
             transformerSupplier,
             queryableStoreName);
 
-        final ProcessorParameters<K, VR, ?, ?> processorParameters = 
unsafeCastProcessorParametersToCompletelyDifferentType(
-            new ProcessorParameters<>(processorSupplier, name)
-        );
+        final ProcessorParameters<K, VR, ?, ?> processorParameters =
+            unsafeCastProcessorParametersToCompletelyDifferentType(
+                new ProcessorParameters<>(
+                    new StoreDelegatingProcessorSupplier<>(
+                        processorSupplier,
+                        storeBuilder),
+                    name
+                ));
 
-        final GraphNode tableNode = new TableProcessorNode<>(
+        final GraphNode tableNode = new ProcessorToStateConnectorNode<>(
             name,
             processorParameters,
-            storeFactory,
             stateStoreNames
         );
         maybeSetOutputVersioned(tableNode, materializedInternal);
@@ -574,9 +580,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
 
         final ProcessorGraphNode<K, Change<V>> node = new TableSuppressNode<>(
             name,
-            new ProcessorParameters<>(suppressionSupplier, name),
-            new String[]{storeName}
+            new ProcessorParameters<>(suppressionSupplier, name)
         );
+
         node.setOutputVersioned(false);
 
         builder.addGraphNode(graphNode, node);
@@ -1235,26 +1241,24 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
 
         final String subscriptionReceiveName = renamed.suffixWithOrElseGet(
             "-subscription-receive", builder, SUBSCRIPTION_PROCESSOR);
-        final StatefulProcessorNode<KO, SubscriptionWrapper<K>> 
subscriptionReceiveNode =
-            new StatefulProcessorNode<>(
+        final ProcessorGraphNode<KO, SubscriptionWrapper<K>> 
subscriptionReceiveNode =
+            new ProcessorGraphNode<>(
                 subscriptionReceiveName,
                 new ProcessorParameters<>(
                     new 
SubscriptionReceiveProcessorSupplier<>(subscriptionStoreFactory, 
combinedKeySchema),
-                    subscriptionReceiveName),
-                new String[]{subscriptionStoreName}
+                    subscriptionReceiveName)
             );
         builder.addGraphNode(subscriptionSource, subscriptionReceiveNode);
 
         final KTableValueGetterSupplier<KO, VO> foreignKeyValueGetter = 
((KTableImpl<KO, VO, VO>) foreignKeyTable).valueGetterSupplier();
-        final StatefulProcessorNode<CombinedKey<KO, K>, 
Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinNode =
-            new StatefulProcessorNode<>(
+        final ProcessorToStateConnectorNode<CombinedKey<KO, K>, 
Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> subscriptionJoinNode =
+            new ProcessorToStateConnectorNode<>(
                 new ProcessorParameters<>(
                     new SubscriptionJoinProcessorSupplier<>(
                         foreignKeyValueGetter
                     ),
                     renamed.suffixWithOrElseGet("-subscription-join-foreign", 
builder, SUBSCRIPTION_PROCESSOR)
                 ),
-                Collections.emptySet(),
                 Collections.singleton(foreignKeyValueGetter)
             );
         builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinNode);
@@ -1306,7 +1310,7 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
         builder.internalTopologyBuilder.copartitionSources(resultSourceNodes);
 
         final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = 
valueGetterSupplier();
-        final StatefulProcessorNode<K, SubscriptionResponseWrapper<VO>> 
responseJoinNode = new StatefulProcessorNode<>(
+        final ProcessorToStateConnectorNode<K, 
SubscriptionResponseWrapper<VO>> responseJoinNode = new 
ProcessorToStateConnectorNode<>(
             new ProcessorParameters<>(
                 new ResponseJoinProcessorSupplier<>(
                         primaryKeyValueGetter,
@@ -1317,7 +1321,6 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
                 ),
                 renamed.suffixWithOrElseGet("-subscription-response-resolver", 
builder, SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR)
             ),
-            Collections.emptySet(),
             Collections.singleton(primaryKeyValueGetter)
         );
         builder.addGraphNode(foreignResponseSource, responseJoinNode);
@@ -1339,7 +1342,7 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
 
         final KTableSource<K, VR> resultProcessorSupplier = new 
KTableSource<>(materializedInternal);
 
-        final TableProcessorNode<K, VR> resultNode = new TableProcessorNode<>(
+        final ProcessorGraphNode<K, VR> resultNode = new ProcessorGraphNode<>(
             resultProcessorName,
             new ProcessorParameters<>(
                 resultProcessorSupplier,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignJoinSubscriptionSendNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignJoinSubscriptionSendNode.java
index 4efbd9b29f1..afd9ee1e64d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignJoinSubscriptionSendNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignJoinSubscriptionSendNode.java
@@ -22,7 +22,7 @@ import 
org.apache.kafka.streams.processor.api.ProcessorSupplier;
 public class ForeignJoinSubscriptionSendNode<K, V> extends 
ProcessorGraphNode<K, V> implements VersionedSemanticsGraphNode {
 
     public ForeignJoinSubscriptionSendNode(final ProcessorParameters<K, V, ?, 
?> processorParameters) {
-        super(processorParameters);
+        super(processorParameters.processorName(), processorParameters);
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
index 66ffdc003ae..d58b3a3b1a9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
@@ -72,8 +72,8 @@ public final class GraphGraceSearchUtil {
 
     @SuppressWarnings("rawtypes")
     private static Long extractGracePeriod(final GraphNode node) {
-        if (node instanceof StatefulProcessorNode) {
-            final ProcessorSupplier processorSupplier = 
((StatefulProcessorNode) node).processorParameters().processorSupplier();
+        if (node instanceof ProcessorGraphNode) {
+            final ProcessorSupplier processorSupplier = ((ProcessorGraphNode) 
node).processorParameters().processorSupplier();
             if (processorSupplier instanceof KStreamWindowAggregate) {
                 final KStreamWindowAggregate kStreamWindowAggregate = 
(KStreamWindowAggregate) processorSupplier;
                 final Windows windows = kStreamWindowAggregate.windows();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
index 1c8e8cace2b..514676af1f6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
@@ -28,13 +28,6 @@ public class ProcessorGraphNode<K, V> extends GraphNode {
 
     private final ProcessorParameters<K, V, ?, ?> processorParameters;
 
-    public ProcessorGraphNode(final ProcessorParameters<K, V, ?, ?> 
processorParameters) {
-
-        super(processorParameters.processorName());
-
-        this.processorParameters = processorParameters;
-    }
-
     public ProcessorGraphNode(final String nodeName,
                               final ProcessorParameters<K, V, ?, ?> 
processorParameters) {
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorToStateConnectorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorToStateConnectorNode.java
new file mode 100644
index 00000000000..b476d6a7731
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorToStateConnectorNode.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
+import org.apache.kafka.streams.processor.ConnectedStoreProvider;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+import java.util.Arrays;
+import java.util.Set;
+
+/**
+ * Used for stateful processors that need to be manually connected to the 
state store(s)
+ * they need to access. This should only be used in cases where the stores) 
cannot
+ * be connected automatically by implementing the {@link 
ConnectedStoreProvider#stores()} method
+ * and returning the store directly. Generally this will only apply to DSL 
operators that utilize
+ * value getters to access another processor's state store(s), and the 
process/processValues
+ * operator where the user's custom processor supplier doesn't implement the 
#stores method
+ * and has to be connected to the store when compiling the topology.
+ */
+public class ProcessorToStateConnectorNode<K, V> extends ProcessorGraphNode<K, 
V> {
+
+    private final String[] storeNames;
+
+    /**
+     * Create a node representing a stateful processor that uses value getters 
to access stores, and needs to
+     * be connected with those stores
+     */
+    public ProcessorToStateConnectorNode(final ProcessorParameters<K, V, ?, ?> 
processorParameters,
+                                         final 
Set<KTableValueGetterSupplier<?, ?>> valueGetterSuppliers) {
+        super(processorParameters.processorName(), processorParameters);
+        storeNames = valueGetterSuppliers.stream().flatMap(s -> 
Arrays.stream(s.storeNames())).toArray(String[]::new);
+    }
+
+    /**
+     * Create a node representing a stateful processor, which needs to be 
connected to the provided stores
+     */
+    public ProcessorToStateConnectorNode(final String nodeName,
+                                         final ProcessorParameters<K, V, ?, ?> 
processorParameters,
+                                         final String[] storeNames) {
+        super(nodeName, processorParameters);
+        this.storeNames = storeNames;
+    }
+
+    @Override
+    public String toString() {
+        return "ProcessorNode{" +
+            "storeNames=" + Arrays.toString(storeNames) +
+            "} " + super.toString();
+    }
+
+    @Override
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
+        processorParameters().addProcessorTo(topologyBuilder, 
parentNodeNames());
+
+        if (storeNames != null && storeNames.length > 0) {
+            
topologyBuilder.connectProcessorAndStateStores(processorParameters().processorName(),
 storeNames);
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
deleted file mode 100644
index ec6c6583b3e..00000000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals.graph;
-
-import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
-import org.apache.kafka.streams.state.StoreBuilder;
-
-import java.util.Arrays;
-import java.util.Set;
-import java.util.stream.Stream;
-
-public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> {
-
-    private final String[] storeNames;
-    private final StoreFactory storeFactory;
-
-    /**
-     * Create a node representing a stateful processor, where the named stores 
have already been registered.
-     */
-    public StatefulProcessorNode(final ProcessorParameters<K, V, ?, ?> 
processorParameters,
-                                 final Set<StoreBuilder<?>> 
preRegisteredStores,
-                                 final Set<KTableValueGetterSupplier<?, ?>> 
valueGetterSuppliers) {
-        super(processorParameters.processorName(), processorParameters);
-        final Stream<String> registeredStoreNames = 
preRegisteredStores.stream().map(StoreBuilder::name);
-        final Stream<String> valueGetterStoreNames = 
valueGetterSuppliers.stream().flatMap(s -> Arrays.stream(s.storeNames()));
-        storeNames = Stream.concat(registeredStoreNames, 
valueGetterStoreNames).toArray(String[]::new);
-        storeFactory = null;
-    }
-
-    /**
-     * Create a node representing a stateful processor, where the named stores 
have already been registered.
-     */
-    public StatefulProcessorNode(final String nodeName,
-                                 final ProcessorParameters<K, V, ?, ?> 
processorParameters,
-                                 final String[] storeNames) {
-        super(nodeName, processorParameters);
-
-        this.storeNames = storeNames;
-        this.storeFactory = null;
-    }
-
-
-    /**
-     * Create a node representing a stateful processor,
-     * where the store needs to be built and registered as part of building 
this node.
-     */
-    public StatefulProcessorNode(final String nodeName,
-                                 final ProcessorParameters<K, V, ?, ?> 
processorParameters,
-                                 final StoreFactory 
materializedKTableStoreBuilder) {
-        super(nodeName, processorParameters);
-
-        this.storeNames = null;
-        this.storeFactory = materializedKTableStoreBuilder;
-    }
-
-    @Override
-    public String toString() {
-        return "StatefulProcessorNode{" +
-            "storeNames=" + Arrays.toString(storeNames) +
-            ", storeBuilder=" + storeFactory +
-            "} " + super.toString();
-    }
-
-    @Override
-    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
-        processorParameters().addProcessorTo(topologyBuilder, 
parentNodeNames());
-
-        if (storeNames != null && storeNames.length > 0) {
-            
topologyBuilder.connectProcessorAndStateStores(processorParameters().processorName(),
 storeNames);
-        }
-
-        if (storeFactory != null) {
-            topologyBuilder.addStateStore(storeFactory, 
processorParameters().processorName());
-        }
-    }
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java
index 6fef05604cf..a921dab0d1a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java
@@ -20,11 +20,11 @@ package org.apache.kafka.streams.kstream.internals.graph;
 import org.apache.kafka.streams.kstream.internals.KTableFilter;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 
-public class TableFilterNode<K, V> extends TableProcessorNode<K, V> implements 
VersionedSemanticsGraphNode {
+public class TableFilterNode<K, V> extends ProcessorGraphNode<K, V> implements 
VersionedSemanticsGraphNode {
 
     public TableFilterNode(final String nodeName,
                            final ProcessorParameters<K, V, ?, ?> 
processorParameters) {
-        super(nodeName, processorParameters, null, null);
+        super(nodeName, processorParameters);
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
deleted file mode 100644
index af3ab15d490..00000000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals.graph;
-
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
-
-import java.util.Arrays;
-
-public class TableProcessorNode<K, V> extends GraphNode {
-
-    private final ProcessorParameters<K, V, ?, ?> processorParameters;
-    private final StoreFactory storeFactory;
-    private final String[] storeNames;
-
-    public TableProcessorNode(final String nodeName,
-                              final ProcessorParameters<K, V, ?, ?> 
processorParameters) {
-        this(nodeName, processorParameters, null, null);
-    }
-
-    public TableProcessorNode(final String nodeName,
-                              final ProcessorParameters<K, V, ?, ?> 
processorParameters,
-                              final StoreFactory storeFactory,
-                              final String[] storeNames) {
-        super(nodeName);
-        this.processorParameters = processorParameters;
-        this.storeFactory = storeFactory;
-        this.storeNames = storeNames != null ? storeNames : new String[] {};
-    }
-
-    public ProcessorParameters<K, V, ?, ?> processorParameters() {
-        return processorParameters;
-    }
-
-    @Override
-    public String toString() {
-        return "TableProcessorNode{" +
-            ", processorParameters=" + processorParameters +
-            ", storeFactory=" + (storeFactory == null ? "null" : 
storeFactory.storeName()) +
-            ", storeNames=" + Arrays.toString(storeNames) +
-            "} " + super.toString();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
-        processorParameters.addProcessorTo(topologyBuilder, parentNodeNames());
-
-        final String processorName = processorParameters.processorName();
-
-        if (storeNames.length > 0) {
-            // todo(rodesai): remove me once all operators have been moved to 
ProcessorSupplier
-            topologyBuilder.connectProcessorAndStateStores(processorName, 
storeNames);
-        }
-
-        if (storeFactory != null) {
-            // todo(rodesai) remove when KTableImpl#doFilter, 
KTableImpl#doTransformValues moved to ProcessorSupplier
-            topologyBuilder.addStateStore(storeFactory, processorName);
-        }
-    }
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java
index 595d0266aae..ac4a3f25c37 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java
@@ -16,10 +16,12 @@
  */
 package org.apache.kafka.streams.kstream.internals.graph;
 
-public class TableSuppressNode<K, V> extends StatefulProcessorNode<K, V> {
+/**
+ * Marker interface to identify suppression nodes since they have some special 
requirements
+ */
+public class TableSuppressNode<K, V> extends ProcessorGraphNode<K, V> {
     public TableSuppressNode(final String nodeName,
-                             final ProcessorParameters<K, V, ?, ?> 
processorParameters,
-                             final String[] storeNames) {
-        super(nodeName, processorParameters, storeNames);
+                             final ProcessorParameters<K, V, ?, ?> 
processorParameters) {
+        super(nodeName, processorParameters);
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index 8552790692c..5db1439835c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -25,7 +25,6 @@ import 
org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
 
 import org.junit.jupiter.api.Test;
 
@@ -50,8 +49,20 @@ public class GraphGraceSearchUtilTest {
     public void shouldFailIfThereIsNoGraceAncestor() {
         // doesn't matter if this ancestor is stateless or stateful. The 
important thing it that there is
         // no grace period defined on any ancestor of the node
-        final StatefulProcessorNode<String, Long> gracelessAncestor = new 
StatefulProcessorNode<>(
-            "stateful",
+        final ProcessorGraphNode<String, Long> gracelessAncestor = new 
ProcessorGraphNode<>(
+            "graceless",
+            new ProcessorParameters<>(
+                () -> new Processor<String, Long, String, Long>() {
+                    @Override
+                    public void process(final Record<String, Long> record) {}
+
+                },
+                "graceless"
+            )
+        );
+
+        final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>(
+            "stateless",
             new ProcessorParameters<>(
                 () -> new Processor<String, Long, String, Long>() {
 
@@ -59,26 +70,24 @@ public class GraphGraceSearchUtilTest {
                     public void process(final Record<String, Long> record) {}
 
                 },
-                "dummy"
-            ),
-            (StoreFactory) null
+                "stateless"
+            )
         );
 
-        final ProcessorGraphNode<String, Long> node = new 
ProcessorGraphNode<>("stateless", null);
         gracelessAncestor.addChild(node);
 
         try {
             GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
             fail("should have thrown.");
         } catch (final TopologyException e) {
-            assertThat(e.getMessage(), is("Invalid topology: Window close time 
is only defined for windowed computations. Got [stateful->stateless]."));
+            assertThat(e.getMessage(), is("Invalid topology: Window close time 
is only defined for windowed computations. Got [graceless->stateless]."));
         }
     }
 
     @Test
     public void shouldExtractGraceFromKStreamWindowAggregateNode() {
         final TimeWindows windows = TimeWindows.ofSizeAndGrace(ofMillis(10L), 
ofMillis(1234L));
-        final StatefulProcessorNode<String, Long> node = new 
StatefulProcessorNode<>(
+        final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>(
             "asdf",
             new ProcessorParameters<>(
                 new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
@@ -89,8 +98,7 @@ public class GraphGraceSearchUtilTest {
                     null
                 ),
                 "asdf"
-            ),
-            (StoreFactory) null
+            )
         );
 
         final long extracted = 
GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
@@ -101,7 +109,7 @@ public class GraphGraceSearchUtilTest {
     public void shouldExtractGraceFromKStreamSessionWindowAggregateNode() {
         final SessionWindows windows = 
SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L));
 
-        final StatefulProcessorNode<String, Long> node = new 
StatefulProcessorNode<>(
+        final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>(
             "asdf",
             new ProcessorParameters<>(
                 new KStreamSessionWindowAggregate<String, Long, Integer>(
@@ -113,8 +121,7 @@ public class GraphGraceSearchUtilTest {
                     null
                 ),
                 "asdf"
-            ),
-            (StoreFactory) null
+            )
         );
 
         final long extracted = 
GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
@@ -124,15 +131,14 @@ public class GraphGraceSearchUtilTest {
     @Test
     public void shouldExtractGraceFromSessionAncestorThroughStatefulParent() {
         final SessionWindows windows = 
SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L));
-        final StatefulProcessorNode<String, Long> graceGrandparent = new 
StatefulProcessorNode<>(
+        final ProcessorGraphNode<String, Long> graceGrandparent = new 
ProcessorGraphNode<>(
             "asdf",
             new ProcessorParameters<>(new 
KStreamSessionWindowAggregate<String, Long, Integer>(
                 windows, mockStoreFactory("asdf"), 
EmitStrategy.onWindowUpdate(), null, null, null
-            ), "asdf"),
-            (StoreFactory) null
+            ), "asdf")
         );
 
-        final StatefulProcessorNode<String, Long> statefulParent = new 
StatefulProcessorNode<>(
+        final ProcessorGraphNode<String, Long> statefulParent = new 
ProcessorGraphNode<>(
             "stateful",
             new ProcessorParameters<>(
                 () -> new Processor<String, Long, String, Long>() {
@@ -142,12 +148,22 @@ public class GraphGraceSearchUtilTest {
 
                 },
                 "dummy"
-            ),
-            (StoreFactory) null
+            )
         );
         graceGrandparent.addChild(statefulParent);
 
-        final ProcessorGraphNode<String, Long> node = new 
ProcessorGraphNode<>("stateless", null);
+        final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>(
+            "stateless",
+            new ProcessorParameters<>(
+                () -> new Processor<String, Long, String, Long>() {
+
+                    @Override
+                    public void process(final Record<String, Long> record) {}
+
+                },
+                "dummyChild-graceless"
+            )
+        );
         statefulParent.addChild(node);
 
         final long extracted = 
GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
@@ -157,7 +173,7 @@ public class GraphGraceSearchUtilTest {
     @Test
     public void shouldExtractGraceFromSessionAncestorThroughStatelessParent() {
         final SessionWindows windows = 
SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L));
-        final StatefulProcessorNode<String, Long> graceGrandparent = new 
StatefulProcessorNode<>(
+        final ProcessorGraphNode<String, Long> graceGrandparent = new 
ProcessorGraphNode<>(
             "asdf",
             new ProcessorParameters<>(
                 new KStreamSessionWindowAggregate<String, Long, Integer>(
@@ -169,14 +185,35 @@ public class GraphGraceSearchUtilTest {
                     null
                 ),
                 "asdf"
-            ),
-            (StoreFactory) null
+            )
         );
 
-        final ProcessorGraphNode<String, Long> statelessParent = new 
ProcessorGraphNode<>("stateless", null);
+        final ProcessorGraphNode<String, Long> statelessParent = new 
ProcessorGraphNode<>(
+            "statelessParent",
+            new ProcessorParameters<>(
+                () -> new Processor<String, Long, String, Long>() {
+
+                    @Override
+                    public void process(final Record<String, Long> record) {}
+
+                },
+                "statelessParent"
+            )
+        );
         graceGrandparent.addChild(statelessParent);
 
-        final ProcessorGraphNode<String, Long> node = new 
ProcessorGraphNode<>("stateless", null);
+        final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>(
+            "stateless",
+            new ProcessorParameters<>(
+                () -> new Processor<String, Long, String, Long>() {
+
+                    @Override
+                    public void process(final Record<String, Long> record) {}
+
+                },
+                "stateless"
+            )
+        );
         statelessParent.addChild(node);
 
         final long extracted = 
GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
@@ -185,7 +222,7 @@ public class GraphGraceSearchUtilTest {
 
     @Test
     public void shouldUseMaxIfMultiParentsDoNotAgreeOnGrace() {
-        final StatefulProcessorNode<String, Long> leftParent = new 
StatefulProcessorNode<>(
+        final ProcessorGraphNode<String, Long> leftParent = new 
ProcessorGraphNode<>(
             "asdf",
             new ProcessorParameters<>(
                 new KStreamSessionWindowAggregate<String, Long, Integer>(
@@ -197,11 +234,10 @@ public class GraphGraceSearchUtilTest {
                     null
                 ),
                 "asdf"
-            ),
-            (StoreFactory) null
+            )
         );
 
-        final StatefulProcessorNode<String, Long> rightParent = new 
StatefulProcessorNode<>(
+        final ProcessorGraphNode<String, Long> rightParent = new 
ProcessorGraphNode<>(
             "asdf",
             new ProcessorParameters<>(
                 new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
@@ -212,11 +248,21 @@ public class GraphGraceSearchUtilTest {
                     null
                 ),
                 "asdf"
-            ),
-            (StoreFactory) null
+            )
         );
 
-        final ProcessorGraphNode<String, Long> node = new 
ProcessorGraphNode<>("stateless", null);
+        final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>(
+            "stateless",
+            new ProcessorParameters<>(
+                () -> new Processor<String, Long, String, Long>() {
+
+                    @Override
+                    public void process(final Record<String, Long> record) {}
+
+                },
+                "stateless"
+            )
+        );
         leftParent.addChild(node);
         rightParent.addChild(node);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java
deleted file mode 100644
index 36a32d01dc2..00000000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals.graph;
-
-import org.apache.kafka.streams.processor.api.Processor;
-import org.apache.kafka.streams.processor.api.Record;
-
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class TableProcessorNodeTest {
-    private static class TestProcessor implements Processor<String, String, 
String, String> {
-
-        @Override
-        public void process(final Record<String, String> record) {
-        }
-
-    }
-
-    @Test
-    public void shouldConvertToStringWithNullStoreBuilder() {
-        final TableProcessorNode<String, String> node = new 
TableProcessorNode<>(
-            "name",
-            new ProcessorParameters<>(TestProcessor::new, "processor"),
-            null,
-            new String[]{"store1", "store2"}
-        );
-
-        final String asString = node.toString();
-        final String expected = "storeFactory=null";
-        assertTrue(
-                asString.contains(expected),
-                String.format(
-                        "Expected toString to return string with \"%s\", 
received: %s",
-                        expected,
-                        asString)
-        );
-    }
-}
\ No newline at end of file

Reply via email to