This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 31d97bc3c99 KAFKA-18026: KIP-1112, skip re-registering aggregate
stores in StatefulProcessorNode (#18015)
31d97bc3c99 is described below
commit 31d97bc3c99f543d7a3ca148361e5f346c50fde3
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Dec 3 22:18:55 2024 -0800
KAFKA-18026: KIP-1112, skip re-registering aggregate stores in
StatefulProcessorNode (#18015)
Minor followup to #17929 based on this discussion
Also includes some very minor refactoring/renaming on the side. The only
real change is in the KGroupedStreamImpl class
Reviewers: Guozhang Wang <[email protected]>
---
.../internals/CogroupedStreamAggregateBuilder.java | 18 ++++++-------
.../internals/GroupedStreamAggregateBuilder.java | 6 ++---
.../kstream/internals/KGroupedStreamImpl.java | 31 +++++++++++++---------
.../kstream/internals/KStreamAggregate.java | 13 +--------
.../streams/kstream/internals/KStreamImplJoin.java | 18 ++++++-------
.../streams/kstream/internals/KStreamReduce.java | 8 +++---
.../internals/KStreamSessionWindowAggregate.java | 2 +-
.../internals/KStreamSlidingWindowAggregate.java | 2 +-
.../kstream/internals/KStreamWindowAggregate.java | 2 +-
.../internals/KeyValueStoreMaterializer.java | 5 ++--
.../internals/MaterializedStoreFactory.java | 20 +++++++++++++-
.../internals/OuterStreamJoinStoreFactory.java | 7 +++--
.../internals/SessionStoreMaterializer.java | 5 ++--
.../internals/SlidingWindowStoreMaterializer.java | 5 ++--
.../internals/StreamJoinedStoreFactory.java | 7 +++--
.../internals/SubscriptionStoreFactory.java | 7 +++--
.../kstream/internals/WindowStoreMaterializer.java | 5 ++--
.../kstream/internals/graph/StateStoreNode.java | 4 +--
.../internals/graph/TableProcessorNode.java | 2 +-
.../internals/InternalTopologyBuilder.java | 31 +++++++++++-----------
.../processor/internals/StoreBuilderWrapper.java | 7 +++--
.../streams/processor/internals/StoreFactory.java | 8 +++---
.../internals/InternalTopologyBuilderTest.java | 14 +++++-----
.../internals/KeyValueStoreMaterializerTest.java | 4 +--
.../org/apache/kafka/streams/utils/TestUtils.java | 2 +-
25 files changed, 118 insertions(+), 115 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
index 5b294c39d68..126df7de17b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
@@ -58,7 +58,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Serde<VOut> valueSerde,
final String queryableName,
final boolean isOutputVersioned) {
- processRepartitions(groupPatterns, storeFactory.name());
+ processRepartitions(groupPatterns, storeFactory.storeName());
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new
ArrayList<>();
boolean stateCreated = false;
@@ -80,7 +80,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()),
statefulProcessorNode);
}
- return createTable(processors, parentProcessors, named, keySerde,
valueSerde, queryableName, storeFactory.name());
+ return createTable(processors, parentProcessors, named, keySerde,
valueSerde, queryableName, storeFactory.storeName());
}
@SuppressWarnings("unchecked")
@@ -92,7 +92,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Serde<VOut> valueSerde,
final String queryableName,
final Windows<W> windows) {
- processRepartitions(groupPatterns, storeFactory.name());
+ processRepartitions(groupPatterns, storeFactory.storeName());
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new
ArrayList<>();
@@ -119,7 +119,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()),
statefulProcessorNode);
}
- return createTable(processors, parentProcessors, named, keySerde,
valueSerde, queryableName, storeFactory.name());
+ return createTable(processors, parentProcessors, named, keySerde,
valueSerde, queryableName, storeFactory.storeName());
}
@SuppressWarnings("unchecked")
@@ -132,7 +132,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final String queryableName,
final SessionWindows sessionWindows,
final Merger<? super K, VOut> sessionMerger) {
- processRepartitions(groupPatterns, storeFactory.name());
+ processRepartitions(groupPatterns, storeFactory.storeName());
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new
ArrayList<>();
boolean stateCreated = false;
@@ -159,7 +159,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()),
statefulProcessorNode);
}
- return createTable(processors, parentProcessors, named, keySerde,
valueSerde, queryableName, storeFactory.name());
+ return createTable(processors, parentProcessors, named, keySerde,
valueSerde, queryableName, storeFactory.storeName());
}
@SuppressWarnings("unchecked")
@@ -171,7 +171,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Serde<VOut> valueSerde,
final String queryableName,
final SlidingWindows slidingWindows) {
- processRepartitions(groupPatterns, storeFactory.name());
+ processRepartitions(groupPatterns, storeFactory.storeName());
final Collection<KStreamAggProcessorSupplier> parentProcessors = new
ArrayList<>();
final Collection<GraphNode> processors = new ArrayList<>();
boolean stateCreated = false;
@@ -198,7 +198,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()),
statefulProcessorNode);
}
- return createTable(processors, parentProcessors, named, keySerde,
valueSerde, queryableName, storeFactory.name());
+ return createTable(processors, parentProcessors, named, keySerde,
valueSerde, queryableName, storeFactory.storeName());
}
private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>,
Aggregator<? super K, ? super Object, VOut>> groupPatterns,
@@ -279,7 +279,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
new StatefulProcessorNode<>(
processorName,
new ProcessorParameters<>(kStreamAggregate, processorName),
- new String[]{storeFactory.name()}
+ new String[]{storeFactory.storeName()}
);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index 8217bd025bb..c3360c9c013 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -73,7 +73,7 @@ class GroupedStreamAggregateBuilder<K, V> {
final Serde<KR> keySerde,
final Serde<VR> valueSerde,
final boolean isOutputVersioned) {
- assert queryableStoreName == null ||
queryableStoreName.equals(storeFactory.name());
+ assert queryableStoreName == null ||
queryableStoreName.equals(storeFactory.storeName());
final String aggFunctionName = functionName.name();
@@ -82,7 +82,7 @@ class GroupedStreamAggregateBuilder<K, V> {
if (repartitionRequired) {
final OptimizableRepartitionNodeBuilder<K, V>
repartitionNodeBuilder = optimizableRepartitionNodeBuilder();
- final String repartitionTopicPrefix =
userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName :
storeFactory.name();
+ final String repartitionTopicPrefix =
userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName :
storeFactory.storeName();
sourceName = createRepartitionSource(repartitionTopicPrefix,
repartitionNodeBuilder);
// First time through we need to create a repartition node.
@@ -101,7 +101,7 @@ class GroupedStreamAggregateBuilder<K, V> {
new StatefulProcessorNode<>(
aggFunctionName,
new ProcessorParameters<>(aggregateSupplier, aggFunctionName),
- storeFactory
+ new String[] {storeFactory.storeName()}
);
statefulProcessorNode.setOutputVersioned(isOutputVersioned);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 256153708a0..cc335e1383d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -97,10 +97,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V>
implements KGroupedS
}
final String name = new
NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
+ final KeyValueStoreMaterializer<K, V> storeFactory = new
KeyValueStoreMaterializer<>(materializedInternal);
+
return doAggregate(
- new KStreamReduce<>(materializedInternal, reducer),
+ new KStreamReduce<>(storeFactory, reducer),
name,
- materializedInternal
+ storeFactory
);
}
@@ -129,10 +131,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K,
V> implements KGroupedS
}
final String name = new
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+ final KeyValueStoreMaterializer<K, VR> storeFactory = new
KeyValueStoreMaterializer<>(materializedInternal);
+
return doAggregate(
- new KStreamAggregate<>(materializedInternal, initializer,
aggregator),
+ new KStreamAggregate<>(storeFactory, initializer, aggregator),
name,
- materializedInternal
+ storeFactory
);
}
@@ -183,10 +187,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K,
V> implements KGroupedS
}
final String name = new
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+ final KeyValueStoreMaterializer<K, Long> storeFactory = new
KeyValueStoreMaterializer<>(materializedInternal);
+
return doAggregate(
- new KStreamAggregate<>(materializedInternal,
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+ new KStreamAggregate<>(storeFactory,
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
name,
- materializedInternal);
+ storeFactory);
}
@Override
@@ -236,15 +242,16 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K,
V> implements KGroupedS
private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K,
V, K, T> aggregateSupplier,
final String functionName,
- final MaterializedInternal<K, T,
KeyValueStore<Bytes, byte[]>> materializedInternal) {
+ final KeyValueStoreMaterializer<K, T>
storeFactory) {
+
return aggregateBuilder.build(
new NamedInternal(functionName),
- new KeyValueStoreMaterializer<>(materializedInternal),
+ storeFactory,
aggregateSupplier,
- materializedInternal.queryableStoreName(),
- materializedInternal.keySerde(),
- materializedInternal.valueSerde(),
- materializedInternal.storeSupplier() instanceof
VersionedBytesStoreSupplier);
+ storeFactory.queryableStoreName(),
+ storeFactory.keySerde(),
+ storeFactory.valueSerde(),
+ storeFactory.storeSupplier() instanceof
VersionedBytesStoreSupplier);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index abe5fd2b566..bfbd16ffae8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
@@ -28,7 +27,6 @@ import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
@@ -55,20 +53,11 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements
KStreamAggProcessorSupp
private boolean sendOldValues = false;
- KStreamAggregate(final MaterializedInternal<KIn, VAgg,
KeyValueStore<Bytes, byte[]>> materialized,
- final Initializer<VAgg> initializer,
- final Aggregator<? super KIn, ? super VIn, VAgg>
aggregator) {
- this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
- this.storeName = materialized.storeName();
- this.initializer = initializer;
- this.aggregator = aggregator;
- }
-
KStreamAggregate(final StoreFactory storeFactory,
final Initializer<VAgg> initializer,
final Aggregator<? super KIn, ? super VIn, VAgg>
aggregator) {
this.storeFactory = storeFactory;
- this.storeName = storeFactory.name();
+ this.storeName = storeFactory.storeName();
this.initializer = initializer;
this.aggregator = aggregator;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
index 12bb6c19db8..d9008e81c8d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
@@ -146,16 +146,16 @@ class KStreamImplJoin {
otherWindowStore =
joinWindowStoreBuilderFromSupplier(otherStoreSupplier,
streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde());
}
- final KStreamJoinWindow<K, V1> thisWindowedStream = new
KStreamJoinWindow<>(thisWindowStore.name());
+ final KStreamJoinWindow<K, V1> thisWindowedStream = new
KStreamJoinWindow<>(thisWindowStore.storeName());
final ProcessorParameters<K, V1, ?, ?> thisWindowStreamProcessorParams
= new ProcessorParameters<>(thisWindowedStream, thisWindowStreamProcessorName);
- final ProcessorGraphNode<K, V1> thisWindowedStreamsNode = new
WindowedStreamProcessorNode<>(thisWindowStore.name(),
thisWindowStreamProcessorParams);
+ final ProcessorGraphNode<K, V1> thisWindowedStreamsNode = new
WindowedStreamProcessorNode<>(thisWindowStore.storeName(),
thisWindowStreamProcessorParams);
builder.addGraphNode(thisGraphNode, thisWindowedStreamsNode);
- final KStreamJoinWindow<K, V2> otherWindowedStream = new
KStreamJoinWindow<>(otherWindowStore.name());
+ final KStreamJoinWindow<K, V2> otherWindowedStream = new
KStreamJoinWindow<>(otherWindowStore.storeName());
final ProcessorParameters<K, V2, ?, ?>
otherWindowStreamProcessorParams = new
ProcessorParameters<>(otherWindowedStream, otherWindowStreamProcessorName);
- final ProcessorGraphNode<K, V2> otherWindowedStreamsNode = new
WindowedStreamProcessorNode<>(otherWindowStore.name(),
otherWindowStreamProcessorParams);
+ final ProcessorGraphNode<K, V2> otherWindowedStreamsNode = new
WindowedStreamProcessorNode<>(otherWindowStore.storeName(),
otherWindowStreamProcessorParams);
builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
Optional<StoreFactory> outerJoinWindowStore = Optional.empty();
@@ -173,25 +173,25 @@ class KStreamImplJoin {
final JoinWindowsInternal internalWindows = new
JoinWindowsInternal(windows);
final KStreamKStreamJoinLeftSide<K, V1, V2, VOut> joinThis = new
KStreamKStreamJoinLeftSide<>(
- otherWindowStore.name(),
+ otherWindowStore.storeName(),
internalWindows,
joiner,
leftOuter,
- outerJoinWindowStore.map(StoreFactory::name),
+ outerJoinWindowStore.map(StoreFactory::storeName),
sharedTimeTrackerSupplier
);
final KStreamKStreamJoinRightSide<K, V1, V2, VOut> joinOther = new
KStreamKStreamJoinRightSide<>(
- thisWindowStore.name(),
+ thisWindowStore.storeName(),
internalWindows,
AbstractStream.reverseJoinerWithKey(joiner),
rightOuter,
- outerJoinWindowStore.map(StoreFactory::name),
+ outerJoinWindowStore.map(StoreFactory::storeName),
sharedTimeTrackerSupplier
);
final KStreamKStreamSelfJoin<K, V1, V2, VOut> selfJoin = new
KStreamKStreamSelfJoin<>(
- thisWindowStore.name(),
+ thisWindowStore.storeName(),
internalWindows,
joiner,
windows.size() + windows.gracePeriodMs()
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index f337cd9ae44..2f04a8ea65e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
@@ -27,7 +26,6 @@ import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
@@ -53,9 +51,9 @@ public class KStreamReduce<K, V> implements
KStreamAggProcessorSupplier<K, V, K,
private boolean sendOldValues = false;
- KStreamReduce(final MaterializedInternal<K, V, KeyValueStore<Bytes,
byte[]>> materialized, final Reducer<V> reducer) {
- this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
- this.storeName = materialized.storeName();
+ KStreamReduce(final StoreFactory storeFactory, final Reducer<V> reducer) {
+ this.storeFactory = storeFactory;
+ this.storeName = storeFactory.storeName();
this.reducer = reducer;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 4a8040a8d37..f3ca9e6740a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -75,7 +75,7 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg>
implements KStreamAgg
final Aggregator<? super KIn, ? super
VIn, VAgg> aggregator,
final Merger<? super KIn, VAgg>
sessionMerger) {
this.windows = windows;
- this.storeName = storeFactory.name();
+ this.storeName = storeFactory.storeName();
this.storeFactory = storeFactory;
this.emitStrategy = emitStrategy;
this.initializer = initializer;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
index 894657da48c..93935cbc1f0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -64,7 +64,7 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg>
implements KStreamAgg
final Initializer<VAgg> initializer,
final Aggregator<? super KIn, ? super
VIn, VAgg> aggregator) {
this.windows = windows;
- this.storeName = storeFactory.name();
+ this.storeName = storeFactory.storeName();
this.storeFactory = storeFactory;
this.initializer = initializer;
this.aggregator = aggregator;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 2e6147627e9..adb174c4ccd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -63,7 +63,7 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends
Window> implements
final Initializer<VAgg> initializer,
final Aggregator<? super KIn, ? super VIn,
VAgg> aggregator) {
this.windows = windows;
- this.storeName = storeFactory.name();
+ this.storeName = storeFactory.storeName();
this.storeFactory = storeFactory;
this.emitStrategy = emitStrategy;
this.initializer = initializer;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index 3927e95c25b..d59d34e0e90 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.DslKeyValueParams;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -44,7 +43,7 @@ public class KeyValueStoreMaterializer<K, V> extends
MaterializedStoreFactory<K,
}
@Override
- public StateStore build() {
+ public StoreBuilder<?> builder() {
final KeyValueBytesStoreSupplier supplier =
materialized.storeSupplier() == null
? dslStoreSuppliers().keyValueStore(new
DslKeyValueParams(materialized.storeName(), true))
: (KeyValueBytesStoreSupplier) materialized.storeSupplier();
@@ -77,7 +76,7 @@ public class KeyValueStoreMaterializer<K, V> extends
MaterializedStoreFactory<K,
}
- return builder.build();
+ return builder;
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
index 99bd2e848b7..83cb6606790 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
@@ -16,8 +16,10 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.StoreSupplier;
import java.util.Map;
@@ -39,10 +41,26 @@ public abstract class MaterializedStoreFactory<K, V, S
extends StateStore> exten
}
@Override
- public String name() {
+ public String storeName() {
return materialized.storeName();
}
+ public String queryableStoreName() {
+ return materialized.queryableStoreName();
+ }
+
+ public Serde<K> keySerde() {
+ return materialized.keySerde();
+ }
+
+ public Serde<V> valueSerde() {
+ return materialized.valueSerde();
+ }
+
+ public StoreSupplier<S> storeSupplier() {
+ return materialized.storeSupplier();
+ }
+
@Override
public Map<String, String> logConfig() {
return materialized.logConfig();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
index d864698408b..645858d1a65 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.kstream.JoinWindows;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.DslKeyValueParams;
import org.apache.kafka.streams.state.DslStoreSuppliers;
@@ -73,7 +72,7 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends
AbstractConfigurable
}
@Override
- public StateStore build() {
+ public StoreBuilder<?> builder() {
final Duration retentionPeriod = Duration.ofMillis(retentionPeriod());
final Duration windowSize = Duration.ofMillis(windows.size());
final String rpMsgPrefix =
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
@@ -135,7 +134,7 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends
AbstractConfigurable
builder.withLoggingDisabled();
}
- return builder.build();
+ return builder;
}
@Override
@@ -155,7 +154,7 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends
AbstractConfigurable
}
@Override
- public String name() {
+ public String storeName() {
return name;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
index 9f63b3fc279..a5317f48880 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.SessionWindows;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.DslSessionParams;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
@@ -58,7 +57,7 @@ public class SessionStoreMaterializer<K, V> extends
MaterializedStoreFactory<K,
}
@Override
- public StateStore build() {
+ public StoreBuilder<?> builder() {
final SessionBytesStoreSupplier supplier =
materialized.storeSupplier() == null
? dslStoreSuppliers().sessionStore(new DslSessionParams(
materialized.storeName(),
@@ -85,7 +84,7 @@ public class SessionStoreMaterializer<K, V> extends
MaterializedStoreFactory<K,
builder.withCachingDisabled();
}
- return builder.build();
+ return builder;
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
index cea18f96d37..0aca2643be7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.SlidingWindows;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.DslWindowParams;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
@@ -58,7 +57,7 @@ public class SlidingWindowStoreMaterializer<K, V> extends
MaterializedStoreFacto
}
@Override
- public StateStore build() {
+ public StoreBuilder<?> builder() {
final WindowBytesStoreSupplier supplier = materialized.storeSupplier()
== null
? dslStoreSuppliers().windowStore(new DslWindowParams(
materialized.storeName(),
@@ -91,7 +90,7 @@ public class SlidingWindowStoreMaterializer<K, V> extends
MaterializedStoreFacto
builder.withCachingDisabled();
}
- return builder.build();
+ return builder;
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java
index b6e969572c8..4da99a71d61 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.JoinWindows;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.DslWindowParams;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -81,7 +80,7 @@ public class StreamJoinedStoreFactory<K, V1, V2> extends
AbstractConfigurableSto
}
@Override
- public StateStore build() {
+ public StoreBuilder<?> builder() {
final WindowBytesStoreSupplier supplier = storeSupplier == null
? dslStoreSuppliers().windowStore(new DslWindowParams(
this.name,
@@ -106,7 +105,7 @@ public class StreamJoinedStoreFactory<K, V1, V2> extends
AbstractConfigurableSto
builder.withLoggingDisabled();
}
- return builder.build();
+ return builder;
}
@Override
@@ -126,7 +125,7 @@ public class StreamJoinedStoreFactory<K, V1, V2> extends
AbstractConfigurableSto
}
@Override
- public String name() {
+ public String storeName() {
return name;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
index f3c424efb3e..10c8a5e110c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
@@ -20,7 +20,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.DslKeyValueParams;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -45,7 +44,7 @@ public class SubscriptionStoreFactory<K> extends
AbstractConfigurableStoreFactor
}
@Override
- public StateStore build() {
+ public StoreBuilder<?> builder() {
StoreBuilder<?> builder;
builder = Stores.timestampedKeyValueStoreBuilder(
dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name,
true)),
@@ -58,7 +57,7 @@ public class SubscriptionStoreFactory<K> extends
AbstractConfigurableStoreFactor
builder = builder.withLoggingDisabled();
}
builder = builder.withCachingDisabled();
- return builder.build();
+ return builder;
}
@Override
@@ -78,7 +77,7 @@ public class SubscriptionStoreFactory<K> extends
AbstractConfigurableStoreFactor
}
@Override
- public String name() {
+ public String storeName() {
return name;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
index eabce874f70..2b9f3d33814 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Windows;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.DslWindowParams;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
@@ -56,7 +55,7 @@ public class WindowStoreMaterializer<K, V> extends
MaterializedStoreFactory<K, V
}
@Override
- public StateStore build() {
+ public StoreBuilder<?> builder() {
final WindowBytesStoreSupplier supplier = materialized.storeSupplier()
== null
? dslStoreSuppliers().windowStore(new DslWindowParams(
materialized.storeName(),
@@ -85,7 +84,7 @@ public class WindowStoreMaterializer<K, V> extends
MaterializedStoreFactory<K, V
builder.withCachingEnabled();
}
- return builder.build();
+ return builder;
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
index fb3cec2dde4..05375d35efe 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
@@ -25,7 +25,7 @@ public class StateStoreNode<S extends StateStore> extends
GraphNode {
protected final StoreFactory storeBuilder;
public StateStoreNode(final StoreFactory storeBuilder) {
- super(storeBuilder.name());
+ super(storeBuilder.storeName());
this.storeBuilder = storeBuilder;
}
@@ -38,7 +38,7 @@ public class StateStoreNode<S extends StateStore> extends
GraphNode {
@Override
public String toString() {
return "StateStoreNode{" +
- " name='" + storeBuilder.name() + '\'' +
+ " name='" + storeBuilder.storeName() + '\'' +
", logConfig=" + storeBuilder.logConfig() +
", loggingEnabled='" + storeBuilder.loggingEnabled() + '\'' +
"} ";
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
index ccd87855a07..b47252068e6 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
@@ -54,7 +54,7 @@ public class TableProcessorNode<K, V> extends GraphNode {
public String toString() {
return "TableProcessorNode{" +
", processorParameters=" + processorParameters +
- ", storeFactory=" + (storeFactory == null ? "null" :
storeFactory.name()) +
+ ", storeFactory=" + (storeFactory == null ? "null" :
storeFactory.storeName()) +
", storeNames=" + Arrays.toString(storeNames) +
"} " + super.toString();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 9f65a415d95..eeb076fc0cf 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyConfig;
-import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.StateStore;
@@ -433,7 +432,7 @@ public class InternalTopologyBuilder {
// build global state stores
for (final StoreFactory storeFactory : globalStateBuilders.values()) {
storeFactory.configure(config);
- globalStateStores.put(storeFactory.name(), storeFactory.build());
+ globalStateStores.put(storeFactory.storeName(),
storeFactory.builder().build());
}
return this;
@@ -620,20 +619,20 @@ public class InternalTopologyBuilder {
final boolean allowOverride,
final String... processorNames) {
Objects.requireNonNull(storeFactory, "stateStoreFactory can't be
null");
- final StoreFactory stateFactory =
stateFactories.get(storeFactory.name());
+ final StoreFactory stateFactory =
stateFactories.get(storeFactory.storeName());
if (!allowOverride && stateFactory != null &&
!stateFactory.isCompatibleWith(storeFactory)) {
- throw new TopologyException("A different StateStore has already
been added with the name " + storeFactory.name());
+ throw new TopologyException("A different StateStore has already
been added with the name " + storeFactory.storeName());
}
- if (globalStateBuilders.containsKey(storeFactory.name())) {
- throw new TopologyException("A different GlobalStateStore has
already been added with the name " + storeFactory.name());
+ if (globalStateBuilders.containsKey(storeFactory.storeName())) {
+ throw new TopologyException("A different GlobalStateStore has
already been added with the name " + storeFactory.storeName());
}
- stateFactories.put(storeFactory.name(), storeFactory);
+ stateFactories.put(storeFactory.storeName(), storeFactory);
if (processorNames != null) {
for (final String processorName : processorNames) {
Objects.requireNonNull(processorName, "processor name must not
be null");
- connectProcessorAndStateStore(processorName,
storeFactory.name());
+ connectProcessorAndStateStore(processorName,
storeFactory.storeName());
}
}
nodeGroups = null;
@@ -660,7 +659,7 @@ public class InternalTopologyBuilder {
topic,
processorName,
stateUpdateSupplier,
- storeFactory.name(),
+ storeFactory.storeName(),
storeFactory.loggingEnabled());
validateTopicNotAlreadyRegistered(topic);
@@ -682,18 +681,18 @@ public class InternalTopologyBuilder {
keyDeserializer,
valueDeserializer)
);
- storeNameToReprocessOnRestore.put(storeFactory.name(),
+ storeNameToReprocessOnRestore.put(storeFactory.storeName(),
reprocessOnRestore ?
Optional.of(new ReprocessFactory<>(stateUpdateSupplier,
keyDeserializer, valueDeserializer))
: Optional.empty());
nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
nodeGrouper.add(sourceName);
- nodeFactory.addStateStore(storeFactory.name());
+ nodeFactory.addStateStore(storeFactory.storeName());
nodeFactories.put(processorName, nodeFactory);
nodeGrouper.add(processorName);
nodeGrouper.unite(processorName, predecessors);
- globalStateBuilders.put(storeFactory.name(), storeFactory);
- connectSourceStoreAndTopic(storeFactory.name(), topic);
+ globalStateBuilders.put(storeFactory.storeName(), storeFactory);
+ connectSourceStoreAndTopic(storeFactory.storeName(), topic);
nodeGroups = null;
}
@@ -1158,7 +1157,7 @@ public class InternalTopologyBuilder {
if (topologyConfigs != null) {
storeFactory.configure(topologyConfigs.applicationConfigs);
}
- store = storeFactory.build();
+ store = storeFactory.builder().build();
stateStoreMap.put(stateStoreName, store);
} else {
store = globalStateStores.get(stateStoreName);
@@ -1258,8 +1257,8 @@ public class InternalTopologyBuilder {
// if the node is connected to a state store whose changelog
topics are not predefined,
// add to the changelog topics
for (final StoreFactory stateFactory :
stateFactories.values()) {
- if (stateFactory.connectedProcessorNames().contains(node)
&& storeToChangelogTopic.containsKey(stateFactory.name())) {
- final String topicName =
storeToChangelogTopic.get(stateFactory.name());
+ if (stateFactory.connectedProcessorNames().contains(node)
&& storeToChangelogTopic.containsKey(stateFactory.storeName())) {
+ final String topicName =
storeToChangelogTopic.get(stateFactory.storeName());
if (!stateChangelogTopics.containsKey(topicName)) {
final InternalTopicConfig internalTopicConfig =
createChangelogTopicConfig(stateFactory,
topicName);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
index 4648533af1d..61345d7da9b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
@@ -51,8 +50,8 @@ public class StoreBuilderWrapper implements StoreFactory {
}
@Override
- public StateStore build() {
- return builder.build();
+ public StoreBuilder<?> builder() {
+ return builder;
}
@Override
@@ -90,7 +89,7 @@ public class StoreBuilderWrapper implements StoreFactory {
}
@Override
- public String name() {
+ public String storeName() {
return builder.name();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java
index 7542f4c5bd8..ef6df04c0d6 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java
@@ -52,7 +52,7 @@ public interface StoreFactory {
// do nothing
}
- StateStore build();
+ StoreBuilder<?> builder();
long retentionPeriod();
@@ -62,7 +62,7 @@ public interface StoreFactory {
boolean loggingEnabled();
- String name();
+ String storeName();
boolean isWindowStore();
@@ -132,7 +132,7 @@ public interface StoreFactory {
@SuppressWarnings("unchecked")
@Override
public T build() {
- return (T) storeFactory.build();
+ return (T) storeFactory.builder().build();
}
@Override
@@ -147,7 +147,7 @@ public interface StoreFactory {
@Override
public String name() {
- return storeFactory.name();
+ return storeFactory.storeName();
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index e3add9755ae..b0afe364985 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -572,11 +572,11 @@ public class InternalTopologyBuilderTest {
assertEquals(0, builder.buildTopology().stateStores().size());
- builder.connectProcessorAndStateStores("processor-1",
storeFactory.name());
+ builder.connectProcessorAndStateStores("processor-1",
storeFactory.storeName());
final List<StateStore> suppliers =
builder.buildTopology().stateStores();
assertEquals(1, suppliers.size());
- assertEquals(storeFactory.name(), suppliers.get(0).name());
+ assertEquals(storeFactory.storeName(), suppliers.get(0).name());
}
@Test
@@ -586,14 +586,14 @@ public class InternalTopologyBuilderTest {
builder.addSource(null, "source-1", null, null, null, "topic-1");
builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(),
"source-1");
- builder.connectProcessorAndStateStores("processor-1",
storeFactory.name());
+ builder.connectProcessorAndStateStores("processor-1",
storeFactory.storeName());
builder.addSource(null, "source-2", null, null, null, "topic-2");
builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(),
"source-2");
builder.buildTopology();
final Set<String> stateStoreNames =
builder.stateStoreNamesForSubtopology(0);
- assertThat(stateStoreNames, equalTo(Set.of(storeFactory.name())));
+ assertThat(stateStoreNames, equalTo(Set.of(storeFactory.storeName())));
final Set<String> emptyStoreNames =
builder.stateStoreNamesForSubtopology(1);
assertThat(emptyStoreNames, equalTo(Set.of()));
@@ -609,11 +609,11 @@ public class InternalTopologyBuilderTest {
builder.addStateStore(storeFactory);
builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(),
"source-1");
- builder.connectProcessorAndStateStores("processor-1",
storeFactory.name());
+ builder.connectProcessorAndStateStores("processor-1",
storeFactory.storeName());
builder.addStateStore(storeFactory);
builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(),
"source-1");
- builder.connectProcessorAndStateStores("processor-2",
storeFactory.name());
+ builder.connectProcessorAndStateStores("processor-2",
storeFactory.storeName());
assertEquals(1, builder.buildTopology().stateStores().size());
}
@@ -1196,7 +1196,7 @@ public class InternalTopologyBuilderTest {
builder.setApplicationId("test-app");
final Map<String, List<String>> stateStoreAndTopics =
builder.stateStoreNameToFullSourceTopicNames();
- final List<String> topics =
stateStoreAndTopics.get(storeFactory.name());
+ final List<String> topics =
stateStoreAndTopics.get(storeFactory.storeName());
assertEquals(2, topics.size(), "Expected to contain two topics");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
index 85aa8b5e21f..7228496dd36 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
@@ -263,7 +263,7 @@ public class KeyValueStoreMaterializerTest {
final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized) {
final KeyValueStoreMaterializer<String, String> materializer = new
KeyValueStoreMaterializer<>(materialized);
materializer.configure(streamsConfig);
- return (TimestampedKeyValueStore<String, String>) ((StoreFactory)
materializer).build();
+ return (TimestampedKeyValueStore<String, String>)
materializer.builder().build();
}
@SuppressWarnings("unchecked")
@@ -271,6 +271,6 @@ public class KeyValueStoreMaterializerTest {
final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized) {
final KeyValueStoreMaterializer<String, String> materializer = new
KeyValueStoreMaterializer<>(materialized);
materializer.configure(streamsConfig);
- return (VersionedKeyValueStore<String, String>) ((StoreFactory)
materializer).build();
+ return (VersionedKeyValueStore<String, String>)
materializer.builder().build();
}
}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
index 6ecc6b7c7ac..89a229f7ddc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
@@ -119,7 +119,7 @@ public class TestUtils {
public static StoreFactory mockStoreFactory(final String name) {
final StoreFactory storeFactory = Mockito.mock(StoreFactory.class);
- Mockito.when(storeFactory.name()).thenReturn(name);
+ Mockito.when(storeFactory.storeName()).thenReturn(name);
return storeFactory;
}