This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 45d7440 KAFKA-10847: Set StreamsConfig on InternalTopologyDriver
before writing topology (#10640)
45d7440 is described below
commit 45d7440c1577b838d4584d3860e5c4d691446f3f
Author: Sergio Peña <[email protected]>
AuthorDate: Thu May 6 19:27:23 2021 -0500
KAFKA-10847: Set StreamsConfig on InternalTopologyDriver before writing
topology (#10640)
Reviewers: Guozhang Wang <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../kafka/streams/kstream/internals/InternalStreamsBuilder.java | 4 ++--
.../kafka/streams/kstream/internals/graph/GlobalStoreNode.java | 4 +++-
.../apache/kafka/streams/kstream/internals/graph/GraphNode.java | 3 ++-
.../internals/graph/GroupedTableOperationRepartitionNode.java | 4 +++-
.../streams/kstream/internals/graph/KTableKTableJoinNode.java | 3 ++-
.../kstream/internals/graph/OptimizableRepartitionNode.java | 4 +++-
.../streams/kstream/internals/graph/ProcessorGraphNode.java | 4 +++-
.../kafka/streams/kstream/internals/graph/StateStoreNode.java | 4 +++-
.../streams/kstream/internals/graph/StatefulProcessorNode.java | 3 ++-
.../kafka/streams/kstream/internals/graph/StreamSinkNode.java | 4 +++-
.../kafka/streams/kstream/internals/graph/StreamSourceNode.java | 3 ++-
.../streams/kstream/internals/graph/StreamStreamJoinNode.java | 9 ++++++---
.../streams/kstream/internals/graph/StreamTableJoinNode.java | 3 ++-
.../kafka/streams/kstream/internals/graph/StreamToTableNode.java | 4 +++-
.../streams/kstream/internals/graph/TableProcessorNode.java | 3 ++-
.../kafka/streams/kstream/internals/graph/TableSourceNode.java | 3 ++-
.../kstream/internals/graph/UnoptimizableRepartitionNode.java | 4 +++-
.../streams/kstream/internals/KStreamKStreamLeftJoinTest.java | 9 +++++++++
.../streams/kstream/internals/KStreamKStreamOuterJoinTest.java | 6 ++++++
.../streams/kstream/internals/graph/TableSourceNodeTest.java | 4 +++-
20 files changed, 64 insertions(+), 21 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 6fa65c1..1527263 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -74,7 +74,7 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
protected final GraphNode root = new GraphNode(TOPOLOGY_ROOT) {
@Override
- public void writeToTopology(final InternalTopologyBuilder
topologyBuilder) {
+ public void writeToTopology(final InternalTopologyBuilder
topologyBuilder, final Properties props) {
// no-op for root node
}
};
@@ -290,7 +290,7 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
}
if (streamGraphNode.allParentsWrittenToTopology() &&
!streamGraphNode.hasWrittenToTopology()) {
- streamGraphNode.writeToTopology(internalTopologyBuilder);
+ streamGraphNode.writeToTopology(internalTopologyBuilder,
props);
streamGraphNode.setHasWrittenToTopology(true);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
index 753e076..90d9efb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GlobalStoreNode.java
@@ -22,6 +22,8 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.StoreBuilder;
+import java.util.Properties;
+
public class GlobalStoreNode<KIn, VIn, S extends StateStore> extends
StateStoreNode<S> {
private final String sourceName;
@@ -47,7 +49,7 @@ public class GlobalStoreNode<KIn, VIn, S extends StateStore>
extends StateStoreN
}
@Override
- public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
+ public void writeToTopology(final InternalTopologyBuilder topologyBuilder,
final Properties props) {
storeBuilder.withLoggingDisabled();
topologyBuilder.addGlobalStore(storeBuilder,
sourceName,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java
index 76c2b5c..c55395a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java
@@ -23,6 +23,7 @@ import
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashSet;
+import java.util.Properties;
public abstract class GraphNode {
@@ -117,7 +118,7 @@ public abstract class GraphNode {
return this.buildPriority;
}
- public abstract void writeToTopology(final InternalTopologyBuilder
topologyBuilder);
+ public abstract void writeToTopology(final InternalTopologyBuilder
topologyBuilder, final Properties props);
public boolean hasWrittenToTopology() {
return hasWrittenToTopology;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
index a7ba30d..1117e5e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
@@ -26,6 +26,8 @@ import
org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import java.util.Properties;
+
public class GroupedTableOperationRepartitionNode<K, V> extends
BaseRepartitionNode<K, V> {
@@ -78,7 +80,7 @@ public class GroupedTableOperationRepartitionNode<K, V>
extends BaseRepartitionN
}
@Override
- public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
+ public void writeToTopology(final InternalTopologyBuilder topologyBuilder,
final Properties props) {
topologyBuilder.addInternalTopic(repartitionTopic,
internalTopicProperties);
topologyBuilder.addSink(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index e2abfb5..0ca1e35 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import java.util.Arrays;
+import java.util.Properties;
/**
* Too much specific information to generalize so the KTable-KTable join
requires a specific node.
@@ -96,7 +97,7 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
}
@Override
- public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
+ public void writeToTopology(final InternalTopologyBuilder topologyBuilder,
final Properties props) {
final String thisProcessorName =
thisProcessorParameters().processorName();
final String otherProcessorName =
otherProcessorParameters().processorName();
final String mergeProcessorName =
mergeProcessorParameters().processorName();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
index a9693ec..c7ee03d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
@@ -23,6 +23,8 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import java.util.Properties;
+
public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode<K,
V> {
private OptimizableRepartitionNode(final String nodeName,
@@ -64,7 +66,7 @@ public class OptimizableRepartitionNode<K, V> extends
BaseRepartitionNode<K, V>
}
@Override
- public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
+ public void writeToTopology(final InternalTopologyBuilder topologyBuilder,
final Properties props) {
topologyBuilder.addInternalTopic(repartitionTopic,
internalTopicProperties);
topologyBuilder.addProcessor(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
index a38f516..c3dd4f6 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import java.util.Properties;
+
/**
* Used to represent any type of stateless operation:
*
@@ -55,7 +57,7 @@ public class ProcessorGraphNode<K, V> extends GraphNode {
}
@Override
- public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
+ public void writeToTopology(final InternalTopologyBuilder topologyBuilder,
final Properties props) {
topologyBuilder.addProcessor(processorParameters.processorName(),
processorParameters.processorSupplier(), parentNodeNames());
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
index 32dc93d..ae01580 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
@@ -20,6 +20,8 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.StoreBuilder;
+import java.util.Properties;
+
public class StateStoreNode<S extends StateStore> extends GraphNode {
protected final StoreBuilder<S> storeBuilder;
@@ -31,7 +33,7 @@ public class StateStoreNode<S extends StateStore> extends
GraphNode {
}
@Override
- public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
+ public void writeToTopology(final InternalTopologyBuilder topologyBuilder,
final Properties props) {
topologyBuilder.addStateStore(storeBuilder);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
index 28af95b..ad6de37 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
@@ -22,6 +22,7 @@ import
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Arrays;
+import java.util.Properties;
import java.util.Set;
import java.util.stream.Stream;
@@ -78,7 +79,7 @@ public class StatefulProcessorNode<K, V> extends
ProcessorGraphNode<K, V> {
}
@Override
- public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
+ public void writeToTopology(final InternalTopologyBuilder topologyBuilder,
final Properties props) {
final String processorName = processorParameters().processorName();
final ProcessorSupplier<K, V, ?, ?> processorSupplier =
processorParameters().processorSupplier();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
index f12a9e5..8d67ac1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
@@ -26,6 +26,8 @@ import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
+import java.util.Properties;
+
public class StreamSinkNode<K, V> extends GraphNode {
private final TopicNameExtractor<K, V> topicNameExtractor;
@@ -52,7 +54,7 @@ public class StreamSinkNode<K, V> extends GraphNode {
@Override
@SuppressWarnings("unchecked")
- public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
+ public void writeToTopology(final InternalTopologyBuilder topologyBuilder,
final Properties props) {
final Serializer<K> keySerializer = producedInternal.keySerde() ==
null ? null : producedInternal.keySerde().serializer();
final Serializer<V> valSerializer = producedInternal.valueSerde() ==
null ? null : producedInternal.valueSerde().serializer();
final String[] parentNames = parentNodeNames();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
index f4f9842..d4adc89 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
@@ -23,6 +23,7 @@ import
org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import java.util.Collection;
+import java.util.Properties;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,7 +70,7 @@ public class StreamSourceNode<K, V> extends
SourceGraphNode<K, V> {
}
@Override
- public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
+ public void writeToTopology(final InternalTopologyBuilder topologyBuilder,
final Properties props) {
if (topicPattern() != null) {
topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
index 9de2378..e940c71 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
@@ -26,7 +26,9 @@ import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.KeyAndJoinSide;
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
+import java.util.HashMap;
import java.util.Optional;
+import java.util.Properties;
import static
org.apache.kafka.streams.StreamsConfig.InternalConfig.ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX;
@@ -34,6 +36,7 @@ import static
org.apache.kafka.streams.StreamsConfig.InternalConfig.ENABLE_KSTRE
* Too much information to generalize, so Stream-Stream joins are represented
by a specific node.
*/
public class StreamStreamJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K, V1, V2, VR> {
+ private static final Properties EMPTY_PROPERTIES = new Properties();
private final ProcessorParameters<K, V1, ?, ?>
thisWindowedStreamProcessorParameters;
private final ProcessorParameters<K, V2, ?, ?>
otherWindowedStreamProcessorParameters;
@@ -84,8 +87,9 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
"} " + super.toString();
}
+ @SuppressWarnings("unchecked")
@Override
- public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
+ public void writeToTopology(final InternalTopologyBuilder topologyBuilder,
final Properties props) {
final String thisProcessorName =
thisProcessorParameters().processorName();
final String otherProcessorName =
otherProcessorParameters().processorName();
@@ -98,8 +102,7 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends
BaseJoinProcessorNode<K
topologyBuilder.addStateStore(thisWindowStoreBuilder,
thisWindowedStreamProcessorName, otherProcessorName);
topologyBuilder.addStateStore(otherWindowStoreBuilder,
otherWindowedStreamProcessorName, thisProcessorName);
- final StreamsConfig streamsConfig = topologyBuilder.getStreamsConfig();
- if (streamsConfig == null ||
StreamsConfig.InternalConfig.getBoolean(streamsConfig.originals(),
ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
+ if (props == null || StreamsConfig.InternalConfig.getBoolean(new
HashMap(props), ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
outerJoinWindowStoreBuilder.ifPresent(builder ->
topologyBuilder.addStateStore(builder, thisProcessorName, otherProcessorName));
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
index a4db1ba..d5bd2b8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
@@ -21,6 +21,7 @@ import
org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import java.util.Arrays;
+import java.util.Properties;
/**
* Represents a join between a KStream and a KTable or GlobalKTable
@@ -54,7 +55,7 @@ public class StreamTableJoinNode<K, V> extends GraphNode {
}
@Override
- public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
+ public void writeToTopology(final InternalTopologyBuilder topologyBuilder,
final Properties props) {
final String processorName = processorParameters.processorName();
final ProcessorSupplier<K, V, ?, ?> processorSupplier =
processorParameters.processorSupplier();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
index 3b0a572..54231d3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java
@@ -26,6 +26,8 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import java.util.Properties;
+
/**
* Represents a KTable convert From KStream
*/
@@ -52,7 +54,7 @@ public class StreamToTableNode<K, V> extends GraphNode {
@SuppressWarnings("unchecked")
@Override
- public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
+ public void writeToTopology(final InternalTopologyBuilder topologyBuilder,
final Properties props) {
final StoreBuilder<TimestampedKeyValueStore<K, V>> storeBuilder =
new
TimestampedKeyValueStoreMaterializer<>((MaterializedInternal<K, V,
KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
index b9a2568..5254c57 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
@@ -23,6 +23,7 @@ import
org.apache.kafka.streams.state.TimestampedKeyValueStore;
import java.util.Arrays;
import java.util.Objects;
+import java.util.Properties;
public class TableProcessorNode<K, V> extends GraphNode {
@@ -58,7 +59,7 @@ public class TableProcessorNode<K, V> extends GraphNode {
@SuppressWarnings("unchecked")
@Override
- public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
+ public void writeToTopology(final InternalTopologyBuilder topologyBuilder,
final Properties props) {
final String processorName = processorParameters.processorName();
topologyBuilder.addProcessor(processorName,
processorParameters.processorSupplier(), parentNodeNames());
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index 203f3af..b708961 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import java.util.Collections;
+import java.util.Properties;
/**
* Used to represent either a KTable source or a GlobalKTable source. A
boolean flag is used to indicate if this represents a GlobalKTable a {@link
@@ -81,7 +82,7 @@ public class TableSourceNode<K, V> extends SourceGraphNode<K,
V> {
@Override
@SuppressWarnings("unchecked")
- public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
+ public void writeToTopology(final InternalTopologyBuilder topologyBuilder,
final Properties props) {
final String topicName = topicNames().iterator().next();
// TODO: we assume source KTables can only be timestamped-key-value
stores for now.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/UnoptimizableRepartitionNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/UnoptimizableRepartitionNode.java
index daac9bd..a231d23 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/UnoptimizableRepartitionNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/UnoptimizableRepartitionNode.java
@@ -22,6 +22,8 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import java.util.Properties;
+
/**
* Repartition node that is not subject of optimization algorithm
*/
@@ -50,7 +52,7 @@ public class UnoptimizableRepartitionNode<K, V> extends
BaseRepartitionNode<K, V
}
@Override
- public void writeToTopology(final InternalTopologyBuilder topologyBuilder)
{
+ public void writeToTopology(final InternalTopologyBuilder topologyBuilder,
final Properties props) {
topologyBuilder.addInternalTopic(repartitionTopic,
internalTopicProperties);
topologyBuilder.addProcessor(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 50c2f86..d0fb2be 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -116,6 +116,9 @@ public class KStreamKStreamLeftJoinTest {
driver.createInputTopic(topic2, new IntegerSerializer(), new
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockProcessor<Integer, String> processor =
supplier.theCapturedProcessor();
+ // Only 2 window stores should be available
+ assertEquals(2, driver.getAllStateStores().size());
+
// push two items to the primary stream; the other window is empty
// w1 {}
// w2 {}
@@ -167,6 +170,9 @@ public class KStreamKStreamLeftJoinTest {
driver.createInputTopic(topic2, new IntegerSerializer(), new
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockProcessor<Integer, String> processor =
supplier.theCapturedProcessor();
+ // Only 2 window stores should be available
+ assertEquals(2, driver.getAllStateStores().size());
+
inputTopic1.pipeInput(0, "A0", 0);
inputTopic1.pipeInput(0, "A0-0", 0);
inputTopic2.pipeInput(0, "a0", 0);
@@ -488,6 +494,9 @@ public class KStreamKStreamLeftJoinTest {
driver.createInputTopic(topic2, new IntegerSerializer(),
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockProcessor<Integer, String> processor =
supplier.theCapturedProcessor();
+ // 2 window stores + 1 shared window store should be available
+ assertEquals(3, driver.getAllStateStores().size());
+
// push two items to the primary stream; the other window is empty
// w1 {}
// w2 {}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
index 2bf7fef..9c16c59 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
@@ -112,6 +112,9 @@ public class KStreamKStreamOuterJoinTest {
driver.createInputTopic(topic2, new IntegerSerializer(), new
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockProcessor<Integer, String> processor =
supplier.theCapturedProcessor();
+ // Only 2 window stores should be available
+ assertEquals(2, driver.getAllStateStores().size());
+
inputTopic1.pipeInput(0, "A0", 0);
inputTopic1.pipeInput(0, "A0-0", 0);
inputTopic2.pipeInput(0, "a0", 0);
@@ -557,6 +560,9 @@ public class KStreamKStreamOuterJoinTest {
driver.createInputTopic(topic2, new IntegerSerializer(), new
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final MockProcessor<Integer, String> processor =
supplier.theCapturedProcessor();
+ // 2 window stores + 1 shared window store should be available
+ assertEquals(3, driver.getAllStateStores().size());
+
// push two items to the primary stream; the other window is
empty; this should not
// produce any items because window has not expired
// w1 {}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
index 66c55c0f..1d74e89 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNodeTest.java
@@ -30,6 +30,8 @@ import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.util.Properties;
+
@RunWith(PowerMockRunner.class)
@PrepareForTest({InternalTopologyBuilder.class})
public class TableSourceNodeTest {
@@ -71,6 +73,6 @@ public class TableSourceNodeTest {
.build();
tableSourceNode.reuseSourceTopicForChangeLog(shouldReuseSourceTopicForChangelog);
- tableSourceNode.writeToTopology(topologyBuilder);
+ tableSourceNode.writeToTopology(topologyBuilder, new Properties());
}
}