This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ef2e4600f37 KAFKA-18026: KIP-1112, migrate stream-stream joins to use
ProcesserSupplier#stores (#18111)
ef2e4600f37 is described below
commit ef2e4600f37316fee5f688972339e7ae6b8b6cd9
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Thu Dec 12 14:54:58 2024 -0800
KAFKA-18026: KIP-1112, migrate stream-stream joins to use
ProcesserSupplier#stores (#18111)
Covers wrapping of processors and state stores for KStream-KStream joins.
Includes self-joins and the spurious results fix optimization
Reviewers: Guozhang Wang <[email protected]>
---
.../kstream/internals/InternalStreamsBuilder.java | 4 +-
.../streams/kstream/internals/KStreamImplJoin.java | 33 ++--
.../kstream/internals/KStreamJoinWindow.java | 19 +-
.../kstream/internals/KStreamKStreamJoin.java | 37 +++-
.../internals/KStreamKStreamJoinLeftSide.java | 13 +-
.../internals/KStreamKStreamJoinRightSide.java | 13 +-
.../kstream/internals/KStreamKStreamSelfJoin.java | 27 ++-
.../internals/graph/BaseJoinProcessorNode.java | 1 -
.../internals/graph/StreamStreamJoinNode.java | 131 ++++---------
.../apache/kafka/streams/StreamsBuilderTest.java | 209 ++++++++++++++++++++-
.../kstream/internals/KStreamKStreamJoinTest.java | 53 +++---
11 files changed, 356 insertions(+), 184 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 1e148ac047c..954b88bfbea 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -442,9 +442,9 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
GraphNode left = null, right = null;
for (final GraphNode child: parent.children()) {
if (child instanceof WindowedStreamProcessorNode &&
child.buildPriority() < joinNode.buildPriority()) {
- if
(child.nodeName().equals(joinNode.thisWindowedStreamProcessorParameters().processorName()))
{
+ if
(child.nodeName().equals(joinNode.thisWindowedStreamProcessorName())) {
left = child;
- } else if
(child.nodeName().equals(joinNode.otherWindowedStreamProcessorParameters().processorName()))
{
+ } else if
(child.nodeName().equals(joinNode.otherWindowedStreamProcessorName())) {
right = child;
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
index d9008e81c8d..aeece23cf34 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
@@ -146,13 +146,13 @@ class KStreamImplJoin {
otherWindowStore =
joinWindowStoreBuilderFromSupplier(otherStoreSupplier,
streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde());
}
- final KStreamJoinWindow<K, V1> thisWindowedStream = new
KStreamJoinWindow<>(thisWindowStore.storeName());
+ final KStreamJoinWindow<K, V1> thisWindowedStream = new
KStreamJoinWindow<>(thisWindowStore);
final ProcessorParameters<K, V1, ?, ?> thisWindowStreamProcessorParams
= new ProcessorParameters<>(thisWindowedStream, thisWindowStreamProcessorName);
final ProcessorGraphNode<K, V1> thisWindowedStreamsNode = new
WindowedStreamProcessorNode<>(thisWindowStore.storeName(),
thisWindowStreamProcessorParams);
builder.addGraphNode(thisGraphNode, thisWindowedStreamsNode);
- final KStreamJoinWindow<K, V2> otherWindowedStream = new
KStreamJoinWindow<>(otherWindowStore.storeName());
+ final KStreamJoinWindow<K, V2> otherWindowedStream = new
KStreamJoinWindow<>(otherWindowStore);
final ProcessorParameters<K, V2, ?, ?>
otherWindowStreamProcessorParams = new
ProcessorParameters<>(otherWindowedStream, otherWindowStreamProcessorName);
final ProcessorGraphNode<K, V2> otherWindowedStreamsNode = new
WindowedStreamProcessorNode<>(otherWindowStore.storeName(),
otherWindowStreamProcessorParams);
@@ -173,25 +173,25 @@ class KStreamImplJoin {
final JoinWindowsInternal internalWindows = new
JoinWindowsInternal(windows);
final KStreamKStreamJoinLeftSide<K, V1, V2, VOut> joinThis = new
KStreamKStreamJoinLeftSide<>(
- otherWindowStore.storeName(),
internalWindows,
joiner,
leftOuter,
- outerJoinWindowStore.map(StoreFactory::storeName),
- sharedTimeTrackerSupplier
+ sharedTimeTrackerSupplier,
+ otherWindowStore,
+ outerJoinWindowStore
);
final KStreamKStreamJoinRightSide<K, V1, V2, VOut> joinOther = new
KStreamKStreamJoinRightSide<>(
- thisWindowStore.storeName(),
internalWindows,
AbstractStream.reverseJoinerWithKey(joiner),
rightOuter,
- outerJoinWindowStore.map(StoreFactory::storeName),
- sharedTimeTrackerSupplier
+ sharedTimeTrackerSupplier,
+ thisWindowStore,
+ outerJoinWindowStore
);
final KStreamKStreamSelfJoin<K, V1, V2, VOut> selfJoin = new
KStreamKStreamSelfJoin<>(
- thisWindowStore.storeName(),
+ thisWindowStore,
internalWindows,
joiner,
windows.size() + windows.gracePeriodMs()
@@ -209,18 +209,11 @@ class KStreamImplJoin {
joinBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParams)
.withJoinThisProcessorParameters(joinThisProcessorParams)
.withJoinOtherProcessorParameters(joinOtherProcessorParams)
- .withThisWindowStoreBuilder(thisWindowStore)
- .withOtherWindowStoreBuilder(otherWindowStore)
-
.withThisWindowedStreamProcessorParameters(thisWindowStreamProcessorParams)
-
.withOtherWindowedStreamProcessorParameters(otherWindowStreamProcessorParams)
- .withOuterJoinWindowStoreBuilder(outerJoinWindowStore)
+ .withSelfJoinProcessorParameters(selfJoinProcessorParams)
+
.withThisWindowedStreamProcessorName(thisWindowStreamProcessorParams.processorName())
+
.withOtherWindowedStreamProcessorName(otherWindowStreamProcessorParams.processorName())
.withValueJoiner(joiner)
- .withNodeName(joinMergeName)
- .withSelfJoinProcessorParameters(selfJoinProcessorParams);
-
- if (internalWindows.spuriousResultFixEnabled()) {
- joinBuilder.withSpuriousResultFixEnabled();
- }
+ .withNodeName(joinMergeName);
final GraphNode joinGraphNode = joinBuilder.build();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
index 13cfa0db29d..bffdc7cfd09 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -21,14 +21,25 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.WindowStore;
+import java.util.Collections;
+import java.util.Set;
+
class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V, K, V> {
- private final String windowName;
+ private final StoreFactory thisWindowStoreFactory;
+
+ KStreamJoinWindow(final StoreFactory thisWindowStoreFactory) {
+ this.thisWindowStoreFactory = thisWindowStoreFactory;
+ }
- KStreamJoinWindow(final String windowName) {
- this.windowName = windowName;
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ return Collections.singleton(new
FactoryWrappingStoreBuilder<>(thisWindowStoreFactory));
}
@Override
@@ -44,7 +55,7 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K,
V, K, V> {
public void init(final ProcessorContext<K, V> context) {
super.init(context);
- window = context.getStateStore(windowName);
+ window = context.getStateStore(thisWindowStoreFactory.storeName());
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index d3c0c4c0a22..5533963bcd5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -27,9 +27,12 @@ import
org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
@@ -38,7 +41,9 @@ import
org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.LinkedHashSet;
import java.util.Optional;
+import java.util.Set;
import static
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
import static
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
@@ -46,7 +51,7 @@ import static
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.d
abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut, VThis, VOther>
implements ProcessorSupplier<K, VThis, K, VOut> {
private static final Logger LOG =
LoggerFactory.getLogger(KStreamKStreamJoin.class);
- private final String otherWindowName;
+ private final StoreFactory otherWindowStoreFactory;
private final long joinBeforeMs;
private final long joinAfterMs;
private final long joinGraceMs;
@@ -55,20 +60,20 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut,
VThis, VOther> impleme
private final long windowsAfterMs;
private final boolean outer;
- private final Optional<String> outerJoinWindowName;
+ private final Optional<StoreFactory> outerJoinWindowStoreFactory;
private final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther,
? extends VOut> joiner;
private final TimeTrackerSupplier sharedTimeTrackerSupplier;
- KStreamKStreamJoin(final String otherWindowName,
- final JoinWindowsInternal windows,
+ KStreamKStreamJoin(final JoinWindowsInternal windows,
final ValueJoinerWithKey<? super K, ? super VThis, ?
super VOther, ? extends VOut> joiner,
final boolean outer,
- final Optional<String> outerJoinWindowName,
final long joinBeforeMs,
final long joinAfterMs,
- final TimeTrackerSupplier sharedTimeTrackerSupplier) {
- this.otherWindowName = otherWindowName;
+ final TimeTrackerSupplier sharedTimeTrackerSupplier,
+ final StoreFactory otherWindowStoreFactory,
+ final Optional<StoreFactory>
outerJoinWindowStoreFactory) {
+ this.otherWindowStoreFactory = otherWindowStoreFactory;
this.joinBeforeMs = joinBeforeMs;
this.joinAfterMs = joinAfterMs;
this.windowsAfterMs = windows.afterMs;
@@ -77,10 +82,22 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut,
VThis, VOther> impleme
this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
this.joiner = joiner;
this.outer = outer;
- this.outerJoinWindowName = outerJoinWindowName;
+ this.outerJoinWindowStoreFactory = outerJoinWindowStoreFactory;
this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
}
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ // use ordered set for deterministic topology string in tests
+ final Set<StoreBuilder<?>> stores = new LinkedHashSet<>();
+ stores.add(new FactoryWrappingStoreBuilder<>(otherWindowStoreFactory));
+
+ if (outerJoinWindowStoreFactory.isPresent() &&
enableSpuriousResultFix) {
+ stores.add(new
FactoryWrappingStoreBuilder<>(outerJoinWindowStoreFactory.get()));
+ }
+ return stores;
+ }
+
protected abstract class KStreamKStreamJoinProcessor extends
ContextualProcessor<K, VThis, K, VOut> {
private WindowStore<K, VOther> otherWindowStore;
private Sensor droppedRecordsSensor;
@@ -95,11 +112,11 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut,
VThis, VOther> impleme
final StreamsMetricsImpl metrics = (StreamsMetricsImpl)
context.metrics();
droppedRecordsSensor =
droppedRecordsSensor(Thread.currentThread().getName(),
context.taskId().toString(), metrics);
- otherWindowStore = context.getStateStore(otherWindowName);
+ otherWindowStore =
context.getStateStore(otherWindowStoreFactory.storeName());
sharedTimeTracker =
sharedTimeTrackerSupplier.get(context.taskId());
if (enableSpuriousResultFix) {
- outerJoinStore =
outerJoinWindowName.map(context::getStateStore);
+ outerJoinStore = outerJoinWindowStoreFactory.map(s ->
context.getStateStore(s.storeName()));
sharedTimeTracker.setEmitInterval(
StreamsConfig.InternalConfig.getLong(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
index 2309033b232..7629f89a3fa 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
@@ -26,14 +27,14 @@ import java.util.Optional;
class KStreamKStreamJoinLeftSide<K, VLeft, VRight, VOut> extends
KStreamKStreamJoin<K, VLeft, VRight, VOut, VLeft, VRight> {
- KStreamKStreamJoinLeftSide(final String otherWindowName,
- final JoinWindowsInternal windows,
+ KStreamKStreamJoinLeftSide(final JoinWindowsInternal windows,
final ValueJoinerWithKey<? super K, ? super
VLeft, ? super VRight, ? extends VOut> joiner,
final boolean outer,
- final Optional<String> outerJoinWindowName,
- final TimeTrackerSupplier
sharedTimeTrackerSupplier) {
- super(otherWindowName, windows, joiner, outer, outerJoinWindowName,
windows.beforeMs, windows.afterMs,
- sharedTimeTrackerSupplier);
+ final TimeTrackerSupplier
sharedTimeTrackerSupplier,
+ final StoreFactory otherWindowStoreFactory,
+ final Optional<StoreFactory>
outerJoinWindowStoreFactory) {
+ super(windows, joiner, outer, windows.beforeMs, windows.afterMs,
+ sharedTimeTrackerSupplier, otherWindowStoreFactory,
outerJoinWindowStoreFactory);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
index e9cb8b82ff1..7931853b8d0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
@@ -26,14 +27,14 @@ import java.util.Optional;
class KStreamKStreamJoinRightSide<K, VLeft, VRight, VOut> extends
KStreamKStreamJoin<K, VLeft, VRight, VOut, VRight, VLeft> {
- KStreamKStreamJoinRightSide(final String otherWindowName,
- final JoinWindowsInternal windows,
+ KStreamKStreamJoinRightSide(final JoinWindowsInternal windows,
final ValueJoinerWithKey<? super K, ? super
VRight, ? super VLeft, ? extends VOut> joiner,
final boolean outer,
- final Optional<String> outerJoinWindowName,
- final TimeTrackerSupplier
sharedTimeTrackerSupplier) {
- super(otherWindowName, windows, joiner, outer, outerJoinWindowName,
windows.afterMs, windows.beforeMs,
- sharedTimeTrackerSupplier);
+ final TimeTrackerSupplier
sharedTimeTrackerSupplier,
+ final StoreFactory otherWindowStoreFactory,
+ final Optional<StoreFactory>
outerJoinWindowStoreFactory) {
+ super(windows, joiner, outer, windows.afterMs, windows.beforeMs,
sharedTimeTrackerSupplier,
+ otherWindowStoreFactory, outerJoinWindowStoreFactory);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java
index b627a98ef4a..adcc501effe 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java
@@ -25,19 +25,25 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.Set;
+
import static
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K,
V1, K, VOut> {
private static final Logger LOG =
LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
- private final String windowName;
+ private final StoreFactory windowStoreFactory;
private final long joinThisBeforeMs;
private final long joinThisAfterMs;
private final long joinOtherBeforeMs;
@@ -45,13 +51,11 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1
private final long retentionPeriod;
private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ?
extends VOut> joinerThis;
- KStreamKStreamSelfJoin(
- final String windowName,
- final JoinWindowsInternal windows,
- final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends
VOut> joinerThis,
- final long retentionPeriod) {
-
- this.windowName = windowName;
+ KStreamKStreamSelfJoin(final StoreFactory windowStoreFactory,
+ final JoinWindowsInternal windows,
+ final ValueJoinerWithKey<? super K, ? super V1, ?
super V2, ? extends VOut> joinerThis,
+ final long retentionPeriod) {
+ this.windowStoreFactory = windowStoreFactory;
this.joinThisBeforeMs = windows.beforeMs;
this.joinThisAfterMs = windows.afterMs;
this.joinOtherBeforeMs = windows.afterMs;
@@ -60,6 +64,11 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1
this.retentionPeriod = retentionPeriod;
}
+ @Override
+ public Set<StoreBuilder<?>> stores() {
+ return Collections.singleton(new
FactoryWrappingStoreBuilder<>(windowStoreFactory));
+ }
+
@Override
public Processor<K, V1, K, VOut> get() {
return new KStreamKStreamSelfJoinProcessor();
@@ -76,7 +85,7 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1
final StreamsMetricsImpl metrics = (StreamsMetricsImpl)
context.metrics();
droppedRecordsSensor =
droppedRecordsSensor(Thread.currentThread().getName(),
context.taskId().toString(), metrics);
- windowStore = context.getStateStore(windowName);
+ windowStore =
context.getStateStore(windowStoreFactory.storeName());
}
@SuppressWarnings("unchecked")
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
index 0c0dcb3bec9..128de320f2b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
@@ -32,7 +32,6 @@ abstract class BaseJoinProcessorNode<K, V1, V2, VR> extends
GraphNode {
private final String thisJoinSideNodeName;
private final String otherJoinSideNodeName;
-
BaseJoinProcessorNode(final String nodeName,
final ValueJoinerWithKey<? super K, ? super V1, ?
super V2, ? extends VR> valueJoiner,
final ProcessorParameters<K, V1, ?, ?>
joinThisProcessorParameters,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
index f9cf9164d20..7448498a442 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
@@ -17,24 +17,16 @@
package org.apache.kafka.streams.kstream.internals.graph;
-import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
-import java.util.Optional;
/**
* Too much information to generalize, so Stream-Stream joins are represented
by a specific node.
*/
public class StreamStreamJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K, V1, V2, VR> {
- private final ProcessorParameters<K, V1, ?, ?>
thisWindowedStreamProcessorParameters;
- private final ProcessorParameters<K, V2, ?, ?>
otherWindowedStreamProcessorParameters;
- private final StoreFactory thisWindowStoreBuilder;
- private final StoreFactory otherWindowStoreBuilder;
- private final Optional<StoreFactory> outerJoinWindowStoreBuilder;
- private final Joined<K, V1, V2> joined;
- private final boolean enableSpuriousResultFix;
+ private final String thisWindowedStreamProcessorName;
+ private final String otherWindowedStreamProcessorName;
private final ProcessorParameters<K, V1, ?, ?> selfJoinProcessorParameters;
private boolean isSelfJoin;
@@ -43,14 +35,9 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
final ProcessorParameters<K, V1, ?, ?>
joinThisProcessorParameters,
final ProcessorParameters<K, V2, ?, ?>
joinOtherProcessParameters,
final ProcessorParameters<K, VR, ?, ?>
joinMergeProcessorParameters,
- final ProcessorParameters<K, V1, ?, ?>
thisWindowedStreamProcessorParameters,
- final ProcessorParameters<K, V2, ?, ?>
otherWindowedStreamProcessorParameters,
- final StoreFactory thisStoreFactory,
- final StoreFactory otherStoreFactory,
- final Optional<StoreFactory>
outerJoinStoreFactory,
- final Joined<K, V1, V2> joined,
- final boolean enableSpuriousResultFix,
- final ProcessorParameters<K, V1, ?, ?>
selfJoinProcessorParameters) {
+ final ProcessorParameters<K, V1, ?, ?>
selfJoinProcessorParameters,
+ final String thisWindowedStreamProcessorName,
+ final String
otherWindowedStreamProcessorName) {
super(nodeName,
valueJoiner,
@@ -60,26 +47,16 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
null,
null);
- this.thisWindowStoreBuilder = thisStoreFactory;
- this.otherWindowStoreBuilder = otherStoreFactory;
- this.joined = joined;
- this.thisWindowedStreamProcessorParameters =
thisWindowedStreamProcessorParameters;
- this.otherWindowedStreamProcessorParameters =
otherWindowedStreamProcessorParameters;
- this.outerJoinWindowStoreBuilder = outerJoinStoreFactory;
- this.enableSpuriousResultFix = enableSpuriousResultFix;
+ this.thisWindowedStreamProcessorName = thisWindowedStreamProcessorName;
+ this.otherWindowedStreamProcessorName =
otherWindowedStreamProcessorName;
this.selfJoinProcessorParameters = selfJoinProcessorParameters;
}
-
@Override
public String toString() {
return "StreamStreamJoinNode{" +
- "thisWindowedStreamProcessorParameters=" +
thisWindowedStreamProcessorParameters +
- ", otherWindowedStreamProcessorParameters=" +
otherWindowedStreamProcessorParameters +
- ", thisWindowStoreBuilder=" + thisWindowStoreBuilder +
- ", otherWindowStoreBuilder=" + otherWindowStoreBuilder +
- ", outerJoinWindowStoreBuilder=" + outerJoinWindowStoreBuilder +
- ", joined=" + joined +
+ "thisWindowedStreamProcessorName=" +
thisWindowedStreamProcessorName +
+ ", otherWindowedStreamProcessorName=" +
otherWindowedStreamProcessorName +
"} " + super.toString();
}
@@ -89,22 +66,14 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
final String thisProcessorName =
thisProcessorParameters().processorName();
final String otherProcessorName =
otherProcessorParameters().processorName();
- final String thisWindowedStreamProcessorName =
thisWindowedStreamProcessorParameters.processorName();
- final String otherWindowedStreamProcessorName =
otherWindowedStreamProcessorParameters.processorName();
if (isSelfJoin) {
-
topologyBuilder.addProcessor(selfJoinProcessorParameters.processorName(),
selfJoinProcessorParameters.processorSupplier(),
thisWindowedStreamProcessorName);
- topologyBuilder.addStateStore(thisWindowStoreBuilder,
thisWindowedStreamProcessorName, selfJoinProcessorParameters.processorName());
+ selfJoinProcessorParameters.addProcessorTo(topologyBuilder, new
String[]{thisWindowedStreamProcessorName});
} else {
- topologyBuilder.addProcessor(thisProcessorName,
thisProcessorParameters().processorSupplier(), thisWindowedStreamProcessorName);
- topologyBuilder.addProcessor(otherProcessorName,
otherProcessorParameters().processorSupplier(),
otherWindowedStreamProcessorName);
-
topologyBuilder.addProcessor(mergeProcessorParameters().processorName(),
mergeProcessorParameters().processorSupplier(), thisProcessorName,
otherProcessorName);
- topologyBuilder.addStateStore(thisWindowStoreBuilder,
thisWindowedStreamProcessorName, otherProcessorName);
- topologyBuilder.addStateStore(otherWindowStoreBuilder,
otherWindowedStreamProcessorName, thisProcessorName);
-
- if (enableSpuriousResultFix) {
- outerJoinWindowStoreBuilder.ifPresent(builder ->
topologyBuilder.addStateStore(builder, thisProcessorName, otherProcessorName));
- }
+ thisProcessorParameters().addProcessorTo(topologyBuilder, new
String[]{thisWindowedStreamProcessorName});
+ otherProcessorParameters().addProcessorTo(topologyBuilder, new
String[]{otherWindowedStreamProcessorName});
+
+ mergeProcessorParameters().addProcessorTo(topologyBuilder, new
String[]{thisProcessorName, otherProcessorName});
}
}
@@ -116,12 +85,12 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
return isSelfJoin;
}
- public ProcessorParameters<K, V1, ?, ?>
thisWindowedStreamProcessorParameters() {
- return thisWindowedStreamProcessorParameters;
+ public String thisWindowedStreamProcessorName() {
+ return thisWindowedStreamProcessorName;
}
- public ProcessorParameters<K, V2, ?, ?>
otherWindowedStreamProcessorParameters() {
- return otherWindowedStreamProcessorParameters;
+ public String otherWindowedStreamProcessorName() {
+ return otherWindowedStreamProcessorName;
}
public static <K, V1, V2, VR> StreamStreamJoinNodeBuilder<K, V1, V2, VR>
streamStreamJoinNodeBuilder() {
@@ -135,14 +104,9 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
private ProcessorParameters<K, V1, ?, ?> joinThisProcessorParameters;
private ProcessorParameters<K, V2, ?, ?> joinOtherProcessorParameters;
private ProcessorParameters<K, VR, ?, ?> joinMergeProcessorParameters;
- private ProcessorParameters<K, V1, ?, ?>
thisWindowedStreamProcessorParameters;
- private ProcessorParameters<K, V2, ?, ?>
otherWindowedStreamProcessorParameters;
- private StoreFactory thisStoreFactory;
- private StoreFactory otherStoreFactory;
- private Optional<StoreFactory> outerJoinStoreFactory;
- private Joined<K, V1, V2> joined;
- private boolean enableSpuriousResultFix = false;
private ProcessorParameters<K, V1, ?, ?> selfJoinProcessorParameters;
+ private String thisWindowedStreamProcessorName;
+ private String otherWindowedStreamProcessorName;
private StreamStreamJoinNodeBuilder() {
}
@@ -167,50 +131,24 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
return this;
}
- public StreamStreamJoinNodeBuilder<K, V1, V2, VR>
withJoinMergeProcessorParameters(final ProcessorParameters<K, VR, ?, ?>
joinMergeProcessorParameters) {
- this.joinMergeProcessorParameters = joinMergeProcessorParameters;
- return this;
- }
-
- public StreamStreamJoinNodeBuilder<K, V1, V2, VR>
withThisWindowedStreamProcessorParameters(final ProcessorParameters<K, V1, ?,
?> thisWindowedStreamProcessorParameters) {
- this.thisWindowedStreamProcessorParameters =
thisWindowedStreamProcessorParameters;
- return this;
- }
-
- public StreamStreamJoinNodeBuilder<K, V1, V2, VR>
withOtherWindowedStreamProcessorParameters(
- final ProcessorParameters<K, V2, ?, ?>
otherWindowedStreamProcessorParameters) {
- this.otherWindowedStreamProcessorParameters =
otherWindowedStreamProcessorParameters;
- return this;
- }
-
- public StreamStreamJoinNodeBuilder<K, V1, V2, VR>
withThisWindowStoreBuilder(final StoreFactory thisStoreFactory) {
- this.thisStoreFactory = thisStoreFactory;
- return this;
- }
-
- public StreamStreamJoinNodeBuilder<K, V1, V2, VR>
withOtherWindowStoreBuilder(final StoreFactory otherStoreFactory) {
- this.otherStoreFactory = otherStoreFactory;
- return this;
- }
-
- public StreamStreamJoinNodeBuilder<K, V1, V2, VR>
withOuterJoinWindowStoreBuilder(final Optional<StoreFactory>
outerJoinWindowStoreBuilder) {
- this.outerJoinStoreFactory = outerJoinWindowStoreBuilder;
+ public StreamStreamJoinNodeBuilder<K, V1, V2, VR>
withSelfJoinProcessorParameters(
+ final ProcessorParameters<K, V1, ?, ?>
selfJoinProcessorParameters) {
+ this.selfJoinProcessorParameters = selfJoinProcessorParameters;
return this;
}
- public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withJoined(final
Joined<K, V1, V2> joined) {
- this.joined = joined;
+ public StreamStreamJoinNodeBuilder<K, V1, V2, VR>
withJoinMergeProcessorParameters(final ProcessorParameters<K, VR, ?, ?>
joinMergeProcessorParameters) {
+ this.joinMergeProcessorParameters = joinMergeProcessorParameters;
return this;
}
- public StreamStreamJoinNodeBuilder<K, V1, V2, VR>
withSpuriousResultFixEnabled() {
- this.enableSpuriousResultFix = true;
+ public StreamStreamJoinNodeBuilder<K, V1, V2, VR>
withThisWindowedStreamProcessorName(final String
thisWindowedStreamProcessorName) {
+ this.thisWindowedStreamProcessorName =
thisWindowedStreamProcessorName;
return this;
}
- public StreamStreamJoinNodeBuilder<K, V1, V2, VR>
withSelfJoinProcessorParameters(
- final ProcessorParameters<K, V1, ?, ?>
selfJoinProcessorParameters) {
- this.selfJoinProcessorParameters = selfJoinProcessorParameters;
+ public StreamStreamJoinNodeBuilder<K, V1, V2, VR>
withOtherWindowedStreamProcessorName(final String
otherWindowedStreamProcessorName) {
+ this.otherWindowedStreamProcessorName =
otherWindowedStreamProcessorName;
return this;
}
@@ -221,14 +159,9 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
joinThisProcessorParameters,
joinOtherProcessorParameters,
joinMergeProcessorParameters,
-
thisWindowedStreamProcessorParameters,
-
otherWindowedStreamProcessorParameters,
- thisStoreFactory,
- otherStoreFactory,
- outerJoinStoreFactory,
- joined,
- enableSpuriousResultFix,
- selfJoinProcessorParameters);
+ selfJoinProcessorParameters,
+ thisWindowedStreamProcessorName,
+
otherWindowedStreamProcessorName);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 00c745fd735..738f753f532 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -89,6 +89,7 @@ import java.util.regex.Pattern;
import static java.util.Arrays.asList;
import static
org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
+import static
org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1;
import static org.apache.kafka.streams.state.Stores.inMemoryKeyValueStore;
@@ -1610,8 +1611,8 @@ public class StreamsBuilderTest {
.toStream(Named.as("toStream"))// wrapped 4
.to("output", Produced.as("sink"));
- final var top = builder.build();
- System.out.println(top.describe());
+ builder.build();
+
assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
"aggregate-cogroup-agg-0", "aggregate-cogroup-agg-1",
"aggregate-cogroup-merge", "toStream"
));
@@ -1747,6 +1748,209 @@ public class StreamsBuilderTest {
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
}
+ @Test
+ public void shouldWrapProcessorsForStreamStreamInnerJoin() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ final KStream<String, String> stream1 = builder.stream("input-1",
Consumed.as("source-1"));
+ final KStream<String, String> stream2 = builder.stream("input-2",
Consumed.as("source-2"));
+
+ stream1.join(
+ stream2,
+ MockValueJoiner.TOSTRING_JOINER,
+ JoinWindows.ofTimeDifferenceAndGrace(Duration.ofDays(1),
Duration.ofDays(1)),
+ StreamJoined.as("ss-join"))
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+
+ // TODO: fix these names once we address
https://issues.apache.org/jira/browse/KAFKA-18191
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
+ "KSTREAM-JOINTHIS-0000000004", "KSTREAM-JOINOTHER-0000000005",
+ "KSTREAM-WINDOWED-0000000003", "KSTREAM-WINDOWED-0000000002",
+ "KSTREAM-MERGE-0000000006"
+ ));
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(5));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(4));
+ }
+
+ @Test
+ public void shouldWrapProcessorsForStreamStreamLeftJoin() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ final KStream<String, String> stream1 = builder.stream("input-1",
Consumed.as("source-1"));
+ final KStream<String, String> stream2 = builder.stream("input-2",
Consumed.as("source-2"));
+
+ stream1.leftJoin(
+ stream2,
+ MockValueJoiner.TOSTRING_JOINER,
+ JoinWindows.ofTimeDifferenceAndGrace(Duration.ofDays(1),
Duration.ofDays(1)),
+ StreamJoined.as("ss-join"))
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+
+ // TODO: fix these names once we address
https://issues.apache.org/jira/browse/KAFKA-18191
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
+ "KSTREAM-JOINTHIS-0000000004", "KSTREAM-OUTEROTHER-0000000005",
+ "KSTREAM-WINDOWED-0000000003", "KSTREAM-WINDOWED-0000000002",
+ "KSTREAM-MERGE-0000000006"
+ ));
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(5));
+
+ // 1 additional store due to spurious results fix for left/outer joins
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(3));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(6));
+ }
+
+ @Test
+ public void shouldWrapProcessorsForStreamStreamOuterJoin() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ final KStream<String, String> stream1 = builder.stream("input-1",
Consumed.as("source-1"));
+ final KStream<String, String> stream2 = builder.stream("input-2",
Consumed.as("source-2"));
+
+ stream1.outerJoin(
+ stream2,
+ MockValueJoiner.TOSTRING_JOINER,
+ JoinWindows.ofTimeDifferenceAndGrace(Duration.ofDays(1),
Duration.ofDays(1)),
+ StreamJoined.as("ss-join"))
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+
+ // TODO: fix these names once we address
https://issues.apache.org/jira/browse/KAFKA-18191
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
+ "KSTREAM-OUTERTHIS-0000000004", "KSTREAM-OUTEROTHER-0000000005",
+ "KSTREAM-WINDOWED-0000000003", "KSTREAM-WINDOWED-0000000002",
+ "KSTREAM-MERGE-0000000006"
+ ));
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(5));
+
+ // 1 additional store due to spurious results fix for left/outer joins
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(3));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(6));
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void
shouldWrapProcessorsForStreamStreamOuterJoinWithoutSpuriousResultsFix() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ final KStream<String, String> stream1 = builder.stream("input-1",
Consumed.as("source-1"));
+ final KStream<String, String> stream2 = builder.stream("input-2",
Consumed.as("source-2"));
+
+ stream1.outerJoin(
+ stream2,
+ MockValueJoiner.TOSTRING_JOINER,
+ JoinWindows.of(Duration.ofDays(1)), // intentionally uses
deprecated version of this API!
+ StreamJoined.as("ss-join"))
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+
+ // TODO: fix these names once we address
https://issues.apache.org/jira/browse/KAFKA-18191
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
+ "KSTREAM-OUTERTHIS-0000000004", "KSTREAM-OUTEROTHER-0000000005",
+ "KSTREAM-WINDOWED-0000000003", "KSTREAM-WINDOWED-0000000002",
+ "KSTREAM-MERGE-0000000006"
+ ));
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(5));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(4));
+ }
+
+ @Test
+ public void shouldWrapProcessorsForStreamStreamSelfJoin() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ final KStream<String, String> stream1 = builder.stream("input",
Consumed.as("source"));
+
+ stream1.join(
+ stream1,
+ MockValueJoiner.TOSTRING_JOINER,
+ JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofDays(1)),
+ StreamJoined.as("ss-join"))
+ .to("output", Produced.as("sink"));
+
+ builder.build();
+
+ // TODO: fix these names once we address
https://issues.apache.org/jira/browse/KAFKA-18191
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
+ "KSTREAM-JOINTHIS-0000000003", "KSTREAM-JOINOTHER-0000000004",
+ "KSTREAM-WINDOWED-0000000001", "KSTREAM-WINDOWED-0000000002",
+ "KSTREAM-MERGE-0000000005"
+ ));
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(5));
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(4));
+ }
+
+ @Test
+ public void
shouldWrapProcessorsForStreamStreamSelfJoinWithSharedStoreOptimization() {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(PROCESSOR_WRAPPER_CLASS_CONFIG,
RecordingProcessorWrapper.class);
+ props.put(TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
+
+ final WrapperRecorder counter = new WrapperRecorder();
+ props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ final KStream<String, String> stream1 = builder.stream("input",
Consumed.as("source"));
+
+ stream1.join(
+ stream1,
+ MockValueJoiner.TOSTRING_JOINER,
+ JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofDays(1)),
+ StreamJoined.as("ss-join"))
+ .to("output", Produced.as("sink"));
+
+ final Properties properties = new Properties();
+ properties.putAll(props);
+ builder.build(properties);
+
+ // TODO: fix these names once we address
https://issues.apache.org/jira/browse/KAFKA-18191
+ assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
+ "KSTREAM-WINDOWED-0000000001", "KSTREAM-MERGE-0000000005"
+ ));
+ assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
+ // only 1 store when topology optimizations enabled due to sharing
self-join store
+ assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+ assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
+ }
+
@Test
public void shouldWrapProcessorsForStreamTableJoin() {
final Map<Object, Object> props = dummyStreamsConfigMap();
@@ -1767,6 +1971,7 @@ public class StreamsBuilderTest {
.to("output", Produced.as("sink"));
builder.build();
+
assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder(
"source-table", "st-join"
));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index f388441f356..d1068444cdb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -34,11 +34,11 @@ import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
-import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.apache.kafka.streams.state.DslWindowParams;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -456,39 +456,42 @@ public class KStreamKStreamJoinTest {
* This test is testing something internal to [[KStreamKStreamJoin]],
so we had to setup low-level api manually.
*/
final KStreamImplJoin.TimeTrackerSupplier tracker = new
KStreamImplJoin.TimeTrackerSupplier();
- final KStreamKStreamJoinRightSide<String, String, String, String> join
= new KStreamKStreamJoinRightSide<>(
+ final WindowStoreBuilder<String, String> otherStoreBuilder = new
WindowStoreBuilder<>(
+ new InMemoryWindowBytesStoreSupplier(
"other",
- new
JoinWindowsInternal(JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(1000))),
- (key, v1, v2) -> v1 + v2,
- true,
- Optional.of("outer"),
- tracker);
+ 1000L,
+ 100,
+ false),
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime());
+ final KeyValueStoreBuilder<TimestampedKeyAndJoinSide<String>,
LeftOrRightValue<String, String>> outerStoreBuilder = new
KeyValueStoreBuilder<>(
+ new InMemoryKeyValueBytesStoreSupplier("outer"),
+ new TimestampedKeyAndJoinSideSerde<>(Serdes.String()),
+ new LeftOrRightValueSerde<>(Serdes.String(), Serdes.String()),
+ new MockTime()
+ );
+ final KStreamKStreamJoinRightSide<String, String, String, String> join
= new KStreamKStreamJoinRightSide<>(
+ new
JoinWindowsInternal(JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(1000))),
+ (key, v1, v2) -> v1 + v2,
+ true,
+ tracker,
+ StoreBuilderWrapper.wrapStoreBuilder(otherStoreBuilder),
+
Optional.of(StoreBuilderWrapper.wrapStoreBuilder(outerStoreBuilder)));
+
final Processor<String, String, String, String> joinProcessor =
join.get();
final MockInternalNewProcessorContext<String, String> procCtx = new
MockInternalNewProcessorContext<>();
- final WindowStore<String, String> otherStore = new
WindowStoreBuilder<>(
- new InMemoryWindowBytesStoreSupplier(
- "other",
- 1000L,
- 100,
- false),
- Serdes.String(),
- Serdes.String(),
- new MockTime()).build();
+ final WindowStore<String, String> otherStore =
otherStoreBuilder.build();
- final KeyValueStore<TimestampedKeyAndJoinSide<String>,
LeftOrRightValue<String, String>> outerStore = Mockito.spy(
- new KeyValueStoreBuilder<>(
- new InMemoryKeyValueBytesStoreSupplier("outer"),
- new TimestampedKeyAndJoinSideSerde<>(Serdes.String()),
- new LeftOrRightValueSerde<>(Serdes.String(),
Serdes.String()),
- new MockTime()
- ).build());
+ final KeyValueStore<TimestampedKeyAndJoinSide<String>,
LeftOrRightValue<String, String>> outerStore =
+ Mockito.spy(outerStoreBuilder.build());
final GenericInMemoryKeyValueStore<String, String> rootStore = new
GenericInMemoryKeyValueStore<>("root");
- otherStore.init((StateStoreContext) procCtx, rootStore);
+ otherStore.init(procCtx, rootStore);
procCtx.addStateStore(otherStore);
- outerStore.init((StateStoreContext) procCtx, rootStore);
+ outerStore.init(procCtx, rootStore);
procCtx.addStateStore(outerStore);
joinProcessor.init(procCtx);