This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 21563380f3e KAFKA-18026: KIP-1112, migrate table-table joins to use 
ProcesserSuppliers#stores (#18048)
21563380f3e is described below

commit 21563380f3e5bfab901d60319d58de93cada61d3
Author: Almog Gavra <[email protected]>
AuthorDate: Wed Dec 11 17:37:34 2024 -0800

    KAFKA-18026: KIP-1112, migrate table-table joins to use 
ProcesserSuppliers#stores (#18048)
    
    Covers wrapping of processors and state stores for KTable-KTable joins
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>, Guozhang 
Wang <[email protected]>
---
 .../streams/kstream/internals/KTableImpl.java      | 50 ++++++-----
 .../internals/KTableKTableAbstractJoin.java        |  8 ++
 .../kstream/internals/KTableKTableJoinMerger.java  | 21 ++++-
 .../internals/graph/KTableKTableJoinNode.java      | 54 +++---------
 .../internals/graph/ProcessorParameters.java       |  2 +-
 .../apache/kafka/streams/StreamsBuilderTest.java   | 96 ++++++++++++++++++++++
 6 files changed, 160 insertions(+), 71 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index a0d5ff4d7c0..f90d35827f1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -748,26 +748,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
             ((KTableImpl<?, ?, ?>) other).enableSendingOldValues(true);
         }
 
-        final KTableKTableAbstractJoin<K, V, VO, VR> joinThis;
-        final KTableKTableAbstractJoin<K, VO, V, VR> joinOther;
-
-        if (!leftOuter) { // inner
-            joinThis = new KTableKTableInnerJoin<>(this, (KTableImpl<K, ?, 
VO>) other, joiner);
-            joinOther = new KTableKTableInnerJoin<>((KTableImpl<K, ?, VO>) 
other, this, reverseJoiner(joiner));
-        } else if (!rightOuter) { // left
-            joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, VO>) 
other, joiner);
-            joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, VO>) 
other, this, reverseJoiner(joiner));
-        } else { // outer
-            joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?, 
VO>) other, joiner);
-            joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, VO>) 
other, this, reverseJoiner(joiner));
-        }
-
-        final String joinThisName = renamed.suffixWithOrElseGet("-join-this", 
builder, JOINTHIS_NAME);
-        final String joinOtherName = 
renamed.suffixWithOrElseGet("-join-other", builder, JOINOTHER_NAME);
-
-        final ProcessorParameters<K, Change<V>, ?, ?> 
joinThisProcessorParameters = new ProcessorParameters<>(joinThis, joinThisName);
-        final ProcessorParameters<K, Change<VO>, ?, ?> 
joinOtherProcessorParameters = new ProcessorParameters<>(joinOther, 
joinOtherName);
-
         final Serde<K> keySerde;
         final Serde<VR> valueSerde;
         final String queryableStoreName;
@@ -788,19 +768,45 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
             storeFactory = null;
         }
 
+        final KTableKTableAbstractJoin<K, V, VO, VR> joinThis;
+        final KTableKTableAbstractJoin<K, VO, V, VR> joinOther;
+
+        if (!leftOuter) { // inner
+            joinThis = new KTableKTableInnerJoin<>(this, (KTableImpl<K, ?, 
VO>) other, joiner);
+            joinOther = new KTableKTableInnerJoin<>((KTableImpl<K, ?, VO>) 
other, this, reverseJoiner(joiner));
+        } else if (!rightOuter) { // left
+            joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, VO>) 
other, joiner);
+            joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, VO>) 
other, this, reverseJoiner(joiner));
+        } else { // outer
+            joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?, 
VO>) other, joiner);
+            joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, VO>) 
other, this, reverseJoiner(joiner));
+        }
+
+        final String joinThisName = renamed.suffixWithOrElseGet("-join-this", 
builder, JOINTHIS_NAME);
+        final String joinOtherName = 
renamed.suffixWithOrElseGet("-join-other", builder, JOINOTHER_NAME);
+
+        final ProcessorParameters<K, Change<V>, ?, ?> 
joinThisProcessorParameters = new ProcessorParameters<>(joinThis, joinThisName);
+        final ProcessorParameters<K, Change<VO>, ?, ?> 
joinOtherProcessorParameters = new ProcessorParameters<>(joinOther, 
joinOtherName);
+        final ProcessorParameters<K, Change<VR>, ?, ?> 
joinMergeProcessorParameters = new ProcessorParameters<>(
+                KTableKTableJoinMerger.of(
+                        (KTableProcessorSupplier<K, V, K, VR>) 
joinThisProcessorParameters.processorSupplier(),
+                        (KTableProcessorSupplier<K, VO, K, VR>) 
joinOtherProcessorParameters.processorSupplier(),
+                        queryableStoreName,
+                        storeFactory),
+                joinMergeName);
+
         final KTableKTableJoinNode<K, V, VO, VR> kTableKTableJoinNode =
             KTableKTableJoinNode.<K, V, VO, VR>kTableKTableJoinNodeBuilder()
                 .withNodeName(joinMergeName)
                 .withJoinThisProcessorParameters(joinThisProcessorParameters)
                 .withJoinOtherProcessorParameters(joinOtherProcessorParameters)
+                .withMergeProcessorParameters(joinMergeProcessorParameters)
                 .withThisJoinSideNodeName(name)
                 .withOtherJoinSideNodeName(((KTableImpl<?, ?, ?>) other).name)
                 .withJoinThisStoreNames(valueGetterSupplier().storeNames())
                 .withJoinOtherStoreNames(((KTableImpl<?, ?, ?>) 
other).valueGetterSupplier().storeNames())
                 .withKeySerde(keySerde)
                 .withValueSerde(valueSerde)
-                .withQueryableStoreName(queryableStoreName)
-                .withStoreBuilder(storeFactory)
                 .build();
 
         final boolean isOutputVersioned = materializedInternal != null
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
index 21339c0b649..6db3388c81c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
@@ -17,6 +17,9 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.state.StoreBuilder;
+
+import java.util.Set;
 
 public abstract class KTableKTableAbstractJoin<K, V1, V2, VOut> implements
     KTableProcessorSupplier<K, V1, K, VOut> {
@@ -49,6 +52,11 @@ public abstract class KTableKTableAbstractJoin<K, V1, V2, 
VOut> implements
         return true;
     }
 
+    @Override
+    public Set<StoreBuilder<?>> stores() {
+        return null;
+    }
+
     public void setUseVersionedSemantics(final boolean useVersionedSemantics) {
         this.useVersionedSemantics = useVersionedSemantics;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index 7924f8ea857..bf9ddef3356 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -20,6 +20,8 @@ import 
org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
 
 import java.util.Collections;
@@ -34,14 +36,17 @@ public class KTableKTableJoinMerger<K, V> implements 
KTableProcessorSupplier<K,
     private final KTableProcessorSupplier<K, ?, K, V> parent1;
     private final KTableProcessorSupplier<K, ?, K, V> parent2;
     private final String queryableName;
+    private final StoreFactory storeFactory;
     private boolean sendOldValues = false;
 
     KTableKTableJoinMerger(final KTableProcessorSupplier<K, ?, K, V> parent1,
                            final KTableProcessorSupplier<K, ?, K, V> parent2,
-                           final String queryableName) {
+                           final String queryableName,
+                           final StoreFactory storeFactory) {
         this.parent1 = parent1;
         this.parent2 = parent2;
         this.queryableName = queryableName;
+        this.storeFactory = storeFactory;
     }
 
     public String queryableName() {
@@ -53,6 +58,13 @@ public class KTableKTableJoinMerger<K, V> implements 
KTableProcessorSupplier<K,
         return new KTableKTableJoinMergeProcessor();
     }
 
+    @Override
+    public Set<StoreBuilder<?>> stores() {
+        return storeFactory == null
+                ? null
+                : Set.of(new 
StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory));
+    }
+
     @Override
     public KTableValueGetterSupplier<K, V> view() {
         // if the result KTable is materialized, use the materialized store to 
return getter value;
@@ -90,13 +102,14 @@ public class KTableKTableJoinMerger<K, V> implements 
KTableProcessorSupplier<K,
 
     public static <K, V> KTableKTableJoinMerger<K, V> of(final 
KTableProcessorSupplier<K, ?, K, V> parent1,
                                                          final 
KTableProcessorSupplier<K, ?, K, V> parent2) {
-        return of(parent1, parent2, null);
+        return of(parent1, parent2, null, null);
     }
 
     public static <K, V> KTableKTableJoinMerger<K, V> of(final 
KTableProcessorSupplier<K, ?, K, V> parent1,
                                                          final 
KTableProcessorSupplier<K, ?, K, V> parent2,
-                                                         final String 
queryableName) {
-        return new KTableKTableJoinMerger<>(parent1, parent2, queryableName);
+                                                         final String 
queryableName,
+                                                         final StoreFactory 
stores) {
+        return new KTableKTableJoinMerger<>(parent1, parent2, queryableName, 
stores);
     }
 
     private class KTableKTableJoinMergeProcessor extends 
ContextualProcessor<K, Change<V>, K, Change<V>> {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index 6cfdd53784c..275a88b7676 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -21,10 +21,8 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.KTableKTableAbstractJoin;
 import org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger;
-import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
 
 import java.util.Arrays;
 
@@ -37,7 +35,6 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
     private final Serde<VR> valueSerde;
     private final String[] joinThisStoreNames;
     private final String[] joinOtherStoreNames;
-    private final StoreFactory storeFactory;
 
     KTableKTableJoinNode(final String nodeName,
                          final ProcessorParameters<K, Change<V1>, ?, ?> 
joinThisProcessorParameters,
@@ -48,8 +45,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
                          final Serde<K> keySerde,
                          final Serde<VR> valueSerde,
                          final String[] joinThisStoreNames,
-                         final String[] joinOtherStoreNames,
-                         final StoreFactory storeFactory) {
+                         final String[] joinOtherStoreNames) {
 
         super(nodeName,
             null,
@@ -63,7 +59,6 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
         this.valueSerde = valueSerde;
         this.joinThisStoreNames = joinThisStoreNames;
         this.joinOtherStoreNames = joinOtherStoreNames;
-        this.storeFactory = storeFactory;
     }
 
     public Serde<K> keySerde() {
@@ -120,30 +115,13 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
         final String thisProcessorName = 
thisProcessorParameters().processorName();
         final String otherProcessorName = 
otherProcessorParameters().processorName();
-        final String mergeProcessorName = 
mergeProcessorParameters().processorName();
 
-        topologyBuilder.addProcessor(
-            thisProcessorName,
-            thisProcessorParameters().processorSupplier(),
-            thisJoinSideNodeName());
-
-        topologyBuilder.addProcessor(
-            otherProcessorName,
-            otherProcessorParameters().processorSupplier(),
-            otherJoinSideNodeName());
-
-        topologyBuilder.addProcessor(
-            mergeProcessorName,
-            mergeProcessorParameters().processorSupplier(),
-            thisProcessorName,
-            otherProcessorName);
+        thisProcessorParameters().addProcessorTo(topologyBuilder, 
thisJoinSideNodeName());
+        otherProcessorParameters().addProcessorTo(topologyBuilder, 
otherJoinSideNodeName());
+        mergeProcessorParameters().addProcessorTo(topologyBuilder, 
thisProcessorName, otherProcessorName);
 
         topologyBuilder.connectProcessorAndStateStores(thisProcessorName, 
joinOtherStoreNames);
         topologyBuilder.connectProcessorAndStateStores(otherProcessorName, 
joinThisStoreNames);
-
-        if (storeFactory != null) {
-            topologyBuilder.addStateStore(storeFactory, mergeProcessorName);
-        }
     }
 
     @Override
@@ -168,8 +146,8 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
         private Serde<VR> valueSerde;
         private String[] joinThisStoreNames;
         private String[] joinOtherStoreNames;
-        private String queryableStoreName;
-        private StoreFactory storeFactory;
+        private ProcessorParameters<K, Change<VR>, ?, ?>
+                joinMergeProcessorParameters;
 
         private KTableKTableJoinNodeBuilder() {
         }
@@ -219,35 +197,23 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
             return this;
         }
 
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> 
withQueryableStoreName(final String queryableStoreName) {
-            this.queryableStoreName = queryableStoreName;
-            return this;
-        }
-
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> 
withStoreBuilder(final StoreFactory storeFactory) {
-            this.storeFactory = storeFactory;
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> 
withMergeProcessorParameters(final ProcessorParameters<K, Change<VR>, ?, ?> 
joinMergeProcessorParameters) {
+            this.joinMergeProcessorParameters = joinMergeProcessorParameters;
             return this;
         }
 
-        @SuppressWarnings("unchecked")
         public KTableKTableJoinNode<K, V1, V2, VR> build() {
             return new KTableKTableJoinNode<>(
                 nodeName,
                 joinThisProcessorParameters,
                 joinOtherProcessorParameters,
-                new ProcessorParameters<>(
-                    KTableKTableJoinMerger.of(
-                        (KTableProcessorSupplier<K, V1, K, VR>) 
joinThisProcessorParameters.processorSupplier(),
-                        (KTableProcessorSupplier<K, V2, K, VR>) 
joinOtherProcessorParameters.processorSupplier(),
-                        queryableStoreName),
-                    nodeName),
+                joinMergeProcessorParameters,
                 thisJoinSide,
                 otherJoinSide,
                 keySerde,
                 valueSerde,
                 joinThisStoreNames,
-                joinOtherStoreNames,
-                storeFactory
+                joinOtherStoreNames
             );
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
index bae4172ebb6..7cfbc94533e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
@@ -63,7 +63,7 @@ public class ProcessorParameters<KIn, VIn, KOut, VOut> {
         return fixedKeyProcessorSupplier;
     }
 
-    public void addProcessorTo(final InternalTopologyBuilder topologyBuilder, 
final String[] parentNodeNames) {
+    public void addProcessorTo(final InternalTopologyBuilder topologyBuilder, 
final String... parentNodeNames) {
         if (processorSupplier != null) {
             ApiUtils.checkSupplier(processorSupplier);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 056721fa8af..5371d53949f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -1807,6 +1807,102 @@ public class StreamsBuilderTest {
         assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
     }
 
+    @Test
+    public void shouldWrapProcessorsForTableTableInnerJoin() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        final KTable<String, String> t1 = builder.table("input1", 
Consumed.as("input1")); // 1
+        final KTable<String, String> t2 = builder.table("input2", 
Consumed.as("input2")); // 2
+
+        t1.join(t2, (v1, v2) -> v1 + v2, Named.as("join-processor"), 
Materialized.as("the_join")) // 3 (this), 4 (other), 5 (merger)
+                .toStream(Named.as("toStream")) // 6
+                .to("output", Produced.as("sink"));
+
+        builder.build();
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(6));
+        assertThat(counter.wrappedProcessorNames().toString(), 
counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
+                "input1",
+                "input2",
+                "join-processor-join-this",
+                "join-processor-join-other",
+                "join-processor",
+                "toStream"
+        ));
+
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(3)); // one 
for join this, one for join that
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(3));
+    }
+
+    @Test
+    public void shouldWrapProcessorsForTableTableLeftJoin() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        final KTable<String, String> t1 = builder.table("input1", 
Consumed.as("input1")); // 1
+        final KTable<String, String> t2 = builder.table("input2", 
Consumed.as("input2")); // 2
+
+        t1.leftJoin(t2, (v1, v2) -> v1 + v2, Named.as("join-processor"), 
Materialized.as("the_join")) // 3 (this), 4 (other), 5 (merger)
+                .toStream(Named.as("toStream")) // 6
+                .to("output", Produced.as("sink"));
+
+        builder.build();
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(6));
+        assertThat(counter.wrappedProcessorNames().toString(), 
counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
+                "input1",
+                "input2",
+                "join-processor-join-this",
+                "join-processor-join-other",
+                "join-processor",
+                "toStream"
+        ));
+
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(3)); // 
table1, table2, join materialized
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(3));
+    }
+
+    @Test
+    public void shouldWrapProcessorsForTableTableOuterJoin() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        final KTable<String, String> t1 = builder.table("input1", 
Consumed.as("input1")); // 1
+        final KTable<String, String> t2 = builder.table("input2", 
Consumed.as("input2")); // 2
+
+        t1.outerJoin(t2, (v1, v2) -> v1 + v2, Named.as("join-processor"), 
Materialized.as("the_join")) // 3 (this), 4 (other), 5 (merger)
+                .toStream(Named.as("toStream")) // 6
+                .to("output", Produced.as("sink"));
+
+        builder.build();
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(6));
+        assertThat(counter.wrappedProcessorNames().toString(), 
counter.wrappedProcessorNames(), Matchers.containsInAnyOrder(
+                "input1",
+                "input2",
+                "join-processor-join-this",
+                "join-processor-join-other",
+                "join-processor",
+                "toStream"
+        ));
+
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(3)); // 
table1, table2, join materialized
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(3));
+    }
+
     @Test
     public void shouldAllowStreamsFromSameTopic() {
         builder.stream("topic");

Reply via email to