This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 31d97bc3c99 KAFKA-18026: KIP-1112, skip re-registering aggregate 
stores in StatefulProcessorNode (#18015)
31d97bc3c99 is described below

commit 31d97bc3c99f543d7a3ca148361e5f346c50fde3
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Dec 3 22:18:55 2024 -0800

    KAFKA-18026: KIP-1112, skip re-registering aggregate stores in 
StatefulProcessorNode (#18015)
    
    Minor followup to #17929 based on this discussion
    
    Also includes some very minor refactoring/renaming on the side. The only 
real change is in the KGroupedStreamImpl class
    
    Reviewers: Guozhang Wang <[email protected]>
---
 .../internals/CogroupedStreamAggregateBuilder.java | 18 ++++++-------
 .../internals/GroupedStreamAggregateBuilder.java   |  6 ++---
 .../kstream/internals/KGroupedStreamImpl.java      | 31 +++++++++++++---------
 .../kstream/internals/KStreamAggregate.java        | 13 +--------
 .../streams/kstream/internals/KStreamImplJoin.java | 18 ++++++-------
 .../streams/kstream/internals/KStreamReduce.java   |  8 +++---
 .../internals/KStreamSessionWindowAggregate.java   |  2 +-
 .../internals/KStreamSlidingWindowAggregate.java   |  2 +-
 .../kstream/internals/KStreamWindowAggregate.java  |  2 +-
 .../internals/KeyValueStoreMaterializer.java       |  5 ++--
 .../internals/MaterializedStoreFactory.java        | 20 +++++++++++++-
 .../internals/OuterStreamJoinStoreFactory.java     |  7 +++--
 .../internals/SessionStoreMaterializer.java        |  5 ++--
 .../internals/SlidingWindowStoreMaterializer.java  |  5 ++--
 .../internals/StreamJoinedStoreFactory.java        |  7 +++--
 .../internals/SubscriptionStoreFactory.java        |  7 +++--
 .../kstream/internals/WindowStoreMaterializer.java |  5 ++--
 .../kstream/internals/graph/StateStoreNode.java    |  4 +--
 .../internals/graph/TableProcessorNode.java        |  2 +-
 .../internals/InternalTopologyBuilder.java         | 31 +++++++++++-----------
 .../processor/internals/StoreBuilderWrapper.java   |  7 +++--
 .../streams/processor/internals/StoreFactory.java  |  8 +++---
 .../internals/InternalTopologyBuilderTest.java     | 14 +++++-----
 .../internals/KeyValueStoreMaterializerTest.java   |  4 +--
 .../org/apache/kafka/streams/utils/TestUtils.java  |  2 +-
 25 files changed, 118 insertions(+), 115 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
index 5b294c39d68..126df7de17b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
@@ -58,7 +58,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                                 final Serde<VOut> valueSerde,
                                 final String queryableName,
                                 final boolean isOutputVersioned) {
-        processRepartitions(groupPatterns, storeFactory.name());
+        processRepartitions(groupPatterns, storeFactory.storeName());
         final Collection<GraphNode> processors = new ArrayList<>();
         final Collection<KStreamAggProcessorSupplier> parentProcessors = new 
ArrayList<>();
         boolean stateCreated = false;
@@ -80,7 +80,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
             processors.add(statefulProcessorNode);
             builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), 
statefulProcessorNode);
         }
-        return createTable(processors, parentProcessors, named, keySerde, 
valueSerde, queryableName, storeFactory.name());
+        return createTable(processors, parentProcessors, named, keySerde, 
valueSerde, queryableName, storeFactory.storeName());
     }
 
     @SuppressWarnings("unchecked")
@@ -92,7 +92,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                                                   final Serde<VOut> valueSerde,
                                                   final String queryableName,
                                                   final Windows<W> windows) {
-        processRepartitions(groupPatterns, storeFactory.name());
+        processRepartitions(groupPatterns, storeFactory.storeName());
 
         final Collection<GraphNode> processors = new ArrayList<>();
         final Collection<KStreamAggProcessorSupplier> parentProcessors = new 
ArrayList<>();
@@ -119,7 +119,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
             processors.add(statefulProcessorNode);
             builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), 
statefulProcessorNode);
         }
-        return createTable(processors, parentProcessors, named, keySerde, 
valueSerde, queryableName, storeFactory.name());
+        return createTable(processors, parentProcessors, named, keySerde, 
valueSerde, queryableName, storeFactory.storeName());
     }
 
     @SuppressWarnings("unchecked")
@@ -132,7 +132,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                                 final String queryableName,
                                 final SessionWindows sessionWindows,
                                 final Merger<? super K, VOut> sessionMerger) {
-        processRepartitions(groupPatterns, storeFactory.name());
+        processRepartitions(groupPatterns, storeFactory.storeName());
         final Collection<GraphNode> processors = new ArrayList<>();
         final Collection<KStreamAggProcessorSupplier> parentProcessors = new 
ArrayList<>();
         boolean stateCreated = false;
@@ -159,7 +159,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
             processors.add(statefulProcessorNode);
             builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), 
statefulProcessorNode);
         }
-        return createTable(processors, parentProcessors, named, keySerde, 
valueSerde, queryableName, storeFactory.name());
+        return createTable(processors, parentProcessors, named, keySerde, 
valueSerde, queryableName, storeFactory.storeName());
     }
 
     @SuppressWarnings("unchecked")
@@ -171,7 +171,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                                 final Serde<VOut> valueSerde,
                                 final String queryableName,
                                 final SlidingWindows slidingWindows) {
-        processRepartitions(groupPatterns, storeFactory.name());
+        processRepartitions(groupPatterns, storeFactory.storeName());
         final Collection<KStreamAggProcessorSupplier> parentProcessors = new 
ArrayList<>();
         final Collection<GraphNode> processors = new ArrayList<>();
         boolean stateCreated = false;
@@ -198,7 +198,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
             processors.add(statefulProcessorNode);
             builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), 
statefulProcessorNode);
         }
-        return createTable(processors, parentProcessors, named, keySerde, 
valueSerde, queryableName, storeFactory.name());
+        return createTable(processors, parentProcessors, named, keySerde, 
valueSerde, queryableName, storeFactory.storeName());
     }
 
     private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, 
Aggregator<? super K, ? super Object, VOut>> groupPatterns,
@@ -279,7 +279,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                 new StatefulProcessorNode<>(
                     processorName,
                     new ProcessorParameters<>(kStreamAggregate, processorName),
-                    new String[]{storeFactory.name()}
+                    new String[]{storeFactory.storeName()}
                 );
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index 8217bd025bb..c3360c9c013 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -73,7 +73,7 @@ class GroupedStreamAggregateBuilder<K, V> {
                                   final Serde<KR> keySerde,
                                   final Serde<VR> valueSerde,
                                   final boolean isOutputVersioned) {
-        assert queryableStoreName == null || 
queryableStoreName.equals(storeFactory.name());
+        assert queryableStoreName == null || 
queryableStoreName.equals(storeFactory.storeName());
 
         final String aggFunctionName = functionName.name();
 
@@ -82,7 +82,7 @@ class GroupedStreamAggregateBuilder<K, V> {
 
         if (repartitionRequired) {
             final OptimizableRepartitionNodeBuilder<K, V> 
repartitionNodeBuilder = optimizableRepartitionNodeBuilder();
-            final String repartitionTopicPrefix = 
userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : 
storeFactory.name();
+            final String repartitionTopicPrefix = 
userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : 
storeFactory.storeName();
             sourceName = createRepartitionSource(repartitionTopicPrefix, 
repartitionNodeBuilder);
 
             // First time through we need to create a repartition node.
@@ -101,7 +101,7 @@ class GroupedStreamAggregateBuilder<K, V> {
             new StatefulProcessorNode<>(
                 aggFunctionName,
                 new ProcessorParameters<>(aggregateSupplier, aggFunctionName),
-                storeFactory
+                new String[] {storeFactory.storeName()}
             );
         statefulProcessorNode.setOutputVersioned(isOutputVersioned);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 256153708a0..cc335e1383d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -97,10 +97,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> 
implements KGroupedS
         }
 
         final String name = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
+        final KeyValueStoreMaterializer<K, V> storeFactory = new 
KeyValueStoreMaterializer<>(materializedInternal);
+
         return doAggregate(
-            new KStreamReduce<>(materializedInternal, reducer),
+            new KStreamReduce<>(storeFactory, reducer),
             name,
-            materializedInternal
+            storeFactory
         );
     }
 
@@ -129,10 +131,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, 
V> implements KGroupedS
         }
 
         final String name = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+        final KeyValueStoreMaterializer<K, VR> storeFactory = new 
KeyValueStoreMaterializer<>(materializedInternal);
+
         return doAggregate(
-            new KStreamAggregate<>(materializedInternal, initializer, 
aggregator),
+            new KStreamAggregate<>(storeFactory, initializer, aggregator),
             name,
-            materializedInternal
+            storeFactory
         );
     }
 
@@ -183,10 +187,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, 
V> implements KGroupedS
         }
 
         final String name = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+        final KeyValueStoreMaterializer<K, Long> storeFactory = new 
KeyValueStoreMaterializer<>(materializedInternal);
+
         return doAggregate(
-            new KStreamAggregate<>(materializedInternal, 
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+            new KStreamAggregate<>(storeFactory, 
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
             name,
-            materializedInternal);
+            storeFactory);
     }
 
     @Override
@@ -236,15 +242,16 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, 
V> implements KGroupedS
 
     private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, 
V, K, T> aggregateSupplier,
                                          final String functionName,
-                                         final MaterializedInternal<K, T, 
KeyValueStore<Bytes, byte[]>> materializedInternal) {
+                                         final KeyValueStoreMaterializer<K, T> 
storeFactory) {
+
         return aggregateBuilder.build(
             new NamedInternal(functionName),
-            new KeyValueStoreMaterializer<>(materializedInternal),
+            storeFactory,
             aggregateSupplier,
-            materializedInternal.queryableStoreName(),
-            materializedInternal.keySerde(),
-            materializedInternal.valueSerde(),
-            materializedInternal.storeSupplier() instanceof 
VersionedBytesStoreSupplier);
+            storeFactory.queryableStoreName(),
+            storeFactory.keySerde(),
+            storeFactory.valueSerde(),
+            storeFactory.storeSupplier() instanceof 
VersionedBytesStoreSupplier);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index abe5fd2b566..bfbd16ffae8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
@@ -28,7 +27,6 @@ import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.StoreFactory;
 import 
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
@@ -55,20 +53,11 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements 
KStreamAggProcessorSupp
 
     private boolean sendOldValues = false;
 
-    KStreamAggregate(final MaterializedInternal<KIn, VAgg, 
KeyValueStore<Bytes, byte[]>> materialized,
-                     final Initializer<VAgg> initializer,
-                     final Aggregator<? super KIn, ? super VIn, VAgg> 
aggregator) {
-        this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
-        this.storeName = materialized.storeName();
-        this.initializer = initializer;
-        this.aggregator = aggregator;
-    }
-
     KStreamAggregate(final StoreFactory storeFactory,
                      final Initializer<VAgg> initializer,
                      final Aggregator<? super KIn, ? super VIn, VAgg> 
aggregator) {
         this.storeFactory = storeFactory;
-        this.storeName = storeFactory.name();
+        this.storeName = storeFactory.storeName();
         this.initializer = initializer;
         this.aggregator = aggregator;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
index 12bb6c19db8..d9008e81c8d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
@@ -146,16 +146,16 @@ class KStreamImplJoin {
             otherWindowStore = 
joinWindowStoreBuilderFromSupplier(otherStoreSupplier, 
streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde());
         }
 
-        final KStreamJoinWindow<K, V1> thisWindowedStream = new 
KStreamJoinWindow<>(thisWindowStore.name());
+        final KStreamJoinWindow<K, V1> thisWindowedStream = new 
KStreamJoinWindow<>(thisWindowStore.storeName());
 
         final ProcessorParameters<K, V1, ?, ?> thisWindowStreamProcessorParams 
= new ProcessorParameters<>(thisWindowedStream, thisWindowStreamProcessorName);
-        final ProcessorGraphNode<K, V1> thisWindowedStreamsNode = new 
WindowedStreamProcessorNode<>(thisWindowStore.name(), 
thisWindowStreamProcessorParams);
+        final ProcessorGraphNode<K, V1> thisWindowedStreamsNode = new 
WindowedStreamProcessorNode<>(thisWindowStore.storeName(), 
thisWindowStreamProcessorParams);
         builder.addGraphNode(thisGraphNode, thisWindowedStreamsNode);
 
-        final KStreamJoinWindow<K, V2> otherWindowedStream = new 
KStreamJoinWindow<>(otherWindowStore.name());
+        final KStreamJoinWindow<K, V2> otherWindowedStream = new 
KStreamJoinWindow<>(otherWindowStore.storeName());
 
         final ProcessorParameters<K, V2, ?, ?> 
otherWindowStreamProcessorParams = new 
ProcessorParameters<>(otherWindowedStream, otherWindowStreamProcessorName);
-        final ProcessorGraphNode<K, V2> otherWindowedStreamsNode = new 
WindowedStreamProcessorNode<>(otherWindowStore.name(), 
otherWindowStreamProcessorParams);
+        final ProcessorGraphNode<K, V2> otherWindowedStreamsNode = new 
WindowedStreamProcessorNode<>(otherWindowStore.storeName(), 
otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
         Optional<StoreFactory> outerJoinWindowStore = Optional.empty();
@@ -173,25 +173,25 @@ class KStreamImplJoin {
 
         final JoinWindowsInternal internalWindows = new 
JoinWindowsInternal(windows);
         final KStreamKStreamJoinLeftSide<K, V1, V2, VOut> joinThis = new 
KStreamKStreamJoinLeftSide<>(
-            otherWindowStore.name(),
+            otherWindowStore.storeName(),
             internalWindows,
             joiner,
             leftOuter,
-            outerJoinWindowStore.map(StoreFactory::name),
+            outerJoinWindowStore.map(StoreFactory::storeName),
             sharedTimeTrackerSupplier
         );
 
         final KStreamKStreamJoinRightSide<K, V1, V2, VOut> joinOther = new 
KStreamKStreamJoinRightSide<>(
-            thisWindowStore.name(),
+            thisWindowStore.storeName(),
             internalWindows,
             AbstractStream.reverseJoinerWithKey(joiner),
             rightOuter,
-            outerJoinWindowStore.map(StoreFactory::name),
+            outerJoinWindowStore.map(StoreFactory::storeName),
             sharedTimeTrackerSupplier
         );
 
         final KStreamKStreamSelfJoin<K, V1, V2, VOut> selfJoin = new 
KStreamKStreamSelfJoin<>(
-            thisWindowStore.name(),
+            thisWindowStore.storeName(),
             internalWindows,
             joiner,
             windows.size() + windows.gracePeriodMs()
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index f337cd9ae44..2f04a8ea65e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.Processor;
@@ -27,7 +26,6 @@ import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.StoreFactory;
 import 
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
@@ -53,9 +51,9 @@ public class KStreamReduce<K, V> implements 
KStreamAggProcessorSupplier<K, V, K,
 
     private boolean sendOldValues = false;
 
-    KStreamReduce(final MaterializedInternal<K, V, KeyValueStore<Bytes, 
byte[]>> materialized, final Reducer<V> reducer) {
-        this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
-        this.storeName = materialized.storeName();
+    KStreamReduce(final StoreFactory storeFactory, final Reducer<V> reducer) {
+        this.storeFactory = storeFactory;
+        this.storeName = storeFactory.storeName();
         this.reducer = reducer;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 4a8040a8d37..f3ca9e6740a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -75,7 +75,7 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
                                          final Aggregator<? super KIn, ? super 
VIn, VAgg> aggregator,
                                          final Merger<? super KIn, VAgg> 
sessionMerger) {
         this.windows = windows;
-        this.storeName = storeFactory.name();
+        this.storeName = storeFactory.storeName();
         this.storeFactory = storeFactory;
         this.emitStrategy = emitStrategy;
         this.initializer = initializer;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
index 894657da48c..93935cbc1f0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -64,7 +64,7 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
                                          final Initializer<VAgg> initializer,
                                          final Aggregator<? super KIn, ? super 
VIn, VAgg> aggregator) {
         this.windows = windows;
-        this.storeName = storeFactory.name();
+        this.storeName = storeFactory.storeName();
         this.storeFactory = storeFactory;
         this.initializer = initializer;
         this.aggregator = aggregator;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 2e6147627e9..adb174c4ccd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -63,7 +63,7 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends 
Window> implements
                                   final Initializer<VAgg> initializer,
                                   final Aggregator<? super KIn, ? super VIn, 
VAgg> aggregator) {
         this.windows = windows;
-        this.storeName = storeFactory.name();
+        this.storeName = storeFactory.storeName();
         this.storeFactory = storeFactory;
         this.emitStrategy = emitStrategy;
         this.initializer = initializer;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index 3927e95c25b..d59d34e0e90 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.DslKeyValueParams;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -44,7 +43,7 @@ public class KeyValueStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K,
     }
 
     @Override
-    public StateStore build() {
+    public StoreBuilder<?> builder() {
         final KeyValueBytesStoreSupplier supplier = 
materialized.storeSupplier() == null
                 ? dslStoreSuppliers().keyValueStore(new 
DslKeyValueParams(materialized.storeName(), true))
                 : (KeyValueBytesStoreSupplier) materialized.storeSupplier();
@@ -77,7 +76,7 @@ public class KeyValueStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K,
         }
 
 
-        return builder.build();
+        return builder;
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
index 99bd2e848b7..83cb6606790 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.StoreSupplier;
 
 import java.util.Map;
 
@@ -39,10 +41,26 @@ public abstract class MaterializedStoreFactory<K, V, S 
extends StateStore> exten
     }
 
     @Override
-    public String name() {
+    public String storeName() {
         return materialized.storeName();
     }
 
+    public String queryableStoreName() {
+        return materialized.queryableStoreName();
+    }
+
+    public Serde<K> keySerde() {
+        return materialized.keySerde();
+    }
+
+    public Serde<V> valueSerde() {
+        return materialized.valueSerde();
+    }
+
+    public StoreSupplier<S> storeSupplier() {
+        return materialized.storeSupplier();
+    }
+
     @Override
     public Map<String, String> logConfig() {
         return materialized.logConfig();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
index d864698408b..645858d1a65 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.kstream.JoinWindows;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.StoreFactory;
 import org.apache.kafka.streams.state.DslKeyValueParams;
 import org.apache.kafka.streams.state.DslStoreSuppliers;
@@ -73,7 +72,7 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends 
AbstractConfigurable
     }
 
     @Override
-    public StateStore build() {
+    public StoreBuilder<?> builder() {
         final Duration retentionPeriod = Duration.ofMillis(retentionPeriod());
         final Duration windowSize = Duration.ofMillis(windows.size());
         final String rpMsgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
@@ -135,7 +134,7 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends 
AbstractConfigurable
             builder.withLoggingDisabled();
         }
 
-        return builder.build();
+        return builder;
     }
 
     @Override
@@ -155,7 +154,7 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends 
AbstractConfigurable
     }
 
     @Override
-    public String name() {
+    public String storeName() {
         return name;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
index 9f63b3fc279..a5317f48880 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.SessionWindows;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.DslSessionParams;
 import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
@@ -58,7 +57,7 @@ public class SessionStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K,
     }
 
     @Override
-    public StateStore build() {
+    public StoreBuilder<?> builder() {
         final SessionBytesStoreSupplier supplier = 
materialized.storeSupplier() == null
                 ? dslStoreSuppliers().sessionStore(new DslSessionParams(
                         materialized.storeName(),
@@ -85,7 +84,7 @@ public class SessionStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K,
             builder.withCachingDisabled();
         }
 
-        return builder.build();
+        return builder;
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
index cea18f96d37..0aca2643be7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.SlidingWindows;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.DslWindowParams;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
@@ -58,7 +57,7 @@ public class SlidingWindowStoreMaterializer<K, V> extends 
MaterializedStoreFacto
     }
 
     @Override
-    public StateStore build() {
+    public StoreBuilder<?> builder() {
         final WindowBytesStoreSupplier supplier = materialized.storeSupplier() 
== null
                 ? dslStoreSuppliers().windowStore(new DslWindowParams(
                         materialized.storeName(),
@@ -91,7 +90,7 @@ public class SlidingWindowStoreMaterializer<K, V> extends 
MaterializedStoreFacto
             builder.withCachingDisabled();
         }
 
-        return builder.build();
+        return builder;
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java
index b6e969572c8..4da99a71d61 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.JoinWindows;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.StoreFactory;
 import org.apache.kafka.streams.state.DslWindowParams;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -81,7 +80,7 @@ public class StreamJoinedStoreFactory<K, V1, V2> extends 
AbstractConfigurableSto
     }
 
     @Override
-    public StateStore build() {
+    public StoreBuilder<?> builder() {
         final WindowBytesStoreSupplier supplier = storeSupplier == null
                 ? dslStoreSuppliers().windowStore(new DslWindowParams(
                         this.name,
@@ -106,7 +105,7 @@ public class StreamJoinedStoreFactory<K, V1, V2> extends 
AbstractConfigurableSto
             builder.withLoggingDisabled();
         }
 
-        return builder.build();
+        return builder;
     }
 
     @Override
@@ -126,7 +125,7 @@ public class StreamJoinedStoreFactory<K, V1, V2> extends 
AbstractConfigurableSto
     }
 
     @Override
-    public String name() {
+    public String storeName() {
         return name;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
index f3c424efb3e..10c8a5e110c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
@@ -20,7 +20,6 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.StoreFactory;
 import org.apache.kafka.streams.state.DslKeyValueParams;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -45,7 +44,7 @@ public class SubscriptionStoreFactory<K> extends 
AbstractConfigurableStoreFactor
     }
 
     @Override
-    public StateStore build() {
+    public StoreBuilder<?> builder() {
         StoreBuilder<?> builder;
         builder = Stores.timestampedKeyValueStoreBuilder(
             dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name, 
true)),
@@ -58,7 +57,7 @@ public class SubscriptionStoreFactory<K> extends 
AbstractConfigurableStoreFactor
             builder = builder.withLoggingDisabled();
         }
         builder = builder.withCachingDisabled();
-        return builder.build();
+        return builder;
     }
 
     @Override
@@ -78,7 +77,7 @@ public class SubscriptionStoreFactory<K> extends 
AbstractConfigurableStoreFactor
     }
 
     @Override
-    public String name() {
+    public String storeName() {
         return name;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
index eabce874f70..2b9f3d33814 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.Windows;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.DslWindowParams;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
@@ -56,7 +55,7 @@ public class WindowStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K, V
     }
 
     @Override
-    public StateStore build() {
+    public StoreBuilder<?> builder() {
         final WindowBytesStoreSupplier supplier = materialized.storeSupplier() 
== null
                 ? dslStoreSuppliers().windowStore(new DslWindowParams(
                         materialized.storeName(),
@@ -85,7 +84,7 @@ public class WindowStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K, V
             builder.withCachingEnabled();
         }
 
-        return builder.build();
+        return builder;
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
index fb3cec2dde4..05375d35efe 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
@@ -25,7 +25,7 @@ public class StateStoreNode<S extends StateStore> extends 
GraphNode {
     protected final StoreFactory storeBuilder;
 
     public StateStoreNode(final StoreFactory storeBuilder) {
-        super(storeBuilder.name());
+        super(storeBuilder.storeName());
 
         this.storeBuilder = storeBuilder;
     }
@@ -38,7 +38,7 @@ public class StateStoreNode<S extends StateStore> extends 
GraphNode {
     @Override
     public String toString() {
         return "StateStoreNode{" +
-               " name='" + storeBuilder.name() +  '\'' +
+               " name='" + storeBuilder.storeName() +  '\'' +
                ", logConfig=" + storeBuilder.logConfig() +
                ", loggingEnabled='" + storeBuilder.loggingEnabled() + '\'' +
                "} ";
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
index ccd87855a07..b47252068e6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
@@ -54,7 +54,7 @@ public class TableProcessorNode<K, V> extends GraphNode {
     public String toString() {
         return "TableProcessorNode{" +
             ", processorParameters=" + processorParameters +
-            ", storeFactory=" + (storeFactory == null ? "null" : 
storeFactory.name()) +
+            ", storeFactory=" + (storeFactory == null ? "null" : 
storeFactory.storeName()) +
             ", storeNames=" + Arrays.toString(storeNames) +
             "} " + super.toString();
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 9f65a415d95..eeb076fc0cf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyConfig;
-import org.apache.kafka.streams.TopologyDescription;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.internals.ApiUtils;
 import org.apache.kafka.streams.processor.StateStore;
@@ -433,7 +432,7 @@ public class InternalTopologyBuilder {
         // build global state stores
         for (final StoreFactory storeFactory : globalStateBuilders.values()) {
             storeFactory.configure(config);
-            globalStateStores.put(storeFactory.name(), storeFactory.build());
+            globalStateStores.put(storeFactory.storeName(), 
storeFactory.builder().build());
         }
 
         return this;
@@ -620,20 +619,20 @@ public class InternalTopologyBuilder {
                                     final boolean allowOverride,
                                     final String... processorNames) {
         Objects.requireNonNull(storeFactory, "stateStoreFactory can't be 
null");
-        final StoreFactory stateFactory = 
stateFactories.get(storeFactory.name());
+        final StoreFactory stateFactory = 
stateFactories.get(storeFactory.storeName());
         if (!allowOverride && stateFactory != null && 
!stateFactory.isCompatibleWith(storeFactory)) {
-            throw new TopologyException("A different StateStore has already 
been added with the name " + storeFactory.name());
+            throw new TopologyException("A different StateStore has already 
been added with the name " + storeFactory.storeName());
         }
-        if (globalStateBuilders.containsKey(storeFactory.name())) {
-            throw new TopologyException("A different GlobalStateStore has 
already been added with the name " + storeFactory.name());
+        if (globalStateBuilders.containsKey(storeFactory.storeName())) {
+            throw new TopologyException("A different GlobalStateStore has 
already been added with the name " + storeFactory.storeName());
         }
 
-        stateFactories.put(storeFactory.name(), storeFactory);
+        stateFactories.put(storeFactory.storeName(), storeFactory);
 
         if (processorNames != null) {
             for (final String processorName : processorNames) {
                 Objects.requireNonNull(processorName, "processor name must not 
be null");
-                connectProcessorAndStateStore(processorName, 
storeFactory.name());
+                connectProcessorAndStateStore(processorName, 
storeFactory.storeName());
             }
         }
         nodeGroups = null;
@@ -660,7 +659,7 @@ public class InternalTopologyBuilder {
                                      topic,
                                      processorName,
                                      stateUpdateSupplier,
-                                     storeFactory.name(),
+                                     storeFactory.storeName(),
                                      storeFactory.loggingEnabled());
         validateTopicNotAlreadyRegistered(topic);
 
@@ -682,18 +681,18 @@ public class InternalTopologyBuilder {
             keyDeserializer,
             valueDeserializer)
         );
-        storeNameToReprocessOnRestore.put(storeFactory.name(),
+        storeNameToReprocessOnRestore.put(storeFactory.storeName(),
             reprocessOnRestore ?
                 Optional.of(new ReprocessFactory<>(stateUpdateSupplier, 
keyDeserializer, valueDeserializer))
                 : Optional.empty());
         nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
         nodeGrouper.add(sourceName);
-        nodeFactory.addStateStore(storeFactory.name());
+        nodeFactory.addStateStore(storeFactory.storeName());
         nodeFactories.put(processorName, nodeFactory);
         nodeGrouper.add(processorName);
         nodeGrouper.unite(processorName, predecessors);
-        globalStateBuilders.put(storeFactory.name(), storeFactory);
-        connectSourceStoreAndTopic(storeFactory.name(), topic);
+        globalStateBuilders.put(storeFactory.storeName(), storeFactory);
+        connectSourceStoreAndTopic(storeFactory.storeName(), topic);
         nodeGroups = null;
     }
 
@@ -1158,7 +1157,7 @@ public class InternalTopologyBuilder {
                     if (topologyConfigs != null) {
                         
storeFactory.configure(topologyConfigs.applicationConfigs);
                     }
-                    store = storeFactory.build();
+                    store = storeFactory.builder().build();
                     stateStoreMap.put(stateStoreName, store);
                 } else {
                     store = globalStateStores.get(stateStoreName);
@@ -1258,8 +1257,8 @@ public class InternalTopologyBuilder {
                 // if the node is connected to a state store whose changelog 
topics are not predefined,
                 // add to the changelog topics
                 for (final StoreFactory stateFactory : 
stateFactories.values()) {
-                    if (stateFactory.connectedProcessorNames().contains(node) 
&& storeToChangelogTopic.containsKey(stateFactory.name())) {
-                        final String topicName = 
storeToChangelogTopic.get(stateFactory.name());
+                    if (stateFactory.connectedProcessorNames().contains(node) 
&& storeToChangelogTopic.containsKey(stateFactory.storeName())) {
+                        final String topicName = 
storeToChangelogTopic.get(stateFactory.storeName());
                         if (!stateChangelogTopics.containsKey(topicName)) {
                             final InternalTopicConfig internalTopicConfig =
                                 createChangelogTopicConfig(stateFactory, 
topicName);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
index 4648533af1d..61345d7da9b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
 import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
@@ -51,8 +50,8 @@ public class StoreBuilderWrapper implements StoreFactory {
     }
 
     @Override
-    public StateStore build() {
-        return builder.build();
+    public StoreBuilder<?> builder() {
+        return builder;
     }
 
     @Override
@@ -90,7 +89,7 @@ public class StoreBuilderWrapper implements StoreFactory {
     }
 
     @Override
-    public String name() {
+    public String storeName() {
         return builder.name();
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java
index 7542f4c5bd8..ef6df04c0d6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java
@@ -52,7 +52,7 @@ public interface StoreFactory {
         // do nothing
     }
 
-    StateStore build();
+    StoreBuilder<?> builder();
 
     long retentionPeriod();
 
@@ -62,7 +62,7 @@ public interface StoreFactory {
 
     boolean loggingEnabled();
 
-    String name();
+    String storeName();
 
     boolean isWindowStore();
 
@@ -132,7 +132,7 @@ public interface StoreFactory {
         @SuppressWarnings("unchecked")
         @Override
         public T build() {
-            return (T) storeFactory.build();
+            return (T) storeFactory.builder().build();
         }
 
         @Override
@@ -147,7 +147,7 @@ public interface StoreFactory {
 
         @Override
         public String name() {
-            return storeFactory.name();
+            return storeFactory.storeName();
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index e3add9755ae..b0afe364985 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -572,11 +572,11 @@ public class InternalTopologyBuilderTest {
 
         assertEquals(0, builder.buildTopology().stateStores().size());
 
-        builder.connectProcessorAndStateStores("processor-1", 
storeFactory.name());
+        builder.connectProcessorAndStateStores("processor-1", 
storeFactory.storeName());
 
         final List<StateStore> suppliers = 
builder.buildTopology().stateStores();
         assertEquals(1, suppliers.size());
-        assertEquals(storeFactory.name(), suppliers.get(0).name());
+        assertEquals(storeFactory.storeName(), suppliers.get(0).name());
     }
 
     @Test
@@ -586,14 +586,14 @@ public class InternalTopologyBuilderTest {
 
         builder.addSource(null, "source-1", null, null, null, "topic-1");
         builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), 
"source-1");
-        builder.connectProcessorAndStateStores("processor-1", 
storeFactory.name());
+        builder.connectProcessorAndStateStores("processor-1", 
storeFactory.storeName());
 
         builder.addSource(null, "source-2", null, null, null, "topic-2");
         builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(), 
"source-2");
 
         builder.buildTopology();
         final Set<String> stateStoreNames = 
builder.stateStoreNamesForSubtopology(0);
-        assertThat(stateStoreNames, equalTo(Set.of(storeFactory.name())));
+        assertThat(stateStoreNames, equalTo(Set.of(storeFactory.storeName())));
 
         final Set<String> emptyStoreNames = 
builder.stateStoreNamesForSubtopology(1);
         assertThat(emptyStoreNames, equalTo(Set.of()));
@@ -609,11 +609,11 @@ public class InternalTopologyBuilderTest {
 
         builder.addStateStore(storeFactory);
         builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), 
"source-1");
-        builder.connectProcessorAndStateStores("processor-1", 
storeFactory.name());
+        builder.connectProcessorAndStateStores("processor-1", 
storeFactory.storeName());
 
         builder.addStateStore(storeFactory);
         builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(), 
"source-1");
-        builder.connectProcessorAndStateStores("processor-2", 
storeFactory.name());
+        builder.connectProcessorAndStateStores("processor-2", 
storeFactory.storeName());
 
         assertEquals(1, builder.buildTopology().stateStores().size());
     }
@@ -1196,7 +1196,7 @@ public class InternalTopologyBuilderTest {
         builder.setApplicationId("test-app");
 
         final Map<String, List<String>> stateStoreAndTopics = 
builder.stateStoreNameToFullSourceTopicNames();
-        final List<String> topics = 
stateStoreAndTopics.get(storeFactory.name());
+        final List<String> topics = 
stateStoreAndTopics.get(storeFactory.storeName());
 
         assertEquals(2, topics.size(), "Expected to contain two topics");
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
index 85aa8b5e21f..7228496dd36 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
@@ -263,7 +263,7 @@ public class KeyValueStoreMaterializerTest {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized) {
         final KeyValueStoreMaterializer<String, String> materializer = new 
KeyValueStoreMaterializer<>(materialized);
         materializer.configure(streamsConfig);
-        return (TimestampedKeyValueStore<String, String>) ((StoreFactory) 
materializer).build();
+        return (TimestampedKeyValueStore<String, String>) 
materializer.builder().build();
     }
 
     @SuppressWarnings("unchecked")
@@ -271,6 +271,6 @@ public class KeyValueStoreMaterializerTest {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized) {
         final KeyValueStoreMaterializer<String, String> materializer = new 
KeyValueStoreMaterializer<>(materialized);
         materializer.configure(streamsConfig);
-        return (VersionedKeyValueStore<String, String>) ((StoreFactory) 
materializer).build();
+        return (VersionedKeyValueStore<String, String>) 
materializer.builder().build();
     }
 }
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java 
b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
index 6ecc6b7c7ac..89a229f7ddc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
@@ -119,7 +119,7 @@ public class TestUtils {
 
     public static StoreFactory mockStoreFactory(final String name) {
         final StoreFactory storeFactory = Mockito.mock(StoreFactory.class);
-        Mockito.when(storeFactory.name()).thenReturn(name);
+        Mockito.when(storeFactory.storeName()).thenReturn(name);
         return storeFactory;
     }
 

Reply via email to