This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 184b64fb416 KAFKA-18026: migrate KStream and KTable aggregates to use
ProcesserSupplier#stores (#17929)
184b64fb416 is described below
commit 184b64fb41651f95e038f402fa24f58761f4c185
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Dec 3 02:09:43 2024 -0800
KAFKA-18026: migrate KStream and KTable aggregates to use
ProcesserSupplier#stores (#17929)
As part of KIP-1112, to maximize the utility of the new ProcessorWrapper,
we need to migrate the DSL operators to the new method of attaching state
stores by implementing ProcessorSupplier#stores, which makes these stores
available for inspection by the user's wrapper.
This PR covers the aggregate operator for both KStream and KTable.
Reviewers: Guozhang Wang <[email protected]>, Rohan Desai
<[email protected]>
---
.../internals/CogroupedStreamAggregateBuilder.java | 20 +-
.../kstream/internals/KGroupedStreamImpl.java | 6 +-
.../kstream/internals/KGroupedTableImpl.java | 8 +-
.../kstream/internals/KStreamAggregate.java | 28 +-
.../streams/kstream/internals/KStreamReduce.java | 20 +-
.../internals/KStreamSessionWindowAggregate.java | 16 +-
.../internals/KStreamSlidingWindowAggregate.java | 15 +-
.../kstream/internals/KStreamWindowAggregate.java | 16 +-
.../streams/kstream/internals/KTableAggregate.java | 19 +-
.../streams/kstream/internals/KTableReduce.java | 21 +-
.../internals/SessionWindowedKStreamImpl.java | 18 +-
.../internals/SlidingWindowedKStreamImpl.java | 16 +-
.../kstream/internals/TimeWindowedKStreamImpl.java | 16 +-
.../apache/kafka/streams/StreamsBuilderTest.java | 323 ++++++++++++++++++---
.../org/apache/kafka/streams/TopologyTest.java | 9 +-
...KStreamSessionWindowAggregateProcessorTest.java | 7 +-
.../internals/KStreamWindowAggregateTest.java | 15 +-
.../kstream/internals/KTableReduceTest.java | 3 +-
.../internals/graph/GraphGraceSearchUtilTest.java | 13 +-
.../org/apache/kafka/streams/utils/TestUtils.java | 141 ++++++++-
20 files changed, 614 insertions(+), 116 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
index 161c4a85c52..5b294c39d68 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
@@ -58,14 +58,14 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Serde<VOut> valueSerde,
final String queryableName,
final boolean isOutputVersioned) {
- processRepartitions(groupPatterns, storeFactory);
+ processRepartitions(groupPatterns, storeFactory.name());
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new
ArrayList<>();
boolean stateCreated = false;
int counter = 0;
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K,
Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
- new KStreamAggregate<>(storeFactory.name(), initializer,
kGroupedStream.getValue());
+ new KStreamAggregate<>(storeFactory, initializer,
kGroupedStream.getValue());
parentProcessors.add(parentProcessor);
final StatefulProcessorNode<K, ?> statefulProcessorNode =
getStatefulProcessorNode(
named.suffixWithOrElseGet(
@@ -92,7 +92,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Serde<VOut> valueSerde,
final String queryableName,
final Windows<W> windows) {
- processRepartitions(groupPatterns, storeFactory);
+ processRepartitions(groupPatterns, storeFactory.name());
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new
ArrayList<>();
@@ -102,7 +102,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
(KStreamAggProcessorSupplier<K, ?, K, ?>) new
KStreamWindowAggregate<K, K, VOut, W>(
windows,
- storeFactory.name(),
+ storeFactory,
EmitStrategy.onWindowUpdate(),
initializer,
kGroupedStream.getValue());
@@ -132,7 +132,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final String queryableName,
final SessionWindows sessionWindows,
final Merger<? super K, VOut> sessionMerger) {
- processRepartitions(groupPatterns, storeFactory);
+ processRepartitions(groupPatterns, storeFactory.name());
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new
ArrayList<>();
boolean stateCreated = false;
@@ -141,7 +141,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
(KStreamAggProcessorSupplier<K, ?, K, ?>) new
KStreamSessionWindowAggregate<K, K, VOut>(
sessionWindows,
- storeFactory.name(),
+ storeFactory,
EmitStrategy.onWindowUpdate(),
initializer,
kGroupedStream.getValue(),
@@ -171,7 +171,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Serde<VOut> valueSerde,
final String queryableName,
final SlidingWindows slidingWindows) {
- processRepartitions(groupPatterns, storeFactory);
+ processRepartitions(groupPatterns, storeFactory.name());
final Collection<KStreamAggProcessorSupplier> parentProcessors = new
ArrayList<>();
final Collection<GraphNode> processors = new ArrayList<>();
boolean stateCreated = false;
@@ -180,7 +180,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
(KStreamAggProcessorSupplier<K, ?, K, ?>) new
KStreamSlidingWindowAggregate<K, K, VOut>(
slidingWindows,
- storeFactory.name(),
+ storeFactory,
// TODO: We do not have other emit policies for co-group
yet
EmitStrategy.onWindowUpdate(),
initializer,
@@ -202,7 +202,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
}
private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>,
Aggregator<? super K, ? super Object, VOut>> groupPatterns,
- final StoreFactory storeFactory) {
+ final String storeName) {
for (final KGroupedStreamImpl<K, ?> repartitionReqs :
groupPatterns.keySet()) {
if (repartitionReqs.repartitionRequired) {
@@ -210,7 +210,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final OptimizableRepartitionNodeBuilder<K, ?>
repartitionNodeBuilder = optimizableRepartitionNodeBuilder();
final String repartitionNamePrefix =
repartitionReqs.userProvidedRepartitionTopicName != null ?
- repartitionReqs.userProvidedRepartitionTopicName :
storeFactory.name();
+ repartitionReqs.userProvidedRepartitionTopicName :
storeName;
createRepartitionSource(repartitionNamePrefix,
repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 2ed1854d9cc..256153708a0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -98,7 +98,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V>
implements KGroupedS
final String name = new
NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
return doAggregate(
- new KStreamReduce<>(materializedInternal.storeName(), reducer),
+ new KStreamReduce<>(materializedInternal, reducer),
name,
materializedInternal
);
@@ -130,7 +130,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V>
implements KGroupedS
final String name = new
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
return doAggregate(
- new KStreamAggregate<>(materializedInternal.storeName(),
initializer, aggregator),
+ new KStreamAggregate<>(materializedInternal, initializer,
aggregator),
name,
materializedInternal
);
@@ -184,7 +184,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V>
implements KGroupedS
final String name = new
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
return doAggregate(
- new KStreamAggregate<>(materializedInternal.storeName(),
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+ new KStreamAggregate<>(materializedInternal,
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
name,
materializedInternal);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index d03cb65c021..e500582244b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -91,7 +91,7 @@ public class KGroupedTableImpl<K, V> extends
AbstractStream<K, V> implements KGr
final StatefulProcessorNode statefulProcessorNode = new
StatefulProcessorNode<>(
funcName,
new ProcessorParameters<>(aggregateSupplier, funcName),
- new KeyValueStoreMaterializer<>(materialized)
+ new String[]{materialized.storeName()}
);
statefulProcessorNode.setOutputVersioned(materialized.storeSupplier()
instanceof VersionedBytesStoreSupplier);
@@ -148,7 +148,7 @@ public class KGroupedTableImpl<K, V> extends
AbstractStream<K, V> implements KGr
materializedInternal.withValueSerde(valueSerde);
}
final ProcessorSupplier<K, Change<V>, K, Change<V>> aggregateSupplier
= new KTableReduce<>(
- materializedInternal.storeName(),
+ materializedInternal,
adder,
subtractor);
return doAggregate(aggregateSupplier, new NamedInternal(named),
REDUCE_NAME, materializedInternal);
@@ -179,7 +179,7 @@ public class KGroupedTableImpl<K, V> extends
AbstractStream<K, V> implements KGr
}
final ProcessorSupplier<K, Change<V>, K, Change<Long>>
aggregateSupplier = new KTableAggregate<>(
- materializedInternal.storeName(),
+ materializedInternal,
countInitializer,
countAdder,
countSubtractor);
@@ -224,7 +224,7 @@ public class KGroupedTableImpl<K, V> extends
AbstractStream<K, V> implements KGr
materializedInternal.withKeySerde(keySerde);
}
final ProcessorSupplier<K, Change<V>, K, Change<VAgg>>
aggregateSupplier = new KTableAggregate<>(
- materializedInternal.storeName(),
+ materializedInternal,
initializer,
adder,
subtractor);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 3e906813d84..abe5fd2b566 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
@@ -24,13 +25,20 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.Set;
+
import static
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
import static
org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
@@ -41,19 +49,35 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements
KStreamAggProcessorSupp
private static final Logger LOG =
LoggerFactory.getLogger(KStreamAggregate.class);
private final String storeName;
+ private final StoreFactory storeFactory;
private final Initializer<VAgg> initializer;
private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
private boolean sendOldValues = false;
- KStreamAggregate(final String storeName,
+ KStreamAggregate(final MaterializedInternal<KIn, VAgg,
KeyValueStore<Bytes, byte[]>> materialized,
+ final Initializer<VAgg> initializer,
+ final Aggregator<? super KIn, ? super VIn, VAgg>
aggregator) {
+ this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
+ this.storeName = materialized.storeName();
+ this.initializer = initializer;
+ this.aggregator = aggregator;
+ }
+
+ KStreamAggregate(final StoreFactory storeFactory,
final Initializer<VAgg> initializer,
final Aggregator<? super KIn, ? super VIn, VAgg>
aggregator) {
- this.storeName = storeName;
+ this.storeFactory = storeFactory;
+ this.storeName = storeFactory.name();
this.initializer = initializer;
this.aggregator = aggregator;
}
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ return Collections.singleton(new
FactoryWrappingStoreBuilder<>(storeFactory));
+ }
+
@Override
public Processor<KIn, VIn, KIn, Change<VAgg>> get() {
return new KStreamAggregateProcessor();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 15528f5d150..f337cd9ae44 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -17,19 +17,27 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.Set;
+
import static
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
import static
org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
@@ -40,15 +48,23 @@ public class KStreamReduce<K, V> implements
KStreamAggProcessorSupplier<K, V, K,
private static final Logger LOG =
LoggerFactory.getLogger(KStreamReduce.class);
private final String storeName;
+ private final StoreFactory storeFactory;
private final Reducer<V> reducer;
private boolean sendOldValues = false;
- KStreamReduce(final String storeName, final Reducer<V> reducer) {
- this.storeName = storeName;
+ KStreamReduce(final MaterializedInternal<K, V, KeyValueStore<Bytes,
byte[]>> materialized, final Reducer<V> reducer) {
+ this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
+ this.storeName = materialized.storeName();
this.reducer = reducer;
}
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ return Collections.singleton(new
FactoryWrappingStoreBuilder<>(storeFactory));
+ }
+
+
@Override
public Processor<K, V, K, Change<V>> get() {
return new KStreamReduceProcessor();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 8f2c53c8a9a..4a8040a8d37 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -33,16 +33,21 @@ import
org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Set;
import static
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION;
import static
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor;
@@ -54,6 +59,7 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg>
implements KStreamAgg
private static final Logger LOG =
LoggerFactory.getLogger(KStreamSessionWindowAggregate.class);
private final String storeName;
+ private final StoreFactory storeFactory;
private final SessionWindows windows;
private final Initializer<VAgg> initializer;
private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
@@ -63,19 +69,25 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg>
implements KStreamAgg
private boolean sendOldValues = false;
public KStreamSessionWindowAggregate(final SessionWindows windows,
- final String storeName,
+ final StoreFactory storeFactory,
final EmitStrategy emitStrategy,
final Initializer<VAgg> initializer,
final Aggregator<? super KIn, ? super
VIn, VAgg> aggregator,
final Merger<? super KIn, VAgg>
sessionMerger) {
this.windows = windows;
- this.storeName = storeName;
+ this.storeName = storeFactory.name();
+ this.storeFactory = storeFactory;
this.emitStrategy = emitStrategy;
this.initializer = initializer;
this.aggregator = aggregator;
this.sessionMerger = sessionMerger;
}
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ return Collections.singleton(new
FactoryWrappingStoreBuilder<>(storeFactory));
+ }
+
@Override
public Processor<KIn, VIn, Windowed<KIn>, Change<VAgg>> get() {
return new KStreamSessionWindowAggregateProcessor();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
index 4a288bb0e83..894657da48c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -28,7 +28,10 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -36,6 +39,7 @@ import org.apache.kafka.streams.state.WindowStoreIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -46,6 +50,7 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg>
implements KStreamAgg
private static final Logger log =
LoggerFactory.getLogger(KStreamSlidingWindowAggregate.class);
private final String storeName;
+ private final StoreFactory storeFactory;
private final SlidingWindows windows;
private final Initializer<VAgg> initializer;
private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
@@ -54,17 +59,23 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg>
implements KStreamAgg
private boolean sendOldValues = false;
public KStreamSlidingWindowAggregate(final SlidingWindows windows,
- final String storeName,
+ final StoreFactory storeFactory,
final EmitStrategy emitStrategy,
final Initializer<VAgg> initializer,
final Aggregator<? super KIn, ? super
VIn, VAgg> aggregator) {
this.windows = windows;
- this.storeName = storeName;
+ this.storeName = storeFactory.name();
+ this.storeFactory = storeFactory;
this.initializer = initializer;
this.aggregator = aggregator;
this.emitStrategy = emitStrategy;
}
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ return Collections.singleton(new
FactoryWrappingStoreBuilder<>(storeFactory));
+ }
+
@Override
public Processor<KIn, VIn, Windowed<KIn>, Change<VAgg>> get() {
return new KStreamSlidingWindowAggregateProcessor(storeName,
emitStrategy, sendOldValues);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 340ce82d856..2e6147627e9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -29,13 +29,18 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.Map;
+import java.util.Set;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@@ -44,6 +49,7 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W extends
Window> implements
private static final Logger log =
LoggerFactory.getLogger(KStreamWindowAggregate.class);
private final String storeName;
+ private final StoreFactory storeFactory;
private final Windows<W> windows;
private final Initializer<VAgg> initializer;
private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
@@ -52,12 +58,13 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W
extends Window> implements
private boolean sendOldValues = false;
public KStreamWindowAggregate(final Windows<W> windows,
- final String storeName,
+ final StoreFactory storeFactory,
final EmitStrategy emitStrategy,
final Initializer<VAgg> initializer,
final Aggregator<? super KIn, ? super VIn,
VAgg> aggregator) {
this.windows = windows;
- this.storeName = storeName;
+ this.storeName = storeFactory.name();
+ this.storeFactory = storeFactory;
this.emitStrategy = emitStrategy;
this.initializer = initializer;
this.aggregator = aggregator;
@@ -70,6 +77,11 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W
extends Window> implements
}
}
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ return Collections.singleton(new
FactoryWrappingStoreBuilder<>(storeFactory));
+ }
+
@Override
public Processor<KIn, VIn, Windowed<KIn>, Change<VAgg>> get() {
return new KStreamWindowAggregateProcessor(storeName, emitStrategy,
sendOldValues);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index f71143ff209..cecb8048634 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -16,15 +16,23 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
+import java.util.Collections;
+import java.util.Set;
+
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
import static
org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
import static
org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST;
@@ -33,17 +41,19 @@ public class KTableAggregate<KIn, VIn, VAgg> implements
KTableProcessorSupplier<KIn, VIn, KIn, VAgg> {
private final String storeName;
+ private final StoreFactory storeFactory;
private final Initializer<VAgg> initializer;
private final Aggregator<? super KIn, ? super VIn, VAgg> add;
private final Aggregator<? super KIn, ? super VIn, VAgg> remove;
private boolean sendOldValues = false;
- KTableAggregate(final String storeName,
+ KTableAggregate(final MaterializedInternal<KIn, VAgg, KeyValueStore<Bytes,
byte[]>> materialized,
final Initializer<VAgg> initializer,
final Aggregator<? super KIn, ? super VIn, VAgg> add,
final Aggregator<? super KIn, ? super VIn, VAgg> remove) {
- this.storeName = storeName;
+ this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
+ this.storeName = materialized.storeName();
this.initializer = initializer;
this.add = add;
this.remove = remove;
@@ -56,6 +66,11 @@ public class KTableAggregate<KIn, VIn, VAgg> implements
return true;
}
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ return Collections.singleton(new
FactoryWrappingStoreBuilder<>(storeFactory));
+ }
+
@Override
public Processor<KIn, Change<VIn>, KIn, Change<VAgg>> get() {
return new KTableAggregateProcessor();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index c577d30d984..d0b35098abe 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -16,14 +16,22 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
+import java.util.Collections;
+import java.util.Set;
+
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
import static
org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
import static
org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST;
@@ -31,13 +39,17 @@ import static
org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_
public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, K, V>
{
private final String storeName;
+ private final StoreFactory storeFactory;
private final Reducer<V> addReducer;
private final Reducer<V> removeReducer;
private boolean sendOldValues = false;
- KTableReduce(final String storeName, final Reducer<V> addReducer, final
Reducer<V> removeReducer) {
- this.storeName = storeName;
+ KTableReduce(final MaterializedInternal<K, V, KeyValueStore<Bytes,
byte[]>> materialized,
+ final Reducer<V> addReducer,
+ final Reducer<V> removeReducer) {
+ this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
+ this.storeName = materialized.storeName();
this.addReducer = addReducer;
this.removeReducer = removeReducer;
}
@@ -49,6 +61,11 @@ public class KTableReduce<K, V> implements
KTableProcessorSupplier<K, V, K, V> {
return true;
}
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ return Collections.singleton(new
FactoryWrappingStoreBuilder<>(storeFactory));
+ }
+
@Override
public Processor<K, Change<V>, K, Change<V>> get() {
return new KTableReduceProcessor();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index d8f3770b79a..989984d42f8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.SessionStore;
import java.util.Objects;
@@ -108,12 +109,14 @@ public class SessionWindowedKStreamImpl<K, V> extends
AbstractStream<K, V> imple
}
final String aggregateName = new
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+ final StoreFactory storeFactory = new
SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy);
+
return aggregateBuilder.build(
new NamedInternal(aggregateName),
- new SessionStoreMaterializer<>(materializedInternal, windows,
emitStrategy),
+ storeFactory,
new KStreamSessionWindowAggregate<>(
windows,
- materializedInternal.storeName(),
+ storeFactory,
emitStrategy,
aggregateBuilder.countInitializer,
aggregateBuilder.countAggregator,
@@ -158,12 +161,14 @@ public class SessionWindowedKStreamImpl<K, V> extends
AbstractStream<K, V> imple
}
final String reduceName = new
NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
+ final StoreFactory storeFactory = new
SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy);
+
return aggregateBuilder.build(
new NamedInternal(reduceName),
- new SessionStoreMaterializer<>(materializedInternal, windows,
emitStrategy),
+ storeFactory,
new KStreamSessionWindowAggregate<>(
windows,
- materializedInternal.storeName(),
+ storeFactory,
emitStrategy,
aggregateBuilder.reduceInitializer,
reduceAggregator,
@@ -216,13 +221,14 @@ public class SessionWindowedKStreamImpl<K, V> extends
AbstractStream<K, V> imple
}
final String aggregateName = new
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+ final StoreFactory storeFactory = new
SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy);
return aggregateBuilder.build(
new NamedInternal(aggregateName),
- new SessionStoreMaterializer<>(materializedInternal, windows,
emitStrategy),
+ storeFactory,
new KStreamSessionWindowAggregate<>(
windows,
- materializedInternal.storeName(),
+ storeFactory,
emitStrategy,
initializer,
aggregator,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
index 3cb7b3f29bd..16b2d0185ae 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.WindowStore;
import java.util.Objects;
@@ -90,11 +91,12 @@ public class SlidingWindowedKStreamImpl<K, V> extends
AbstractStream<K, V> imple
}
final String aggregateName = new
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+ final StoreFactory storeFactory = new
SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
return aggregateBuilder.build(
new NamedInternal(aggregateName),
- new SlidingWindowStoreMaterializer<>(materializedInternal,
windows, emitStrategy),
- new KStreamSlidingWindowAggregate<>(windows,
materializedInternal.storeName(), emitStrategy,
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+ storeFactory,
+ new KStreamSlidingWindowAggregate<>(windows, storeFactory,
emitStrategy, aggregateBuilder.countInitializer,
aggregateBuilder.countAggregator),
materializedInternal.queryableStoreName(),
materializedInternal.keySerde() != null ? new
FullTimeWindowedSerde<>(materializedInternal.keySerde(),
windows.timeDifferenceMs()) : null,
materializedInternal.valueSerde(),
@@ -135,11 +137,12 @@ public class SlidingWindowedKStreamImpl<K, V> extends
AbstractStream<K, V> imple
materializedInternal.withKeySerde(keySerde);
}
final String aggregateName = new
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+ final StoreFactory storeFactory = new
SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
return aggregateBuilder.build(
new NamedInternal(aggregateName),
- new SlidingWindowStoreMaterializer<>(materializedInternal,
windows, emitStrategy),
- new KStreamSlidingWindowAggregate<>(windows,
materializedInternal.storeName(), emitStrategy, initializer, aggregator),
+ storeFactory,
+ new KStreamSlidingWindowAggregate<>(windows, storeFactory,
emitStrategy, initializer, aggregator),
materializedInternal.queryableStoreName(),
materializedInternal.keySerde() != null ? new
FullTimeWindowedSerde<>(materializedInternal.keySerde(),
windows.timeDifferenceMs()) : null,
materializedInternal.valueSerde(),
@@ -181,11 +184,12 @@ public class SlidingWindowedKStreamImpl<K, V> extends
AbstractStream<K, V> imple
}
final String reduceName = new
NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
+ final StoreFactory storeFactory = new
SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
return aggregateBuilder.build(
new NamedInternal(reduceName),
- new SlidingWindowStoreMaterializer<>(materializedInternal,
windows, emitStrategy),
- new KStreamSlidingWindowAggregate<>(windows,
materializedInternal.storeName(), emitStrategy,
aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
+ storeFactory,
+ new KStreamSlidingWindowAggregate<>(windows, storeFactory,
emitStrategy, aggregateBuilder.reduceInitializer,
aggregatorForReducer(reducer)),
materializedInternal.queryableStoreName(),
materializedInternal.keySerde() != null ? new
FullTimeWindowedSerde<>(materializedInternal.keySerde(),
windows.timeDifferenceMs()) : null,
materializedInternal.valueSerde(),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index b615e20714b..80a671abdc5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.WindowStore;
import java.util.Objects;
@@ -102,13 +103,14 @@ public class TimeWindowedKStreamImpl<K, V, W extends
Window> extends AbstractStr
}
final String aggregateName = new
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+ final StoreFactory storeFactory = new
WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
return aggregateBuilder.build(
new NamedInternal(aggregateName),
- new WindowStoreMaterializer<>(materializedInternal, windows,
emitStrategy),
+ storeFactory,
new KStreamWindowAggregate<>(
windows,
- materializedInternal.storeName(),
+ storeFactory,
emitStrategy,
aggregateBuilder.countInitializer,
aggregateBuilder.countAggregator),
@@ -154,13 +156,14 @@ public class TimeWindowedKStreamImpl<K, V, W extends
Window> extends AbstractStr
}
final String aggregateName = new
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
+ final StoreFactory storeFactory = new
WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
return aggregateBuilder.build(
new NamedInternal(aggregateName),
- new WindowStoreMaterializer<>(materializedInternal, windows,
emitStrategy),
+ storeFactory,
new KStreamWindowAggregate<>(
windows,
- materializedInternal.storeName(),
+ storeFactory,
emitStrategy,
initializer,
aggregator),
@@ -205,13 +208,14 @@ public class TimeWindowedKStreamImpl<K, V, W extends
Window> extends AbstractStr
}
final String reduceName = new
NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
+ final StoreFactory storeFactory = new
WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy);
return aggregateBuilder.build(
new NamedInternal(reduceName),
- new WindowStoreMaterializer<>(materializedInternal, windows,
emitStrategy),
+ storeFactory,
new KStreamWindowAggregate<>(
windows,
- materializedInternal.storeName(),
+ storeFactory,
emitStrategy,
aggregateBuilder.reduceInitializer,
aggregatorForReducer(reducer)),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 5210dd0b3c6..36559b18e29 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
@@ -35,7 +36,9 @@ import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
@@ -47,6 +50,7 @@ import
org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
@@ -56,6 +60,7 @@ import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper;
+import
org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper.WrapperRecorder;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockPredicate;
@@ -76,7 +81,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -88,6 +92,8 @@ import static java.util.Arrays.asList;
import static
org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1;
+import static org.apache.kafka.streams.state.Stores.inMemoryKeyValueStore;
+import static
org.apache.kafka.streams.state.Stores.timestampedKeyValueStoreBuilder;
import static
org.apache.kafka.streams.utils.TestUtils.PROCESSOR_WRAPPER_COUNTER_CONFIG;
import static org.apache.kafka.streams.utils.TestUtils.dummyStreamsConfigMap;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -124,7 +130,7 @@ public class StreamsBuilderTest {
final StreamsBuilder builder = new StreamsBuilder();
builder.addGlobalStore(
Stores.keyValueStoreBuilder(
- Stores.inMemoryKeyValueStore("store"),
+ inMemoryKeyValueStore("store"),
Serdes.String(),
Serdes.String()
),
@@ -1384,7 +1390,7 @@ public class StreamsBuilderTest {
@Test
public void shouldUseSpecifiedNameForGlobalStoreProcessor() {
builder.addGlobalStore(Stores.keyValueStoreBuilder(
- Stores.inMemoryKeyValueStore("store"),
+ inMemoryKeyValueStore("store"),
Serdes.String(),
Serdes.String()
),
@@ -1401,7 +1407,7 @@ public class StreamsBuilderTest {
@Test
public void shouldUseDefaultNameForGlobalStoreProcessor() {
builder.addGlobalStore(Stores.keyValueStoreBuilder(
- Stores.inMemoryKeyValueStore("store"),
+ inMemoryKeyValueStore("store"),
Serdes.String(),
Serdes.String()
),
@@ -1420,8 +1426,8 @@ public class StreamsBuilderTest {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
- final Set<String> wrappedProcessors = Collections.synchronizedSet(new
HashSet<>());
- props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors);
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
@@ -1430,35 +1436,247 @@ public class StreamsBuilderTest {
// call to fail
final Random random = new Random();
- builder.stream("input")
- .process((ProcessorSupplier<Object, Object, Object, Object>) () ->
record -> System.out.println("Processing: " + random.nextInt()),
Named.as("processor1"))
- .processValues(() -> record -> System.out.println("Processing: " +
random.nextInt()), Named.as("processor2"))
- .to("output");
+ final StoreBuilder<?> store =
timestampedKeyValueStoreBuilder(inMemoryKeyValueStore("store"),
Serdes.String(), Serdes.String());
+ builder.stream("input", Consumed.as("source"))
+ .process(
+ new ProcessorSupplier<>() {
+ @Override
+ public Processor<Object, Object, Object, Object> get() {
+ return record -> System.out.println("Processing: " +
random.nextInt());
+ }
+
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ return Collections.singleton(store);
+ }
+ },
+ Named.as("stateful-process-1"))
+ .process(
+ new ProcessorSupplier<>() {
+ @Override
+ public Processor<Object, Object, Object, Object> get() {
+ return record -> System.out.println("Processing: " +
random.nextInt());
+ }
+
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ return Collections.singleton(store);
+ }
+ },
+ Named.as("stateful-process-2"))
+ .processValues(
+ () -> record -> System.out.println("Processing values: " +
random.nextInt()),
+ Named.as("stateless-processValues"))
+ .to("output", Produced.as("sink"));
builder.build();
- assertThat(wrappedProcessors.size(), CoreMatchers.is(2));
- assertThat(wrappedProcessors,
Matchers.containsInAnyOrder("processor1", "processor2"));
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(3));
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
+ "stateful-process-1", "stateful-process-2",
"stateless-processValues"));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
}
@Test
- public void shouldWrapProcessorsForAggregationOperators() {
+ public void shouldWrapProcessorsForStreamReduce() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
- final Set<String> wrappedProcessors = Collections.synchronizedSet(new
HashSet<>());
- props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors);
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
- builder.stream("input")
+ builder.stream("input", Consumed.as("source"))
+ .groupBy(KeyValue::new, Grouped.as("groupBy")) // wrapped 1 & 2
(implicit selectKey & repartition)
+ .reduce((l, r) -> l, Named.as("reduce"), Materialized.as("store"))
// wrapped 3
+ .toStream(Named.as("toStream"))// wrapped 4
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
+ "groupBy", "groupBy-repartition-filter", "reduce", "toStream"));
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(4));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
+ }
+
+ @Test
+ public void shouldWrapProcessorsForStreamAggregate() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ builder.stream("input", Consumed.as("source"))
+ .groupByKey()
+ .count(Named.as("count")) // wrapped 1
+ .toStream(Named.as("toStream"))// wrapped 2
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder("count", "toStream"));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
+ }
+
+ @Test
+ public void shouldWrapProcessorsForTimeWindowStreamAggregate() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ builder.stream("input", Consumed.as("source"))
+ .groupByKey()
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(1)))
+ .count(Named.as("count")) // wrapped 1
+ .toStream(Named.as("toStream"))// wrapped 2
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder("count", "toStream"));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
+ }
+
+ @Test
+ public void shouldWrapProcessorsForSlidingWindowStreamAggregate() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ builder.stream("input", Consumed.as("source"))
+ .groupByKey()
+
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofDays(1)))
+ .count(Named.as("count")) // wrapped 1
+ .toStream(Named.as("toStream"))// wrapped 2
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder("count", "toStream"));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
+ }
+
+ @Test
+ public void shouldWrapProcessorsForSessionWindowStreamAggregate() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ builder.stream("input", Consumed.as("source"))
.groupByKey()
+
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofDays(1)))
.count(Named.as("count")) // wrapped 1
.toStream(Named.as("toStream"))// wrapped 2
- .to("output");
+ .to("output", Produced.as("sink"));
builder.build();
- assertThat(wrappedProcessors.size(), CoreMatchers.is(2));
- assertThat(wrappedProcessors, Matchers.containsInAnyOrder("count",
"toStream"));
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder("count", "toStream"));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
+ }
+
+ @Test
+ public void shouldWrapProcessorsForCoGroupedStreamAggregate() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ final KStream<String, String> stream1 = builder.stream("one",
Consumed.as("source-1"));
+ final KStream<String, String> stream2 = builder.stream("two",
Consumed.as("source-2"));
+
+ final KGroupedStream<String, String> grouped1 =
stream1.groupByKey(Grouped.as("groupByKey-1"));
+ final KGroupedStream<String, String> grouped2 =
stream2.groupByKey(Grouped.as("groupByKey-2"));
+
+ grouped1
+ .cogroup((k, v, a) -> a + v) // wrapped 1
+ .cogroup(grouped2, (k, v, a) -> a + v) // wrapped 2
+ .aggregate(() -> "", Named.as("aggregate"),
Materialized.as("store")) // wrapped 3, store 1
+ .toStream(Named.as("toStream"))// wrapped 4
+ .to("output", Produced.as("sink"));
+
+ final var top = builder.build();
+ System.out.println(top.describe());
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
+ "aggregate-cogroup-agg-0", "aggregate-cogroup-agg-1",
"aggregate-cogroup-merge", "toStream"
+ ));
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(4));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
+ }
+
+ @Test
+ public void shouldWrapProcessorsForTableAggregate() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ builder.table("input", Consumed.as("source-table")) // wrapped 1,
store 1
+ .groupBy(KeyValue::new, Grouped.as("groupBy")) // wrapped 2
(implicit selectKey)
+ .count(Named.as("count")) // wrapped 3, store 2
+ .toStream(Named.as("toStream"))// wrapped 4
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
+ "source-table", "groupBy", "count", "toStream"
+ ));
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(4));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
+ }
+
+ @Test
+ public void shouldWrapProcessorsForTableReduce() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ builder.table("input", Consumed.as("source-table")) // wrapped 1,
store 1
+ .groupBy(KeyValue::new, Grouped.as("groupBy")) // wrapped 2
(implicit selectKey)
+ .reduce((l, r) -> "", (l, r) -> "", Named.as("reduce"),
Materialized.as("store")) // wrapped 3, store 2
+ .toStream(Named.as("toStream"))// wrapped 4
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
+ "source-table", "groupBy", "reduce", "toStream"
+ ));
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(4));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
}
@Test
@@ -1466,49 +1684,76 @@ public class StreamsBuilderTest {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
- final Set<String> wrappedProcessors = Collections.synchronizedSet(new
HashSet<>());
- props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors);
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
- builder.stream("input")
- .filter((k, v) -> true, Named.as("filter")) // wrapped 1
+ builder.stream("input", Consumed.as("source"))
+ .filter((k, v) -> true, Named.as("filter-stream")) // wrapped 1
.map(KeyValue::new, Named.as("map")) // wrapped 2
.selectKey((k, v) -> k, Named.as("selectKey")) // wrapped 3
.peek((k, v) -> { }, Named.as("peek")) // wrapped 4
.flatMapValues(e -> new ArrayList<>(), Named.as("flatMap")) //
wrapped 5
- .toTable(Named.as("toTable")) // wrapped 6 (note named as
toTable-repartition-filter)
+ .toTable(Named.as("toTable")) // should be wrapped when we do
StreamToTableNode
+ .filter((k, v) -> true, Named.as("filter-table")) // should be
wrapped once we do TableProcessorNode
.toStream(Named.as("toStream")) // wrapped 7
- .to("output");
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(7));
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
+ "filter-stream", "map", "selectKey", "peek", "flatMap",
+ "toTable-repartition-filter", "toStream"
+ ));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(0));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(0));
+ }
+
+ @Test
+ public void shouldWrapProcessorsForUnmaterializedSourceTable() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ builder.table("input", Consumed.as("source")) // wrapped 1
+ .toStream(Named.as("toStream")) // wrapped 2
+ .to("output", Produced.as("sink"));
builder.build();
- assertThat(wrappedProcessors.size(), CoreMatchers.is(7));
- assertThat(wrappedProcessors, Matchers.containsInAnyOrder(
- "filter", "map", "selectKey", "peek", "flatMap",
"toTable-repartition-filter",
- "toStream"
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
+ "source", "toStream"
));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(0));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(0));
}
@Test
- public void shouldWrapProcessorsForTableSource() {
+ public void shouldWrapProcessorsForMaterializedSourceTable() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
- final Set<String> wrappedProcessors = Collections.synchronizedSet(new
HashSet<>());
- props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors);
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
- builder.table("input") // wrapped 1 (named KTABLE_SOURCE-0000000002)
- .toStream(Named.as("toStream")) // wrapped 2
- .to("output");
+ builder.table("input", Consumed.as("source"),
Materialized.as("store")) // wrapped 1
+ .toStream(Named.as("toStream")) // wrapped 2
+ .to("output", Produced.as("sink"));
builder.build();
- assertThat(wrappedProcessors.size(), CoreMatchers.is(2));
- assertThat(wrappedProcessors, Matchers.containsInAnyOrder(
- "KTABLE-SOURCE-0000000002",
- "toStream"
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
+ "source", "toStream"
));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 833eada8d5e..f43747d3cf2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -46,6 +46,7 @@ import
org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper;
+import
org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper.WrapperRecorder;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockProcessorSupplier;
@@ -2427,8 +2428,8 @@ public class TopologyTest {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
- final Set<String> wrappedProcessors = Collections.synchronizedSet(new
HashSet<>());
- props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, wrappedProcessors);
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final Topology topology = new Topology(new TopologyConfig(new
StreamsConfig(props)));
@@ -2453,8 +2454,8 @@ public class TopologyTest {
() -> (Processor<Object, Object, Object, Object>) record ->
System.out.println("Processing: " + random.nextInt()),
"p2"
);
- assertThat(wrappedProcessors.size(), is(3));
- assertThat(wrappedProcessors, Matchers.containsInAnyOrder("p1", "p2",
"p3"));
+ assertThat(counter.numWrappedProcessors(), is(3));
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder("p1", "p2", "p3"));
}
@SuppressWarnings("deprecation")
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index adf7b32c708..f9503fd1564 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -64,6 +64,7 @@ import java.util.stream.Collectors;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.streams.utils.TestUtils.mockStoreFactory;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
@@ -126,7 +127,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
sessionAggregator = new KStreamSessionWindowAggregate<>(
SessionWindows.ofInactivityGapWithNoGrace(ofMillis(GAP_MS)),
- STORE_NAME,
+ mockStoreFactory(STORE_NAME),
emitStrategy,
initializer,
aggregator,
@@ -484,7 +485,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
setup(inputType, false);
final Processor<String, String, Windowed<String>, Change<Long>>
processor = new KStreamSessionWindowAggregate<>(
SessionWindows.ofInactivityGapAndGrace(ofMillis(10L),
ofMillis(0L)),
- STORE_NAME,
+ mockStoreFactory(STORE_NAME),
EmitStrategy.onWindowUpdate(),
initializer,
aggregator,
@@ -551,7 +552,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
setup(inputType, false);
final Processor<String, String, Windowed<String>, Change<Long>>
processor = new KStreamSessionWindowAggregate<>(
SessionWindows.ofInactivityGapAndGrace(ofMillis(10L),
ofMillis(1L)),
- STORE_NAME,
+ mockStoreFactory(STORE_NAME),
EmitStrategy.onWindowUpdate(),
initializer,
aggregator,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 14b77a7cc30..c4fdf3ad86a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -46,6 +46,7 @@ import
org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForwa
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
@@ -76,6 +77,7 @@ import static java.time.Duration.ofMillis;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.streams.utils.TestUtils.mockStoreFactory;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.hasItems;
@@ -90,6 +92,7 @@ public class KStreamWindowAggregateTest {
private static final String WINDOW_STORE_NAME = "dummy-store-name";
private final Properties props =
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
private final String threadId = Thread.currentThread().getName();
+ private final StoreFactory storeFactory =
mockStoreFactory(WINDOW_STORE_NAME);
public StrategyType type;
@@ -646,7 +649,7 @@ public class KStreamWindowAggregateTest {
final MockInternalNewProcessorContext<Windowed<String>,
Change<String>> context = makeContext(stateDir, windowSize);
final KStreamWindowAggregate<String, String, String, TimeWindow>
processorSupplier = new KStreamWindowAggregate<>(
windows,
- WINDOW_STORE_NAME,
+ storeFactory,
emitStrategy,
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER
@@ -736,7 +739,7 @@ public class KStreamWindowAggregateTest {
final MockInternalNewProcessorContext<Windowed<String>,
Change<String>> context = makeContext(stateDir, windowSize);
final KStreamWindowAggregate<String, String, String, TimeWindow>
processorSupplier = new KStreamWindowAggregate<>(
windows,
- WINDOW_STORE_NAME,
+ storeFactory,
emitStrategy,
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER
@@ -805,7 +808,7 @@ public class KStreamWindowAggregateTest {
final MockInternalNewProcessorContext<Windowed<String>,
Change<String>> context = makeContext(stateDir, windowSize);
final KStreamWindowAggregate<String, String, String, TimeWindow>
processorSupplier = new KStreamWindowAggregate<>(
windows,
- WINDOW_STORE_NAME,
+ storeFactory,
emitStrategy,
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER
@@ -906,7 +909,7 @@ public class KStreamWindowAggregateTest {
final MockInternalNewProcessorContext<Windowed<String>,
Change<String>> context = makeContext(stateDir, windowSize);
final KStreamWindowAggregate<String, String, String, TimeWindow>
processorSupplier = new KStreamWindowAggregate<>(
windows,
- WINDOW_STORE_NAME,
+ storeFactory,
emitStrategy,
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER
@@ -982,7 +985,7 @@ public class KStreamWindowAggregateTest {
final IllegalArgumentException e = assertThrows(
IllegalArgumentException.class, () -> new
KStreamWindowAggregate<>(
UnlimitedWindows.of(),
- WINDOW_STORE_NAME,
+ storeFactory,
emitStrategy,
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER)
@@ -992,7 +995,7 @@ public class KStreamWindowAggregateTest {
} else {
new KStreamWindowAggregate<>(
UnlimitedWindows.of(),
- WINDOW_STORE_NAME,
+ storeFactory,
emitStrategy,
MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
index de437ed66a5..5f1c8489dab 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
@@ -42,7 +43,7 @@ public class KTableReduceTest {
final Processor<String, Change<Set<String>>, String,
Change<Set<String>>> reduceProcessor =
new KTableReduce<String, Set<String>>(
- "myStore",
+ new MaterializedInternal<>(Materialized.as("myStore")),
this::unionNotNullArgs,
this::differenceNotNullArgs
).get();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index 1541dce30ba..8552790692c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -30,6 +30,7 @@ import
org.apache.kafka.streams.processor.internals.StoreFactory;
import org.junit.jupiter.api.Test;
import static java.time.Duration.ofMillis;
+import static org.apache.kafka.streams.utils.TestUtils.mockStoreFactory;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
@@ -82,7 +83,7 @@ public class GraphGraceSearchUtilTest {
new ProcessorParameters<>(
new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
windows,
- "asdf",
+ mockStoreFactory("asdf"),
EmitStrategy.onWindowUpdate(),
null,
null
@@ -105,7 +106,7 @@ public class GraphGraceSearchUtilTest {
new ProcessorParameters<>(
new KStreamSessionWindowAggregate<String, Long, Integer>(
windows,
- "asdf",
+ mockStoreFactory("asdf"),
EmitStrategy.onWindowUpdate(),
null,
null,
@@ -126,7 +127,7 @@ public class GraphGraceSearchUtilTest {
final StatefulProcessorNode<String, Long> graceGrandparent = new
StatefulProcessorNode<>(
"asdf",
new ProcessorParameters<>(new
KStreamSessionWindowAggregate<String, Long, Integer>(
- windows, "asdf", EmitStrategy.onWindowUpdate(), null, null,
null
+ windows, mockStoreFactory("asdf"),
EmitStrategy.onWindowUpdate(), null, null, null
), "asdf"),
(StoreFactory) null
);
@@ -161,7 +162,7 @@ public class GraphGraceSearchUtilTest {
new ProcessorParameters<>(
new KStreamSessionWindowAggregate<String, Long, Integer>(
windows,
- "asdf",
+ mockStoreFactory("asdf"),
EmitStrategy.onWindowUpdate(),
null,
null,
@@ -189,7 +190,7 @@ public class GraphGraceSearchUtilTest {
new ProcessorParameters<>(
new KStreamSessionWindowAggregate<String, Long, Integer>(
SessionWindows.ofInactivityGapAndGrace(ofMillis(10L),
ofMillis(1234L)),
- "asdf",
+ mockStoreFactory("asdf"),
EmitStrategy.onWindowUpdate(),
null,
null,
@@ -205,7 +206,7 @@ public class GraphGraceSearchUtilTest {
new ProcessorParameters<>(
new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(4321L)),
- "asdf",
+ mockStoreFactory("asdf"),
EmitStrategy.onWindowUpdate(),
null,
null
diff --git
a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
index 24ac2f2306d..6ecc6b7c7ac 100644
--- a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
@@ -18,13 +18,19 @@ package org.apache.kafka.streams.utils;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorWrapper;
import org.apache.kafka.streams.processor.api.WrappedFixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.WrappedProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.StoreBuilder;
+import
org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper.WrapperRecorder;
import org.junit.jupiter.api.TestInfo;
+import org.mockito.Mockito;
import java.lang.reflect.Method;
import java.time.Duration;
@@ -43,7 +49,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
public class TestUtils {
- public static final String PROCESSOR_WRAPPER_COUNTER_CONFIG =
"wrapped.processor.count";
+ public static final String PROCESSOR_WRAPPER_COUNTER_CONFIG =
"wrapped.counter";
/**
* Waits for the given {@link KafkaStreams} instances to all be in a
specific {@link KafkaStreams.State}.
@@ -111,6 +117,12 @@ public class TestUtils {
return baseConfigs;
}
+ public static StoreFactory mockStoreFactory(final String name) {
+ final StoreFactory storeFactory = Mockito.mock(StoreFactory.class);
+ Mockito.when(storeFactory.name()).thenReturn(name);
+ return storeFactory;
+ }
+
/**
* Simple pass-through processor wrapper that counts the number of
processors
* it wraps.
@@ -119,29 +131,142 @@ public class TestUtils {
*/
public static class RecordingProcessorWrapper implements ProcessorWrapper {
- private Set<String> wrappedProcessorNames;
+ private WrapperRecorder recorder;
@Override
public void configure(final Map<String, ?> configs) {
if (configs.containsKey(PROCESSOR_WRAPPER_COUNTER_CONFIG)) {
- wrappedProcessorNames = (Set<String>)
configs.get(PROCESSOR_WRAPPER_COUNTER_CONFIG);
+ recorder = (WrapperRecorder)
configs.get(PROCESSOR_WRAPPER_COUNTER_CONFIG);
} else {
- wrappedProcessorNames = Collections.synchronizedSet(new
HashSet<>());
+ recorder = new WrapperRecorder();
}
}
+ public static class WrapperRecorder {
+ private final Set<String> uniqueStores = new HashSet<>();
+ private final Set<String> processorStoresCounted = new HashSet<>();
+ private final Set<String> wrappedProcessorNames =
Collections.synchronizedSet(new HashSet<>());
+
+ public void wrapProcessorSupplier(final String name) {
+ wrappedProcessorNames.add(name);
+ }
+
+ public void wrapStateStore(final String processorName, final
String storeName) {
+ if (!uniqueStores.contains(storeName)) {
+ uniqueStores.add(storeName);
+ }
+
+ final String processorStoreKey = processorName + storeName;
+ if (!processorStoresCounted.contains(processorStoreKey)) {
+ processorStoresCounted.add(processorStoreKey);
+ }
+ }
+
+ public int numWrappedProcessors() {
+ return wrappedProcessorNames.size();
+ }
+
+ // Number of unique state stores in the topology connected to
their processors via the
+ // ProcessorSupplier#stores method. State stores connected to more
than one processor are
+ // counted only once
+ public int numUniqueStateStores() {
+ return uniqueStores.size();
+ }
+
+ // Number of stores connected to a processor via the
ProcessorSupplier#stores method (ie the size
+ // of the set returned by #stores), summed across all processors
in the topology.
+ // Equal to the number of unique <processorName>-<storeName>
+ // pairings. Will be greater than or equal to the value of
#numUniqueStateStores, as this method
+ // will double count any stores connected to more than one
processor
+ public int numConnectedStateStores() {
+ return processorStoresCounted.size();
+ }
+
+ public Set<String> wrappedProcessorNames() {
+ return wrappedProcessorNames;
+ }
+
+ }
+
@Override
public <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut,
VOut> wrapProcessorSupplier(final String processorName,
final ProcessorSupplier<KIn, VIn, KOut, VOut>
processorSupplier) {
- wrappedProcessorNames.add(processorName);
- return ProcessorWrapper.asWrapped(processorSupplier);
+
+ return new CountingDelegatingProcessorSupplier<>(recorder,
processorName, processorSupplier);
}
@Override
public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn,
VOut> wrapFixedKeyProcessorSupplier(final String processorName,
final FixedKeyProcessorSupplier<KIn, VIn, VOut>
processorSupplier) {
- wrappedProcessorNames.add(processorName);
- return ProcessorWrapper.asWrappedFixedKey(processorSupplier);
+ return new CountingDelegatingFixedKeyProcessorSupplier<>(recorder,
processorName, processorSupplier);
+ }
+ }
+
+ private static class CountingDelegatingProcessorSupplier<KIn, VIn, KOut,
VOut>
+ implements WrappedProcessorSupplier<KIn, VIn, KOut, VOut> {
+
+ private final WrapperRecorder counter;
+ private final String processorName;
+ private final ProcessorSupplier<KIn, VIn, KOut, VOut> delegate;
+
+ public CountingDelegatingProcessorSupplier(final WrapperRecorder
counter,
+ final String processorName,
+ final
ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier) {
+ this.counter = counter;
+ this.processorName = processorName;
+ this.delegate = processorSupplier;
+
+ counter.wrapProcessorSupplier(processorName);
+ }
+
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ final Set<StoreBuilder<?>> stores = delegate.stores();
+ if (stores != null) {
+ for (final StoreBuilder<?> store : stores) {
+ counter.wrapStateStore(processorName, store.name());
+ }
+ }
+ return stores;
+ }
+
+ @Override
+ public Processor<KIn, VIn, KOut, VOut> get() {
+ return delegate.get();
+ }
+ }
+
+ private static class CountingDelegatingFixedKeyProcessorSupplier<KIn, VIn,
VOut>
+ implements WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> {
+
+ private final WrapperRecorder counter;
+ private final String processorName;
+ private final FixedKeyProcessorSupplier<KIn, VIn, VOut> delegate;
+
+ public CountingDelegatingFixedKeyProcessorSupplier(final
WrapperRecorder counter,
+ final String
processorName,
+ final
FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier) {
+ this.counter = counter;
+ this.processorName = processorName;
+ this.delegate = processorSupplier;
+
+ counter.wrapProcessorSupplier(processorName);
+ }
+
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ final Set<StoreBuilder<?>> stores = delegate.stores();
+ if (stores != null) {
+ for (final StoreBuilder<?> store : stores) {
+ counter.wrapStateStore(processorName, store.name());
+ }
+ }
+ return stores;
+ }
+
+ @Override
+ public FixedKeyProcessor<KIn, VIn, VOut> get() {
+ return delegate.get();
}
}
}