This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ef2e4600f37 KAFKA-18026: KIP-1112, migrate stream-stream joins to use 
ProcesserSupplier#stores (#18111)
ef2e4600f37 is described below

commit ef2e4600f37316fee5f688972339e7ae6b8b6cd9
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Thu Dec 12 14:54:58 2024 -0800

    KAFKA-18026: KIP-1112, migrate stream-stream joins to use 
ProcesserSupplier#stores (#18111)
    
    Covers wrapping of processors and state stores for KStream-KStream joins.
    
    Includes self-joins and the spurious results fix optimization
    
    Reviewers: Guozhang Wang <[email protected]>
---
 .../kstream/internals/InternalStreamsBuilder.java  |   4 +-
 .../streams/kstream/internals/KStreamImplJoin.java |  33 ++--
 .../kstream/internals/KStreamJoinWindow.java       |  19 +-
 .../kstream/internals/KStreamKStreamJoin.java      |  37 +++-
 .../internals/KStreamKStreamJoinLeftSide.java      |  13 +-
 .../internals/KStreamKStreamJoinRightSide.java     |  13 +-
 .../kstream/internals/KStreamKStreamSelfJoin.java  |  27 ++-
 .../internals/graph/BaseJoinProcessorNode.java     |   1 -
 .../internals/graph/StreamStreamJoinNode.java      | 131 ++++---------
 .../apache/kafka/streams/StreamsBuilderTest.java   | 209 ++++++++++++++++++++-
 .../kstream/internals/KStreamKStreamJoinTest.java  |  53 +++---
 11 files changed, 356 insertions(+), 184 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 1e148ac047c..954b88bfbea 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -442,9 +442,9 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
             GraphNode left = null, right = null;
             for (final GraphNode child: parent.children()) {
                 if (child instanceof WindowedStreamProcessorNode && 
child.buildPriority() < joinNode.buildPriority()) {
-                    if 
(child.nodeName().equals(joinNode.thisWindowedStreamProcessorParameters().processorName()))
 {
+                    if 
(child.nodeName().equals(joinNode.thisWindowedStreamProcessorName())) {
                         left = child;
-                    } else if 
(child.nodeName().equals(joinNode.otherWindowedStreamProcessorParameters().processorName()))
 {
+                    } else if 
(child.nodeName().equals(joinNode.otherWindowedStreamProcessorName())) {
                         right = child;
                     }
                 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
index d9008e81c8d..aeece23cf34 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
@@ -146,13 +146,13 @@ class KStreamImplJoin {
             otherWindowStore = 
joinWindowStoreBuilderFromSupplier(otherStoreSupplier, 
streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde());
         }
 
-        final KStreamJoinWindow<K, V1> thisWindowedStream = new 
KStreamJoinWindow<>(thisWindowStore.storeName());
+        final KStreamJoinWindow<K, V1> thisWindowedStream = new 
KStreamJoinWindow<>(thisWindowStore);
 
         final ProcessorParameters<K, V1, ?, ?> thisWindowStreamProcessorParams 
= new ProcessorParameters<>(thisWindowedStream, thisWindowStreamProcessorName);
         final ProcessorGraphNode<K, V1> thisWindowedStreamsNode = new 
WindowedStreamProcessorNode<>(thisWindowStore.storeName(), 
thisWindowStreamProcessorParams);
         builder.addGraphNode(thisGraphNode, thisWindowedStreamsNode);
 
-        final KStreamJoinWindow<K, V2> otherWindowedStream = new 
KStreamJoinWindow<>(otherWindowStore.storeName());
+        final KStreamJoinWindow<K, V2> otherWindowedStream = new 
KStreamJoinWindow<>(otherWindowStore);
 
         final ProcessorParameters<K, V2, ?, ?> 
otherWindowStreamProcessorParams = new 
ProcessorParameters<>(otherWindowedStream, otherWindowStreamProcessorName);
         final ProcessorGraphNode<K, V2> otherWindowedStreamsNode = new 
WindowedStreamProcessorNode<>(otherWindowStore.storeName(), 
otherWindowStreamProcessorParams);
@@ -173,25 +173,25 @@ class KStreamImplJoin {
 
         final JoinWindowsInternal internalWindows = new 
JoinWindowsInternal(windows);
         final KStreamKStreamJoinLeftSide<K, V1, V2, VOut> joinThis = new 
KStreamKStreamJoinLeftSide<>(
-            otherWindowStore.storeName(),
             internalWindows,
             joiner,
             leftOuter,
-            outerJoinWindowStore.map(StoreFactory::storeName),
-            sharedTimeTrackerSupplier
+            sharedTimeTrackerSupplier,
+            otherWindowStore,
+            outerJoinWindowStore
         );
 
         final KStreamKStreamJoinRightSide<K, V1, V2, VOut> joinOther = new 
KStreamKStreamJoinRightSide<>(
-            thisWindowStore.storeName(),
             internalWindows,
             AbstractStream.reverseJoinerWithKey(joiner),
             rightOuter,
-            outerJoinWindowStore.map(StoreFactory::storeName),
-            sharedTimeTrackerSupplier
+            sharedTimeTrackerSupplier,
+            thisWindowStore,
+            outerJoinWindowStore
         );
 
         final KStreamKStreamSelfJoin<K, V1, V2, VOut> selfJoin = new 
KStreamKStreamSelfJoin<>(
-            thisWindowStore.storeName(),
+            thisWindowStore,
             internalWindows,
             joiner,
             windows.size() + windows.gracePeriodMs()
@@ -209,18 +209,11 @@ class KStreamImplJoin {
         joinBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParams)
                    .withJoinThisProcessorParameters(joinThisProcessorParams)
                    .withJoinOtherProcessorParameters(joinOtherProcessorParams)
-                   .withThisWindowStoreBuilder(thisWindowStore)
-                   .withOtherWindowStoreBuilder(otherWindowStore)
-                   
.withThisWindowedStreamProcessorParameters(thisWindowStreamProcessorParams)
-                   
.withOtherWindowedStreamProcessorParameters(otherWindowStreamProcessorParams)
-                   .withOuterJoinWindowStoreBuilder(outerJoinWindowStore)
+                   .withSelfJoinProcessorParameters(selfJoinProcessorParams)
+                   
.withThisWindowedStreamProcessorName(thisWindowStreamProcessorParams.processorName())
+                   
.withOtherWindowedStreamProcessorName(otherWindowStreamProcessorParams.processorName())
                    .withValueJoiner(joiner)
-                   .withNodeName(joinMergeName)
-                   .withSelfJoinProcessorParameters(selfJoinProcessorParams);
-
-        if (internalWindows.spuriousResultFixEnabled()) {
-            joinBuilder.withSpuriousResultFixEnabled();
-        }
+                   .withNodeName(joinMergeName);
 
         final GraphNode joinGraphNode = joinBuilder.build();
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
index 13cfa0db29d..bffdc7cfd09 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -21,14 +21,25 @@ import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import 
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.WindowStore;
 
+import java.util.Collections;
+import java.util.Set;
+
 class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V, K, V> {
 
-    private final String windowName;
+    private final StoreFactory thisWindowStoreFactory;
+
+    KStreamJoinWindow(final StoreFactory thisWindowStoreFactory) {
+        this.thisWindowStoreFactory = thisWindowStoreFactory;
+    }
 
-    KStreamJoinWindow(final String windowName) {
-        this.windowName = windowName;
+    @Override
+    public Set<StoreBuilder<?>> stores() {
+        return Collections.singleton(new 
FactoryWrappingStoreBuilder<>(thisWindowStoreFactory));
     }
 
     @Override
@@ -44,7 +55,7 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, 
V, K, V> {
         public void init(final ProcessorContext<K, V> context) {
             super.init(context);
 
-            window = context.getStateStore(windowName);
+            window = context.getStateStore(thisWindowStoreFactory.storeName());
         }
 
         @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index d3c0c4c0a22..5533963bcd5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -27,9 +27,12 @@ import 
org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import 
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.LeftOrRightValue;
@@ -38,7 +41,9 @@ import 
org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.LinkedHashSet;
 import java.util.Optional;
+import java.util.Set;
 
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
@@ -46,7 +51,7 @@ import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.d
 abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut, VThis, VOther> 
implements ProcessorSupplier<K, VThis, K, VOut> {
     private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamJoin.class);
 
-    private final String otherWindowName;
+    private final StoreFactory otherWindowStoreFactory;
     private final long joinBeforeMs;
     private final long joinAfterMs;
     private final long joinGraceMs;
@@ -55,20 +60,20 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut, 
VThis, VOther> impleme
     private final long windowsAfterMs;
 
     private final boolean outer;
-    private final Optional<String> outerJoinWindowName;
+    private final Optional<StoreFactory> outerJoinWindowStoreFactory;
     private final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, 
? extends VOut> joiner;
 
     private final TimeTrackerSupplier sharedTimeTrackerSupplier;
 
-    KStreamKStreamJoin(final String otherWindowName,
-                       final JoinWindowsInternal windows,
+    KStreamKStreamJoin(final JoinWindowsInternal windows,
                        final ValueJoinerWithKey<? super K, ? super VThis, ? 
super VOther, ? extends VOut> joiner,
                        final boolean outer,
-                       final Optional<String> outerJoinWindowName,
                        final long joinBeforeMs,
                        final long joinAfterMs,
-                       final TimeTrackerSupplier sharedTimeTrackerSupplier) {
-        this.otherWindowName = otherWindowName;
+                       final TimeTrackerSupplier sharedTimeTrackerSupplier,
+                       final StoreFactory otherWindowStoreFactory,
+                       final Optional<StoreFactory> 
outerJoinWindowStoreFactory) {
+        this.otherWindowStoreFactory = otherWindowStoreFactory;
         this.joinBeforeMs = joinBeforeMs;
         this.joinAfterMs = joinAfterMs;
         this.windowsAfterMs = windows.afterMs;
@@ -77,10 +82,22 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut, 
VThis, VOther> impleme
         this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
         this.joiner = joiner;
         this.outer = outer;
-        this.outerJoinWindowName = outerJoinWindowName;
+        this.outerJoinWindowStoreFactory = outerJoinWindowStoreFactory;
         this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
     }
 
+    @Override
+    public Set<StoreBuilder<?>> stores() {
+        // use ordered set for deterministic topology string in tests
+        final Set<StoreBuilder<?>> stores = new LinkedHashSet<>();
+        stores.add(new FactoryWrappingStoreBuilder<>(otherWindowStoreFactory));
+
+        if (outerJoinWindowStoreFactory.isPresent() && 
enableSpuriousResultFix) {
+            stores.add(new 
FactoryWrappingStoreBuilder<>(outerJoinWindowStoreFactory.get()));
+        }
+        return stores;
+    }
+
     protected abstract class KStreamKStreamJoinProcessor extends 
ContextualProcessor<K, VThis, K, VOut> {
         private WindowStore<K, VOther> otherWindowStore;
         private Sensor droppedRecordsSensor;
@@ -95,11 +112,11 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut, 
VThis, VOther> impleme
 
             final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
             droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
-            otherWindowStore = context.getStateStore(otherWindowName);
+            otherWindowStore = 
context.getStateStore(otherWindowStoreFactory.storeName());
             sharedTimeTracker = 
sharedTimeTrackerSupplier.get(context.taskId());
 
             if (enableSpuriousResultFix) {
-                outerJoinStore = 
outerJoinWindowName.map(context::getStateStore);
+                outerJoinStore = outerJoinWindowStoreFactory.map(s -> 
context.getStateStore(s.storeName()));
 
                 sharedTimeTracker.setEmitInterval(
                     StreamsConfig.InternalConfig.getLong(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
index 2309033b232..7629f89a3fa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
 import 
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
 import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
 import org.apache.kafka.streams.state.internals.LeftOrRightValue;
 import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
 
@@ -26,14 +27,14 @@ import java.util.Optional;
 
 class KStreamKStreamJoinLeftSide<K, VLeft, VRight, VOut> extends 
KStreamKStreamJoin<K, VLeft, VRight, VOut, VLeft, VRight> {
 
-    KStreamKStreamJoinLeftSide(final String otherWindowName,
-                               final JoinWindowsInternal windows,
+    KStreamKStreamJoinLeftSide(final JoinWindowsInternal windows,
                                final ValueJoinerWithKey<? super K, ? super 
VLeft, ? super VRight, ? extends VOut> joiner,
                                final boolean outer,
-                               final Optional<String> outerJoinWindowName,
-                               final TimeTrackerSupplier 
sharedTimeTrackerSupplier) {
-        super(otherWindowName, windows, joiner, outer, outerJoinWindowName, 
windows.beforeMs, windows.afterMs,
-                sharedTimeTrackerSupplier);
+                               final TimeTrackerSupplier 
sharedTimeTrackerSupplier,
+                               final StoreFactory otherWindowStoreFactory,
+                               final Optional<StoreFactory> 
outerJoinWindowStoreFactory) {
+        super(windows, joiner, outer, windows.beforeMs, windows.afterMs,
+              sharedTimeTrackerSupplier, otherWindowStoreFactory, 
outerJoinWindowStoreFactory);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
index e9cb8b82ff1..7931853b8d0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
 import 
org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
 import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
 import org.apache.kafka.streams.state.internals.LeftOrRightValue;
 import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
 
@@ -26,14 +27,14 @@ import java.util.Optional;
 
 class KStreamKStreamJoinRightSide<K, VLeft, VRight, VOut> extends 
KStreamKStreamJoin<K, VLeft, VRight, VOut, VRight, VLeft> {
 
-    KStreamKStreamJoinRightSide(final String otherWindowName,
-                                final JoinWindowsInternal windows,
+    KStreamKStreamJoinRightSide(final JoinWindowsInternal windows,
                                 final ValueJoinerWithKey<? super K, ? super 
VRight, ? super VLeft, ? extends VOut> joiner,
                                 final boolean outer,
-                                final Optional<String> outerJoinWindowName,
-                                final TimeTrackerSupplier 
sharedTimeTrackerSupplier) {
-        super(otherWindowName, windows, joiner, outer, outerJoinWindowName, 
windows.afterMs, windows.beforeMs,
-                sharedTimeTrackerSupplier);
+                                final TimeTrackerSupplier 
sharedTimeTrackerSupplier,
+                                final StoreFactory otherWindowStoreFactory,
+                                final Optional<StoreFactory> 
outerJoinWindowStoreFactory) {
+        super(windows, joiner, outer, windows.afterMs, windows.beforeMs, 
sharedTimeTrackerSupplier,
+              otherWindowStoreFactory, outerJoinWindowStoreFactory);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java
index b627a98ef4a..adcc501effe 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java
@@ -25,19 +25,25 @@ import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import 
org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.Set;
+
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
 class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, 
V1, K, VOut> {
     private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
 
-    private final String windowName;
+    private final StoreFactory windowStoreFactory;
     private final long joinThisBeforeMs;
     private final long joinThisAfterMs;
     private final long joinOtherBeforeMs;
@@ -45,13 +51,11 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1
     private final long retentionPeriod;
     private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? 
extends VOut> joinerThis;
 
-    KStreamKStreamSelfJoin(
-        final String windowName,
-        final JoinWindowsInternal windows,
-        final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends 
VOut> joinerThis,
-        final long retentionPeriod) {
-
-        this.windowName = windowName;
+    KStreamKStreamSelfJoin(final StoreFactory windowStoreFactory,
+                           final JoinWindowsInternal windows,
+                           final ValueJoinerWithKey<? super K, ? super V1, ? 
super V2, ? extends VOut> joinerThis,
+                           final long retentionPeriod) {
+        this.windowStoreFactory = windowStoreFactory;
         this.joinThisBeforeMs = windows.beforeMs;
         this.joinThisAfterMs = windows.afterMs;
         this.joinOtherBeforeMs = windows.afterMs;
@@ -60,6 +64,11 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1
         this.retentionPeriod = retentionPeriod;
     }
 
+    @Override
+    public Set<StoreBuilder<?>> stores() {
+        return Collections.singleton(new 
FactoryWrappingStoreBuilder<>(windowStoreFactory));
+    }
+
     @Override
     public Processor<K, V1, K, VOut> get() {
         return new KStreamKStreamSelfJoinProcessor();
@@ -76,7 +85,7 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1
 
             final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
             droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
-            windowStore = context.getStateStore(windowName);
+            windowStore = 
context.getStateStore(windowStoreFactory.storeName());
         }
 
         @SuppressWarnings("unchecked")
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
index 0c0dcb3bec9..128de320f2b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
@@ -32,7 +32,6 @@ abstract class BaseJoinProcessorNode<K, V1, V2, VR> extends 
GraphNode {
     private final String thisJoinSideNodeName;
     private final String otherJoinSideNodeName;
 
-
     BaseJoinProcessorNode(final String nodeName,
                           final ValueJoinerWithKey<? super K, ? super V1, ? 
super V2, ? extends VR> valueJoiner,
                           final ProcessorParameters<K, V1, ?, ?> 
joinThisProcessorParameters,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
index f9cf9164d20..7448498a442 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
@@ -17,24 +17,16 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
-import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
 
-import java.util.Optional;
 
 /**
  * Too much information to generalize, so Stream-Stream joins are represented 
by a specific node.
  */
 public class StreamStreamJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K, V1, V2, VR> {
-    private final ProcessorParameters<K, V1, ?, ?> 
thisWindowedStreamProcessorParameters;
-    private final ProcessorParameters<K, V2, ?, ?> 
otherWindowedStreamProcessorParameters;
-    private final StoreFactory thisWindowStoreBuilder;
-    private final StoreFactory otherWindowStoreBuilder;
-    private final Optional<StoreFactory> outerJoinWindowStoreBuilder;
-    private final Joined<K, V1, V2> joined;
-    private final boolean enableSpuriousResultFix;
+    private final String thisWindowedStreamProcessorName;
+    private final String otherWindowedStreamProcessorName;
     private final ProcessorParameters<K, V1, ?, ?> selfJoinProcessorParameters;
     private boolean isSelfJoin;
 
@@ -43,14 +35,9 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
                                  final ProcessorParameters<K, V1, ?, ?> 
joinThisProcessorParameters,
                                  final ProcessorParameters<K, V2, ?, ?> 
joinOtherProcessParameters,
                                  final ProcessorParameters<K, VR, ?, ?> 
joinMergeProcessorParameters,
-                                 final ProcessorParameters<K, V1, ?, ?> 
thisWindowedStreamProcessorParameters,
-                                 final ProcessorParameters<K, V2, ?, ?> 
otherWindowedStreamProcessorParameters,
-                                 final StoreFactory thisStoreFactory,
-                                 final StoreFactory otherStoreFactory,
-                                 final Optional<StoreFactory> 
outerJoinStoreFactory,
-                                 final Joined<K, V1, V2> joined,
-                                 final boolean enableSpuriousResultFix,
-                                 final ProcessorParameters<K, V1, ?, ?> 
selfJoinProcessorParameters) {
+                                 final ProcessorParameters<K, V1, ?, ?> 
selfJoinProcessorParameters,
+                                 final String thisWindowedStreamProcessorName,
+                                 final String 
otherWindowedStreamProcessorName) {
 
         super(nodeName,
               valueJoiner,
@@ -60,26 +47,16 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
               null,
               null);
 
-        this.thisWindowStoreBuilder = thisStoreFactory;
-        this.otherWindowStoreBuilder = otherStoreFactory;
-        this.joined = joined;
-        this.thisWindowedStreamProcessorParameters = 
thisWindowedStreamProcessorParameters;
-        this.otherWindowedStreamProcessorParameters =  
otherWindowedStreamProcessorParameters;
-        this.outerJoinWindowStoreBuilder = outerJoinStoreFactory;
-        this.enableSpuriousResultFix = enableSpuriousResultFix;
+        this.thisWindowedStreamProcessorName = thisWindowedStreamProcessorName;
+        this.otherWindowedStreamProcessorName =  
otherWindowedStreamProcessorName;
         this.selfJoinProcessorParameters = selfJoinProcessorParameters;
     }
 
-
     @Override
     public String toString() {
         return "StreamStreamJoinNode{" +
-               "thisWindowedStreamProcessorParameters=" + 
thisWindowedStreamProcessorParameters +
-               ", otherWindowedStreamProcessorParameters=" + 
otherWindowedStreamProcessorParameters +
-               ", thisWindowStoreBuilder=" + thisWindowStoreBuilder +
-               ", otherWindowStoreBuilder=" + otherWindowStoreBuilder +
-               ", outerJoinWindowStoreBuilder=" + outerJoinWindowStoreBuilder +
-               ", joined=" + joined +
+            "thisWindowedStreamProcessorName=" + 
thisWindowedStreamProcessorName +
+            ", otherWindowedStreamProcessorName=" + 
otherWindowedStreamProcessorName +
                "} " + super.toString();
     }
 
@@ -89,22 +66,14 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
 
         final String thisProcessorName = 
thisProcessorParameters().processorName();
         final String otherProcessorName = 
otherProcessorParameters().processorName();
-        final String thisWindowedStreamProcessorName = 
thisWindowedStreamProcessorParameters.processorName();
-        final String otherWindowedStreamProcessorName = 
otherWindowedStreamProcessorParameters.processorName();
 
         if (isSelfJoin) {
-            
topologyBuilder.addProcessor(selfJoinProcessorParameters.processorName(), 
selfJoinProcessorParameters.processorSupplier(), 
thisWindowedStreamProcessorName);
-            topologyBuilder.addStateStore(thisWindowStoreBuilder, 
thisWindowedStreamProcessorName, selfJoinProcessorParameters.processorName());
+            selfJoinProcessorParameters.addProcessorTo(topologyBuilder, new 
String[]{thisWindowedStreamProcessorName});
         } else {
-            topologyBuilder.addProcessor(thisProcessorName, 
thisProcessorParameters().processorSupplier(), thisWindowedStreamProcessorName);
-            topologyBuilder.addProcessor(otherProcessorName, 
otherProcessorParameters().processorSupplier(), 
otherWindowedStreamProcessorName);
-            
topologyBuilder.addProcessor(mergeProcessorParameters().processorName(), 
mergeProcessorParameters().processorSupplier(), thisProcessorName, 
otherProcessorName);
-            topologyBuilder.addStateStore(thisWindowStoreBuilder, 
thisWindowedStreamProcessorName, otherProcessorName);
-            topologyBuilder.addStateStore(otherWindowStoreBuilder, 
otherWindowedStreamProcessorName, thisProcessorName);
-
-            if (enableSpuriousResultFix) {
-                outerJoinWindowStoreBuilder.ifPresent(builder -> 
topologyBuilder.addStateStore(builder, thisProcessorName, otherProcessorName));
-            }
+            thisProcessorParameters().addProcessorTo(topologyBuilder, new 
String[]{thisWindowedStreamProcessorName});
+            otherProcessorParameters().addProcessorTo(topologyBuilder, new 
String[]{otherWindowedStreamProcessorName});
+
+            mergeProcessorParameters().addProcessorTo(topologyBuilder, new 
String[]{thisProcessorName, otherProcessorName});
         }
     }
 
@@ -116,12 +85,12 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
         return isSelfJoin;
     }
 
-    public ProcessorParameters<K, V1, ?, ?> 
thisWindowedStreamProcessorParameters() {
-        return thisWindowedStreamProcessorParameters;
+    public String thisWindowedStreamProcessorName() {
+        return thisWindowedStreamProcessorName;
     }
 
-    public ProcessorParameters<K, V2, ?, ?> 
otherWindowedStreamProcessorParameters() {
-        return otherWindowedStreamProcessorParameters;
+    public String otherWindowedStreamProcessorName() {
+        return otherWindowedStreamProcessorName;
     }
 
     public static <K, V1, V2, VR> StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
streamStreamJoinNodeBuilder() {
@@ -135,14 +104,9 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
         private ProcessorParameters<K, V1, ?, ?> joinThisProcessorParameters;
         private ProcessorParameters<K, V2, ?, ?> joinOtherProcessorParameters;
         private ProcessorParameters<K, VR, ?, ?> joinMergeProcessorParameters;
-        private ProcessorParameters<K, V1, ?, ?> 
thisWindowedStreamProcessorParameters;
-        private ProcessorParameters<K, V2, ?, ?> 
otherWindowedStreamProcessorParameters;
-        private StoreFactory thisStoreFactory;
-        private StoreFactory otherStoreFactory;
-        private Optional<StoreFactory> outerJoinStoreFactory;
-        private Joined<K, V1, V2> joined;
-        private boolean enableSpuriousResultFix = false;
         private ProcessorParameters<K, V1, ?, ?> selfJoinProcessorParameters;
+        private String thisWindowedStreamProcessorName;
+        private String otherWindowedStreamProcessorName;
 
         private StreamStreamJoinNodeBuilder() {
         }
@@ -167,50 +131,24 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
             return this;
         }
 
-        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withJoinMergeProcessorParameters(final ProcessorParameters<K, VR, ?, ?> 
joinMergeProcessorParameters) {
-            this.joinMergeProcessorParameters = joinMergeProcessorParameters;
-            return this;
-        }
-
-        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withThisWindowedStreamProcessorParameters(final ProcessorParameters<K, V1, ?, 
?> thisWindowedStreamProcessorParameters) {
-            this.thisWindowedStreamProcessorParameters = 
thisWindowedStreamProcessorParameters;
-            return this;
-        }
-
-        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withOtherWindowedStreamProcessorParameters(
-            final ProcessorParameters<K, V2, ?, ?> 
otherWindowedStreamProcessorParameters) {
-            this.otherWindowedStreamProcessorParameters = 
otherWindowedStreamProcessorParameters;
-            return this;
-        }
-
-        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withThisWindowStoreBuilder(final StoreFactory thisStoreFactory) {
-            this.thisStoreFactory = thisStoreFactory;
-            return this;
-        }
-
-        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withOtherWindowStoreBuilder(final StoreFactory otherStoreFactory) {
-            this.otherStoreFactory = otherStoreFactory;
-            return this;
-        }
-
-        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withOuterJoinWindowStoreBuilder(final Optional<StoreFactory> 
outerJoinWindowStoreBuilder) {
-            this.outerJoinStoreFactory = outerJoinWindowStoreBuilder;
+        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withSelfJoinProcessorParameters(
+            final ProcessorParameters<K, V1, ?, ?> 
selfJoinProcessorParameters) {
+            this.selfJoinProcessorParameters = selfJoinProcessorParameters;
             return this;
         }
 
-        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> withJoined(final 
Joined<K, V1, V2> joined) {
-            this.joined = joined;
+        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withJoinMergeProcessorParameters(final ProcessorParameters<K, VR, ?, ?> 
joinMergeProcessorParameters) {
+            this.joinMergeProcessorParameters = joinMergeProcessorParameters;
             return this;
         }
 
-        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withSpuriousResultFixEnabled() {
-            this.enableSpuriousResultFix = true;
+        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withThisWindowedStreamProcessorName(final String 
thisWindowedStreamProcessorName) {
+            this.thisWindowedStreamProcessorName = 
thisWindowedStreamProcessorName;
             return this;
         }
 
-        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withSelfJoinProcessorParameters(
-            final ProcessorParameters<K, V1, ?, ?> 
selfJoinProcessorParameters) {
-            this.selfJoinProcessorParameters = selfJoinProcessorParameters;
+        public StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withOtherWindowedStreamProcessorName(final String 
otherWindowedStreamProcessorName) {
+            this.otherWindowedStreamProcessorName = 
otherWindowedStreamProcessorName;
             return this;
         }
 
@@ -221,14 +159,9 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
                                               joinThisProcessorParameters,
                                               joinOtherProcessorParameters,
                                               joinMergeProcessorParameters,
-                                              
thisWindowedStreamProcessorParameters,
-                                              
otherWindowedStreamProcessorParameters,
-                                              thisStoreFactory,
-                                              otherStoreFactory,
-                                              outerJoinStoreFactory,
-                                              joined,
-                                              enableSpuriousResultFix,
-                                              selfJoinProcessorParameters);
+                                              selfJoinProcessorParameters,
+                                              thisWindowedStreamProcessorName,
+                                              
otherWindowedStreamProcessorName);
 
 
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 00c745fd735..738f753f532 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -89,6 +89,7 @@ import java.util.regex.Pattern;
 
 import static java.util.Arrays.asList;
 import static 
org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
+import static 
org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1;
 import static org.apache.kafka.streams.state.Stores.inMemoryKeyValueStore;
@@ -1610,8 +1611,8 @@ public class StreamsBuilderTest {
             .toStream(Named.as("toStream"))// wrapped 4
             .to("output", Produced.as("sink"));
 
-        final var top = builder.build();
-        System.out.println(top.describe());
+        builder.build();
+
         assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
             "aggregate-cogroup-agg-0", "aggregate-cogroup-agg-1", 
"aggregate-cogroup-merge", "toStream"
         ));
@@ -1747,6 +1748,209 @@ public class StreamsBuilderTest {
         assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
     }
 
+    @Test
+    public void shouldWrapProcessorsForStreamStreamInnerJoin() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        final KStream<String, String> stream1 = builder.stream("input-1", 
Consumed.as("source-1"));
+        final KStream<String, String> stream2 = builder.stream("input-2", 
Consumed.as("source-2"));
+
+        stream1.join(
+                stream2,
+                MockValueJoiner.TOSTRING_JOINER,
+                JoinWindows.ofTimeDifferenceAndGrace(Duration.ofDays(1), 
Duration.ofDays(1)),
+                StreamJoined.as("ss-join"))
+            .to("output", Produced.as("sink"));
+
+        builder.build();
+
+        // TODO: fix these names once we address 
https://issues.apache.org/jira/browse/KAFKA-18191
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
+            "KSTREAM-JOINTHIS-0000000004", "KSTREAM-JOINOTHER-0000000005",
+            "KSTREAM-WINDOWED-0000000003", "KSTREAM-WINDOWED-0000000002",
+            "KSTREAM-MERGE-0000000006"
+        ));
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(5));
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(4));
+    }
+
+    @Test
+    public void shouldWrapProcessorsForStreamStreamLeftJoin() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        final KStream<String, String> stream1 = builder.stream("input-1", 
Consumed.as("source-1"));
+        final KStream<String, String> stream2 = builder.stream("input-2", 
Consumed.as("source-2"));
+
+        stream1.leftJoin(
+                stream2,
+                MockValueJoiner.TOSTRING_JOINER,
+                JoinWindows.ofTimeDifferenceAndGrace(Duration.ofDays(1), 
Duration.ofDays(1)),
+                StreamJoined.as("ss-join"))
+            .to("output", Produced.as("sink"));
+
+        builder.build();
+
+        // TODO: fix these names once we address 
https://issues.apache.org/jira/browse/KAFKA-18191
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
+            "KSTREAM-JOINTHIS-0000000004", "KSTREAM-OUTEROTHER-0000000005",
+            "KSTREAM-WINDOWED-0000000003", "KSTREAM-WINDOWED-0000000002",
+            "KSTREAM-MERGE-0000000006"
+        ));
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(5));
+
+        // 1 additional store due to spurious results fix for left/outer joins
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(3));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(6));
+    }
+
+    @Test
+    public void shouldWrapProcessorsForStreamStreamOuterJoin() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        final KStream<String, String> stream1 = builder.stream("input-1", 
Consumed.as("source-1"));
+        final KStream<String, String> stream2 = builder.stream("input-2", 
Consumed.as("source-2"));
+
+        stream1.outerJoin(
+                stream2,
+                MockValueJoiner.TOSTRING_JOINER,
+                JoinWindows.ofTimeDifferenceAndGrace(Duration.ofDays(1), 
Duration.ofDays(1)),
+                StreamJoined.as("ss-join"))
+            .to("output", Produced.as("sink"));
+
+        builder.build();
+
+        // TODO: fix these names once we address 
https://issues.apache.org/jira/browse/KAFKA-18191
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
+            "KSTREAM-OUTERTHIS-0000000004", "KSTREAM-OUTEROTHER-0000000005",
+            "KSTREAM-WINDOWED-0000000003", "KSTREAM-WINDOWED-0000000002",
+            "KSTREAM-MERGE-0000000006"
+        ));
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(5));
+
+        // 1 additional store due to spurious results fix for left/outer joins
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(3));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(6));
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void 
shouldWrapProcessorsForStreamStreamOuterJoinWithoutSpuriousResultsFix() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        final KStream<String, String> stream1 = builder.stream("input-1", 
Consumed.as("source-1"));
+        final KStream<String, String> stream2 = builder.stream("input-2", 
Consumed.as("source-2"));
+
+        stream1.outerJoin(
+                stream2,
+                MockValueJoiner.TOSTRING_JOINER,
+                JoinWindows.of(Duration.ofDays(1)), // intentionally uses 
deprecated version of this API!
+                StreamJoined.as("ss-join"))
+            .to("output", Produced.as("sink"));
+
+        builder.build();
+
+        // TODO: fix these names once we address 
https://issues.apache.org/jira/browse/KAFKA-18191
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
+            "KSTREAM-OUTERTHIS-0000000004", "KSTREAM-OUTEROTHER-0000000005",
+            "KSTREAM-WINDOWED-0000000003", "KSTREAM-WINDOWED-0000000002",
+            "KSTREAM-MERGE-0000000006"
+        ));
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(5));
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(4));
+    }
+
+    @Test
+    public void shouldWrapProcessorsForStreamStreamSelfJoin() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        final KStream<String, String> stream1 = builder.stream("input", 
Consumed.as("source"));
+
+        stream1.join(
+                stream1,
+                MockValueJoiner.TOSTRING_JOINER,
+                JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofDays(1)),
+                StreamJoined.as("ss-join"))
+            .to("output", Produced.as("sink"));
+
+        builder.build();
+
+        // TODO: fix these names once we address 
https://issues.apache.org/jira/browse/KAFKA-18191
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
+            "KSTREAM-JOINTHIS-0000000003", "KSTREAM-JOINOTHER-0000000004",
+            "KSTREAM-WINDOWED-0000000001", "KSTREAM-WINDOWED-0000000002",
+            "KSTREAM-MERGE-0000000005"
+        ));
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(5));
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(4));
+    }
+
+    @Test
+    public void 
shouldWrapProcessorsForStreamStreamSelfJoinWithSharedStoreOptimization() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        final KStream<String, String> stream1 = builder.stream("input", 
Consumed.as("source"));
+
+        stream1.join(
+                stream1,
+                MockValueJoiner.TOSTRING_JOINER,
+                JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofDays(1)),
+                StreamJoined.as("ss-join"))
+            .to("output", Produced.as("sink"));
+
+        final Properties properties = new Properties();
+        properties.putAll(props);
+        builder.build(properties);
+
+        // TODO: fix these names once we address 
https://issues.apache.org/jira/browse/KAFKA-18191
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
+            "KSTREAM-WINDOWED-0000000001", "KSTREAM-MERGE-0000000005"
+        ));
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(2));
+        // only 1 store when topology optimizations enabled due to sharing 
self-join store
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
+    }
+
     @Test
     public void shouldWrapProcessorsForStreamTableJoin() {
         final Map<Object, Object> props = dummyStreamsConfigMap();
@@ -1767,6 +1971,7 @@ public class StreamsBuilderTest {
             .to("output", Produced.as("sink"));
 
         builder.build();
+
         assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder(
             "source-table", "st-join"
         ));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index f388441f356..d1068444cdb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -34,11 +34,11 @@ import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.StreamJoined;
-import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
 import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
 import org.apache.kafka.streams.state.DslWindowParams;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -456,39 +456,42 @@ public class KStreamKStreamJoinTest {
          * This test is testing something internal to [[KStreamKStreamJoin]], 
so we had to setup low-level api manually.
          */
         final KStreamImplJoin.TimeTrackerSupplier tracker = new 
KStreamImplJoin.TimeTrackerSupplier();
-        final KStreamKStreamJoinRightSide<String, String, String, String> join 
= new KStreamKStreamJoinRightSide<>(
+        final WindowStoreBuilder<String, String> otherStoreBuilder = new 
WindowStoreBuilder<>(
+            new InMemoryWindowBytesStoreSupplier(
                 "other",
-                new 
JoinWindowsInternal(JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(1000))),
-                (key, v1, v2) -> v1 + v2,
-                true,
-                Optional.of("outer"),
-                tracker);
+                1000L,
+                100,
+                false),
+            Serdes.String(),
+            Serdes.String(),
+            new MockTime());
+        final KeyValueStoreBuilder<TimestampedKeyAndJoinSide<String>, 
LeftOrRightValue<String, String>> outerStoreBuilder = new 
KeyValueStoreBuilder<>(
+            new InMemoryKeyValueBytesStoreSupplier("outer"),
+            new TimestampedKeyAndJoinSideSerde<>(Serdes.String()),
+            new LeftOrRightValueSerde<>(Serdes.String(), Serdes.String()),
+            new MockTime()
+        );
+        final KStreamKStreamJoinRightSide<String, String, String, String> join 
= new KStreamKStreamJoinRightSide<>(
+            new 
JoinWindowsInternal(JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(1000))),
+            (key, v1, v2) -> v1 + v2,
+            true,
+            tracker,
+            StoreBuilderWrapper.wrapStoreBuilder(otherStoreBuilder),
+            
Optional.of(StoreBuilderWrapper.wrapStoreBuilder(outerStoreBuilder)));
+
         final Processor<String, String, String, String> joinProcessor = 
join.get();
         final MockInternalNewProcessorContext<String, String> procCtx = new 
MockInternalNewProcessorContext<>();
-        final WindowStore<String, String> otherStore = new 
WindowStoreBuilder<>(
-                new InMemoryWindowBytesStoreSupplier(
-                        "other",
-                        1000L,
-                        100,
-                        false),
-                Serdes.String(),
-                Serdes.String(),
-                new MockTime()).build();
+        final WindowStore<String, String> otherStore = 
otherStoreBuilder.build();
 
-        final KeyValueStore<TimestampedKeyAndJoinSide<String>, 
LeftOrRightValue<String, String>> outerStore = Mockito.spy(
-                new KeyValueStoreBuilder<>(
-                    new InMemoryKeyValueBytesStoreSupplier("outer"),
-                    new TimestampedKeyAndJoinSideSerde<>(Serdes.String()),
-                    new LeftOrRightValueSerde<>(Serdes.String(), 
Serdes.String()),
-                    new MockTime()
-                ).build());
+        final KeyValueStore<TimestampedKeyAndJoinSide<String>, 
LeftOrRightValue<String, String>> outerStore =
+            Mockito.spy(outerStoreBuilder.build());
 
         final GenericInMemoryKeyValueStore<String, String> rootStore = new 
GenericInMemoryKeyValueStore<>("root");
 
-        otherStore.init((StateStoreContext) procCtx, rootStore);
+        otherStore.init(procCtx, rootStore);
         procCtx.addStateStore(otherStore);
 
-        outerStore.init((StateStoreContext) procCtx, rootStore);
+        outerStore.init(procCtx, rootStore);
         procCtx.addStateStore(outerStore);
 
         joinProcessor.init(procCtx);

Reply via email to