This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5243fb9a7d7 KAFKA-18026: migrate KTableSource to use
ProcesserSupplier#stores (#17903)
5243fb9a7d7 is described below
commit 5243fb9a7d717521e64fa57bcdf895825df37e85
Author: Almog Gavra <[email protected]>
AuthorDate: Wed Nov 27 14:04:27 2024 -0800
KAFKA-18026: migrate KTableSource to use ProcesserSupplier#stores (#17903)
This PR is part of the implementation for KIP-1112 (KAFKA-18026). In order
to have DSL operators be properly wrapped by the interface suggestion in 1112,
we need to make sure they all use the ConnectedStoreProvider#stores method to
connect stores instead of manually calling addStateStore.
This is a refactor only, there is no new behaviors.
Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
.../org/apache/kafka/streams/StreamsBuilder.java | 8 +-
.../java/org/apache/kafka/streams/Topology.java | 8 +-
.../kstream/internals/InternalStreamsBuilder.java | 9 +-
.../streams/kstream/internals/KStreamImpl.java | 7 +-
.../streams/kstream/internals/KStreamImplJoin.java | 2 +-
.../streams/kstream/internals/KTableImpl.java | 7 +-
.../streams/kstream/internals/KTableSource.java | 24 ++++-
.../kstream/internals/MaterializedInternal.java | 11 ++-
.../kstream/internals/graph/GlobalStoreNode.java | 12 ++-
.../kstream/internals/graph/TableSourceNode.java | 34 ++-----
.../internals/InternalTopologyBuilder.java | 41 +++++----
.../processor/internals/StoreBuilderWrapper.java | 10 +-
.../StoreDelegatingProcessorSupplier.java | 47 ++++++++++
.../streams/processor/internals/StoreFactory.java | 76 ++++++++++++++++
.../apache/kafka/streams/StreamsBuilderTest.java | 79 +++++++++++-----
.../apache/kafka/streams/StreamsConfigTest.java | 6 +-
.../org/apache/kafka/streams/TopologyTest.java | 13 +--
.../internals/graph/TableSourceNodeTest.java | 17 +++-
.../internals/GlobalStreamThreadTest.java | 7 +-
.../internals/InternalTopologyBuilderTest.java | 101 +++++++++++----------
.../org/apache/kafka/streams/utils/TestUtils.java | 16 ++--
.../kafka/test/MockKeyValueStoreBuilder.java | 2 +-
22 files changed, 361 insertions(+), 176 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 2879436e500..b9cc75b9fde 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -384,7 +384,9 @@ public class StreamsBuilder {
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>
materializedInternal =
new MaterializedInternal<>(
Materialized.with(consumedInternal.keySerde(),
consumedInternal.valueSerde()),
- internalStreamsBuilder, topic + "-");
+ internalStreamsBuilder,
+ topic + "-",
+ true /* force materializing global tables */);
return internalStreamsBuilder.globalTable(topic, consumedInternal,
materializedInternal);
}
@@ -517,7 +519,7 @@ public class StreamsBuilder {
*/
public synchronized StreamsBuilder addStateStore(final StoreBuilder<?>
builder) {
Objects.requireNonNull(builder, "builder can't be null");
- internalStreamsBuilder.addStateStore(new StoreBuilderWrapper(builder));
+
internalStreamsBuilder.addStateStore(StoreBuilderWrapper.wrapStoreBuilder(builder));
return this;
}
@@ -556,7 +558,7 @@ public class StreamsBuilder {
Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
internalStreamsBuilder.addGlobalStore(
- new StoreBuilderWrapper(storeBuilder),
+ StoreBuilderWrapper.wrapStoreBuilder(storeBuilder),
topic,
new ConsumedInternal<>(consumed),
stateUpdateSupplier,
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java
b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 8fd34a47327..35fe13faa38 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -34,7 +34,7 @@ import
org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
-import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
+import
org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Set;
@@ -853,14 +853,13 @@ public class Topology {
final String
processorName,
final
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
internalTopologyBuilder.addGlobalStore(
- new StoreBuilderWrapper(storeBuilder),
sourceName,
null,
keyDeserializer,
valueDeserializer,
topic,
processorName,
- stateUpdateSupplier,
+ new StoreDelegatingProcessorSupplier<>(stateUpdateSupplier,
Set.of(storeBuilder)),
true
);
return this;
@@ -899,14 +898,13 @@ public class Topology {
final String
processorName,
final
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
internalTopologyBuilder.addGlobalStore(
- new StoreBuilderWrapper(storeBuilder),
sourceName,
timestampExtractor,
keyDeserializer,
valueDeserializer,
topic,
processorName,
- stateUpdateSupplier,
+ new StoreDelegatingProcessorSupplier<>(stateUpdateSupplier,
Set.of(storeBuilder)),
true
);
return this;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 92dde06e9c0..1e148ac047c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -140,7 +140,7 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
final String tableSourceName = named
.orElseGenerateWithPrefix(this, KTableImpl.SOURCE_NAME);
- final KTableSource<K, V> tableSource = new
KTableSource<>(materialized.storeName(), materialized.queryableStoreName());
+ final KTableSource<K, V> tableSource = new
KTableSource<>(materialized);
final ProcessorParameters<K, V, ?, ?> processorParameters = new
ProcessorParameters<>(tableSource, tableSourceName);
final TableSourceNode<K, V> tableSourceNode = TableSourceNode.<K,
V>tableSourceNodeBuilder()
@@ -148,7 +148,6 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
.withSourceName(sourceName)
.withNodeName(tableSourceName)
.withConsumedInternal(consumed)
- .withMaterializedInternal(materialized)
.withProcessorParameters(processorParameters)
.build();
tableSourceNode.setOutputVersioned(materialized.storeSupplier()
instanceof VersionedBytesStoreSupplier);
@@ -186,9 +185,7 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
final String processorName = named
.orElseGenerateWithPrefix(this, KTableImpl.SOURCE_NAME);
- // enforce store name as queryable name to always materialize global
table stores
- final String storeName = materialized.storeName();
- final KTableSource<K, V> tableSource = new KTableSource<>(storeName,
storeName);
+ final KTableSource<K, V> tableSource = new
KTableSource<>(materialized);
final ProcessorParameters<K, V, ?, ?> processorParameters = new
ProcessorParameters<>(tableSource, processorName);
@@ -197,12 +194,12 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
.isGlobalKTable(true)
.withSourceName(sourceName)
.withConsumedInternal(consumed)
- .withMaterializedInternal(materialized)
.withProcessorParameters(processorParameters)
.build();
addGraphNode(root, tableSourceNode);
+ final String storeName = materialized.storeName();
return new GlobalKTableImpl<>(new
KTableSourceValueGetterSupplier<>(storeName),
materialized.queryableStoreName());
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index ec2fd211efb..b650724055b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -658,10 +658,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
subTopologySourceNodes = this.subTopologySourceNodes;
}
- final KTableSource<K, V> tableSource = new KTableSource<>(
- materializedInternal.storeName(),
- materializedInternal.queryableStoreName()
- );
+ final KTableSource<K, V> tableSource = new
KTableSource<>(materializedInternal);
final ProcessorParameters<K, V, ?, ?> processorParameters = new
ProcessorParameters<>(tableSource, name);
final GraphNode tableNode = new StreamToTableNode<>(
name,
@@ -1171,7 +1168,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
bufferStoreName = Optional.of(name + "-Buffer");
final RocksDBTimeOrderedKeyValueBuffer.Builder<Object, Object>
storeBuilder =
new
RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferStoreName.get(),
joinedInternal.gracePeriod(), name);
- builder.addStateStore(new StoreBuilderWrapper(storeBuilder));
+
builder.addStateStore(StoreBuilderWrapper.wrapStoreBuilder(storeBuilder));
}
final ProcessorSupplier<K, V, K, ? extends VR> processorSupplier = new
KStreamKTableJoin<>(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
index 394c1300588..12bb6c19db8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
@@ -262,7 +262,7 @@ class KStreamImplJoin {
private static <K, V> StoreFactory
joinWindowStoreBuilderFromSupplier(final WindowBytesStoreSupplier storeSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
- return new StoreBuilderWrapper(Stores.windowStoreBuilder(
+ return StoreBuilderWrapper.wrapStoreBuilder(Stores.windowStoreBuilder(
storeSupplier,
keySerde,
valueSerde
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 09efdb78006..2c75167f019 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -590,7 +590,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K,
V> implements KTable<
final ProcessorGraphNode<K, Change<V>> node = new TableSuppressNode<>(
name,
new ProcessorParameters<>(suppressionSupplier, name),
- new StoreBuilderWrapper(storeBuilder)
+ StoreBuilderWrapper.wrapStoreBuilder(storeBuilder)
);
node.setOutputVersioned(false);
@@ -1227,10 +1227,7 @@ public class KTableImpl<K, S, V> extends
AbstractStream<K, V> implements KTable<
materializedInternal.withKeySerde(keySerde);
}
- final KTableSource<K, VR> resultProcessorSupplier = new KTableSource<>(
- materializedInternal.storeName(),
- materializedInternal.queryableStoreName()
- );
+ final KTableSource<K, VR> resultProcessorSupplier = new
KTableSource<>(materializedInternal);
final StoreFactory resultStore =
new KeyValueStoreMaterializer<>(materializedInternal);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index b29a4fa51f1..e41f2bf06dd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -17,12 +17,16 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
@@ -30,6 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
+import java.util.Set;
import static
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static
org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
@@ -40,15 +45,17 @@ public class KTableSource<KIn, VIn> implements
ProcessorSupplier<KIn, VIn, KIn,
private static final Logger LOG =
LoggerFactory.getLogger(KTableSource.class);
private final String storeName;
+ private final StoreFactory storeFactory;
private String queryableName;
private boolean sendOldValues;
- public KTableSource(final String storeName, final String queryableName) {
+ public KTableSource(
+ final MaterializedInternal<KIn, VIn, KeyValueStore<Bytes, byte[]>>
materialized) {
+ this.storeName = materialized.storeName();
Objects.requireNonNull(storeName, "storeName can't be null");
-
- this.storeName = storeName;
- this.queryableName = queryableName;
+ this.queryableName = materialized.queryableStoreName();
this.sendOldValues = false;
+ this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
}
public String queryableName() {
@@ -60,6 +67,15 @@ public class KTableSource<KIn, VIn> implements
ProcessorSupplier<KIn, VIn, KIn,
return new KTableSourceProcessor();
}
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ if (materialized()) {
+ return Set.of(new
StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory));
+ } else {
+ return null;
+ }
+ }
+
// when source ktable requires sending old values, we just
// need to set the queryable name as the store name to enforce
materialization
public void enableSendingOldValues() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index cf6ce76f8d5..d6cd130ba6d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -39,12 +39,19 @@ public final class MaterializedInternal<K, V, S extends
StateStore> extends Mate
public MaterializedInternal(final Materialized<K, V, S> materialized,
final InternalNameProvider nameProvider,
final String generatedStorePrefix) {
+ this(materialized, nameProvider, generatedStorePrefix, false);
+ }
+
+ public MaterializedInternal(final Materialized<K, V, S> materialized,
+ final InternalNameProvider nameProvider,
+ final String generatedStorePrefix,
+ final boolean forceQueryable) {
super(materialized);
// if storeName is not provided, the corresponding KTable would never
be queryable;
// but we still need to provide an internal name for it in case we
materialize.
- queryable = storeName() != null;
- if (!queryable && nameProvider != null) {
+ queryable = forceQueryable || storeName() != null;
+ if (storeName() == null && nameProvider != null) {
storeName = nameProvider.newStoreName(generatedStorePrefix);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
index df6e7c263e6..a9093ad4770 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
@@ -20,8 +20,11 @@ import
org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import
org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier;
import org.apache.kafka.streams.processor.internals.StoreFactory;
+import java.util.Set;
+
public class GlobalStoreNode<KIn, VIn, S extends StateStore> extends
StateStoreNode<S> {
private final String sourceName;
@@ -52,15 +55,16 @@ public class GlobalStoreNode<KIn, VIn, S extends
StateStore> extends StateStoreN
@Override
public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
storeBuilder.withLoggingDisabled();
- topologyBuilder.addGlobalStore(storeBuilder,
- sourceName,
+ topologyBuilder.addGlobalStore(sourceName,
consumed.timestampExtractor(),
consumed.keyDeserializer(),
consumed.valueDeserializer(),
topic,
processorName,
- stateUpdateSupplier,
- reprocessOnRestore);
+ new StoreDelegatingProcessorSupplier<>(
+ stateUpdateSupplier,
+ Set.of(new
StoreFactory.FactoryWrappingStoreBuilder<>(storeBuilder))
+ ), reprocessOnRestore);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index f0f8e0dcb4a..5e776a5c733 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -17,15 +17,10 @@
package org.apache.kafka.streams.kstream.internals.graph;
-import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.KTableSource;
-import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
-import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
-import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Collections;
import java.util.Iterator;
@@ -36,7 +31,6 @@ import java.util.Iterator;
*/
public class TableSourceNode<K, V> extends SourceGraphNode<K, V> {
- private final MaterializedInternal<K, V, ?> materializedInternal;
private final ProcessorParameters<K, V, ?, ?> processorParameters;
private final String sourceName;
private final boolean isGlobalKTable;
@@ -46,7 +40,6 @@ public class TableSourceNode<K, V> extends SourceGraphNode<K,
V> {
final String sourceName,
final String topic,
final ConsumedInternal<K, V> consumedInternal,
- final MaterializedInternal<K, V, ?>
materializedInternal,
final ProcessorParameters<K, V, ?, ?>
processorParameters,
final boolean isGlobalKTable) {
@@ -57,7 +50,6 @@ public class TableSourceNode<K, V> extends SourceGraphNode<K,
V> {
this.sourceName = sourceName;
this.isGlobalKTable = isGlobalKTable;
this.processorParameters = processorParameters;
- this.materializedInternal = materializedInternal;
}
@@ -68,7 +60,6 @@ public class TableSourceNode<K, V> extends SourceGraphNode<K,
V> {
@Override
public String toString() {
return "TableSourceNode{" +
- "materializedInternal=" + materializedInternal +
", processorParameters=" + processorParameters +
", sourceName='" + sourceName + '\'' +
", isGlobalKTable=" + isGlobalKTable +
@@ -93,12 +84,8 @@ public class TableSourceNode<K, V> extends
SourceGraphNode<K, V> {
throw new IllegalStateException("A table source node must have a
single topic as input");
}
- final StoreFactory storeFactory =
- new KeyValueStoreMaterializer<>((MaterializedInternal<K, V,
KeyValueStore<Bytes, byte[]>>) materializedInternal);
-
if (isGlobalKTable) {
topologyBuilder.addGlobalStore(
- storeFactory,
sourceName,
consumedInternal().timestampExtractor(),
consumedInternal().keyDeserializer(),
@@ -116,16 +103,16 @@ public class TableSourceNode<K, V> extends
SourceGraphNode<K, V> {
consumedInternal().valueDeserializer(),
topicName);
- topologyBuilder.addProcessor(processorParameters.processorName(),
processorParameters.processorSupplier(), sourceName);
+ processorParameters.addProcessorTo(topologyBuilder, new String[]
{sourceName});
- // only add state store if the source KTable should be materialized
+ // if the KTableSource should not be materialized, stores will be
null or empty
final KTableSource<K, V> tableSource = (KTableSource<K, V>)
processorParameters.processorSupplier();
- if (tableSource.materialized()) {
- topologyBuilder.addStateStore(storeFactory, nodeName());
-
+ if (tableSource.stores() != null) {
if (shouldReuseSourceTopicForChangelog) {
- storeFactory.withLoggingDisabled();
-
topologyBuilder.connectSourceStoreAndTopic(storeFactory.name(), topicName);
+ tableSource.stores().forEach(store -> {
+ store.withLoggingDisabled();
+
topologyBuilder.connectSourceStoreAndTopic(store.name(), topicName);
+ });
}
}
}
@@ -138,7 +125,6 @@ public class TableSourceNode<K, V> extends
SourceGraphNode<K, V> {
private String sourceName;
private String topic;
private ConsumedInternal<K, V> consumedInternal;
- private MaterializedInternal<K, V, ?> materializedInternal;
private ProcessorParameters<K, V, ?, ?> processorParameters;
private boolean isGlobalKTable = false;
@@ -155,11 +141,6 @@ public class TableSourceNode<K, V> extends
SourceGraphNode<K, V> {
return this;
}
- public TableSourceNodeBuilder<K, V> withMaterializedInternal(final
MaterializedInternal<K, V, ?> materializedInternal) {
- this.materializedInternal = materializedInternal;
- return this;
- }
-
public TableSourceNodeBuilder<K, V> withConsumedInternal(final
ConsumedInternal<K, V> consumedInternal) {
this.consumedInternal = consumedInternal;
return this;
@@ -185,7 +166,6 @@ public class TableSourceNode<K, V> extends
SourceGraphNode<K, V> {
sourceName,
topic,
consumedInternal,
- materializedInternal,
processorParameters,
isGlobalKTable);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 2c5e798b62d..9f65a415d95 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyConfig;
+import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.StateStore;
@@ -607,7 +608,7 @@ public class InternalTopologyBuilder {
public final void addStateStore(final StoreBuilder<?> storeBuilder,
final String... processorNames) {
- addStateStore(new StoreBuilderWrapper(storeBuilder), false,
processorNames);
+ addStateStore(StoreBuilderWrapper.wrapStoreBuilder(storeBuilder),
false, processorNames);
}
public final void addStateStore(final StoreFactory storeFactory,
@@ -638,8 +639,7 @@ public class InternalTopologyBuilder {
nodeGroups = null;
}
- public final <KIn, VIn> void addGlobalStore(final StoreFactory
storeFactory,
- final String sourceName,
+ public final <KIn, VIn> void addGlobalStore(final String sourceName,
final TimestampExtractor
timestampExtractor,
final Deserializer<KIn>
keyDeserializer,
final Deserializer<VIn>
valueDeserializer,
@@ -647,8 +647,15 @@ public class InternalTopologyBuilder {
final String processorName,
final ProcessorSupplier<KIn,
VIn, Void, Void> stateUpdateSupplier,
final boolean
reprocessOnRestore) {
- Objects.requireNonNull(storeFactory, "store builder must not be null");
ApiUtils.checkSupplier(stateUpdateSupplier);
+ final Set<StoreBuilder<?>> stores = stateUpdateSupplier.stores();
+ if (stores == null || stores.size() != 1) {
+ throw new IllegalArgumentException(
+ "Global stores must pass in suppliers with exactly one
store but got " +
+ (stores != null ? stores.size() : 0));
+ }
+ final StoreFactory storeFactory =
+ StoreBuilderWrapper.wrapStoreBuilder(stores.iterator().next());
validateGlobalStoreArguments(sourceName,
topic,
processorName,
@@ -2105,8 +2112,8 @@ public class InternalTopologyBuilder {
private static final SubtopologyComparator SUBTOPOLOGY_COMPARATOR = new
SubtopologyComparator();
public static final class TopologyDescription implements
org.apache.kafka.streams.TopologyDescription {
- private final TreeSet<TopologyDescription.Subtopology> subtopologies =
new TreeSet<>(SUBTOPOLOGY_COMPARATOR);
- private final TreeSet<TopologyDescription.GlobalStore> globalStores =
new TreeSet<>(GLOBALSTORE_COMPARATOR);
+ private final TreeSet<Subtopology> subtopologies = new
TreeSet<>(SUBTOPOLOGY_COMPARATOR);
+ private final TreeSet<GlobalStore> globalStores = new
TreeSet<>(GLOBALSTORE_COMPARATOR);
private final String namedTopology;
public TopologyDescription() {
@@ -2117,21 +2124,21 @@ public class InternalTopologyBuilder {
this.namedTopology = namedTopology;
}
- public void addSubtopology(final TopologyDescription.Subtopology
subtopology) {
+ public void addSubtopology(final Subtopology subtopology) {
subtopologies.add(subtopology);
}
- public void addGlobalStore(final TopologyDescription.GlobalStore
globalStore) {
+ public void addGlobalStore(final GlobalStore globalStore) {
globalStores.add(globalStore);
}
@Override
- public Set<TopologyDescription.Subtopology> subtopologies() {
+ public Set<Subtopology> subtopologies() {
return Collections.unmodifiableSet(subtopologies);
}
@Override
- public Set<TopologyDescription.GlobalStore> globalStores() {
+ public Set<GlobalStore> globalStores() {
return Collections.unmodifiableSet(globalStores);
}
@@ -2144,17 +2151,17 @@ public class InternalTopologyBuilder {
} else {
sb.append("Topology: ").append(namedTopology).append(":\n ");
}
- final TopologyDescription.Subtopology[] sortedSubtopologies =
- subtopologies.descendingSet().toArray(new
TopologyDescription.Subtopology[0]);
- final TopologyDescription.GlobalStore[] sortedGlobalStores =
+ final Subtopology[] sortedSubtopologies =
+ subtopologies.descendingSet().toArray(new Subtopology[0]);
+ final GlobalStore[] sortedGlobalStores =
globalStores.descendingSet().toArray(new GlobalStore[0]);
int expectedId = 0;
int subtopologiesIndex = sortedSubtopologies.length - 1;
int globalStoresIndex = sortedGlobalStores.length - 1;
while (subtopologiesIndex != -1 && globalStoresIndex != -1) {
sb.append(" ");
- final TopologyDescription.Subtopology subtopology =
sortedSubtopologies[subtopologiesIndex];
- final TopologyDescription.GlobalStore globalStore =
sortedGlobalStores[globalStoresIndex];
+ final Subtopology subtopology =
sortedSubtopologies[subtopologiesIndex];
+ final GlobalStore globalStore =
sortedGlobalStores[globalStoresIndex];
if (subtopology.id() == expectedId) {
sb.append(subtopology);
subtopologiesIndex--;
@@ -2165,13 +2172,13 @@ public class InternalTopologyBuilder {
expectedId++;
}
while (subtopologiesIndex != -1) {
- final TopologyDescription.Subtopology subtopology =
sortedSubtopologies[subtopologiesIndex];
+ final Subtopology subtopology =
sortedSubtopologies[subtopologiesIndex];
sb.append(" ");
sb.append(subtopology);
subtopologiesIndex--;
}
while (globalStoresIndex != -1) {
- final TopologyDescription.GlobalStore globalStore =
sortedGlobalStores[globalStoresIndex];
+ final GlobalStore globalStore =
sortedGlobalStores[globalStoresIndex];
sb.append(" ");
sb.append(globalStore);
globalStoresIndex--;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
index b8522b8e2cd..4648533af1d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
@@ -38,7 +38,15 @@ public class StoreBuilderWrapper implements StoreFactory {
private final StoreBuilder<?> builder;
private final Set<String> connectedProcessorNames = new HashSet<>();
- public StoreBuilderWrapper(final StoreBuilder<?> builder) {
+ public static StoreFactory wrapStoreBuilder(final StoreBuilder<?> builder)
{
+ if (builder instanceof FactoryWrappingStoreBuilder) {
+ return ((FactoryWrappingStoreBuilder<?>) builder).storeFactory();
+ } else {
+ return new StoreBuilderWrapper(builder);
+ }
+ }
+
+ private StoreBuilderWrapper(final StoreBuilder<?> builder) {
this.builder = builder;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreDelegatingProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreDelegatingProcessorSupplier.java
new file mode 100644
index 00000000000..cce8281e15e
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreDelegatingProcessorSupplier.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.state.StoreBuilder;
+
+import java.util.Set;
+
+public class StoreDelegatingProcessorSupplier<KIn, VIn, KOut, VOut> implements
ProcessorSupplier<KIn, VIn, KOut, VOut> {
+
+ private final ProcessorSupplier<KIn, VIn, KOut, VOut> delegate;
+ private final Set<StoreBuilder<?>> stores;
+
+ public StoreDelegatingProcessorSupplier(
+ final ProcessorSupplier<KIn, VIn, KOut, VOut> delegate,
+ final Set<StoreBuilder<?>> stores
+ ) {
+ this.delegate = delegate;
+ this.stores = stores;
+ }
+
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ return stores;
+ }
+
+ @Override
+ public Processor<KIn, VIn, KOut, VOut> get() {
+ return delegate.get();
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java
index b05c334c27f..7542f4c5bd8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Map;
import java.util.Set;
@@ -75,4 +76,79 @@ public interface StoreFactory {
boolean isCompatibleWith(StoreFactory storeFactory);
+ class FactoryWrappingStoreBuilder<T extends StateStore> implements
StoreBuilder<T> {
+
+ private final StoreFactory storeFactory;
+
+ public FactoryWrappingStoreBuilder(final StoreFactory storeFactory) {
+ this.storeFactory = storeFactory;
+ }
+
+ public StoreFactory storeFactory() {
+ return storeFactory;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final FactoryWrappingStoreBuilder<?> that =
(FactoryWrappingStoreBuilder<?>) o;
+
+ return storeFactory.isCompatibleWith(that.storeFactory);
+ }
+
+ @Override
+ public int hashCode() {
+ return storeFactory.hashCode();
+ }
+
+ @Override
+ public StoreBuilder<T> withCachingEnabled() {
+ throw new IllegalStateException("Should not try to modify
StoreBuilder wrapper");
+ }
+
+ @Override
+ public StoreBuilder<T> withCachingDisabled() {
+ storeFactory.withCachingDisabled();
+ return this;
+ }
+
+ @Override
+ public StoreBuilder<T> withLoggingEnabled(final Map<String, String>
config) {
+ throw new IllegalStateException("Should not try to modify
StoreBuilder wrapper");
+ }
+
+ @Override
+ public StoreBuilder<T> withLoggingDisabled() {
+ storeFactory.withLoggingDisabled();
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T build() {
+ return (T) storeFactory.build();
+ }
+
+ @Override
+ public Map<String, String> logConfig() {
+ return storeFactory.logConfig();
+ }
+
+ @Override
+ public boolean loggingEnabled() {
+ return storeFactory.loggingEnabled();
+ }
+
+ @Override
+ public String name() {
+ return storeFactory.name();
+ }
+ }
+
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index fc601d5b737..5210dd0b3c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -55,7 +55,7 @@ import
org.apache.kafka.streams.state.internals.InMemoryWindowStore;
import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
-import org.apache.kafka.streams.utils.TestUtils.CountingProcessorWrapper;
+import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockPredicate;
@@ -65,6 +65,7 @@ import org.apache.kafka.test.NoopValueTransformerWithKey;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
+import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -75,12 +76,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import static java.util.Arrays.asList;
@@ -1417,10 +1418,10 @@ public class StreamsBuilderTest {
@Test
public void shouldWrapProcessorsForProcess() {
final Map<Object, Object> props = dummyStreamsConfigMap();
- props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
CountingProcessorWrapper.class);
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
- final AtomicInteger wrappedProcessorCount = new AtomicInteger();
- props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessorCount);
+ final Set<String> wrappedProcessors = Collections.synchronizedSet(new
HashSet<>());
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors);
final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
@@ -1430,56 +1431,84 @@ public class StreamsBuilderTest {
final Random random = new Random();
builder.stream("input")
- .process((ProcessorSupplier<Object, Object, Object, Object>) () ->
record -> System.out.println("Processing: " + random.nextInt()))
- .processValues(() -> record -> System.out.println("Processing: " +
random.nextInt()))
+ .process((ProcessorSupplier<Object, Object, Object, Object>) () ->
record -> System.out.println("Processing: " + random.nextInt()),
Named.as("processor1"))
+ .processValues(() -> record -> System.out.println("Processing: " +
random.nextInt()), Named.as("processor2"))
.to("output");
builder.build();
- assertThat(wrappedProcessorCount.get(), CoreMatchers.is(2));
+ assertThat(wrappedProcessors.size(), CoreMatchers.is(2));
+ assertThat(wrappedProcessors,
Matchers.containsInAnyOrder("processor1", "processor2"));
}
@Test
public void shouldWrapProcessorsForAggregationOperators() {
final Map<Object, Object> props = dummyStreamsConfigMap();
- props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
CountingProcessorWrapper.class);
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
- final AtomicInteger wrappedProcessorCount = new AtomicInteger();
- props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessorCount);
+ final Set<String> wrappedProcessors = Collections.synchronizedSet(new
HashSet<>());
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors);
final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
builder.stream("input")
.groupByKey()
- .count() // wrapped 1
- .toStream()// wrapped 2
+ .count(Named.as("count")) // wrapped 1
+ .toStream(Named.as("toStream"))// wrapped 2
.to("output");
builder.build();
- assertThat(wrappedProcessorCount.get(), CoreMatchers.is(2));
+ assertThat(wrappedProcessors.size(), CoreMatchers.is(2));
+ assertThat(wrappedProcessors, Matchers.containsInAnyOrder("count",
"toStream"));
}
@Test
public void shouldWrapProcessorsForStatelessOperators() {
final Map<Object, Object> props = dummyStreamsConfigMap();
- props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
CountingProcessorWrapper.class);
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
- final AtomicInteger wrappedProcessorCount = new AtomicInteger();
- props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessorCount);
+ final Set<String> wrappedProcessors = Collections.synchronizedSet(new
HashSet<>());
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors);
final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
builder.stream("input")
- .filter((k, v) -> true) // wrapped 1
- .map(KeyValue::new) // wrapped 2
- .selectKey((k, v) -> k) // wrapped 3
- .peek((k, v) -> { }) // wrapped 4
- .flatMapValues(e -> new ArrayList<>()) // wrapped 5
- .toTable() // wrapped 6
- .toStream() // wrapped 7
+ .filter((k, v) -> true, Named.as("filter")) // wrapped 1
+ .map(KeyValue::new, Named.as("map")) // wrapped 2
+ .selectKey((k, v) -> k, Named.as("selectKey")) // wrapped 3
+ .peek((k, v) -> { }, Named.as("peek")) // wrapped 4
+ .flatMapValues(e -> new ArrayList<>(), Named.as("flatMap")) //
wrapped 5
+ .toTable(Named.as("toTable")) // wrapped 6 (note named as
toTable-repartition-filter)
+ .toStream(Named.as("toStream")) // wrapped 7
.to("output");
builder.build();
- assertThat(wrappedProcessorCount.get(), CoreMatchers.is(7));
+ assertThat(wrappedProcessors.size(), CoreMatchers.is(7));
+ assertThat(wrappedProcessors, Matchers.containsInAnyOrder(
+ "filter", "map", "selectKey", "peek", "flatMap",
"toTable-repartition-filter",
+ "toStream"
+ ));
+ }
+
+ @Test
+ public void shouldWrapProcessorsForTableSource() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final Set<String> wrappedProcessors = Collections.synchronizedSet(new
HashSet<>());
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ builder.table("input") // wrapped 1 (named KTABLE_SOURCE-0000000002)
+ .toStream(Named.as("toStream")) // wrapped 2
+ .to("output");
+
+ builder.build();
+ assertThat(wrappedProcessors.size(), CoreMatchers.is(2));
+ assertThat(wrappedProcessors, Matchers.containsInAnyOrder(
+ "KTABLE-SOURCE-0000000002",
+ "toStream"
+ ));
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index b001c98868f..4467e252b92 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -42,7 +42,7 @@ import
org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper;
import org.apache.kafka.streams.processor.internals.RecordCollectorTest;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
-import org.apache.kafka.streams.utils.TestUtils.CountingProcessorWrapper;
+import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper;
import org.apache.log4j.Level;
import org.junit.jupiter.api.BeforeEach;
@@ -1230,13 +1230,13 @@ public class StreamsConfigTest {
@Test
public void shouldAllowConfiguringProcessorWrapperWithClass() {
- props.put(StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG,
CountingProcessorWrapper.class);
+ props.put(StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
new StreamsConfig(props);
}
@Test
public void shouldAllowConfiguringProcessorWrapperWithClassName() {
- props.put(StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG,
CountingProcessorWrapper.class.getName());
+ props.put(StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class.getName());
new StreamsConfig(props);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 461aa1a2921..833eada8d5e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -45,13 +45,14 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
-import org.apache.kafka.streams.utils.TestUtils.CountingProcessorWrapper;
+import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
+import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -71,7 +72,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import static java.time.Duration.ofMillis;
@@ -2425,10 +2425,10 @@ public class TopologyTest {
@Test
public void shouldWrapProcessors() {
final Map<Object, Object> props = dummyStreamsConfigMap();
- props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
CountingProcessorWrapper.class);
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
- final AtomicInteger wrappedProcessorCount = new AtomicInteger();
- props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessorCount);
+ final Set<String> wrappedProcessors = Collections.synchronizedSet(new
HashSet<>());
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors);
final Topology topology = new Topology(new TopologyConfig(new
StreamsConfig(props)));
@@ -2453,7 +2453,8 @@ public class TopologyTest {
() -> (Processor<Object, Object, Object, Object>) record ->
System.out.println("Processing: " + random.nextInt()),
"p2"
);
- assertThat(wrappedProcessorCount.get(), is(3));
+ assertThat(wrappedProcessors.size(), is(3));
+ assertThat(wrappedProcessors, Matchers.containsInAnyOrder("p1", "p2",
"p3"));
}
@SuppressWarnings("deprecation")
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
index 2988e14e720..bf70d476839 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
@@ -16,23 +16,29 @@
*/
package org.apache.kafka.streams.kstream.internals.graph;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import
org.apache.kafka.streams.kstream.internals.graph.TableSourceNode.TableSourceNodeBuilder;
+import org.apache.kafka.streams.processor.api.ProcessorWrapper;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
@@ -43,6 +49,12 @@ public class TableSourceNodeTest {
private InternalTopologyBuilder topologyBuilder =
mock(InternalTopologyBuilder.class);
+ @BeforeEach
+ public void before() {
+ when(topologyBuilder.wrapProcessorSupplier(any(), any()))
+ .thenAnswer(iom ->
ProcessorWrapper.asWrapped(iom.getArgument(1)));
+ }
+
@Test
public void
shouldConnectStateStoreToInputTopicIfInputTopicIsUsedAsChangelog() {
final boolean shouldReuseSourceTopicForChangelog = true;
@@ -59,12 +71,13 @@ public class TableSourceNodeTest {
private void buildTableSourceNode(final boolean
shouldReuseSourceTopicForChangelog) {
final TableSourceNodeBuilder<String, String> tableSourceNodeBuilder =
TableSourceNode.tableSourceNodeBuilder();
+ final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>>
+ materializedInternal = new
MaterializedInternal<>(Materialized.as(STORE_NAME));
final TableSourceNode<String, String> tableSourceNode =
tableSourceNodeBuilder
.withTopic(TOPIC)
- .withMaterializedInternal(new
MaterializedInternal<>(Materialized.as(STORE_NAME)))
.withConsumedInternal(new
ConsumedInternal<>(Consumed.as("node-name")))
.withProcessorParameters(
- new ProcessorParameters<>(new KTableSource<>(STORE_NAME,
STORE_NAME), null))
+ new ProcessorParameters<>(new
KTableSource<>(materializedInternal), null))
.build();
tableSourceNode.reuseSourceTopicForChangeLog(shouldReuseSourceTopicForChangelog);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 244a246bd20..e4f78c900d1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -41,6 +41,7 @@ import
org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.TestUtils;
@@ -108,15 +109,17 @@ public class GlobalStreamThreadTest {
}
};
+ final StoreFactory storeFactory =
+ new
KeyValueStoreMaterializer<>(materialized).withLoggingDisabled();
+ final StoreBuilder<?> storeBuilder = new
StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory);
builder.addGlobalStore(
- new
KeyValueStoreMaterializer<>(materialized).withLoggingDisabled(),
"sourceName",
null,
null,
null,
GLOBAL_STORE_TOPIC_NAME,
"processorName",
- processorSupplier,
+ new StoreDelegatingProcessorSupplier<>(processorSupplier,
Set.of(storeBuilder)),
false
);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 9d46569c27b..e3add9755ae 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -95,7 +95,8 @@ public class InternalTopologyBuilderTest {
private final Serde<String> stringSerde = Serdes.String();
private final InternalTopologyBuilder builder = new
InternalTopologyBuilder();
- private final StoreFactory storeBuilder = new
MockKeyValueStoreBuilder("testStore", false).asFactory();
+ private final StoreBuilder<?> storeBuilder = new
MockKeyValueStoreBuilder("testStore", false);
+ private final StoreFactory storeFactory = new
MockKeyValueStoreBuilder("testStore", false).asFactory();
@Test
public void shouldAddSourceWithOffsetReset() {
@@ -225,7 +226,6 @@ public class InternalTopologyBuilderTest {
final IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> builder.addGlobalStore(
- new MockKeyValueStoreBuilder("global-store",
false).asFactory().withLoggingDisabled(),
"globalSource",
null,
null,
@@ -331,18 +331,20 @@ public class InternalTopologyBuilderTest {
@Test
public void testPatternSourceTopicsWithGlobalTopics() {
+ final StoreBuilder<?> storeBuilder =
+ new MockKeyValueStoreBuilder("global-store", false)
+ .withLoggingDisabled();
builder.setApplicationId("X");
builder.addSource(null, "source-1", null, null, null,
Pattern.compile("topic-1"));
builder.addSource(null, "source-2", null, null, null,
Pattern.compile("topic-2"));
builder.addGlobalStore(
- new MockKeyValueStoreBuilder("global-store",
false).asFactory().withLoggingDisabled(),
"globalSource",
null,
null,
null,
"globalTopic",
"global-processor",
- new MockApiProcessorSupplier<>(),
+ new StoreDelegatingProcessorSupplier<>(new
MockApiProcessorSupplier<>(), Set.of(storeBuilder)),
false
);
builder.initializeSubscription();
@@ -356,18 +358,20 @@ public class InternalTopologyBuilderTest {
@Test
public void testNameSourceTopicsWithGlobalTopics() {
+ final StoreBuilder<?> storeBuilder =
+ new MockKeyValueStoreBuilder("global-store", false)
+ .withLoggingDisabled();
builder.setApplicationId("X");
builder.addSource(null, "source-1", null, null, null, "topic-1");
builder.addSource(null, "source-2", null, null, null, "topic-2");
builder.addGlobalStore(
- new MockKeyValueStoreBuilder("global-store",
false).asFactory().withLoggingDisabled(),
"globalSource",
null,
null,
null,
"globalTopic",
"global-processor",
- new MockApiProcessorSupplier<>(),
+ new StoreDelegatingProcessorSupplier<>(new
MockApiProcessorSupplier<>(), Set.of(storeBuilder)),
false
);
builder.initializeSubscription();
@@ -427,14 +431,14 @@ public class InternalTopologyBuilderTest {
@Test
public void testAddStateStoreWithNonExistingProcessor() {
- assertThrows(TopologyException.class, () ->
builder.addStateStore(storeBuilder, "no-such-processor"));
+ assertThrows(TopologyException.class, () ->
builder.addStateStore(storeFactory, "no-such-processor"));
}
@Test
public void testAddStateStoreWithSource() {
builder.addSource(null, "source-1", null, null, null, "topic-1");
try {
- builder.addStateStore(storeBuilder, "source-1");
+ builder.addStateStore(storeFactory, "source-1");
fail("Should throw TopologyException with store cannot be added to
source");
} catch (final TopologyException expected) { /* ok */ }
}
@@ -444,7 +448,7 @@ public class InternalTopologyBuilderTest {
builder.addSource(null, "source-1", null, null, null, "topic-1");
builder.addSink("sink-1", "topic-1", null, null, null, "source-1");
try {
- builder.addStateStore(storeBuilder, "sink-1");
+ builder.addStateStore(storeFactory, "sink-1");
fail("Should throw TopologyException with store cannot be added to
sink");
} catch (final TopologyException expected) { /* ok */ }
}
@@ -454,7 +458,7 @@ public class InternalTopologyBuilderTest {
final StoreBuilder<KeyValueStore<Object, Object>> otherBuilder =
new MockKeyValueStoreBuilder("testStore", false);
- builder.addStateStore(storeBuilder);
+ builder.addStateStore(storeFactory);
final TopologyException exception = assertThrows(
TopologyException.class,
@@ -469,24 +473,23 @@ public class InternalTopologyBuilderTest {
@Test
public void shouldNotAllowToAddStoresWithSameNameWhenFirstStoreIsGlobal() {
- final StoreFactory globalBuilder =
- new MockKeyValueStoreBuilder("testStore",
false).asFactory().withLoggingDisabled();
+ final StoreBuilder<?> globalBuilder =
+ new MockKeyValueStoreBuilder("testStore",
false).withLoggingDisabled();
builder.addGlobalStore(
- globalBuilder,
"global-store",
null,
null,
null,
"global-topic",
"global-processor",
- new MockApiProcessorSupplier<>(),
+ new StoreDelegatingProcessorSupplier<>(new
MockApiProcessorSupplier<>(), Set.of(globalBuilder)),
false
);
final TopologyException exception = assertThrows(
TopologyException.class,
- () -> builder.addStateStore(storeBuilder)
+ () -> builder.addStateStore(storeFactory)
);
assertThat(
@@ -497,22 +500,21 @@ public class InternalTopologyBuilderTest {
@Test
public void shouldNotAllowToAddStoresWithSameNameWhenSecondStoreIsGlobal()
{
- final StoreFactory globalBuilder =
- new MockKeyValueStoreBuilder("testStore",
false).asFactory().withLoggingDisabled();
+ final StoreBuilder<?> globalBuilder =
+ new MockKeyValueStoreBuilder("testStore",
false).withLoggingDisabled();
- builder.addStateStore(storeBuilder);
+ builder.addStateStore(storeFactory);
final TopologyException exception = assertThrows(
TopologyException.class,
() -> builder.addGlobalStore(
- globalBuilder,
"global-store",
null,
null,
null,
"global-topic",
"global-processor",
- new MockApiProcessorSupplier<>(),
+ new StoreDelegatingProcessorSupplier<>(new
MockApiProcessorSupplier<>(), Set.of(globalBuilder)),
false
)
);
@@ -525,34 +527,32 @@ public class InternalTopologyBuilderTest {
@Test
public void shouldNotAllowToAddGlobalStoresWithSameName() {
- final StoreFactory firstGlobalBuilder =
- new MockKeyValueStoreBuilder("testStore",
false).asFactory().withLoggingDisabled();
- final StoreFactory secondGlobalBuilder =
- new MockKeyValueStoreBuilder("testStore",
false).asFactory().withLoggingDisabled();
+ final StoreBuilder<KeyValueStore<Object, Object>> firstGlobalBuilder =
+ new MockKeyValueStoreBuilder("testStore",
false).withLoggingDisabled();
+ final StoreBuilder<KeyValueStore<Object, Object>> secondGlobalBuilder =
+ new MockKeyValueStoreBuilder("testStore",
false).withLoggingDisabled();
builder.addGlobalStore(
- firstGlobalBuilder,
"global-store",
null,
null,
null,
"global-topic",
"global-processor",
- new MockApiProcessorSupplier<>(),
+ new StoreDelegatingProcessorSupplier<>(new
MockApiProcessorSupplier<>(), Set.of(firstGlobalBuilder)),
false
);
final TopologyException exception = assertThrows(
TopologyException.class,
() -> builder.addGlobalStore(
- secondGlobalBuilder,
"global-store-2",
null,
null,
null,
"global-topic",
"global-processor-2",
- new MockApiProcessorSupplier<>(),
+ new StoreDelegatingProcessorSupplier<>(new
MockApiProcessorSupplier<>(), Set.of(secondGlobalBuilder)),
false
)
);
@@ -565,35 +565,35 @@ public class InternalTopologyBuilderTest {
@Test
public void testAddStateStore() {
- builder.addStateStore(storeBuilder);
+ builder.addStateStore(storeFactory);
builder.setApplicationId("X");
builder.addSource(null, "source-1", null, null, null, "topic-1");
builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(),
"source-1");
assertEquals(0, builder.buildTopology().stateStores().size());
- builder.connectProcessorAndStateStores("processor-1",
storeBuilder.name());
+ builder.connectProcessorAndStateStores("processor-1",
storeFactory.name());
final List<StateStore> suppliers =
builder.buildTopology().stateStores();
assertEquals(1, suppliers.size());
- assertEquals(storeBuilder.name(), suppliers.get(0).name());
+ assertEquals(storeFactory.name(), suppliers.get(0).name());
}
@Test
public void testStateStoreNamesForSubtopology() {
- builder.addStateStore(storeBuilder);
+ builder.addStateStore(storeFactory);
builder.setApplicationId("X");
builder.addSource(null, "source-1", null, null, null, "topic-1");
builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(),
"source-1");
- builder.connectProcessorAndStateStores("processor-1",
storeBuilder.name());
+ builder.connectProcessorAndStateStores("processor-1",
storeFactory.name());
builder.addSource(null, "source-2", null, null, null, "topic-2");
builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(),
"source-2");
builder.buildTopology();
final Set<String> stateStoreNames =
builder.stateStoreNamesForSubtopology(0);
- assertThat(stateStoreNames, equalTo(Set.of(storeBuilder.name())));
+ assertThat(stateStoreNames, equalTo(Set.of(storeFactory.name())));
final Set<String> emptyStoreNames =
builder.stateStoreNamesForSubtopology(1);
assertThat(emptyStoreNames, equalTo(Set.of()));
@@ -607,13 +607,13 @@ public class InternalTopologyBuilderTest {
builder.setApplicationId("X");
builder.addSource(null, "source-1", null, null, null, "topic-1");
- builder.addStateStore(storeBuilder);
+ builder.addStateStore(storeFactory);
builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(),
"source-1");
- builder.connectProcessorAndStateStores("processor-1",
storeBuilder.name());
+ builder.connectProcessorAndStateStores("processor-1",
storeFactory.name());
- builder.addStateStore(storeBuilder);
+ builder.addStateStore(storeFactory);
builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(),
"source-1");
- builder.connectProcessorAndStateStores("processor-2",
storeBuilder.name());
+ builder.connectProcessorAndStateStores("processor-2",
storeFactory.name());
assertEquals(1, builder.buildTopology().stateStores().size());
}
@@ -763,15 +763,16 @@ public class InternalTopologyBuilderTest {
assertNotEquals(oldNodeGroups, newNodeGroups);
oldNodeGroups = newNodeGroups;
+
+ final StoreBuilder<?> globalBuilder = new
MockKeyValueStoreBuilder("global-store", false).withLoggingDisabled();
builder.addGlobalStore(
- new MockKeyValueStoreBuilder("global-store",
false).asFactory().withLoggingDisabled(),
"globalSource",
null,
null,
null,
"globalTopic",
"global-processor",
- new MockApiProcessorSupplier<>(),
+ new StoreDelegatingProcessorSupplier<>(new
MockApiProcessorSupplier<>(), Set.of(globalBuilder)),
false
);
newNodeGroups = builder.nodeGroups();
@@ -879,7 +880,7 @@ public class InternalTopologyBuilderTest {
public void
shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() {
builder.addSource(null, "source", null, null, null, "topic");
builder.addProcessor("processor", new MockApiProcessorSupplier<>(),
"source");
- builder.addStateStore(storeBuilder, "processor");
+ builder.addStateStore(storeFactory, "processor");
final Map<String, List<String>> stateStoreNameToSourceTopic =
builder.stateStoreNameToFullSourceTopicNames();
assertEquals(1, stateStoreNameToSourceTopic.size());
assertEquals(Collections.singletonList("topic"),
stateStoreNameToSourceTopic.get("testStore"));
@@ -889,7 +890,7 @@ public class InternalTopologyBuilderTest {
public void
shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() {
builder.addSource(null, "source", null, null, null, "topic");
builder.addProcessor("processor", new MockApiProcessorSupplier<>(),
"source");
- builder.addStateStore(storeBuilder, "processor");
+ builder.addStateStore(storeFactory, "processor");
final Map<String, List<String>> stateStoreNameToSourceTopic =
builder.stateStoreNameToFullSourceTopicNames();
assertEquals(1, stateStoreNameToSourceTopic.size());
assertEquals(Collections.singletonList("topic"),
stateStoreNameToSourceTopic.get("testStore"));
@@ -901,7 +902,7 @@ public class InternalTopologyBuilderTest {
builder.addInternalTopic("internal-topic",
InternalTopicProperties.empty());
builder.addSource(null, "source", null, null, null, "internal-topic");
builder.addProcessor("processor", new MockApiProcessorSupplier<>(),
"source");
- builder.addStateStore(storeBuilder, "processor");
+ builder.addStateStore(storeFactory, "processor");
final Map<String, List<String>> stateStoreNameToSourceTopic =
builder.stateStoreNameToFullSourceTopicNames();
assertEquals(1, stateStoreNameToSourceTopic.size());
assertEquals(Collections.singletonList("appId-internal-topic"),
stateStoreNameToSourceTopic.get("testStore"));
@@ -975,7 +976,7 @@ public class InternalTopologyBuilderTest {
builder.setApplicationId("appId");
builder.addSource(null, "source", null, null, null, "topic");
builder.addProcessor("processor", new MockApiProcessorSupplier<>(),
"source");
- builder.addStateStore(storeBuilder, "processor");
+ builder.addStateStore(storeFactory, "processor");
builder.buildTopology();
final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups
= builder.subtopologyToTopicsInfo();
final InternalTopologyBuilder.TopicsInfo topicsInfo =
topicGroups.values().iterator().next();
@@ -1183,7 +1184,7 @@ public class InternalTopologyBuilderTest {
public void shouldConnectRegexMatchedTopicsToStateStore() {
builder.addSource(null, "ingest", null, null, null,
Pattern.compile("topic-\\d+"));
builder.addProcessor("my-processor", new MockApiProcessorSupplier<>(),
"ingest");
- builder.addStateStore(storeBuilder, "my-processor");
+ builder.addStateStore(storeFactory, "my-processor");
final Set<String> updatedTopics = new HashSet<>();
@@ -1195,7 +1196,7 @@ public class InternalTopologyBuilderTest {
builder.setApplicationId("test-app");
final Map<String, List<String>> stateStoreAndTopics =
builder.stateStoreNameToFullSourceTopicNames();
- final List<String> topics =
stateStoreAndTopics.get(storeBuilder.name());
+ final List<String> topics =
stateStoreAndTopics.get(storeFactory.name());
assertEquals(2, topics.size(), "Expected to contain two topics");
@@ -1208,14 +1209,13 @@ public class InternalTopologyBuilderTest {
public void
shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
final String sameNameForSourceAndProcessor = "sameName";
assertThrows(TopologyException.class, () -> builder.addGlobalStore(
- storeBuilder,
sameNameForSourceAndProcessor,
null,
null,
null,
"anyTopicName",
sameNameForSourceAndProcessor,
- new MockApiProcessorSupplier<>(),
+ new StoreDelegatingProcessorSupplier<>(new
MockApiProcessorSupplier<>(), Set.of(storeBuilder)),
false
));
}
@@ -1351,16 +1351,17 @@ public class InternalTopologyBuilderTest {
public void shouldConnectGlobalStateStoreToInputTopic() {
final String globalStoreName = "global-store";
final String globalTopic = "global-topic";
+ final StoreBuilder<?> storeBuilder =
+ new MockKeyValueStoreBuilder(globalStoreName,
false).withLoggingDisabled();
builder.setApplicationId("X");
builder.addGlobalStore(
- new MockKeyValueStoreBuilder(globalStoreName,
false).asFactory().withLoggingDisabled(),
"globalSource",
null,
null,
null,
globalTopic,
"global-processor",
- new MockApiProcessorSupplier<>(),
+ new StoreDelegatingProcessorSupplier<>(new
MockApiProcessorSupplier<>(), Set.of(storeBuilder)),
false
);
builder.initializeSubscription();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
index 0f0bcf90698..24ac2f2306d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
@@ -28,10 +28,12 @@ import org.junit.jupiter.api.TestInfo;
import java.lang.reflect.Method;
import java.time.Duration;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG;
@@ -115,30 +117,30 @@ public class TestUtils {
* To retrieve the current count, pass an instance of AtomicInteger into
the configs
* alongside the wrapper itself. Use the config key defined with {@link
#PROCESSOR_WRAPPER_COUNTER_CONFIG}
*/
- public static class CountingProcessorWrapper implements ProcessorWrapper {
+ public static class RecordingProcessorWrapper implements ProcessorWrapper {
- private AtomicInteger wrappedProcessorCount;
+ private Set<String> wrappedProcessorNames;
@Override
public void configure(final Map<String, ?> configs) {
if (configs.containsKey(PROCESSOR_WRAPPER_COUNTER_CONFIG)) {
- wrappedProcessorCount = (AtomicInteger)
configs.get(PROCESSOR_WRAPPER_COUNTER_CONFIG);
+ wrappedProcessorNames = (Set<String>)
configs.get(PROCESSOR_WRAPPER_COUNTER_CONFIG);
} else {
- wrappedProcessorCount = new AtomicInteger();
+ wrappedProcessorNames = Collections.synchronizedSet(new
HashSet<>());
}
}
@Override
public <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut,
VOut> wrapProcessorSupplier(final String processorName,
final ProcessorSupplier<KIn, VIn, KOut, VOut>
processorSupplier) {
- wrappedProcessorCount.incrementAndGet();
+ wrappedProcessorNames.add(processorName);
return ProcessorWrapper.asWrapped(processorSupplier);
}
@Override
public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn,
VOut> wrapFixedKeyProcessorSupplier(final String processorName,
final FixedKeyProcessorSupplier<KIn, VIn, VOut>
processorSupplier) {
- wrappedProcessorCount.incrementAndGet();
+ wrappedProcessorNames.add(processorName);
return ProcessorWrapper.asWrappedFixedKey(processorSupplier);
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java
b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java
index 2faf89b1622..15c896ad076 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStoreBuilder.java
@@ -39,6 +39,6 @@ public class MockKeyValueStoreBuilder extends
AbstractStoreBuilder<Integer, byte
}
public StoreFactory asFactory() {
- return new StoreBuilderWrapper(this);
+ return StoreBuilderWrapper.wrapStoreBuilder(this);
}
}