[ 
https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16464036#comment-16464036
 ] 

ASF GitHub Bot commented on KAFKA-6761:
---------------------------------------

guozhangwang closed pull request #4923: KAFKA-6761: Part 1 of 3; Graph nodes
URL: https://github.com/apache/kafka/pull/4923
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/BaseJoinProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/BaseJoinProcessorNode.java
new file mode 100644
index 00000000000..899ee718e28
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/BaseJoinProcessorNode.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+/**
+ * Utility base class containing the common fields between
+ * a Stream-Stream join and a Table-Table join
+ */
+abstract class BaseJoinProcessorNode<K, V1, V2, VR> extends StreamsGraphNode {
+
+    private final ProcessorSupplier<K, V1> joinThisProcessSupplier;
+    private final ProcessorSupplier<K, V2> joinOtherProcessSupplier;
+    private final ProcessorSupplier<K, VR> joinMergeProcessor;
+    private final ValueJoiner<? super V1, ? super V2, ? extends VR> 
valueJoiner;
+    private final String joinThisProcessorName;
+    private final String joinOtherProcessorName;
+    private final String joinMergeProcessorName;
+    private final String thisJoinSide;
+    private final String otherJoinSide;
+
+
+    BaseJoinProcessorNode(final String parentProcessorNodeName,
+                          final String processorNodeName,
+                          final ValueJoiner<? super V1, ? super V2, ? extends 
VR> valueJoiner,
+                          final ProcessorParameters<K, V1> 
joinThisProcessorDetails,
+                          final ProcessorParameters<K, V2> 
joinOtherProcessDetails,
+                          final ProcessorParameters<K, VR> 
joinMergeProcessorDetails,
+                          final String thisJoinSide,
+                          final String otherJoinSide) {
+
+        super(parentProcessorNodeName,
+              processorNodeName,
+              false);
+
+        this.valueJoiner = valueJoiner;
+        this.joinThisProcessSupplier = 
joinThisProcessorDetails.processorSupplier();
+        this.joinOtherProcessSupplier = 
joinOtherProcessDetails.processorSupplier();
+        this.joinMergeProcessor = 
joinMergeProcessorDetails.processorSupplier();
+        this.joinThisProcessorName = joinThisProcessorDetails.processorName();
+        this.joinOtherProcessorName = joinOtherProcessDetails.processorName();
+        this.joinMergeProcessorName = 
joinMergeProcessorDetails.processorName();
+        this.thisJoinSide = thisJoinSide;
+        this.otherJoinSide = otherJoinSide;
+    }
+
+    ProcessorSupplier<K, V1> joinThisProcessorSupplier() {
+        return joinThisProcessSupplier;
+    }
+
+    ProcessorSupplier<K, V2> joinOtherProcessorSupplier() {
+        return joinOtherProcessSupplier;
+    }
+
+    ProcessorSupplier<K, VR> joinMergeProcessor() {
+        return joinMergeProcessor;
+    }
+
+    ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner() {
+        return valueJoiner;
+    }
+
+    String joinThisProcessorName() {
+        return joinThisProcessorName;
+    }
+
+    String joinOtherProcessorName() {
+        return joinOtherProcessorName;
+    }
+
+    String joinMergeProcessorName() {
+        return joinMergeProcessorName;
+    }
+
+    String thisJoinSide() {
+        return thisJoinSide;
+    }
+
+    String otherJoinSide() {
+        return otherJoinSide;
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinNode.java
new file mode 100644
index 00000000000..f76aa0d64d2
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinNode.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+import java.util.Arrays;
+
+/**
+ * Too much specific information to generalize so the
+ * KTable-KTable join requires a specific node.
+ */
+class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, V1, 
V2, VR> {
+
+    private final String[] joinThisStoreNames;
+    private final String[] joinOtherStoreNames;
+
+    KTableKTableJoinNode(final String parentProcessorNodeName,
+                         final String processorNodeName,
+                         final ValueJoiner<? super V1, ? super V2, ? extends 
VR> valueJoiner,
+                         final ProcessorParameters<K, V1> 
joinThisProcessorParameters,
+                         final ProcessorParameters<K, V2> 
joinOtherProcessorParameters,
+                         final ProcessorParameters<K, VR> 
joinMergeProcessorParameters,
+                         final String thisJoinSide,
+                         final String otherJoinSide,
+                         final String[] joinThisStoreNames,
+                         final String[] joinOtherStoreNames) {
+
+        super(parentProcessorNodeName,
+              processorNodeName,
+              valueJoiner,
+              joinThisProcessorParameters,
+              joinOtherProcessorParameters,
+              joinMergeProcessorParameters,
+              thisJoinSide,
+              otherJoinSide);
+
+        this.joinThisStoreNames = joinThisStoreNames;
+        this.joinOtherStoreNames = joinOtherStoreNames;
+    }
+
+    String[] joinThisStoreNames() {
+        return Arrays.copyOf(joinThisStoreNames, joinThisStoreNames.length);
+    }
+
+    String[] joinOtherStoreNames() {
+        return Arrays.copyOf(joinOtherStoreNames, joinOtherStoreNames.length);
+    }
+
+    @Override
+    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+
+    static <K, V, V1, V2, VR> KTableKTableJoinNodeBuilder<K, V1, V2, VR> 
kTableKTableJoinNodeBuilder() {
+        return new KTableKTableJoinNodeBuilder<>();
+    }
+
+    static final class KTableKTableJoinNodeBuilder<K, V1, V2, VR> {
+
+        private String processorNodeName;
+        private String parentProcessorNodeName;
+        private String[] joinThisStoreNames;
+        private ProcessorParameters<K, V1> joinThisProcessorParameters;
+        private String[] joinOtherStoreNames;
+        private ProcessorParameters<K, V2> joinOtherProcessorParameters;
+        private ProcessorParameters<K, VR> joinMergeProcessorParameters;
+        private ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner;
+        private String thisJoinSide;
+        private String otherJoinSide;
+
+        private KTableKTableJoinNodeBuilder() {
+        }
+
+        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  
withJoinThisStoreNames(final String[] joinThisStoreNames) {
+            this.joinThisStoreNames = joinThisStoreNames;
+            return this;
+        }
+
+        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  
withJoinThisProcessorParameters(final ProcessorParameters<K, V1> 
joinThisProcessorParameters) {
+            this.joinThisProcessorParameters = joinThisProcessorParameters;
+            return this;
+        }
+
+        KTableKTableJoinNodeBuilder<K, V1, V2, VR> 
withProcessorNodeName(String processorNodeName) {
+            this.processorNodeName = processorNodeName;
+            return this;
+        }
+
+        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  
withJoinOtherStoreNames(final String[] joinOtherStoreNames) {
+            this.joinOtherStoreNames = joinOtherStoreNames;
+            return this;
+        }
+
+        KTableKTableJoinNodeBuilder<K, V1, V2, VR> 
withParentProcessorNodeName(final String parentProcessorNodeName) {
+            this.parentProcessorNodeName = parentProcessorNodeName;
+            return this;
+        }
+
+        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  
withJoinOtherProcessorParameters(final ProcessorParameters<K, V2> 
joinOtherProcessorParameters) {
+            this.joinOtherProcessorParameters = joinOtherProcessorParameters;
+            return this;
+        }
+
+        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  
withJoinMergeProcessorParameters(final ProcessorParameters<K, VR> 
joinMergeProcessorParameters) {
+            this.joinMergeProcessorParameters = joinMergeProcessorParameters;
+            return this;
+        }
+
+        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withValueJoiner(final 
ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner) {
+            this.valueJoiner = valueJoiner;
+            return this;
+        }
+
+        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withThisJoinSide(final 
String thisJoinSide) {
+            this.thisJoinSide = thisJoinSide;
+            return this;
+        }
+
+        KTableKTableJoinNodeBuilder<K, V1, V2, VR>  withOtherJoinSide(final 
String otherJoinSide) {
+            this.otherJoinSide = otherJoinSide;
+            return this;
+        }
+
+        KTableKTableJoinNode<K, V1, V2, VR> build() {
+
+            return new KTableKTableJoinNode<>(parentProcessorNodeName,
+                                              processorNodeName,
+                                              valueJoiner,
+                                              joinThisProcessorParameters,
+                                              joinOtherProcessorParameters,
+                                              joinMergeProcessorParameters,
+                                              thisJoinSide,
+                                              otherJoinSide,
+                                              joinThisStoreNames,
+                                              joinOtherStoreNames);
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProcessorParameters.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProcessorParameters.java
new file mode 100644
index 00000000000..cab1589c882
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProcessorParameters.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+/**
+ * Class used to represent a {@link ProcessorSupplier} and the name
+ * used to register it with the {@link 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder}
+ *
+ * Used by the Join nodes as there are several parameters, this abstraction 
helps
+ * keep the number of arguments more reasonable.
+ */
+class ProcessorParameters<K, V> {
+
+    private final ProcessorSupplier<K, V> processorSupplier;
+    private final String processorName;
+
+    ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier, final 
String processorName) {
+        this.processorSupplier = processorSupplier;
+        this.processorName = processorName;
+    }
+
+    ProcessorSupplier<K, V> processorSupplier() {
+        return processorSupplier;
+    }
+
+    String processorName() {
+        return processorName;
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/RepartitionNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/RepartitionNode.java
new file mode 100644
index 00000000000..d8aaee9875d
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/RepartitionNode.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+class RepartitionNode<K, V> extends StatelessProcessorNode<K, V> {
+
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
+    private final String sinkName;
+    private final String sourceName;
+    private final String repartitionTopic;
+    private final String processorName;
+
+
+    RepartitionNode(final String parentProcessorNodeName,
+                    final String processorNodeName,
+                    final String sourceName,
+                    final ProcessorSupplier<K, V> processorSupplier,
+                    final Serde<K> keySerde,
+                    final Serde<V> valueSerde,
+                    final String sinkName,
+                    final String repartitionTopic,
+                    final String processorName) {
+
+        super(parentProcessorNodeName,
+              processorNodeName,
+              processorSupplier,
+              false);
+
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+        this.sinkName = sinkName;
+        this.sourceName = sourceName;
+        this.repartitionTopic = repartitionTopic;
+        this.processorName = processorName;
+    }
+
+    Serde<K> keySerde() {
+        return keySerde;
+    }
+
+    Serde<V> valueSerde() {
+        return valueSerde;
+    }
+
+    String sinkName() {
+        return sinkName;
+    }
+
+    String sourceName() {
+        return sourceName;
+    }
+
+    String repartitionTopic() {
+        return repartitionTopic;
+    }
+
+    String processorName() {
+        return processorName;
+    }
+
+    @Override
+    void writeToTopology(InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+
+    static <K, V> RepartitionNodeBuilder<K, V> repartitionNodeBuilder() {
+        return new RepartitionNodeBuilder<>();
+    }
+
+
+    static final class RepartitionNodeBuilder<K, V> {
+
+        private String processorNodeName;
+        private ProcessorSupplier<K, V> processorSupplier;
+        private Serde<K> keySerde;
+        private Serde<V> valueSerde;
+        private String sinkName;
+        private String sourceName;
+        private String repartitionTopic;
+        private String processorName;
+        private String parentProcessorNodeName;
+
+        private RepartitionNodeBuilder() {
+        }
+
+        RepartitionNodeBuilder<K, V> withProcessorSupplier(final 
ProcessorSupplier<K, V> processorSupplier) {
+            this.processorSupplier = processorSupplier;
+            return this;
+        }
+
+        RepartitionNodeBuilder<K, V> withKeySerde(final Serde<K> keySerde) {
+            this.keySerde = keySerde;
+            return this;
+        }
+
+        RepartitionNodeBuilder<K, V> withValueSerde(final Serde<V> valueSerde) 
{
+            this.valueSerde = valueSerde;
+            return this;
+        }
+
+        RepartitionNodeBuilder<K, V> withSinkName(final String sinkName) {
+            this.sinkName = sinkName;
+            return this;
+        }
+
+        RepartitionNodeBuilder<K, V> withSourceName(final String sourceName) {
+            this.sourceName = sourceName;
+            return this;
+        }
+
+        RepartitionNodeBuilder<K, V> withRepartitionTopic(final String 
repartitionTopic) {
+            this.repartitionTopic = repartitionTopic;
+            return this;
+        }
+
+        RepartitionNodeBuilder<K, V> withProcessorName(final String 
processorName) {
+            this.processorName = processorName;
+            return this;
+        }
+
+        RepartitionNodeBuilder<K, V> withParentProcessorNodeName(final String 
parentProcessorNodeName) {
+            this.parentProcessorNodeName = parentProcessorNodeName;
+            return this;
+        }
+
+        RepartitionNodeBuilder<K, V> withProcessorNodeName(final String 
processorNodeName) {
+            this.processorNodeName = processorNodeName;
+            return this;
+        }
+
+        RepartitionNode<K, V> build() {
+
+            return new RepartitionNode<>(parentProcessorNodeName,
+                                         processorNodeName,
+                                         sourceName,
+                                         processorSupplier,
+                                         keySerde,
+                                         valueSerde,
+                                         sinkName,
+                                         repartitionTopic,
+                                         processorName);
+
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulProcessorNode.java
new file mode 100644
index 00000000000..57d30fee236
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulProcessorNode.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+
+import java.util.Arrays;
+
+class StatefulProcessorNode<K, V> extends StatelessProcessorNode<K, V> {
+
+    private final String[] storeNames;
+    private final 
org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> 
storeSupplier;
+    private final StoreBuilder<KeyValueStore<K, V>> storeBuilder;
+
+
+    StatefulProcessorNode(final String parentNodeName,
+                          final String processorNodeName,
+                          final ProcessorSupplier<K, V> processorSupplier,
+                          final String[] storeNames,
+                          final 
org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> 
storeSupplier,
+                          final StoreBuilder<KeyValueStore<K, V>> storeBuilder,
+                          final boolean repartitionRequired) {
+        super(parentNodeName,
+              processorNodeName,
+              processorSupplier,
+              repartitionRequired);
+
+        this.storeNames = storeNames;
+        this.storeSupplier = storeSupplier;
+        this.storeBuilder = storeBuilder;
+    }
+
+
+    String[] storeNames() {
+        return Arrays.copyOf(storeNames, storeNames.length);
+    }
+
+    org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> 
storeSupplier() {
+        return storeSupplier;
+    }
+
+    StoreBuilder<KeyValueStore<K, V>> storeBuilder() {
+        return storeBuilder;
+    }
+
+    @Override
+    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+
+    static <K, V> StatefulProcessorNodeBuilder<K, V> 
statefulProcessorNodeBuilder() {
+        return new StatefulProcessorNodeBuilder<>();
+    }
+
+    static final class StatefulProcessorNodeBuilder<K, V> {
+
+        private ProcessorSupplier processorSupplier;
+        private String processorNodeName;
+        private String parentProcessorNodeName;
+        private boolean repartitionRequired;
+        private String[] storeNames;
+        private 
org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> 
storeSupplier;
+        private StoreBuilder<KeyValueStore<K, V>> storeBuilder;
+
+        private StatefulProcessorNodeBuilder() {
+        }
+
+        StatefulProcessorNodeBuilder<K, V> withProcessorSupplier(final 
ProcessorSupplier processorSupplier) {
+            this.processorSupplier = processorSupplier;
+            return this;
+        }
+
+        StatefulProcessorNodeBuilder<K, V> withProcessorNodeName(final String 
processorNodeName) {
+            this.processorNodeName = processorNodeName;
+            return this;
+        }
+
+        StatefulProcessorNodeBuilder<K, V> withParentProcessorNodeName(final 
String parentProcessorNodeName) {
+            this.parentProcessorNodeName = parentProcessorNodeName;
+            return this;
+        }
+
+        StatefulProcessorNodeBuilder<K, V> withStoreNames(final String[] 
storeNames) {
+            this.storeNames = storeNames;
+            return this;
+        }
+
+        StatefulProcessorNodeBuilder<K, V> withRepartitionRequired(final 
boolean repartitionRequired) {
+            this.repartitionRequired = repartitionRequired;
+            return this;
+        }
+
+        StatefulProcessorNodeBuilder<K, V> withStoreSupplier(final 
StateStoreSupplier<KeyValueStore> storeSupplier) {
+            this.storeSupplier = storeSupplier;
+            return this;
+        }
+
+        StatefulProcessorNodeBuilder<K, V> withStoreBuilder(final 
StoreBuilder<KeyValueStore<K, V>> storeBuilder) {
+            this.storeBuilder = storeBuilder;
+            return this;
+        }
+
+        StatefulProcessorNode<K, V> build() {
+            return new StatefulProcessorNode<>(parentProcessorNodeName,
+                                               processorNodeName,
+                                               processorSupplier,
+                                               storeNames,
+                                               storeSupplier,
+                                               storeBuilder,
+                                               repartitionRequired);
+
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulRepartitionNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulRepartitionNode.java
new file mode 100644
index 00000000000..e7d51407bcc
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulRepartitionNode.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+class StatefulRepartitionNode<K, V, T> extends RepartitionNode<K, V> {
+
+    private final ProcessorSupplier<K, Change<V>> statefulProcessorSupplier;
+    private final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> 
materialized;
+
+    StatefulRepartitionNode(final String parentProcessorNodeName,
+                            final String processorNodeName,
+                            final String sourceName,
+                            final Serde<K> keySerde,
+                            final Serde<V> valueSerde,
+                            final String sinkName,
+                            final String repartitionTopic,
+                            final String processorName,
+                            final ProcessorSupplier<K, Change<V>> 
statefulProcessorSupplier,
+                            final MaterializedInternal<K, T, 
KeyValueStore<Bytes, byte[]>> materialized) {
+        super(parentProcessorNodeName,
+              processorNodeName,
+              sourceName,
+              null,
+              keySerde,
+              valueSerde,
+              sinkName,
+              repartitionTopic,
+              processorName);
+
+        this.statefulProcessorSupplier = statefulProcessorSupplier;
+        this.materialized = materialized;
+    }
+
+    ProcessorSupplier<K, Change<V>> statefulProcessorSupplier() {
+        return statefulProcessorSupplier;
+    }
+
+    MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized() {
+        return materialized;
+    }
+
+    ChangedSerializer<? extends V> changedValueSerializer() {
+        final Serializer<? extends V> valueSerializer = valueSerde() == null ? 
null : valueSerde().serializer();
+        return new ChangedSerializer<>(valueSerializer);
+
+    }
+
+    ChangedDeserializer<? extends V> changedValueDeserializer() {
+        final Deserializer<? extends V> valueDeserializer = valueSerde() == 
null ? null : valueSerde().deserializer();
+        return new ChangedDeserializer<>(valueDeserializer);
+    }
+
+    static <K, V, T> StatefulRepartitionNodeBuilder<K, V, T> 
statefulRepartitionNodeBuilder() {
+        return new StatefulRepartitionNodeBuilder<>();
+    }
+
+    @Override
+    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+
+
+    static final class StatefulRepartitionNodeBuilder<K, V, T> {
+
+        private String parentProcessorNodeName;
+        private String processorNodeName;
+        private Serde<K> keySerde;
+        private Serde<V> valueSerde;
+        private String sinkName;
+        private String sourceName;
+        private String repartitionTopic;
+        private String processorName;
+        private ProcessorSupplier<K, Change<V>> statefulProcessorSupplier;
+        private MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> 
materialized;
+
+        private StatefulRepartitionNodeBuilder() {
+        }
+
+        StatefulRepartitionNodeBuilder<K, V, T> withKeySerde(final Serde<K> 
keySerde) {
+            this.keySerde = keySerde;
+            return this;
+        }
+
+
+        StatefulRepartitionNodeBuilder<K, V, T> withValueSerde(final Serde<V> 
valueSerde) {
+            this.valueSerde = valueSerde;
+            return this;
+        }
+
+        StatefulRepartitionNodeBuilder<K, V, T> 
withParentProcessorNodeName(final String parentProcessorNodeName) {
+            this.parentProcessorNodeName = parentProcessorNodeName;
+            return this;
+        }
+
+        StatefulRepartitionNodeBuilder<K, V, T> withSinkName(final String 
sinkName) {
+            this.sinkName = sinkName;
+            return this;
+        }
+
+        StatefulRepartitionNodeBuilder<K, V, T> withSourceName(final String 
sourceName) {
+            this.sourceName = sourceName;
+            return this;
+        }
+
+        StatefulRepartitionNodeBuilder<K, V, T> withRepartitionTopic(final 
String repartitionTopic) {
+            this.repartitionTopic = repartitionTopic;
+            return this;
+        }
+
+        StatefulRepartitionNodeBuilder<K, V, T> withProcessorNodeName(final 
String processorNodeName) {
+            this.processorName = processorNodeName;
+            return this;
+        }
+
+        StatefulRepartitionNodeBuilder<K, V, T> 
withStatefulProcessorSupplier(final ProcessorSupplier<K, Change<V>> 
statefulProcessorSupplier) {
+            this.statefulProcessorSupplier = statefulProcessorSupplier;
+            return this;
+        }
+
+        StatefulRepartitionNodeBuilder<K, V, T> withMaterialized(final 
MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) {
+            this.materialized = materialized;
+            return this;
+        }
+
+        StatefulRepartitionNodeBuilder<K, V, T> withNodeName(final String 
nodeName) {
+            this.processorNodeName = nodeName;
+            return this;
+        }
+
+        public StatefulRepartitionNode<K, V, T> build() {
+
+            return new StatefulRepartitionNode<>(parentProcessorNodeName,
+                                                 processorNodeName,
+                                                 sourceName,
+                                                 keySerde,
+                                                 valueSerde,
+                                                 sinkName,
+                                                 repartitionTopic,
+                                                 processorName,
+                                                 statefulProcessorSupplier,
+                                                 materialized);
+
+
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulSourceNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulSourceNode.java
new file mode 100644
index 00000000000..b2fdc8129de
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatefulSourceNode.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+
+import java.util.Collections;
+
+/**
+ * Used to represent either a KTable source or a GlobalKTable source.
+ * The presence of a {@link KTableSource} indicates this source node supplies
+ * a {@link org.apache.kafka.streams.kstream.GlobalKTable}
+ */
+class StatefulSourceNode<K, V> extends StreamSourceNode<K, V> {
+
+    private 
org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> 
storeSupplier;
+    private StoreBuilder<KeyValueStore<K, V>> storeBuilder;
+    private final ProcessorSupplier<K, V> processorSupplier;
+    private final String sourceName;
+    private final String processorName;
+    private final KTableSource<K, V> kTableSource;
+
+    StatefulSourceNode(final String predecessorNodeName,
+                       final String nodeName,
+                       final String sourceName,
+                       final String processorName,
+                       final String topic,
+                       final ConsumedInternal<K, V> consumedInternal,
+                       final ProcessorSupplier<K, V> processorSupplier,
+                       final KTableSource<K, V> kTableSource) {
+
+        super(predecessorNodeName,
+              nodeName,
+              Collections.singletonList(topic),
+              consumedInternal);
+
+        this.processorSupplier = processorSupplier;
+        this.sourceName = sourceName;
+        this.processorName = processorName;
+        this.kTableSource = kTableSource;
+    }
+
+    StateStoreSupplier<KeyValueStore> storeSupplier() {
+        return storeSupplier;
+    }
+
+    void setStoreSupplier(StateStoreSupplier<KeyValueStore> storeSupplier) {
+        this.storeSupplier = storeSupplier;
+    }
+
+    StoreBuilder<KeyValueStore<K, V>> storeBuilder() {
+        return storeBuilder;
+    }
+
+    void setStoreBuilder(StoreBuilder<KeyValueStore<K, V>> storeBuilder) {
+        this.storeBuilder = storeBuilder;
+    }
+
+    ProcessorSupplier<K, V> processorSupplier() {
+        return processorSupplier;
+    }
+
+    String sourceName() {
+        return sourceName;
+    }
+
+    KTableSource<K, V> kTableSource() {
+        return kTableSource;
+    }
+
+    String processorName() {
+        return processorName;
+    }
+
+    boolean isGlobalKTable() {
+        return kTableSource != null;
+    }
+
+    static <K, V> StatefulSourceNodeBuilder<K, V> statefulSourceNodeBuilder() {
+        return new StatefulSourceNodeBuilder<>();
+    }
+
+    @Override
+    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+
+    static final class StatefulSourceNodeBuilder<K, V> {
+
+        private String predecessorNodeName;
+        private String nodeName;
+        private String sourceName;
+        private String processorName;
+        private String topic;
+        private ConsumedInternal<K, V> consumedInternal;
+        private StateStoreSupplier<KeyValueStore> storeSupplier;
+        private StoreBuilder<KeyValueStore<K, V>> storeBuilder;
+        private ProcessorSupplier<K, V> processorSupplier;
+        private KTableSource<K, V> kTableSource;
+
+        private StatefulSourceNodeBuilder() {
+        }
+
+
+        StatefulSourceNodeBuilder<K, V> withPredecessorNodeName(final String 
predecessorNodeName) {
+            this.predecessorNodeName = predecessorNodeName;
+            return this;
+        }
+
+        StatefulSourceNodeBuilder<K, V> withSourceName(final String 
sourceName) {
+            this.sourceName = sourceName;
+            return this;
+        }
+
+        StatefulSourceNodeBuilder<K, V> withProcessorName(final String 
processorName) {
+            this.processorName = processorName;
+            return this;
+        }
+
+        StatefulSourceNodeBuilder<K, V> withTopic(final String topic) {
+            this.topic = topic;
+            return this;
+        }
+
+        StatefulSourceNodeBuilder<K, V> withStoreSupplier(final 
StateStoreSupplier<KeyValueStore> storeSupplier) {
+            this.storeSupplier = storeSupplier;
+            return this;
+        }
+
+
+        StatefulSourceNodeBuilder<K, V> withStoreBuilder(final 
StoreBuilder<KeyValueStore<K, V>> storeBuilder) {
+            this.storeBuilder = storeBuilder;
+            return this;
+        }
+
+        StatefulSourceNodeBuilder<K, V> withConsumedInternal(final 
ConsumedInternal<K, V> consumedInternal) {
+            this.consumedInternal = consumedInternal;
+            return this;
+        }
+
+        StatefulSourceNodeBuilder<K, V> withProcessorSupplier(final 
ProcessorSupplier<K, V> processorSupplier) {
+            this.processorSupplier = processorSupplier;
+            return this;
+        }
+
+        StatefulSourceNodeBuilder<K, V> withKTableSource(final KTableSource<K, 
V> kTableSource) {
+            this.kTableSource = kTableSource;
+            return this;
+        }
+
+        StatefulSourceNodeBuilder<K, V> withNodeName(final String nodeName) {
+            this.nodeName = nodeName;
+            return this;
+        }
+
+        StatefulSourceNode<K, V> build() {
+            StatefulSourceNode<K, V>
+                statefulSourceNode =
+                new StatefulSourceNode<>(predecessorNodeName,
+                                         nodeName,
+                                         sourceName,
+                                         processorName,
+                                         topic,
+                                         consumedInternal,
+                                         processorSupplier,
+                                         kTableSource);
+
+            statefulSourceNode.setRepartitionRequired(false);
+            if (storeSupplier != null) {
+                statefulSourceNode.setStoreSupplier(storeSupplier);
+            } else if (storeBuilder != null) {
+                statefulSourceNode.setStoreBuilder(storeBuilder);
+            }
+
+            return statefulSourceNode;
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatelessProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatelessProcessorNode.java
new file mode 100644
index 00000000000..eecb06850a8
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StatelessProcessorNode.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Used to represent any type of stateless operation:
+ *
+ * map, mapValues, flatMap, flatMapValues, filter, filterNot, branch
+ *
+ */
+class StatelessProcessorNode<K, V> extends StreamsGraphNode {
+
+    private final ProcessorSupplier<K, V> processorSupplier;
+
+    // some processors need to register multiple parent names with
+    // the InternalTopologyBuilder KStream#merge for example.
+    // There is only one parent graph node but the name of each KStream merged 
needs
+    // to get registered with InternalStreamsBuilder
+
+    private List<String> multipleParentNames = new ArrayList<>();
+
+
+    StatelessProcessorNode(final String parentProcessorNodeName,
+                           final String processorNodeName,
+                           final ProcessorSupplier<K, V> processorSupplier,
+                           final boolean repartitionRequired) {
+
+        super(parentProcessorNodeName,
+              processorNodeName,
+              repartitionRequired);
+
+        this.processorSupplier = processorSupplier;
+    }
+
+    StatelessProcessorNode(final String parentProcessorNodeName,
+                           final String processorNodeName,
+                           final boolean repartitionRequired,
+                           final ProcessorSupplier<K, V> processorSupplier,
+                           final List<String> multipleParentNames) {
+
+        this(parentProcessorNodeName, processorNodeName, processorSupplier, 
repartitionRequired);
+
+        this.multipleParentNames = multipleParentNames;
+    }
+
+    ProcessorSupplier<K, V> processorSupplier() {
+        return processorSupplier;
+    }
+
+    List<String> multipleParentNames() {
+        return new ArrayList<>(multipleParentNames);
+    }
+
+    @Override
+    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamSinkNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamSinkNode.java
new file mode 100644
index 00000000000..0a65d1ba2c9
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamSinkNode.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+class StreamSinkNode<K, V> extends StreamsGraphNode {
+
+    private final String topic;
+    private final ProducedInternal<K, V> producedInternal;
+
+    StreamSinkNode(final String parentProcessorNodeName,
+                   final String processorNodeName,
+                   final String topic,
+                   final ProducedInternal<K, V> producedInternal) {
+
+        super(parentProcessorNodeName,
+              processorNodeName,
+              false);
+
+        this.topic = topic;
+        this.producedInternal = producedInternal;
+    }
+
+    String topic() {
+        return topic;
+    }
+
+    Serde<K> keySerde() {
+        return producedInternal.keySerde();
+    }
+
+    Serializer<K> keySerializer() {
+        return producedInternal.keySerde() != null ? 
producedInternal.keySerde().serializer() : null;
+    }
+
+    Serde<V> valueSerde() {
+        return producedInternal.valueSerde();
+    }
+
+    Serializer<V> valueSerializer() {
+        return producedInternal.valueSerde() != null ? 
producedInternal.valueSerde().serializer() : null;
+    }
+
+    StreamPartitioner<? super K, ? super V> streamPartitioner() {
+        return producedInternal.streamPartitioner();
+    }
+
+    @Override
+    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamSourceNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamSourceNode.java
new file mode 100644
index 00000000000..cb8684061f7
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamSourceNode.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.regex.Pattern;
+
+class StreamSourceNode<K, V> extends StreamsGraphNode {
+
+    private Collection<String> topics;
+    private Pattern topicPattern;
+    private final ConsumedInternal<K, V> consumedInternal;
+
+
+    StreamSourceNode(final String parentProcessorNodeName,
+                     final String processorNodeName,
+                     final Collection<String> topics,
+                     final ConsumedInternal<K, V> consumedInternal) {
+        super(parentProcessorNodeName,
+              processorNodeName,
+              false);
+
+        this.topics = topics;
+        this.consumedInternal = consumedInternal;
+    }
+
+    StreamSourceNode(final String parentProcessorNodeName,
+                     final String processorNodeName,
+                     final Pattern topicPattern,
+                     final ConsumedInternal<K, V> consumedInternal) {
+
+        super(parentProcessorNodeName,
+              processorNodeName,
+              false);
+
+        this.topicPattern = topicPattern;
+        this.consumedInternal = consumedInternal;
+    }
+
+    List<String> getTopics() {
+        return new ArrayList<>(topics);
+    }
+
+    Pattern getTopicPattern() {
+        return topicPattern;
+    }
+
+    Serde<K> keySerde() {
+        return consumedInternal.keySerde();
+    }
+
+    Deserializer<K> keyDeserializer() {
+        return consumedInternal.keySerde() != null ? 
consumedInternal.keySerde().deserializer() : null;
+    }
+
+    TimestampExtractor timestampExtractor() {
+        return consumedInternal.timestampExtractor();
+    }
+
+    Topology.AutoOffsetReset autoOffsetReset() {
+        return consumedInternal.offsetResetPolicy();
+    }
+
+    @Override
+    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinNode.java
new file mode 100644
index 00000000000..90734eb42d7
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinNode.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.WindowStore;
+
+/**
+ * Too much information to generalize, so Stream-Stream joins are
+ * represented by a specific node.
+ */
+class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, V1, 
V2, VR> {
+
+    private final ProcessorSupplier<K, V1> thisWindowedStreamProcessorSupplier;
+    private final ProcessorSupplier<K, V2> 
otherWindowedStreamProcessorSupplier;
+    private final StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder;
+    private final StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder;
+    private final Joined<K, V1, V2> joined;
+
+    private final String thisWindowedStreamName;
+    private final String otherWindowedStreamName;
+
+
+    StreamStreamJoinNode(final String parentProcessorNodeName,
+                         final String processorNodeName,
+                         final ValueJoiner<? super V1, ? super V2, ? extends 
VR> valueJoiner,
+                         final ProcessorParameters<K, V1> 
joinThisProcessorParameters,
+                         final ProcessorParameters<K, V2> 
joinOtherProcessParameters,
+                         final ProcessorParameters<K, VR> 
joinMergeProcessorParameters,
+                         final ProcessorParameters<K, V1> 
thisWindowedStreamProcessorParameters,
+                         final ProcessorParameters<K, V2> 
otherWindowedStreamProcessorParameters,
+                         final StoreBuilder<WindowStore<K, V1>> 
thisWindowStoreBuilder,
+                         final StoreBuilder<WindowStore<K, V2>> 
otherWindowStoreBuilder,
+                         final Joined<K, V1, V2> joined,
+                         final String leftHandSideStreamName,
+                         final String otherStreamName) {
+
+        super(parentProcessorNodeName,
+              processorNodeName,
+              valueJoiner,
+              joinThisProcessorParameters,
+              joinOtherProcessParameters,
+              joinMergeProcessorParameters,
+              leftHandSideStreamName,
+              otherStreamName);
+
+        this.thisWindowedStreamProcessorSupplier = 
thisWindowedStreamProcessorParameters.processorSupplier();
+        this.otherWindowedStreamProcessorSupplier = 
otherWindowedStreamProcessorParameters.processorSupplier();
+        this.thisWindowedStreamName = 
thisWindowedStreamProcessorParameters.processorName();
+        this.otherWindowedStreamName = 
otherWindowedStreamProcessorParameters.processorName();
+        this.thisWindowStoreBuilder = thisWindowStoreBuilder;
+        this.otherWindowStoreBuilder = otherWindowStoreBuilder;
+        this.joined = joined;
+    }
+
+    ProcessorSupplier<K, V1> thisWindowedStreamProcessorSupplier() {
+        return thisWindowedStreamProcessorSupplier;
+    }
+
+    ProcessorSupplier<K, V2> otherWindowedStreamProcessorSupplier() {
+        return otherWindowedStreamProcessorSupplier;
+    }
+
+    String thisWindowedStreamName() {
+        return thisWindowedStreamName;
+    }
+
+    String otherWindowedStreamName() {
+        return otherWindowedStreamName;
+    }
+
+    StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder() {
+        return thisWindowStoreBuilder;
+    }
+
+    StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder() {
+        return otherWindowStoreBuilder;
+    }
+
+    @Override
+    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+
+    static <K, V, V1, V2, VR> StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
streamStreamJoinNodeBuilder() {
+        return new StreamStreamJoinNodeBuilder<>();
+    }
+
+    static final class StreamStreamJoinNodeBuilder<K, V1, V2, VR> {
+
+        private String processorNodeName;
+        private String parentProcessorNodeName;
+        private ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner;
+        private ProcessorParameters<K, V1> joinThisProcessorParameters;
+        private ProcessorParameters<K, V2> joinOtherProcessorParameters;
+        private ProcessorParameters<K, VR> joinMergeProcessorParameters;
+        private ProcessorParameters<K, V1> 
thisWindowedStreamProcessorParameters;
+        private ProcessorParameters<K, V2> 
otherWindowedStreamProcessorParameters;
+        private StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder;
+        private StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder;
+        private Joined<K, V1, V2> joined;
+        private String leftHandSideStreamName;
+        private String otherStreamName;
+
+
+        private StreamStreamJoinNodeBuilder() {
+        }
+
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withValueJoiner(final 
ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner) {
+            this.valueJoiner = valueJoiner;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withJoinThisProcessorParameters(final ProcessorParameters<K, V1> 
joinThisProcessorParameters) {
+            this.joinThisProcessorParameters = joinThisProcessorParameters;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withThisWindowedStreamProcessorParameters(final ProcessorParameters<K, V1> 
thisWindowedStreamProcessorParameters) {
+            this.thisWindowedStreamProcessorParameters = 
thisWindowedStreamProcessorParameters;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withProcessorNodeName(final 
String name) {
+            this.processorNodeName = name;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withParentProcessorNodeName(final String predecessorNodeName) {
+            this.parentProcessorNodeName = predecessorNodeName;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withJoinOtherProcessorParameters(final ProcessorParameters<K, V2> 
joinOtherProcessParameters) {
+            this.joinOtherProcessorParameters = joinOtherProcessParameters;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withOtherWindowedStreamProcessorParameters(final ProcessorParameters<K, V2> 
otherWindowedStreamProcessorParameters) {
+            this.otherWindowedStreamProcessorParameters = 
otherWindowedStreamProcessorParameters;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withJoinMergeProcessorParameters(final ProcessorParameters<K, VR> 
joinMergeProcessorParameters) {
+            this.joinMergeProcessorParameters = joinMergeProcessorParameters;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withLeftHandSideStreamName(final String leftHandSideStreamName) {
+            this.leftHandSideStreamName = leftHandSideStreamName;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withOtherStreamName(final 
String otherStreamName) {
+            this.otherStreamName = otherStreamName;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withThisWindowStoreBuilder(final StoreBuilder<WindowStore<K, V1>> 
thisWindowStoreBuilder) {
+            this.thisWindowStoreBuilder = thisWindowStoreBuilder;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> 
withOtherWindowStoreBuilder(final StoreBuilder<WindowStore<K, V2>> 
otherWindowStoreBuilder) {
+            this.otherWindowStoreBuilder = otherWindowStoreBuilder;
+            return this;
+        }
+
+        StreamStreamJoinNodeBuilder<K, V1, V2, VR> withJoined(final Joined<K, 
V1, V2> joined) {
+            this.joined = joined;
+            return this;
+        }
+
+        StreamStreamJoinNode<K, V1, V2, VR> build() {
+
+            return new StreamStreamJoinNode<>(parentProcessorNodeName,
+                                              processorNodeName,
+                                              valueJoiner,
+                                              joinThisProcessorParameters,
+                                              joinOtherProcessorParameters,
+                                              joinMergeProcessorParameters,
+                                              
thisWindowedStreamProcessorParameters,
+                                              
otherWindowedStreamProcessorParameters,
+                                              thisWindowStoreBuilder,
+                                              otherWindowStoreBuilder,
+                                              joined,
+                                              leftHandSideStreamName,
+                                              otherStreamName);
+
+
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamTableJoinNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamTableJoinNode.java
new file mode 100644
index 00000000000..18f20557621
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamTableJoinNode.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+import java.util.Arrays;
+
+/**
+ * Represents a join between a KStream and a KTable or GlobalKTable
+ */
+
+class StreamTableJoinNode<K1, K2, V1, V2, VR> extends StreamsGraphNode {
+
+    private final String[] storeNames;
+    private final ProcessorSupplier<K1, V1> processorSupplier;
+
+    StreamTableJoinNode(final String parentProcessorNodeName,
+                        final String processorNodeName,
+                        final ProcessorSupplier<K1, V1> processorSupplier,
+                        final String[] storeNames) {
+        super(parentProcessorNodeName,
+              processorNodeName,
+              false);
+
+        // in the case of Stream-Table join the state stores associated with 
the KTable
+        this.storeNames = storeNames;
+        this.processorSupplier = processorSupplier;
+    }
+
+    String[] storeNames() {
+        return Arrays.copyOf(storeNames, storeNames.length);
+    }
+
+    ProcessorSupplier<K1, V1> processorSupplier() {
+        return processorSupplier;
+    }
+
+    @Override
+    void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
+        //TODO will implement in follow-up pr
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamsGraphNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamsGraphNode.java
new file mode 100644
index 00000000000..4597513fc0f
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamsGraphNode.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+
+import java.util.Collection;
+import java.util.LinkedHashSet;
+
+abstract class StreamsGraphNode {
+
+    private StreamsGraphNode parentNode;
+    private final Collection<StreamsGraphNode> childNodes = new 
LinkedHashSet<>();
+    private final String processorNodeName;
+    private String parentProcessorNodeName;
+    private boolean repartitionRequired;
+    private boolean triggersRepartitioning;
+    private Integer id;
+    private StreamsTopologyGraph streamsTopologyGraph;
+
+    StreamsGraphNode(final String parentProcessorNodeName,
+                     final String processorNodeName,
+                     final boolean repartitionRequired) {
+        this.parentProcessorNodeName = parentProcessorNodeName;
+        this.processorNodeName = processorNodeName;
+        this.repartitionRequired = repartitionRequired;
+    }
+
+    StreamsGraphNode parentNode() {
+        return parentNode;
+    }
+
+    String parentProcessorNodeName() {
+        return parentProcessorNodeName;
+    }
+
+    void setParentProcessorNodeName(final String parentProcessorNodeName) {
+        this.parentProcessorNodeName = parentProcessorNodeName;
+    }
+
+    void setParentNode(final StreamsGraphNode parentNode) {
+        this.parentNode = parentNode;
+    }
+
+    Collection<StreamsGraphNode> children() {
+        return new LinkedHashSet<>(childNodes);
+    }
+
+    void addChildNode(final StreamsGraphNode node) {
+        this.childNodes.add(node);
+    }
+
+    String processorNodeName() {
+        return processorNodeName;
+    }
+
+    boolean repartitionRequired() {
+        return repartitionRequired;
+    }
+
+    void setRepartitionRequired(boolean repartitionRequired) {
+        this.repartitionRequired = repartitionRequired;
+    }
+
+    public boolean triggersRepartitioning() {
+        return triggersRepartitioning;
+    }
+
+    public void setTriggersRepartitioning(final boolean 
triggersRepartitioning) {
+        this.triggersRepartitioning = triggersRepartitioning;
+    }
+
+    void setId(final int id) {
+        this.id = id;
+    }
+
+    Integer id() {
+        return this.id;
+    }
+
+    public void setStreamsTopologyGraph(final StreamsTopologyGraph 
streamsTopologyGraph) {
+        this.streamsTopologyGraph = streamsTopologyGraph;
+    }
+
+    StreamsTopologyGraph streamsTopologyGraph() {
+        return streamsTopologyGraph;
+    }
+
+    abstract void writeToTopology(final InternalTopologyBuilder 
topologyBuilder);
+
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamsTopologyGraph.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamsTopologyGraph.java
new file mode 100644
index 00000000000..9d2b90be5eb
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamsTopologyGraph.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+class StreamsTopologyGraph {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(StreamsTopologyGraph.class);
+    public static final String TOPOLOGY_ROOT = "root";
+
+    protected final StreamsGraphNode root = new StreamsGraphNode(null, 
TOPOLOGY_ROOT, false) {
+        @Override
+        void writeToTopology(InternalTopologyBuilder topologyBuilder) {
+            // no-op for root node
+        }
+    };
+
+    private final AtomicInteger nodeIdCounter = new AtomicInteger(0);
+    private final Map<StreamsGraphNode, Set<StreamsGraphNode>> 
repartitioningNodeToRepartitioned = new HashMap<>();
+    private final Map<StreamsGraphNode, StreamSinkNode> 
stateStoreNodeToSinkNodes = new HashMap<>();
+    private final Map<String, StreamsGraphNode> nameToGraphNode = new 
HashMap<>();
+
+    private StreamsGraphNode previousNode;
+
+    StreamsTopologyGraph() {
+        nameToGraphNode.put(TOPOLOGY_ROOT, root);
+    }
+
+
+    public void addNode(final StreamsGraphNode node) {
+        node.setId(nodeIdCounter.getAndIncrement());
+
+        if (node.parentProcessorNodeName() == null && 
!node.processorNodeName().equals(TOPOLOGY_ROOT)) {
+            LOG.warn("Updating node {} with predecessor name {}", node, 
previousNode.processorNodeName());
+            node.setParentProcessorNodeName(previousNode.processorNodeName());
+        }
+
+        LOG.debug("Adding node {}", node);
+
+        final StreamsGraphNode predecessorNode =  
nameToGraphNode.get(node.parentProcessorNodeName());
+
+        if (predecessorNode == null) {
+            throw new IllegalStateException(
+                "Nodes should not have a null predecessor.  Name: " + 
node.processorNodeName() + " Type: "
+                + node.getClass().getSimpleName() + " predecessor name " + 
node.parentProcessorNodeName());
+        }
+
+        node.setParentNode(predecessorNode);
+        predecessorNode.addChildNode(node);
+
+        if (node.triggersRepartitioning()) {
+            repartitioningNodeToRepartitioned.put(node, new 
HashSet<StreamsGraphNode>());
+        } else if (node.repartitionRequired()) {
+            StreamsGraphNode currentNode = node;
+            while (currentNode != null) {
+                final StreamsGraphNode parentNode = currentNode.parentNode();
+                if (parentNode.triggersRepartitioning()) {
+                    
repartitioningNodeToRepartitioned.get(parentNode).add(node);
+                    break;
+                }
+                currentNode = parentNode.parentNode();
+            }
+        }
+
+        if (!nameToGraphNode.containsKey(node.processorNodeName())) {
+            nameToGraphNode.put(node.processorNodeName(), node);
+        }
+
+        previousNode = node;
+    }
+
+    public StreamsGraphNode getRoot() {
+        return root;
+    }
+
+    /**
+     * Used for hints when a node in the topology triggers a repartition and 
the repartition flag
+     * is propagated down through the descendant nodes of the topology.  This 
can be used to help make an
+     * optimization where the triggering node does an eager "through" 
operation and the child nodes can ignore
+     * the need to repartition.
+     *
+     * @return Map&lt;StreamGraphNode, Set&lt;StreamGraphNode&gt;&gt;
+     */
+    public Map<StreamsGraphNode, Set<StreamsGraphNode>> 
getRepartitioningNodeToRepartitioned() {
+        Map<StreamsGraphNode, Set<StreamsGraphNode>> copy = new 
HashMap<>(repartitioningNodeToRepartitioned);
+        return Collections.unmodifiableMap(copy);
+    }
+
+    /**
+     * Used for hints when an Aggregation operation is directly output to a 
Sink topic.
+     * This map can be used to help optimize this case and use the Sink topic 
as the changelog topic
+     * for the state store of the aggregation.
+     *
+     * @return Map&lt;StreamGraphNode, StreamSinkNode&gt;
+     */
+    public Map<StreamsGraphNode, StreamSinkNode> 
getStateStoreNodeToSinkNodes() {
+        Map<StreamsGraphNode, StreamSinkNode> copy = new 
HashMap<>(stateStoreNodeToSinkNodes);
+        return Collections.unmodifiableMap(copy);
+    }
+
+    /**
+     * Used for tracking the Streams generated names back to the original 
StreamGraphNode
+     * to enable the predecessor - descendant relationship
+     *
+     * @return Map&lt;String, SteamsGraphNode&gt;
+     */
+    public Map<String, StreamsGraphNode> getNameToGraphNode() {
+        Map<String, StreamsGraphNode> copy = new HashMap<>(nameToGraphNode);
+        return Collections.unmodifiableMap(copy);
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce Kafka Streams Footprint
> ------------------------------
>
>                 Key: KAFKA-6761
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6761
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Bill Bejeck
>            Assignee: Bill Bejeck
>            Priority: Major
>             Fix For: 2.0.0
>
>
> The persistent storage footprint of a Kafka Streams application contains the 
> following aspects:
>  # The internal topics created on the Kafka cluster side.
>  # The materialized state stores on the Kafka Streams application instances 
> side.
> There have been some questions about reducing these footprints, especially 
> since many of them are not necessary. For example, there are redundant 
> internal topics, as well as unnecessary state stores that takes up space but 
> also affect performance. When people are pushing Streams to production with 
> high traffic, this issue would be more common and severe. Reducing the 
> footprint of Streams have clear benefits for reducing resource utilization of 
> Kafka Streams applications, and also not creating pressure on broker's 
> capacities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to