This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 45d7440  KAFKA-10847: Set StreamsConfig on InternalTopologyDriver 
before writing topology (#10640)
45d7440 is described below

commit 45d7440c1577b838d4584d3860e5c4d691446f3f
Author: Sergio Peña <[email protected]>
AuthorDate: Thu May 6 19:27:23 2021 -0500

    KAFKA-10847: Set StreamsConfig on InternalTopologyDriver before writing 
topology (#10640)
    
    Reviewers: Guozhang Wang <[email protected]>, Matthias J. Sax 
<[email protected]>
---
 .../kafka/streams/kstream/internals/InternalStreamsBuilder.java  | 4 ++--
 .../kafka/streams/kstream/internals/graph/GlobalStoreNode.java   | 4 +++-
 .../apache/kafka/streams/kstream/internals/graph/GraphNode.java  | 3 ++-
 .../internals/graph/GroupedTableOperationRepartitionNode.java    | 4 +++-
 .../streams/kstream/internals/graph/KTableKTableJoinNode.java    | 3 ++-
 .../kstream/internals/graph/OptimizableRepartitionNode.java      | 4 +++-
 .../streams/kstream/internals/graph/ProcessorGraphNode.java      | 4 +++-
 .../kafka/streams/kstream/internals/graph/StateStoreNode.java    | 4 +++-
 .../streams/kstream/internals/graph/StatefulProcessorNode.java   | 3 ++-
 .../kafka/streams/kstream/internals/graph/StreamSinkNode.java    | 4 +++-
 .../kafka/streams/kstream/internals/graph/StreamSourceNode.java  | 3 ++-
 .../streams/kstream/internals/graph/StreamStreamJoinNode.java    | 9 ++++++---
 .../streams/kstream/internals/graph/StreamTableJoinNode.java     | 3 ++-
 .../kafka/streams/kstream/internals/graph/StreamToTableNode.java | 4 +++-
 .../streams/kstream/internals/graph/TableProcessorNode.java      | 3 ++-
 .../kafka/streams/kstream/internals/graph/TableSourceNode.java   | 3 ++-
 .../kstream/internals/graph/UnoptimizableRepartitionNode.java    | 4 +++-
 .../streams/kstream/internals/KStreamKStreamLeftJoinTest.java    | 9 +++++++++
 .../streams/kstream/internals/KStreamKStreamOuterJoinTest.java   | 6 ++++++
 .../streams/kstream/internals/graph/TableSourceNodeTest.java     | 4 +++-
 20 files changed, 64 insertions(+), 21 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 6fa65c1..1527263 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -74,7 +74,7 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
 
     protected final GraphNode root = new GraphNode(TOPOLOGY_ROOT) {
         @Override
-        public void writeToTopology(final InternalTopologyBuilder 
topologyBuilder) {
+        public void writeToTopology(final InternalTopologyBuilder 
topologyBuilder, final Properties props) {
             // no-op for root node
         }
     };
@@ -290,7 +290,7 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
             }
 
             if (streamGraphNode.allParentsWrittenToTopology() && 
!streamGraphNode.hasWrittenToTopology()) {
-                streamGraphNode.writeToTopology(internalTopologyBuilder);
+                streamGraphNode.writeToTopology(internalTopologyBuilder, 
props);
                 streamGraphNode.setHasWrittenToTopology(true);
             }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
index 753e076..90d9efb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
@@ -22,6 +22,8 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.StoreBuilder;
 
+import java.util.Properties;
+
 public class GlobalStoreNode<KIn, VIn, S extends StateStore> extends 
StateStoreNode<S> {
 
     private final String sourceName;
@@ -47,7 +49,7 @@ public class GlobalStoreNode<KIn, VIn, S extends StateStore> 
extends StateStoreN
     }
 
     @Override
-    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder, 
final Properties props) {
         storeBuilder.withLoggingDisabled();
         topologyBuilder.addGlobalStore(storeBuilder,
                                        sourceName,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java
index 76c2b5c..c55395a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java
@@ -23,6 +23,7 @@ import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.LinkedHashSet;
+import java.util.Properties;
 
 public abstract class GraphNode {
 
@@ -117,7 +118,7 @@ public abstract class GraphNode {
         return this.buildPriority;
     }
 
-    public abstract void writeToTopology(final InternalTopologyBuilder 
topologyBuilder);
+    public abstract void writeToTopology(final InternalTopologyBuilder 
topologyBuilder, final Properties props);
 
     public boolean hasWrittenToTopology() {
         return hasWrittenToTopology;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
index a7ba30d..1117e5e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
@@ -26,6 +26,8 @@ import 
org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
+import java.util.Properties;
+
 public class GroupedTableOperationRepartitionNode<K, V> extends 
BaseRepartitionNode<K, V> {
 
 
@@ -78,7 +80,7 @@ public class GroupedTableOperationRepartitionNode<K, V> 
extends BaseRepartitionN
     }
 
     @Override
-    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder, 
final Properties props) {
         topologyBuilder.addInternalTopic(repartitionTopic, 
internalTopicProperties);
 
         topologyBuilder.addSink(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index e2abfb5..0ca1e35 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 
 import java.util.Arrays;
+import java.util.Properties;
 
 /**
  * Too much specific information to generalize so the KTable-KTable join 
requires a specific node.
@@ -96,7 +97,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
     }
 
     @Override
-    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder, 
final Properties props) {
         final String thisProcessorName = 
thisProcessorParameters().processorName();
         final String otherProcessorName = 
otherProcessorParameters().processorName();
         final String mergeProcessorName = 
mergeProcessorParameters().processorName();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
index a9693ec..c7ee03d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
@@ -23,6 +23,8 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
+import java.util.Properties;
+
 public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode<K, 
V> {
 
     private OptimizableRepartitionNode(final String nodeName,
@@ -64,7 +66,7 @@ public class OptimizableRepartitionNode<K, V> extends 
BaseRepartitionNode<K, V>
     }
 
     @Override
-    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder, 
final Properties props) {
         topologyBuilder.addInternalTopic(repartitionTopic, 
internalTopicProperties);
 
         topologyBuilder.addProcessor(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
index a38f516..c3dd4f6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams.kstream.internals.graph;
 
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
+import java.util.Properties;
+
 /**
  * Used to represent any type of stateless operation:
  *
@@ -55,7 +57,7 @@ public class ProcessorGraphNode<K, V> extends GraphNode {
     }
 
     @Override
-    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder, 
final Properties props) {
 
         topologyBuilder.addProcessor(processorParameters.processorName(), 
processorParameters.processorSupplier(), parentNodeNames());
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
index 32dc93d..ae01580 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
@@ -20,6 +20,8 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.StoreBuilder;
 
+import java.util.Properties;
+
 public class StateStoreNode<S extends StateStore> extends GraphNode {
 
     protected final StoreBuilder<S> storeBuilder;
@@ -31,7 +33,7 @@ public class StateStoreNode<S extends StateStore> extends 
GraphNode {
     }
 
     @Override
-    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder, 
final Properties props) {
 
         topologyBuilder.addStateStore(storeBuilder);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
index 28af95b..ad6de37 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
@@ -22,6 +22,7 @@ import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.util.Arrays;
+import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Stream;
 
@@ -78,7 +79,7 @@ public class StatefulProcessorNode<K, V> extends 
ProcessorGraphNode<K, V> {
     }
 
     @Override
-    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder, 
final Properties props) {
 
         final String processorName = processorParameters().processorName();
         final ProcessorSupplier<K, V, ?, ?> processorSupplier = 
processorParameters().processorSupplier();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
index f12a9e5..8d67ac1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
@@ -26,6 +26,8 @@ import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
 
+import java.util.Properties;
+
 public class StreamSinkNode<K, V> extends GraphNode {
 
     private final TopicNameExtractor<K, V> topicNameExtractor;
@@ -52,7 +54,7 @@ public class StreamSinkNode<K, V> extends GraphNode {
 
     @Override
     @SuppressWarnings("unchecked")
-    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder, 
final Properties props) {
         final Serializer<K> keySerializer = producedInternal.keySerde() == 
null ? null : producedInternal.keySerde().serializer();
         final Serializer<V> valSerializer = producedInternal.valueSerde() == 
null ? null : producedInternal.valueSerde().serializer();
         final String[] parentNames = parentNodeNames();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
index f4f9842..d4adc89 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
@@ -23,6 +23,7 @@ import 
org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
 import java.util.Collection;
+import java.util.Properties;
 import java.util.regex.Pattern;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,7 +70,7 @@ public class StreamSourceNode<K, V> extends 
SourceGraphNode<K, V> {
     }
 
     @Override
-    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder, 
final Properties props) {
 
         if (topicPattern() != null) {
             topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
index 9de2378..e940c71 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
@@ -26,7 +26,9 @@ import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.internals.KeyAndJoinSide;
 import org.apache.kafka.streams.state.internals.LeftOrRightValue;
 
+import java.util.HashMap;
 import java.util.Optional;
+import java.util.Properties;
 
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
 
@@ -34,6 +36,7 @@ import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.ENABLE_KSTRE
  * Too much information to generalize, so Stream-Stream joins are represented 
by a specific node.
  */
 public class StreamStreamJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K, V1, V2, VR> {
+    private static final Properties EMPTY_PROPERTIES = new Properties();
 
     private final ProcessorParameters<K, V1, ?, ?> 
thisWindowedStreamProcessorParameters;
     private final ProcessorParameters<K, V2, ?, ?> 
otherWindowedStreamProcessorParameters;
@@ -84,8 +87,9 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
                "} " + super.toString();
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder, 
final Properties props) {
 
         final String thisProcessorName = 
thisProcessorParameters().processorName();
         final String otherProcessorName = 
otherProcessorParameters().processorName();
@@ -98,8 +102,7 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
         topologyBuilder.addStateStore(thisWindowStoreBuilder, 
thisWindowedStreamProcessorName, otherProcessorName);
         topologyBuilder.addStateStore(otherWindowStoreBuilder, 
otherWindowedStreamProcessorName, thisProcessorName);
 
-        final StreamsConfig streamsConfig = topologyBuilder.getStreamsConfig();
-        if (streamsConfig == null || 
StreamsConfig.InternalConfig.getBoolean(streamsConfig.originals(), 
ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
+        if (props == null || StreamsConfig.InternalConfig.getBoolean(new 
HashMap(props), ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
             outerJoinWindowStoreBuilder.ifPresent(builder -> 
topologyBuilder.addStateStore(builder, thisProcessorName, otherProcessorName));
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
index a4db1ba..d5bd2b8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
@@ -21,6 +21,7 @@ import 
org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
 import java.util.Arrays;
+import java.util.Properties;
 
 /**
  * Represents a join between a KStream and a KTable or GlobalKTable
@@ -54,7 +55,7 @@ public class StreamTableJoinNode<K, V> extends GraphNode {
     }
 
     @Override
-    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder, 
final Properties props) {
         final String processorName = processorParameters.processorName();
         final ProcessorSupplier<K, V, ?, ?> processorSupplier = 
processorParameters.processorSupplier();
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
index 3b0a572..54231d3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
@@ -26,6 +26,8 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 
+import java.util.Properties;
+
 /**
  * Represents a KTable convert From KStream
  */
@@ -52,7 +54,7 @@ public class StreamToTableNode<K, V> extends GraphNode {
 
     @SuppressWarnings("unchecked")
     @Override
-    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder, 
final Properties props) {
         final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder =
             new 
TimestampedKeyValueStoreMaterializer<>((MaterializedInternal<K, V, 
KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize();
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
index b9a2568..5254c57 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
@@ -23,6 +23,7 @@ import 
org.apache.kafka.streams.state.TimestampedKeyValueStore;
 
 import java.util.Arrays;
 import java.util.Objects;
+import java.util.Properties;
 
 public class TableProcessorNode<K, V> extends GraphNode {
 
@@ -58,7 +59,7 @@ public class TableProcessorNode<K, V> extends GraphNode {
 
     @SuppressWarnings("unchecked")
     @Override
-    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder, 
final Properties props) {
         final String processorName = processorParameters.processorName();
         topologyBuilder.addProcessor(processorName, 
processorParameters.processorSupplier(), parentNodeNames());
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index 203f3af..b708961 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 
 import java.util.Collections;
+import java.util.Properties;
 
 /**
  * Used to represent either a KTable source or a GlobalKTable source. A 
boolean flag is used to indicate if this represents a GlobalKTable a {@link
@@ -81,7 +82,7 @@ public class TableSourceNode<K, V> extends SourceGraphNode<K, 
V> {
 
     @Override
     @SuppressWarnings("unchecked")
-    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder, 
final Properties props) {
         final String topicName = topicNames().iterator().next();
 
         // TODO: we assume source KTables can only be timestamped-key-value 
stores for now.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/UnoptimizableRepartitionNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/UnoptimizableRepartitionNode.java
index daac9bd..a231d23 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/UnoptimizableRepartitionNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/UnoptimizableRepartitionNode.java
@@ -22,6 +22,8 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
+import java.util.Properties;
+
 /**
  * Repartition node that is not subject of optimization algorithm
  */
@@ -50,7 +52,7 @@ public class UnoptimizableRepartitionNode<K, V> extends 
BaseRepartitionNode<K, V
     }
 
     @Override
-    public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
+    public void writeToTopology(final InternalTopologyBuilder topologyBuilder, 
final Properties props) {
         topologyBuilder.addInternalTopic(repartitionTopic, 
internalTopicProperties);
 
         topologyBuilder.addProcessor(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 50c2f86..d0fb2be 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -116,6 +116,9 @@ public class KStreamKStreamLeftJoinTest {
                 driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final MockProcessor<Integer, String> processor = 
supplier.theCapturedProcessor();
 
+            // Only 2 window stores should be available
+            assertEquals(2, driver.getAllStateStores().size());
+
             // push two items to the primary stream; the other window is empty
             // w1 {}
             // w2 {}
@@ -167,6 +170,9 @@ public class KStreamKStreamLeftJoinTest {
                 driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final MockProcessor<Integer, String> processor = 
supplier.theCapturedProcessor();
 
+            // Only 2 window stores should be available
+            assertEquals(2, driver.getAllStateStores().size());
+
             inputTopic1.pipeInput(0, "A0", 0);
             inputTopic1.pipeInput(0, "A0-0", 0);
             inputTopic2.pipeInput(0, "a0", 0);
@@ -488,6 +494,9 @@ public class KStreamKStreamLeftJoinTest {
                     driver.createInputTopic(topic2, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final MockProcessor<Integer, String> processor = 
supplier.theCapturedProcessor();
 
+            // 2 window stores + 1 shared window store should be available
+            assertEquals(3, driver.getAllStateStores().size());
+
             // push two items to the primary stream; the other window is empty
             // w1 {}
             // w2 {}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
index 2bf7fef..9c16c59 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
@@ -112,6 +112,9 @@ public class KStreamKStreamOuterJoinTest {
                 driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final MockProcessor<Integer, String> processor = 
supplier.theCapturedProcessor();
 
+            // Only 2 window stores should be available
+            assertEquals(2, driver.getAllStateStores().size());
+
             inputTopic1.pipeInput(0, "A0", 0);
             inputTopic1.pipeInput(0, "A0-0", 0);
             inputTopic2.pipeInput(0, "a0", 0);
@@ -557,6 +560,9 @@ public class KStreamKStreamOuterJoinTest {
                 driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
             final MockProcessor<Integer, String> processor = 
supplier.theCapturedProcessor();
 
+            // 2 window stores + 1 shared window store should be available
+            assertEquals(3, driver.getAllStateStores().size());
+
             // push two items to the primary stream; the other window is 
empty; this should not
             // produce any items because window has not expired
             // w1 {}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
index 66c55c0f..1d74e89 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
@@ -30,6 +30,8 @@ import org.powermock.api.easymock.PowerMock;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.Properties;
+
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({InternalTopologyBuilder.class})
 public class TableSourceNodeTest {
@@ -71,6 +73,6 @@ public class TableSourceNodeTest {
             .build();
         
tableSourceNode.reuseSourceTopicForChangeLog(shouldReuseSourceTopicForChangelog);
 
-        tableSourceNode.writeToTopology(topologyBuilder);
+        tableSourceNode.writeToTopology(topologyBuilder, new Properties());
     }
 }

Reply via email to