This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 184b64fb416 KAFKA-18026: migrate KStream and KTable aggregates to use 
ProcesserSupplier#stores (#17929)
184b64fb416 is described below

commit 184b64fb41651f95e038f402fa24f58761f4c185
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Dec 3 02:09:43 2024 -0800

    KAFKA-18026: migrate KStream and KTable aggregates to use 
ProcesserSupplier#stores (#17929)
    
    As part of KIP-1112, to maximize the utility of the new ProcessorWrapper, 
we need to migrate the DSL operators to the new method of attaching state 
stores by implementing ProcessorSupplier#stores, which makes these stores 
available for inspection by the user's wrapper.
    
    This PR covers the aggregate operator for both KStream and KTable.
    
    
    Reviewers: Guozhang Wang <[email protected]>, Rohan Desai 
<[email protected]>
---
 .../internals/CogroupedStreamAggregateBuilder.java |  20 +-
 .../kstream/internals/KGroupedStreamImpl.java      |   6 +-
 .../kstream/internals/KGroupedTableImpl.java       |   8 +-
 .../kstream/internals/KStreamAggregate.java        |  28 +-
 .../streams/kstream/internals/KStreamReduce.java   |  20 +-
 .../internals/KStreamSessionWindowAggregate.java   |  16 +-
 .../internals/KStreamSlidingWindowAggregate.java   |  15 +-
 .../kstream/internals/KStreamWindowAggregate.java  |  16 +-
 .../streams/kstream/internals/KTableAggregate.java |  19 +-
 .../streams/kstream/internals/KTableReduce.java    |  21 +-
 .../internals/SessionWindowedKStreamImpl.java      |  18 +-
 .../internals/SlidingWindowedKStreamImpl.java      |  16 +-
 .../kstream/internals/TimeWindowedKStreamImpl.java |  16 +-
 .../apache/kafka/streams/StreamsBuilderTest.java   | 323 ++++++++++++++++++---
 .../org/apache/kafka/streams/TopologyTest.java     |   9 +-
 ...KStreamSessionWindowAggregateProcessorTest.java |   7 +-
 .../internals/KStreamWindowAggregateTest.java      |  15 +-
 .../kstream/internals/KTableReduceTest.java        |   3 +-
 .../internals/graph/GraphGraceSearchUtilTest.java  |  13 +-
 .../org/apache/kafka/streams/utils/TestUtils.java  | 141 ++++++++-
 20 files changed, 614 insertions(+), 116 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
index 161c4a85c52..5b294c39d68 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
@@ -58,14 +58,14 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                                 final Serde<VOut> valueSerde,
                                 final String queryableName,
                                 final boolean isOutputVersioned) {
-        processRepartitions(groupPatterns, storeFactory);
+        processRepartitions(groupPatterns, storeFactory.name());
         final Collection<GraphNode> processors = new ArrayList<>();
         final Collection<KStreamAggProcessorSupplier> parentProcessors = new 
ArrayList<>();
         boolean stateCreated = false;
         int counter = 0;
         for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, 
Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
             final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
-                new KStreamAggregate<>(storeFactory.name(), initializer, 
kGroupedStream.getValue());
+                new KStreamAggregate<>(storeFactory, initializer, 
kGroupedStream.getValue());
             parentProcessors.add(parentProcessor);
             final StatefulProcessorNode<K, ?> statefulProcessorNode = 
getStatefulProcessorNode(
                 named.suffixWithOrElseGet(
@@ -92,7 +92,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                                                   final Serde<VOut> valueSerde,
                                                   final String queryableName,
                                                   final Windows<W> windows) {
-        processRepartitions(groupPatterns, storeFactory);
+        processRepartitions(groupPatterns, storeFactory.name());
 
         final Collection<GraphNode> processors = new ArrayList<>();
         final Collection<KStreamAggProcessorSupplier> parentProcessors = new 
ArrayList<>();
@@ -102,7 +102,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
             final KStreamAggProcessorSupplier<K, ?, K, ?>  parentProcessor =
                 (KStreamAggProcessorSupplier<K, ?, K, ?>) new 
KStreamWindowAggregate<K, K, VOut, W>(
                     windows,
-                    storeFactory.name(),
+                    storeFactory,
                     EmitStrategy.onWindowUpdate(),
                     initializer,
                     kGroupedStream.getValue());
@@ -132,7 +132,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                                 final String queryableName,
                                 final SessionWindows sessionWindows,
                                 final Merger<? super K, VOut> sessionMerger) {
-        processRepartitions(groupPatterns, storeFactory);
+        processRepartitions(groupPatterns, storeFactory.name());
         final Collection<GraphNode> processors = new ArrayList<>();
         final Collection<KStreamAggProcessorSupplier> parentProcessors = new 
ArrayList<>();
         boolean stateCreated = false;
@@ -141,7 +141,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
             final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
                 (KStreamAggProcessorSupplier<K, ?, K, ?>) new 
KStreamSessionWindowAggregate<K, K, VOut>(
                     sessionWindows,
-                    storeFactory.name(),
+                    storeFactory,
                     EmitStrategy.onWindowUpdate(),
                     initializer,
                     kGroupedStream.getValue(),
@@ -171,7 +171,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                                 final Serde<VOut> valueSerde,
                                 final String queryableName,
                                 final SlidingWindows slidingWindows) {
-        processRepartitions(groupPatterns, storeFactory);
+        processRepartitions(groupPatterns, storeFactory.name());
         final Collection<KStreamAggProcessorSupplier> parentProcessors = new 
ArrayList<>();
         final Collection<GraphNode> processors = new ArrayList<>();
         boolean stateCreated = false;
@@ -180,7 +180,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
             final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
                 (KStreamAggProcessorSupplier<K, ?, K, ?>) new 
KStreamSlidingWindowAggregate<K, K, VOut>(
                     slidingWindows,
-                    storeFactory.name(),
+                    storeFactory,
                     // TODO: We do not have other emit policies for co-group 
yet
                     EmitStrategy.onWindowUpdate(),
                     initializer,
@@ -202,7 +202,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
     }
 
     private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, 
Aggregator<? super K, ? super Object, VOut>> groupPatterns,
-                                     final StoreFactory storeFactory) {
+                                     final String storeName) {
         for (final KGroupedStreamImpl<K, ?> repartitionReqs : 
groupPatterns.keySet()) {
 
             if (repartitionReqs.repartitionRequired) {
@@ -210,7 +210,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                 final OptimizableRepartitionNodeBuilder<K, ?> 
repartitionNodeBuilder = optimizableRepartitionNodeBuilder();
 
                 final String repartitionNamePrefix = 
repartitionReqs.userProvidedRepartitionTopicName != null ?
-                    repartitionReqs.userProvidedRepartitionTopicName : 
storeFactory.name();
+                    repartitionReqs.userProvidedRepartitionTopicName : 
storeName;
 
                 createRepartitionSource(repartitionNamePrefix, 
repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 2ed1854d9cc..256153708a0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -98,7 +98,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> 
implements KGroupedS
 
         final String name = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
         return doAggregate(
-            new KStreamReduce<>(materializedInternal.storeName(), reducer),
+            new KStreamReduce<>(materializedInternal, reducer),
             name,
             materializedInternal
         );
@@ -130,7 +130,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> 
implements KGroupedS
 
         final String name = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
         return doAggregate(
-            new KStreamAggregate<>(materializedInternal.storeName(), 
initializer, aggregator),
+            new KStreamAggregate<>(materializedInternal, initializer, 
aggregator),
             name,
             materializedInternal
         );
@@ -184,7 +184,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> 
implements KGroupedS
 
         final String name = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
         return doAggregate(
-            new KStreamAggregate<>(materializedInternal.storeName(), 
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+            new KStreamAggregate<>(materializedInternal, 
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
             name,
             materializedInternal);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index d03cb65c021..e500582244b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -91,7 +91,7 @@ public class KGroupedTableImpl<K, V> extends 
AbstractStream<K, V> implements KGr
         final StatefulProcessorNode statefulProcessorNode = new 
StatefulProcessorNode<>(
             funcName,
             new ProcessorParameters<>(aggregateSupplier, funcName),
-            new KeyValueStoreMaterializer<>(materialized)
+            new String[]{materialized.storeName()}
         );
         statefulProcessorNode.setOutputVersioned(materialized.storeSupplier() 
instanceof VersionedBytesStoreSupplier);
 
@@ -148,7 +148,7 @@ public class KGroupedTableImpl<K, V> extends 
AbstractStream<K, V> implements KGr
             materializedInternal.withValueSerde(valueSerde);
         }
         final ProcessorSupplier<K, Change<V>, K, Change<V>> aggregateSupplier 
= new KTableReduce<>(
-            materializedInternal.storeName(),
+            materializedInternal,
             adder,
             subtractor);
         return doAggregate(aggregateSupplier, new NamedInternal(named), 
REDUCE_NAME, materializedInternal);
@@ -179,7 +179,7 @@ public class KGroupedTableImpl<K, V> extends 
AbstractStream<K, V> implements KGr
         }
 
         final ProcessorSupplier<K, Change<V>, K, Change<Long>> 
aggregateSupplier = new KTableAggregate<>(
-            materializedInternal.storeName(),
+            materializedInternal,
             countInitializer,
             countAdder,
             countSubtractor);
@@ -224,7 +224,7 @@ public class KGroupedTableImpl<K, V> extends 
AbstractStream<K, V> implements KGr
             materializedInternal.withKeySerde(keySerde);
         }
         final ProcessorSupplier<K, Change<V>, K, Change<VAgg>> 
aggregateSupplier = new KTableAggregate<>(
-            materializedInternal.storeName(),
+            materializedInternal,
             initializer,
             adder,
             subtractor);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 3e906813d84..abe5fd2b566 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
@@ -24,13 +25,20 @@ import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import 
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.Set;
+
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 import static 
org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
@@ -41,19 +49,35 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements 
KStreamAggProcessorSupp
     private static final Logger LOG = 
LoggerFactory.getLogger(KStreamAggregate.class);
 
     private final String storeName;
+    private final StoreFactory storeFactory;
     private final Initializer<VAgg> initializer;
     private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
 
     private boolean sendOldValues = false;
 
-    KStreamAggregate(final String storeName,
+    KStreamAggregate(final MaterializedInternal<KIn, VAgg, 
KeyValueStore<Bytes, byte[]>> materialized,
+                     final Initializer<VAgg> initializer,
+                     final Aggregator<? super KIn, ? super VIn, VAgg> 
aggregator) {
+        this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
+        this.storeName = materialized.storeName();
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    KStreamAggregate(final StoreFactory storeFactory,
                      final Initializer<VAgg> initializer,
                      final Aggregator<? super KIn, ? super VIn, VAgg> 
aggregator) {
-        this.storeName = storeName;
+        this.storeFactory = storeFactory;
+        this.storeName = storeFactory.name();
         this.initializer = initializer;
         this.aggregator = aggregator;
     }
 
+    @Override
+    public Set<StoreBuilder<?>> stores() {
+        return Collections.singleton(new 
FactoryWrappingStoreBuilder<>(storeFactory));
+    }
+
     @Override
     public Processor<KIn, VIn, KIn, Change<VAgg>> get() {
         return new KStreamAggregateProcessor();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 15528f5d150..f337cd9ae44 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -17,19 +17,27 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import 
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.Set;
+
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 import static 
org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
@@ -40,15 +48,23 @@ public class KStreamReduce<K, V> implements 
KStreamAggProcessorSupplier<K, V, K,
     private static final Logger LOG = 
LoggerFactory.getLogger(KStreamReduce.class);
 
     private final String storeName;
+    private final StoreFactory storeFactory;
     private final Reducer<V> reducer;
 
     private boolean sendOldValues = false;
 
-    KStreamReduce(final String storeName, final Reducer<V> reducer) {
-        this.storeName = storeName;
+    KStreamReduce(final MaterializedInternal<K, V, KeyValueStore<Bytes, 
byte[]>> materialized, final Reducer<V> reducer) {
+        this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
+        this.storeName = materialized.storeName();
         this.reducer = reducer;
     }
 
+    @Override
+    public Set<StoreBuilder<?>> stores() {
+        return Collections.singleton(new 
FactoryWrappingStoreBuilder<>(storeFactory));
+    }
+
+
     @Override
     public Processor<K, V, K, Change<V>> get() {
         return new KStreamReduceProcessor();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 8f2c53c8a9a..4a8040a8d37 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -33,16 +33,21 @@ import 
org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import 
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION;
 import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor;
@@ -54,6 +59,7 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
     private static final Logger LOG = 
LoggerFactory.getLogger(KStreamSessionWindowAggregate.class);
 
     private final String storeName;
+    private final StoreFactory storeFactory;
     private final SessionWindows windows;
     private final Initializer<VAgg> initializer;
     private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
@@ -63,19 +69,25 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
     private boolean sendOldValues = false;
 
     public KStreamSessionWindowAggregate(final SessionWindows windows,
-                                         final String storeName,
+                                         final StoreFactory storeFactory,
                                          final EmitStrategy emitStrategy,
                                          final Initializer<VAgg> initializer,
                                          final Aggregator<? super KIn, ? super 
VIn, VAgg> aggregator,
                                          final Merger<? super KIn, VAgg> 
sessionMerger) {
         this.windows = windows;
-        this.storeName = storeName;
+        this.storeName = storeFactory.name();
+        this.storeFactory = storeFactory;
         this.emitStrategy = emitStrategy;
         this.initializer = initializer;
         this.aggregator = aggregator;
         this.sessionMerger = sessionMerger;
     }
 
+    @Override
+    public Set<StoreBuilder<?>> stores() {
+        return Collections.singleton(new 
FactoryWrappingStoreBuilder<>(storeFactory));
+    }
+
     @Override
     public Processor<KIn, VIn, Windowed<KIn>, Change<VAgg>> get() {
         return new KStreamSessionWindowAggregateProcessor();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
index 4a288bb0e83..894657da48c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -28,7 +28,10 @@ import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import 
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -36,6 +39,7 @@ import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -46,6 +50,7 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
     private static final Logger log = 
LoggerFactory.getLogger(KStreamSlidingWindowAggregate.class);
 
     private final String storeName;
+    private final StoreFactory storeFactory;
     private final SlidingWindows windows;
     private final Initializer<VAgg> initializer;
     private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
@@ -54,17 +59,23 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
     private boolean sendOldValues = false;
 
     public KStreamSlidingWindowAggregate(final SlidingWindows windows,
-                                         final String storeName,
+                                         final StoreFactory storeFactory,
                                          final EmitStrategy emitStrategy,
                                          final Initializer<VAgg> initializer,
                                          final Aggregator<? super KIn, ? super 
VIn, VAgg> aggregator) {
         this.windows = windows;
-        this.storeName = storeName;
+        this.storeName = storeFactory.name();
+        this.storeFactory = storeFactory;
         this.initializer = initializer;
         this.aggregator = aggregator;
         this.emitStrategy = emitStrategy;
     }
 
+    @Override
+    public Set<StoreBuilder<?>> stores() {
+        return Collections.singleton(new 
FactoryWrappingStoreBuilder<>(storeFactory));
+    }
+
     @Override
     public Processor<KIn, VIn, Windowed<KIn>, Change<VAgg>> get() {
         return new KStreamSlidingWindowAggregateProcessor(storeName, 
emitStrategy, sendOldValues);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 340ce82d856..2e6147627e9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -29,13 +29,18 @@ import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import 
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
@@ -44,6 +49,7 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends 
Window> implements
     private static final Logger log = 
LoggerFactory.getLogger(KStreamWindowAggregate.class);
 
     private final String storeName;
+    private final StoreFactory storeFactory;
     private final Windows<W> windows;
     private final Initializer<VAgg> initializer;
     private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
@@ -52,12 +58,13 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W 
extends Window> implements
     private boolean sendOldValues = false;
 
     public KStreamWindowAggregate(final Windows<W> windows,
-                                  final String storeName,
+                                  final StoreFactory storeFactory,
                                   final EmitStrategy emitStrategy,
                                   final Initializer<VAgg> initializer,
                                   final Aggregator<? super KIn, ? super VIn, 
VAgg> aggregator) {
         this.windows = windows;
-        this.storeName = storeName;
+        this.storeName = storeFactory.name();
+        this.storeFactory = storeFactory;
         this.emitStrategy = emitStrategy;
         this.initializer = initializer;
         this.aggregator = aggregator;
@@ -70,6 +77,11 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W 
extends Window> implements
         }
     }
 
+    @Override
+    public Set<StoreBuilder<?>> stores() {
+        return Collections.singleton(new 
FactoryWrappingStoreBuilder<>(storeFactory));
+    }
+
     @Override
     public Processor<KIn, VIn, Windowed<KIn>, Change<VAgg>> get() {
         return new KStreamWindowAggregateProcessor(storeName, emitStrategy, 
sendOldValues);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index f71143ff209..cecb8048634 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -16,15 +16,23 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import 
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
 
+import java.util.Collections;
+import java.util.Set;
+
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 import static 
org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
 import static 
org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST;
@@ -33,17 +41,19 @@ public class KTableAggregate<KIn, VIn, VAgg> implements
     KTableProcessorSupplier<KIn, VIn, KIn, VAgg> {
 
     private final String storeName;
+    private final StoreFactory storeFactory;
     private final Initializer<VAgg> initializer;
     private final Aggregator<? super KIn, ? super VIn, VAgg> add;
     private final Aggregator<? super KIn, ? super VIn, VAgg> remove;
 
     private boolean sendOldValues = false;
 
-    KTableAggregate(final String storeName,
+    KTableAggregate(final MaterializedInternal<KIn, VAgg, KeyValueStore<Bytes, 
byte[]>> materialized,
                     final Initializer<VAgg> initializer,
                     final Aggregator<? super KIn, ? super VIn, VAgg> add,
                     final Aggregator<? super KIn, ? super VIn, VAgg> remove) {
-        this.storeName = storeName;
+        this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
+        this.storeName = materialized.storeName();
         this.initializer = initializer;
         this.add = add;
         this.remove = remove;
@@ -56,6 +66,11 @@ public class KTableAggregate<KIn, VIn, VAgg> implements
         return true;
     }
 
+    @Override
+    public Set<StoreBuilder<?>> stores() {
+        return Collections.singleton(new 
FactoryWrappingStoreBuilder<>(storeFactory));
+    }
+
     @Override
     public Processor<KIn, Change<VIn>, KIn, Change<VAgg>> get() {
         return new KTableAggregateProcessor();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index c577d30d984..d0b35098abe 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -16,14 +16,22 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import 
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
 
+import java.util.Collections;
+import java.util.Set;
+
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 import static 
org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
 import static 
org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST;
@@ -31,13 +39,17 @@ import static 
org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_
 public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, K, V> 
{
 
     private final String storeName;
+    private final StoreFactory storeFactory;
     private final Reducer<V> addReducer;
     private final Reducer<V> removeReducer;
 
     private boolean sendOldValues = false;
 
-    KTableReduce(final String storeName, final Reducer<V> addReducer, final 
Reducer<V> removeReducer) {
-        this.storeName = storeName;
+    KTableReduce(final MaterializedInternal<K, V, KeyValueStore<Bytes, 
byte[]>> materialized,
+                 final Reducer<V> addReducer,
+                 final Reducer<V> removeReducer) {
+        this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
+        this.storeName = materialized.storeName();
         this.addReducer = addReducer;
         this.removeReducer = removeReducer;
     }
@@ -49,6 +61,11 @@ public class KTableReduce<K, V> implements 
KTableProcessorSupplier<K, V, K, V> {
         return true;
     }
 
+    @Override
+    public Set<StoreBuilder<?>> stores() {
+        return Collections.singleton(new 
FactoryWrappingStoreBuilder<>(storeFactory));
+    }
+
     @Override
     public Processor<K, Change<V>, K, Change<V>> get() {
         return new KTableReduceProcessor();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index d8f3770b79a..989984d42f8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.WindowedSerdes;
 import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
 import org.apache.kafka.streams.state.SessionStore;
 
 import java.util.Objects;
@@ -108,12 +109,14 @@ public class SessionWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
         }
 
         final String aggregateName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+        final StoreFactory storeFactory = new 
SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy);
+
         return aggregateBuilder.build(
             new NamedInternal(aggregateName),
-            new SessionStoreMaterializer<>(materializedInternal, windows, 
emitStrategy),
+            storeFactory,
             new KStreamSessionWindowAggregate<>(
                 windows,
-                materializedInternal.storeName(),
+                storeFactory,
                 emitStrategy,
                 aggregateBuilder.countInitializer,
                 aggregateBuilder.countAggregator,
@@ -158,12 +161,14 @@ public class SessionWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
         }
 
         final String reduceName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
+        final StoreFactory storeFactory = new 
SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy);
+
         return aggregateBuilder.build(
             new NamedInternal(reduceName),
-            new SessionStoreMaterializer<>(materializedInternal, windows, 
emitStrategy),
+            storeFactory,
             new KStreamSessionWindowAggregate<>(
                 windows,
-                materializedInternal.storeName(),
+                storeFactory,
                 emitStrategy,
                 aggregateBuilder.reduceInitializer,
                 reduceAggregator,
@@ -216,13 +221,14 @@ public class SessionWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
         }
 
         final String aggregateName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+        final StoreFactory storeFactory = new 
SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy);
 
         return aggregateBuilder.build(
             new NamedInternal(aggregateName),
-            new SessionStoreMaterializer<>(materializedInternal, windows, 
emitStrategy),
+            storeFactory,
             new KStreamSessionWindowAggregate<>(
                 windows,
-                materializedInternal.storeName(),
+                storeFactory,
                 emitStrategy,
                 initializer,
                 aggregator,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
index 3cb7b3f29bd..16b2d0185ae 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
 import org.apache.kafka.streams.state.WindowStore;
 
 import java.util.Objects;
@@ -90,11 +91,12 @@ public class SlidingWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
         }
 
         final String aggregateName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+        final StoreFactory storeFactory = new 
SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
 
         return aggregateBuilder.build(
                 new NamedInternal(aggregateName),
-                new SlidingWindowStoreMaterializer<>(materializedInternal, 
windows, emitStrategy),
-                new KStreamSlidingWindowAggregate<>(windows, 
materializedInternal.storeName(), emitStrategy, 
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+                storeFactory,
+                new KStreamSlidingWindowAggregate<>(windows, storeFactory, 
emitStrategy, aggregateBuilder.countInitializer, 
aggregateBuilder.countAggregator),
                 materializedInternal.queryableStoreName(),
                 materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), 
windows.timeDifferenceMs()) : null,
                 materializedInternal.valueSerde(),
@@ -135,11 +137,12 @@ public class SlidingWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
             materializedInternal.withKeySerde(keySerde);
         }
         final String aggregateName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+        final StoreFactory storeFactory = new 
SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
 
         return aggregateBuilder.build(
                 new NamedInternal(aggregateName),
-                new SlidingWindowStoreMaterializer<>(materializedInternal, 
windows, emitStrategy),
-                new KStreamSlidingWindowAggregate<>(windows, 
materializedInternal.storeName(), emitStrategy, initializer, aggregator),
+                storeFactory,
+                new KStreamSlidingWindowAggregate<>(windows, storeFactory, 
emitStrategy, initializer, aggregator),
                 materializedInternal.queryableStoreName(),
                 materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), 
windows.timeDifferenceMs()) : null,
                 materializedInternal.valueSerde(),
@@ -181,11 +184,12 @@ public class SlidingWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
         }
 
         final String reduceName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
+        final StoreFactory storeFactory = new 
SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
 
         return aggregateBuilder.build(
                 new NamedInternal(reduceName),
-                new SlidingWindowStoreMaterializer<>(materializedInternal, 
windows, emitStrategy),
-                new KStreamSlidingWindowAggregate<>(windows, 
materializedInternal.storeName(), emitStrategy, 
aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
+                storeFactory,
+                new KStreamSlidingWindowAggregate<>(windows, storeFactory, 
emitStrategy, aggregateBuilder.reduceInitializer, 
aggregatorForReducer(reducer)),
                 materializedInternal.queryableStoreName(),
                 materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), 
windows.timeDifferenceMs()) : null,
                 materializedInternal.valueSerde(),
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index b615e20714b..80a671abdc5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
 import org.apache.kafka.streams.state.WindowStore;
 
 import java.util.Objects;
@@ -102,13 +103,14 @@ public class TimeWindowedKStreamImpl<K, V, W extends 
Window> extends AbstractStr
         }
 
         final String aggregateName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+        final StoreFactory storeFactory = new 
WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
 
         return aggregateBuilder.build(
             new NamedInternal(aggregateName),
-            new WindowStoreMaterializer<>(materializedInternal, windows, 
emitStrategy),
+            storeFactory,
             new KStreamWindowAggregate<>(
                 windows,
-                materializedInternal.storeName(),
+                storeFactory,
                 emitStrategy,
                 aggregateBuilder.countInitializer,
                 aggregateBuilder.countAggregator),
@@ -154,13 +156,14 @@ public class TimeWindowedKStreamImpl<K, V, W extends 
Window> extends AbstractStr
         }
 
         final String aggregateName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+        final StoreFactory storeFactory = new 
WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
 
         return aggregateBuilder.build(
             new NamedInternal(aggregateName),
-            new WindowStoreMaterializer<>(materializedInternal, windows, 
emitStrategy),
+            storeFactory,
             new KStreamWindowAggregate<>(
                 windows,
-                materializedInternal.storeName(),
+                storeFactory,
                 emitStrategy,
                 initializer,
                 aggregator),
@@ -205,13 +208,14 @@ public class TimeWindowedKStreamImpl<K, V, W extends 
Window> extends AbstractStr
         }
 
         final String reduceName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
+        final StoreFactory storeFactory = new 
WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
 
         return aggregateBuilder.build(
             new NamedInternal(reduceName),
-            new WindowStoreMaterializer<>(materializedInternal, windows, 
emitStrategy),
+            storeFactory,
             new KStreamWindowAggregate<>(
                 windows,
-                materializedInternal.storeName(),
+                storeFactory,
                 emitStrategy,
                 aggregateBuilder.reduceInitializer,
                 aggregatorForReducer(reducer)),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 5210dd0b3c6..36559b18e29 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -35,7 +36,9 @@ import org.apache.kafka.streams.kstream.Named;
 import org.apache.kafka.streams.kstream.Printed;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
@@ -47,6 +50,7 @@ import 
org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
@@ -56,6 +60,7 @@ import org.apache.kafka.streams.state.internals.RocksDBStore;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper;
+import 
org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper.WrapperRecorder;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockPredicate;
@@ -76,7 +81,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -88,6 +92,8 @@ import static java.util.Arrays.asList;
 import static 
org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1;
+import static org.apache.kafka.streams.state.Stores.inMemoryKeyValueStore;
+import static 
org.apache.kafka.streams.state.Stores.timestampedKeyValueStoreBuilder;
 import static 
org.apache.kafka.streams.utils.TestUtils.PROCESSOR_WRAPPER_COUNTER_CONFIG;
 import static org.apache.kafka.streams.utils.TestUtils.dummyStreamsConfigMap;
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -124,7 +130,7 @@ public class StreamsBuilderTest {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.addGlobalStore(
             Stores.keyValueStoreBuilder(
-                Stores.inMemoryKeyValueStore("store"),
+                inMemoryKeyValueStore("store"),
                 Serdes.String(),
                 Serdes.String()
             ),
@@ -1384,7 +1390,7 @@ public class StreamsBuilderTest {
     @Test
     public void shouldUseSpecifiedNameForGlobalStoreProcessor() {
         builder.addGlobalStore(Stores.keyValueStoreBuilder(
-                        Stores.inMemoryKeyValueStore("store"),
+                        inMemoryKeyValueStore("store"),
                         Serdes.String(),
                         Serdes.String()
                 ),
@@ -1401,7 +1407,7 @@ public class StreamsBuilderTest {
     @Test
     public void shouldUseDefaultNameForGlobalStoreProcessor() {
         builder.addGlobalStore(Stores.keyValueStoreBuilder(
-                        Stores.inMemoryKeyValueStore("store"),
+                        inMemoryKeyValueStore("store"),
                         Serdes.String(),
                         Serdes.String()
                 ),
@@ -1420,8 +1426,8 @@ public class StreamsBuilderTest {
         final Map<Object, Object> props = dummyStreamsConfigMap();
         props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
 
-        final Set<String> wrappedProcessors = Collections.synchronizedSet(new 
HashSet<>());
-        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors);
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
 
         final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
 
@@ -1430,35 +1436,247 @@ public class StreamsBuilderTest {
         // call to fail
         final Random random = new Random();
 
-        builder.stream("input")
-            .process((ProcessorSupplier<Object, Object, Object, Object>) () -> 
record -> System.out.println("Processing: " + random.nextInt()), 
Named.as("processor1"))
-            .processValues(() -> record -> System.out.println("Processing: " + 
random.nextInt()), Named.as("processor2"))
-            .to("output");
+        final StoreBuilder<?> store = 
timestampedKeyValueStoreBuilder(inMemoryKeyValueStore("store"), 
Serdes.String(), Serdes.String());
+        builder.stream("input", Consumed.as("source"))
+            .process(
+                new ProcessorSupplier<>() {
+                    @Override
+                    public Processor<Object, Object, Object, Object> get() {
+                        return record -> System.out.println("Processing: " + 
random.nextInt());
+                    }
+
+                    @Override
+                    public Set<StoreBuilder<?>> stores() {
+                        return Collections.singleton(store);
+                    }
+                },
+                Named.as("stateful-process-1"))
+            .process(
+                new ProcessorSupplier<>() {
+                    @Override
+                    public Processor<Object, Object, Object, Object> get() {
+                        return record -> System.out.println("Processing: " + 
random.nextInt());
+                    }
+
+                    @Override
+                    public Set<StoreBuilder<?>> stores() {
+                        return Collections.singleton(store);
+                    }
+                },
+                Named.as("stateful-process-2"))
+            .processValues(
+                () -> record -> System.out.println("Processing values: " + 
random.nextInt()),
+                Named.as("stateless-processValues"))
+            .to("output", Produced.as("sink"));
 
         builder.build();
-        assertThat(wrappedProcessors.size(), CoreMatchers.is(2));
-        assertThat(wrappedProcessors, 
Matchers.containsInAnyOrder("processor1", "processor2"));
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(3));
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
+            "stateful-process-1", "stateful-process-2", 
"stateless-processValues"));
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
     }
 
     @Test
-    public void shouldWrapProcessorsForAggregationOperators() {
+    public void shouldWrapProcessorsForStreamReduce() {
         final Map<Object, Object> props = dummyStreamsConfigMap();
         props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
 
-        final Set<String> wrappedProcessors = Collections.synchronizedSet(new 
HashSet<>());
-        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors);
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
 
         final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
 
-        builder.stream("input")
+        builder.stream("input", Consumed.as("source"))
+            .groupBy(KeyValue::new, Grouped.as("groupBy")) // wrapped 1 & 2 
(implicit selectKey & repartition)
+            .reduce((l, r) -> l, Named.as("reduce"), Materialized.as("store")) 
// wrapped 3
+            .toStream(Named.as("toStream"))// wrapped 4
+            .to("output", Produced.as("sink"));
+
+        builder.build();
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
+            "groupBy", "groupBy-repartition-filter", "reduce", "toStream"));
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(4));
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
+    }
+
+    @Test
+    public void shouldWrapProcessorsForStreamAggregate() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        builder.stream("input", Consumed.as("source"))
+            .groupByKey()
+            .count(Named.as("count")) // wrapped 1
+            .toStream(Named.as("toStream"))// wrapped 2
+            .to("output", Produced.as("sink"));
+
+        builder.build();
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder("count", "toStream"));
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
+    }
+
+    @Test
+    public void shouldWrapProcessorsForTimeWindowStreamAggregate() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        builder.stream("input", Consumed.as("source"))
+            .groupByKey()
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(1)))
+            .count(Named.as("count")) // wrapped 1
+            .toStream(Named.as("toStream"))// wrapped 2
+            .to("output", Produced.as("sink"));
+
+        builder.build();
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder("count", "toStream"));
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
+    }
+
+    @Test
+    public void shouldWrapProcessorsForSlidingWindowStreamAggregate() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        builder.stream("input", Consumed.as("source"))
+            .groupByKey()
+            
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofDays(1)))
+            .count(Named.as("count")) // wrapped 1
+            .toStream(Named.as("toStream"))// wrapped 2
+            .to("output", Produced.as("sink"));
+
+        builder.build();
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder("count", "toStream"));
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
+    }
+
+    @Test
+    public void shouldWrapProcessorsForSessionWindowStreamAggregate() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        builder.stream("input", Consumed.as("source"))
             .groupByKey()
+            
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofDays(1)))
             .count(Named.as("count")) // wrapped 1
             .toStream(Named.as("toStream"))// wrapped 2
-            .to("output");
+            .to("output", Produced.as("sink"));
 
         builder.build();
-        assertThat(wrappedProcessors.size(), CoreMatchers.is(2));
-        assertThat(wrappedProcessors, Matchers.containsInAnyOrder("count", 
"toStream"));
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder("count", "toStream"));
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
+    }
+
+    @Test
+    public void shouldWrapProcessorsForCoGroupedStreamAggregate() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        final KStream<String, String> stream1 = builder.stream("one", 
Consumed.as("source-1"));
+        final KStream<String, String> stream2 = builder.stream("two", 
Consumed.as("source-2"));
+
+        final KGroupedStream<String, String> grouped1 = 
stream1.groupByKey(Grouped.as("groupByKey-1"));
+        final KGroupedStream<String, String> grouped2 = 
stream2.groupByKey(Grouped.as("groupByKey-2"));
+
+        grouped1
+            .cogroup((k, v, a) -> a + v) // wrapped 1
+            .cogroup(grouped2, (k, v, a) -> a + v) // wrapped 2
+            .aggregate(() -> "", Named.as("aggregate"), 
Materialized.as("store")) // wrapped 3, store 1
+            .toStream(Named.as("toStream"))// wrapped 4
+            .to("output", Produced.as("sink"));
+
+        final var top = builder.build();
+        System.out.println(top.describe());
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
+            "aggregate-cogroup-agg-0", "aggregate-cogroup-agg-1", 
"aggregate-cogroup-merge", "toStream"
+        ));
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(4));
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
+    }
+
+    @Test
+    public void shouldWrapProcessorsForTableAggregate() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        builder.table("input", Consumed.as("source-table")) // wrapped 1, 
store 1
+            .groupBy(KeyValue::new, Grouped.as("groupBy")) // wrapped 2 
(implicit selectKey)
+            .count(Named.as("count")) // wrapped 3, store 2
+            .toStream(Named.as("toStream"))// wrapped 4
+            .to("output", Produced.as("sink"));
+
+        builder.build();
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
+            "source-table", "groupBy", "count", "toStream"
+        ));
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(4));
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
+    }
+
+    @Test
+    public void shouldWrapProcessorsForTableReduce() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        builder.table("input", Consumed.as("source-table")) // wrapped 1, 
store 1
+            .groupBy(KeyValue::new, Grouped.as("groupBy")) // wrapped 2 
(implicit selectKey)
+            .reduce((l, r) -> "", (l, r) -> "", Named.as("reduce"), 
Materialized.as("store")) // wrapped 3, store 2
+            .toStream(Named.as("toStream"))// wrapped 4
+            .to("output", Produced.as("sink"));
+
+        builder.build();
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
+            "source-table", "groupBy", "reduce", "toStream"
+        ));
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(4));
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
     }
 
     @Test
@@ -1466,49 +1684,76 @@ public class StreamsBuilderTest {
         final Map<Object, Object> props = dummyStreamsConfigMap();
         props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
 
-        final Set<String> wrappedProcessors = Collections.synchronizedSet(new 
HashSet<>());
-        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors);
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
 
         final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
 
-        builder.stream("input")
-            .filter((k, v) -> true, Named.as("filter")) // wrapped 1
+        builder.stream("input", Consumed.as("source"))
+            .filter((k, v) -> true, Named.as("filter-stream")) // wrapped 1
             .map(KeyValue::new, Named.as("map")) // wrapped 2
             .selectKey((k, v) -> k, Named.as("selectKey")) // wrapped 3
             .peek((k, v) -> { }, Named.as("peek")) // wrapped 4
             .flatMapValues(e -> new ArrayList<>(), Named.as("flatMap")) // 
wrapped 5
-            .toTable(Named.as("toTable")) // wrapped 6 (note named as 
toTable-repartition-filter)
+            .toTable(Named.as("toTable")) // should be wrapped when we do 
StreamToTableNode
+            .filter((k, v) -> true, Named.as("filter-table")) // should be 
wrapped once we do TableProcessorNode
             .toStream(Named.as("toStream")) // wrapped 7
-            .to("output");
+            .to("output", Produced.as("sink"));
+
+        builder.build();
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(7));
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
+            "filter-stream", "map", "selectKey", "peek", "flatMap",
+            "toTable-repartition-filter", "toStream"
+        ));
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(0));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(0));
+    }
+
+    @Test
+    public void shouldWrapProcessorsForUnmaterializedSourceTable() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        builder.table("input", Consumed.as("source")) // wrapped 1
+            .toStream(Named.as("toStream")) // wrapped 2
+            .to("output", Produced.as("sink"));
 
         builder.build();
-        assertThat(wrappedProcessors.size(), CoreMatchers.is(7));
-        assertThat(wrappedProcessors, Matchers.containsInAnyOrder(
-                "filter", "map", "selectKey", "peek", "flatMap", 
"toTable-repartition-filter",
-                "toStream"
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
+            "source", "toStream"
         ));
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(0));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(0));
     }
 
     @Test
-    public void shouldWrapProcessorsForTableSource() {
+    public void shouldWrapProcessorsForMaterializedSourceTable() {
         final Map<Object, Object> props = dummyStreamsConfigMap();
         props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
 
-        final Set<String> wrappedProcessors = Collections.synchronizedSet(new 
HashSet<>());
-        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors);
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
 
         final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
 
-        builder.table("input") // wrapped 1 (named KTABLE_SOURCE-0000000002)
-                .toStream(Named.as("toStream")) // wrapped 2
-                .to("output");
+        builder.table("input", Consumed.as("source"), 
Materialized.as("store")) // wrapped 1
+            .toStream(Named.as("toStream")) // wrapped 2
+            .to("output", Produced.as("sink"));
 
         builder.build();
-        assertThat(wrappedProcessors.size(), CoreMatchers.is(2));
-        assertThat(wrappedProcessors, Matchers.containsInAnyOrder(
-                "KTABLE-SOURCE-0000000002",
-                "toStream"
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
+            "source", "toStream"
         ));
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 833eada8d5e..f43747d3cf2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -46,6 +46,7 @@ import 
org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper;
+import 
org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper.WrapperRecorder;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockKeyValueStore;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -2427,8 +2428,8 @@ public class TopologyTest {
         final Map<Object, Object> props = dummyStreamsConfigMap();
         props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
 
-        final Set<String> wrappedProcessors = Collections.synchronizedSet(new 
HashSet<>());
-        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors);
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
 
         final Topology topology = new Topology(new TopologyConfig(new 
StreamsConfig(props)));
 
@@ -2453,8 +2454,8 @@ public class TopologyTest {
             () -> (Processor<Object, Object, Object, Object>) record -> 
System.out.println("Processing: " + random.nextInt()),
             "p2"
         );
-        assertThat(wrappedProcessors.size(), is(3));
-        assertThat(wrappedProcessors, Matchers.containsInAnyOrder("p1", "p2", 
"p3"));
+        assertThat(counter.numWrappedProcessors(), is(3));
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder("p1", "p2", "p3"));
     }
 
     @SuppressWarnings("deprecation")
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index adf7b32c708..f9503fd1564 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -64,6 +64,7 @@ import java.util.stream.Collectors;
 import static java.time.Duration.ofMillis;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.streams.utils.TestUtils.mockStoreFactory;
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.CoreMatchers.is;
@@ -126,7 +127,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
         sessionAggregator = new KStreamSessionWindowAggregate<>(
             SessionWindows.ofInactivityGapWithNoGrace(ofMillis(GAP_MS)),
-            STORE_NAME,
+            mockStoreFactory(STORE_NAME),
             emitStrategy,
             initializer,
             aggregator,
@@ -484,7 +485,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
         setup(inputType, false);
         final Processor<String, String, Windowed<String>, Change<Long>> 
processor = new KStreamSessionWindowAggregate<>(
             SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), 
ofMillis(0L)),
-            STORE_NAME,
+            mockStoreFactory(STORE_NAME),
             EmitStrategy.onWindowUpdate(),
             initializer,
             aggregator,
@@ -551,7 +552,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
         setup(inputType, false);
         final Processor<String, String, Windowed<String>, Change<Long>> 
processor = new KStreamSessionWindowAggregate<>(
             SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), 
ofMillis(1L)),
-            STORE_NAME,
+            mockStoreFactory(STORE_NAME),
             EmitStrategy.onWindowUpdate(),
             initializer,
             aggregator,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 14b77a7cc30..c4fdf3ad86a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -46,6 +46,7 @@ import 
org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForwa
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
@@ -76,6 +77,7 @@ import static java.time.Duration.ofMillis;
 import static java.util.Arrays.asList;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.streams.utils.TestUtils.mockStoreFactory;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.CoreMatchers.hasItems;
@@ -90,6 +92,7 @@ public class KStreamWindowAggregateTest {
     private static final String WINDOW_STORE_NAME = "dummy-store-name";
     private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
+    private final StoreFactory storeFactory = 
mockStoreFactory(WINDOW_STORE_NAME);
 
     public StrategyType type;
 
@@ -646,7 +649,7 @@ public class KStreamWindowAggregateTest {
             final MockInternalNewProcessorContext<Windowed<String>, 
Change<String>> context = makeContext(stateDir, windowSize);
             final KStreamWindowAggregate<String, String, String, TimeWindow> 
processorSupplier = new KStreamWindowAggregate<>(
                 windows,
-                WINDOW_STORE_NAME,
+                storeFactory,
                 emitStrategy,
                 MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER
@@ -736,7 +739,7 @@ public class KStreamWindowAggregateTest {
             final MockInternalNewProcessorContext<Windowed<String>, 
Change<String>> context = makeContext(stateDir, windowSize);
             final KStreamWindowAggregate<String, String, String, TimeWindow> 
processorSupplier = new KStreamWindowAggregate<>(
                 windows,
-                WINDOW_STORE_NAME,
+                storeFactory,
                 emitStrategy,
                 MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER
@@ -805,7 +808,7 @@ public class KStreamWindowAggregateTest {
             final MockInternalNewProcessorContext<Windowed<String>, 
Change<String>> context = makeContext(stateDir, windowSize);
             final KStreamWindowAggregate<String, String, String, TimeWindow> 
processorSupplier = new KStreamWindowAggregate<>(
                 windows,
-                WINDOW_STORE_NAME,
+                storeFactory,
                 emitStrategy,
                 MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER
@@ -906,7 +909,7 @@ public class KStreamWindowAggregateTest {
             final MockInternalNewProcessorContext<Windowed<String>, 
Change<String>> context = makeContext(stateDir, windowSize);
             final KStreamWindowAggregate<String, String, String, TimeWindow> 
processorSupplier = new KStreamWindowAggregate<>(
                 windows,
-                WINDOW_STORE_NAME,
+                storeFactory,
                 emitStrategy,
                 MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER
@@ -982,7 +985,7 @@ public class KStreamWindowAggregateTest {
             final IllegalArgumentException e = assertThrows(
                 IllegalArgumentException.class, () -> new 
KStreamWindowAggregate<>(
                     UnlimitedWindows.of(),
-                    WINDOW_STORE_NAME,
+                    storeFactory,
                     emitStrategy,
                     MockInitializer.STRING_INIT,
                     MockAggregator.TOSTRING_ADDER)
@@ -992,7 +995,7 @@ public class KStreamWindowAggregateTest {
         } else {
             new KStreamWindowAggregate<>(
                 UnlimitedWindows.of(),
-                WINDOW_STORE_NAME,
+                storeFactory,
                 emitStrategy,
                 MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
index de437ed66a5..5f1c8489dab 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
@@ -42,7 +43,7 @@ public class KTableReduceTest {
 
         final Processor<String, Change<Set<String>>, String, 
Change<Set<String>>> reduceProcessor =
             new KTableReduce<String, Set<String>>(
-                "myStore",
+                new MaterializedInternal<>(Materialized.as("myStore")),
                 this::unionNotNullArgs,
                 this::differenceNotNullArgs
             ).get();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index 1541dce30ba..8552790692c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -30,6 +30,7 @@ import 
org.apache.kafka.streams.processor.internals.StoreFactory;
 import org.junit.jupiter.api.Test;
 
 import static java.time.Duration.ofMillis;
+import static org.apache.kafka.streams.utils.TestUtils.mockStoreFactory;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -82,7 +83,7 @@ public class GraphGraceSearchUtilTest {
             new ProcessorParameters<>(
                 new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
                     windows,
-                    "asdf",
+                    mockStoreFactory("asdf"),
                     EmitStrategy.onWindowUpdate(),
                     null,
                     null
@@ -105,7 +106,7 @@ public class GraphGraceSearchUtilTest {
             new ProcessorParameters<>(
                 new KStreamSessionWindowAggregate<String, Long, Integer>(
                     windows,
-                    "asdf",
+                    mockStoreFactory("asdf"),
                     EmitStrategy.onWindowUpdate(),
                     null,
                     null,
@@ -126,7 +127,7 @@ public class GraphGraceSearchUtilTest {
         final StatefulProcessorNode<String, Long> graceGrandparent = new 
StatefulProcessorNode<>(
             "asdf",
             new ProcessorParameters<>(new 
KStreamSessionWindowAggregate<String, Long, Integer>(
-                windows, "asdf", EmitStrategy.onWindowUpdate(), null, null, 
null
+                windows, mockStoreFactory("asdf"), 
EmitStrategy.onWindowUpdate(), null, null, null
             ), "asdf"),
             (StoreFactory) null
         );
@@ -161,7 +162,7 @@ public class GraphGraceSearchUtilTest {
             new ProcessorParameters<>(
                 new KStreamSessionWindowAggregate<String, Long, Integer>(
                     windows,
-                    "asdf",
+                    mockStoreFactory("asdf"),
                     EmitStrategy.onWindowUpdate(),
                     null,
                     null,
@@ -189,7 +190,7 @@ public class GraphGraceSearchUtilTest {
             new ProcessorParameters<>(
                 new KStreamSessionWindowAggregate<String, Long, Integer>(
                     SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), 
ofMillis(1234L)),
-                    "asdf",
+                    mockStoreFactory("asdf"),
                     EmitStrategy.onWindowUpdate(),
                     null,
                     null,
@@ -205,7 +206,7 @@ public class GraphGraceSearchUtilTest {
             new ProcessorParameters<>(
                 new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
                     TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(4321L)),
-                    "asdf",
+                    mockStoreFactory("asdf"),
                     EmitStrategy.onWindowUpdate(),
                     null,
                     null
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java 
b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
index 24ac2f2306d..6ecc6b7c7ac 100644
--- a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
@@ -18,13 +18,19 @@ package org.apache.kafka.streams.utils;
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
 import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.ProcessorWrapper;
 import org.apache.kafka.streams.processor.api.WrappedFixedKeyProcessorSupplier;
 import org.apache.kafka.streams.processor.api.WrappedProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.StoreBuilder;
+import 
org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper.WrapperRecorder;
 
 import org.junit.jupiter.api.TestInfo;
+import org.mockito.Mockito;
 
 import java.lang.reflect.Method;
 import java.time.Duration;
@@ -43,7 +49,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 public class TestUtils {
 
-    public static final String PROCESSOR_WRAPPER_COUNTER_CONFIG = 
"wrapped.processor.count";
+    public static final String PROCESSOR_WRAPPER_COUNTER_CONFIG = 
"wrapped.counter";
 
     /**
      * Waits for the given {@link KafkaStreams} instances to all be in a 
specific {@link KafkaStreams.State}.
@@ -111,6 +117,12 @@ public class TestUtils {
         return baseConfigs;
     }
 
+    public static StoreFactory mockStoreFactory(final String name) {
+        final StoreFactory storeFactory = Mockito.mock(StoreFactory.class);
+        Mockito.when(storeFactory.name()).thenReturn(name);
+        return storeFactory;
+    }
+
     /**
      * Simple pass-through processor wrapper that counts the number of 
processors
      * it wraps.
@@ -119,29 +131,142 @@ public class TestUtils {
      */
     public static class RecordingProcessorWrapper implements ProcessorWrapper {
 
-        private Set<String> wrappedProcessorNames;
+        private WrapperRecorder recorder;
 
         @Override
         public void configure(final Map<String, ?> configs) {
             if (configs.containsKey(PROCESSOR_WRAPPER_COUNTER_CONFIG)) {
-                wrappedProcessorNames = (Set<String>) 
configs.get(PROCESSOR_WRAPPER_COUNTER_CONFIG);
+                recorder = (WrapperRecorder) 
configs.get(PROCESSOR_WRAPPER_COUNTER_CONFIG);
             } else {
-                wrappedProcessorNames = Collections.synchronizedSet(new 
HashSet<>());
+                recorder = new WrapperRecorder();
             }
         }
 
+        public static class WrapperRecorder {
+            private final Set<String> uniqueStores = new HashSet<>();
+            private final Set<String> processorStoresCounted = new HashSet<>();
+            private final Set<String> wrappedProcessorNames = 
Collections.synchronizedSet(new HashSet<>());
+
+            public void wrapProcessorSupplier(final String name) {
+                wrappedProcessorNames.add(name);
+            }
+
+            public void wrapStateStore(final String processorName, final 
String storeName) {
+                if (!uniqueStores.contains(storeName)) {
+                    uniqueStores.add(storeName);
+                }
+
+                final String processorStoreKey = processorName + storeName;
+                if (!processorStoresCounted.contains(processorStoreKey)) {
+                    processorStoresCounted.add(processorStoreKey);
+                }
+            }
+
+            public int numWrappedProcessors() {
+                return wrappedProcessorNames.size();
+            }
+
+            // Number of unique state stores in the topology connected to 
their processors via the
+            // ProcessorSupplier#stores method. State stores connected to more 
than one processor are
+            // counted only once
+            public int numUniqueStateStores() {
+                return uniqueStores.size();
+            }
+
+            // Number of stores connected to a processor via the 
ProcessorSupplier#stores method (ie the size
+            // of the set returned by #stores), summed across all processors 
in the topology.
+            // Equal to the number of unique <processorName>-<storeName>
+            // pairings. Will be greater than or equal to the value of 
#numUniqueStateStores, as this method
+            // will double count any stores connected to more than one 
processor
+            public int numConnectedStateStores() {
+                return processorStoresCounted.size();
+            }
+
+            public Set<String> wrappedProcessorNames() {
+                return wrappedProcessorNames;
+            }
+
+        }
+
         @Override
         public <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, 
VOut> wrapProcessorSupplier(final String processorName,
                                                                                
                            final ProcessorSupplier<KIn, VIn, KOut, VOut> 
processorSupplier) {
-            wrappedProcessorNames.add(processorName);
-            return ProcessorWrapper.asWrapped(processorSupplier);
+
+            return new CountingDelegatingProcessorSupplier<>(recorder, 
processorName, processorSupplier);
         }
 
         @Override
         public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, 
VOut> wrapFixedKeyProcessorSupplier(final String processorName,
                                                                                
                                final FixedKeyProcessorSupplier<KIn, VIn, VOut> 
processorSupplier) {
-            wrappedProcessorNames.add(processorName);
-            return ProcessorWrapper.asWrappedFixedKey(processorSupplier);
+            return new CountingDelegatingFixedKeyProcessorSupplier<>(recorder, 
processorName, processorSupplier);
+        }
+    }
+
+    private static class CountingDelegatingProcessorSupplier<KIn, VIn, KOut, 
VOut>
+        implements WrappedProcessorSupplier<KIn, VIn, KOut, VOut> {
+
+        private final WrapperRecorder counter;
+        private final String processorName;
+        private final ProcessorSupplier<KIn, VIn, KOut, VOut> delegate;
+
+        public CountingDelegatingProcessorSupplier(final WrapperRecorder 
counter,
+                                                   final String processorName,
+                                                   final 
ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier) {
+            this.counter = counter;
+            this.processorName = processorName;
+            this.delegate = processorSupplier;
+
+            counter.wrapProcessorSupplier(processorName);
+        }
+
+        @Override
+        public Set<StoreBuilder<?>> stores() {
+            final Set<StoreBuilder<?>> stores = delegate.stores();
+            if (stores != null) {
+                for (final StoreBuilder<?> store : stores) {
+                    counter.wrapStateStore(processorName, store.name());
+                }
+            }
+            return stores;
+        }
+
+        @Override
+        public Processor<KIn, VIn, KOut, VOut> get() {
+            return delegate.get();
+        }
+    }
+
+    private static class CountingDelegatingFixedKeyProcessorSupplier<KIn, VIn, 
VOut>
+        implements WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> {
+
+        private final WrapperRecorder counter;
+        private final String processorName;
+        private final FixedKeyProcessorSupplier<KIn, VIn, VOut> delegate;
+
+        public CountingDelegatingFixedKeyProcessorSupplier(final 
WrapperRecorder counter,
+                                                           final String 
processorName,
+                                                           final 
FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier) {
+            this.counter = counter;
+            this.processorName = processorName;
+            this.delegate = processorSupplier;
+
+            counter.wrapProcessorSupplier(processorName);
+        }
+
+        @Override
+        public Set<StoreBuilder<?>> stores() {
+            final Set<StoreBuilder<?>> stores = delegate.stores();
+            if (stores != null) {
+                for (final StoreBuilder<?> store : stores) {
+                    counter.wrapStateStore(processorName, store.name());
+                }
+            }
+            return stores;
+        }
+
+        @Override
+        public FixedKeyProcessor<KIn, VIn, VOut> get() {
+            return delegate.get();
         }
     }
 }


Reply via email to