http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 4508c77..ce6ba7b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -19,39 +19,25 @@ package org.apache.kafka.streams.processor;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.QuickUnion;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.regex.Pattern;
 
-
 /**
  * A component that is used to build a {@link ProcessorTopology}. A topology 
contains an acyclic graph of sources, processors,
  * and sinks. A {@link SourceNode source} is a node in the graph that consumes 
one or more Kafka topics and forwards them to
@@ -64,238 +50,30 @@ import java.util.regex.Pattern;
 @InterfaceStability.Evolving
 public class TopologyBuilder {
 
-    private static final Logger log = 
LoggerFactory.getLogger(TopologyBuilder.class);
-
-    private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
-
-    // node factories in a topological order
-    private final LinkedHashMap<String, NodeFactory> nodeFactories = new 
LinkedHashMap<>();
-
-    // state factories
-    private final Map<String, StateStoreFactory> stateFactories = new 
HashMap<>();
-
-    // global state factories
-    private final Map<String, StateStore> globalStateStores = new 
LinkedHashMap<>();
-
-    // all topics subscribed from source processors (without application-id 
prefix for internal topics)
-    private final Set<String> sourceTopicNames = new HashSet<>();
-
-    // all internal topics auto-created by the topology builder and used in 
source / sink processors
-    private final Set<String> internalTopicNames = new HashSet<>();
-
-    // groups of source processors that need to be copartitioned
-    private final List<Set<String>> copartitionSourceGroups = new 
ArrayList<>();
-
-    // map from source processor names to subscribed topics (without 
application-id prefix for internal topics)
-    private final HashMap<String, List<String>> nodeToSourceTopics = new 
HashMap<>();
-
-    // map from source processor names to regex subscription patterns
-    private final HashMap<String, Pattern> nodeToSourcePatterns = new 
LinkedHashMap<>();
-
-    // map from sink processor names to subscribed topic (without 
application-id prefix for internal topics)
-    private final HashMap<String, String> nodeToSinkTopic = new HashMap<>();
-
-    // map from topics to their matched regex patterns, this is to ensure one 
topic is passed through on source node
-    // even if it can be matched by multiple regex patterns
-    private final HashMap<String, Pattern> topicToPatterns = new HashMap<>();
-
-    // map from state store names to all the topics subscribed from source 
processors that
-    // are connected to these state stores
-    private final Map<String, Set<String>> stateStoreNameToSourceTopics = new 
HashMap<>();
-
-    // map from state store names to all the regex subscribed topics from 
source processors that
-    // are connected to these state stores
-    private final Map<String, Set<Pattern>> stateStoreNameToSourceRegex = new 
HashMap<>();
-
-    // map from state store names to this state store's corresponding 
changelog topic if possible,
-    // this is used in the extended KStreamBuilder.
-    private final Map<String, String> storeToChangelogTopic = new HashMap<>();
-
-    // all global topics
-    private final Set<String> globalTopics = new HashSet<>();
-
-    private final Set<String> earliestResetTopics = new HashSet<>();
-
-    private final Set<String> latestResetTopics = new HashSet<>();
-
-    private final Set<Pattern> earliestResetPatterns = new HashSet<>();
-
-    private final Set<Pattern> latestResetPatterns = new HashSet<>();
-
-    private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
-
-    private SubscriptionUpdates subscriptionUpdates = new 
SubscriptionUpdates();
-
-    private String applicationId = null;
-
-    private Pattern topicPattern = null;
-
-    private Map<Integer, Set<String>> nodeGroups = null;
-
-    private static class StateStoreFactory {
-        public final Set<String> users;
-
-        public final StateStoreSupplier supplier;
-
-        StateStoreFactory(StateStoreSupplier supplier) {
-            this.supplier = supplier;
-            this.users = new HashSet<>();
-        }
-    }
-
-    private static abstract class NodeFactory {
-        final String name;
-        final String[] parents;
-
-        NodeFactory(final String name, final String[] parents) {
-            this.name = name;
-            this.parents = parents;
-        }
-
-        public abstract ProcessorNode build();
-
-        abstract TopologyDescription.AbstractNode describe();
-    }
-
-    private static class ProcessorNodeFactory extends NodeFactory {
-        private final ProcessorSupplier<?, ?> supplier;
-        private final Set<String> stateStoreNames = new HashSet<>();
-
-        ProcessorNodeFactory(String name, String[] parents, 
ProcessorSupplier<?, ?> supplier) {
-            super(name, parents.clone());
-            this.supplier = supplier;
-        }
-
-        public void addStateStore(String stateStoreName) {
-            stateStoreNames.add(stateStoreName);
-        }
-
-        @Override
-        public ProcessorNode build() {
-            return new ProcessorNode<>(name, supplier.get(), stateStoreNames);
-        }
-
-        @Override
-        TopologyDescription.Processor describe() {
-            return new TopologyDescription.Processor(name, new 
HashSet<>(stateStoreNames));
-        }
-    }
-
-    private class SourceNodeFactory extends NodeFactory {
-        private final List<String> topics;
-        private final Pattern pattern;
-        private final Deserializer<?> keyDeserializer;
-        private final Deserializer<?> valDeserializer;
-        private final TimestampExtractor timestampExtractor;
-
-        private SourceNodeFactory(final String name,
-                                  final String[] topics,
-                                  final Pattern pattern,
-                                  final TimestampExtractor timestampExtractor,
-                                  final Deserializer<?> keyDeserializer,
-                                  final Deserializer<?> valDeserializer) {
-            super(name, new String[0]);
-            this.topics = topics != null ? Arrays.asList(topics) : new 
ArrayList<String>();
-            this.pattern = pattern;
-            this.keyDeserializer = keyDeserializer;
-            this.valDeserializer = valDeserializer;
-            this.timestampExtractor = timestampExtractor;
-        }
-
-        List<String> getTopics(Collection<String> subscribedTopics) {
-            // if it is subscribed via patterns, it is possible that the topic 
metadata has not been updated
-            // yet and hence the map from source node to topics is stale, in 
this case we put the pattern as a place holder;
-            // this should only happen for debugging since during runtime this 
function should always be called after the metadata has updated.
-            if (subscribedTopics.isEmpty())
-                return Collections.singletonList("" + pattern + "");
-
-            List<String> matchedTopics = new ArrayList<>();
-            for (String update : subscribedTopics) {
-                if (this.pattern == topicToPatterns.get(update)) {
-                    matchedTopics.add(update);
-                } else if (topicToPatterns.containsKey(update) && 
isMatch(update)) {
-                    // the same topic cannot be matched to more than one 
pattern
-                    // TODO: we should lift this requirement in the future
-                    throw new TopologyBuilderException("Topic " + update +
-                            " is already matched for another regex pattern " + 
topicToPatterns.get(update) +
-                            " and hence cannot be matched to this regex 
pattern " + pattern + " any more.");
-                } else if (isMatch(update)) {
-                    topicToPatterns.put(update, this.pattern);
-                    matchedTopics.add(update);
-                }
-            }
-            return matchedTopics;
-        }
-
-        @Override
-        public ProcessorNode build() {
-            final List<String> sourceTopics = nodeToSourceTopics.get(name);
-
-            // if it is subscribed via patterns, it is possible that the topic 
metadata has not been updated
-            // yet and hence the map from source node to topics is stale, in 
this case we put the pattern as a place holder;
-            // this should only happen for debugging since during runtime this 
function should always be called after the metadata has updated.
-            if (sourceTopics == null)
-                return new SourceNode<>(name, Collections.singletonList("" + 
pattern + ""), timestampExtractor, keyDeserializer, valDeserializer);
-            else
-                return new SourceNode<>(name, 
maybeDecorateInternalSourceTopics(sourceTopics), timestampExtractor, 
keyDeserializer, valDeserializer);
-        }
-
-        private boolean isMatch(String topic) {
-            return this.pattern.matcher(topic).matches();
-        }
-
-        @Override
-        TopologyDescription.Source describe() {
-            String sourceTopics;
-
-            if (pattern == null) {
-                sourceTopics = topics.toString();
-                sourceTopics = sourceTopics.substring(1, sourceTopics.length() 
- 1); // trim first and last, ie. []
-            } else {
-                sourceTopics = pattern.toString();
-            }
-
-            return new TopologyDescription.Source(name, sourceTopics);
-        }
-    }
-
-    private class SinkNodeFactory<K, V> extends NodeFactory {
-        private final String topic;
-        private final Serializer<K> keySerializer;
-        private final Serializer<V> valSerializer;
-        private final StreamPartitioner<? super K, ? super V> partitioner;
-
-        private SinkNodeFactory(String name, String[] parents, String topic, 
Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? 
super K, ? super V> partitioner) {
-            super(name, parents.clone());
-            this.topic = topic;
-            this.keySerializer = keySerializer;
-            this.valSerializer = valSerializer;
-            this.partitioner = partitioner;
-        }
-
-        @Override
-        public ProcessorNode build() {
-            if (internalTopicNames.contains(topic)) {
-                // prefix the internal topic name with the application id
-                return new SinkNode<>(name, decorateTopic(topic), 
keySerializer, valSerializer, partitioner);
-            } else {
-                return new SinkNode<>(name, topic, keySerializer, 
valSerializer, partitioner);
-            }
-        }
-
-        @Override
-        TopologyDescription.Sink describe() {
-            return new TopologyDescription.Sink(name, topic);
-        }
-    }
+    /**
+     * NOTE this member would not needed by developers working with the 
processor APIs, but only used
+     * for internal functionalities.
+     * @deprecated not part of public API and for internal usage only
+     */
+    @Deprecated
+    public final InternalTopologyBuilder internalTopologyBuilder = new 
InternalTopologyBuilder();
 
+    /**
+     * NOTE this class would not needed by developers working with the 
processor APIs, but only used
+     * for internal functionalities.
+     * @deprecated not part of public API and for internal usage only
+     */
+    @Deprecated
     public static class TopicsInfo {
         public Set<String> sinkTopics;
         public Set<String> sourceTopics;
         public Map<String, InternalTopicConfig> stateChangelogTopics;
         public Map<String, InternalTopicConfig> repartitionSourceTopics;
 
-        TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, 
Map<String, InternalTopicConfig> repartitionSourceTopics, Map<String, 
InternalTopicConfig> stateChangelogTopics) {
+        public TopicsInfo(final Set<String> sinkTopics,
+                          final Set<String> sourceTopics,
+                          final Map<String, InternalTopicConfig> 
repartitionSourceTopics,
+                          final Map<String, InternalTopicConfig> 
stateChangelogTopics) {
             this.sinkTopics = sinkTopics;
             this.sourceTopics = sourceTopics;
             this.stateChangelogTopics = stateChangelogTopics;
@@ -303,10 +81,10 @@ public class TopologyBuilder {
         }
 
         @Override
-        public boolean equals(Object o) {
+        public boolean equals(final Object o) {
             if (o instanceof TopicsInfo) {
-                TopicsInfo other = (TopicsInfo) o;
-                return other.sourceTopics.equals(this.sourceTopics) && 
other.stateChangelogTopics.equals(this.stateChangelogTopics);
+                final TopicsInfo other = (TopicsInfo) o;
+                return other.sourceTopics.equals(sourceTopics) && 
other.stateChangelogTopics.equals(stateChangelogTopics);
             } else {
                 return false;
             }
@@ -314,7 +92,7 @@ public class TopologyBuilder {
 
         @Override
         public int hashCode() {
-            long n = ((long) sourceTopics.hashCode() << 32) | (long) 
stateChangelogTopics.hashCode();
+            final long n = ((long) sourceTopics.hashCode() << 32) | (long) 
stateChangelogTopics.hashCode();
             return (int) (n % 0xFFFFFFFFL);
         }
 
@@ -341,19 +119,10 @@ public class TopologyBuilder {
      */
     public TopologyBuilder() {}
 
-    /**
-     * Set the applicationId to be used for auto-generated internal topics.
-     *
-     * This is required before calling {@link #topicGroups}, {@link 
#copartitionSources},
-     * {@link #stateStoreNameToSourceTopics} and {@link #build(Integer)}.
-     *
-     * @param applicationId the streams applicationId. Should be the same as 
set by
-     * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG}
-     */
+    /** @deprecated This class is not part of public API and should never be 
used by a developer. */
+    @Deprecated
     public synchronized final TopologyBuilder setApplicationId(final String 
applicationId) {
-        Objects.requireNonNull(applicationId, "applicationId can't be null");
-        this.applicationId = applicationId;
-
+        internalTopologyBuilder.setApplicationId(applicationId);
         return this;
     }
 
@@ -369,8 +138,10 @@ public class TopologyBuilder {
      * @param topics the name of one or more Kafka topics that this source is 
to consume
      * @return this builder instance so methods can be chained together; never 
null
      */
-    public synchronized final TopologyBuilder addSource(final String name, 
final String... topics) {
-        return addSource(null, name, null, null, null, topics);
+    public synchronized final TopologyBuilder addSource(final String name,
+                                                        final String... 
topics) {
+        internalTopologyBuilder.addSource(null, name, null, null, null, 
topics);
+        return this;
     }
 
     /**
@@ -386,9 +157,11 @@ public class TopologyBuilder {
      * @param topics the name of one or more Kafka topics that this source is 
to consume
      * @return this builder instance so methods can be chained together; never 
null
      */
-
-    public synchronized final TopologyBuilder addSource(final AutoOffsetReset 
offsetReset, final String name, final String... topics) {
-        return addSource(offsetReset, name, null, null, null, topics);
+    public synchronized final TopologyBuilder addSource(final AutoOffsetReset 
offsetReset,
+                                                        final String name,
+                                                        final String... 
topics) {
+        internalTopologyBuilder.addSource(offsetReset, name, null, null, null, 
topics);
+        return this;
     }
 
     /**
@@ -404,8 +177,10 @@ public class TopologyBuilder {
      * @param topics             the name of one or more Kafka topics that 
this source is to consume
      * @return this builder instance so methods can be chained together; never 
null
      */
-    public synchronized final TopologyBuilder addSource(final 
TimestampExtractor timestampExtractor, final String name, final String... 
topics) {
-        return addSource(null, name, timestampExtractor, null, null, topics);
+    public synchronized final TopologyBuilder addSource(final 
TimestampExtractor timestampExtractor,
+                                                        final String name, 
final String... topics) {
+        internalTopologyBuilder.addSource(null, name, timestampExtractor, 
null, null, topics);
+        return this;
     }
 
     /**
@@ -423,8 +198,10 @@ public class TopologyBuilder {
      * @param topics             the name of one or more Kafka topics that 
this source is to consume
      * @return this builder instance so methods can be chained together; never 
null
      */
-    public synchronized final TopologyBuilder addSource(final AutoOffsetReset 
offsetReset, final TimestampExtractor timestampExtractor, final String name, 
final String... topics) {
-        return addSource(offsetReset, name, timestampExtractor, null, null, 
topics);
+    public synchronized final TopologyBuilder addSource(final AutoOffsetReset 
offsetReset,
+                                                        final 
TimestampExtractor timestampExtractor, final String name, final String... 
topics) {
+        internalTopologyBuilder.addSource(offsetReset, name, 
timestampExtractor, null, null, topics);
+        return this;
     }
 
     /**
@@ -440,9 +217,10 @@ public class TopologyBuilder {
      * @param topicPattern regular expression pattern to match Kafka topics 
that this source is to consume
      * @return this builder instance so methods can be chained together; never 
null
      */
-
-    public synchronized final TopologyBuilder addSource(final String name, 
final Pattern topicPattern) {
-        return addSource(null, name, null, null, null, topicPattern);
+    public synchronized final TopologyBuilder addSource(final String name,
+                                                        final Pattern 
topicPattern) {
+        internalTopologyBuilder.addSource(null, name, null, null, null, 
topicPattern);
+        return this;
     }
 
     /**
@@ -459,9 +237,11 @@ public class TopologyBuilder {
      * @param topicPattern regular expression pattern to match Kafka topics 
that this source is to consume
      * @return this builder instance so methods can be chained together; never 
null
      */
-
-    public synchronized final TopologyBuilder addSource(final AutoOffsetReset 
offsetReset, final String name, final Pattern topicPattern) {
-        return addSource(offsetReset, name, null, null, null, topicPattern);
+    public synchronized final TopologyBuilder addSource(final AutoOffsetReset 
offsetReset,
+                                                        final String name,
+                                                        final Pattern 
topicPattern) {
+        internalTopologyBuilder.addSource(offsetReset, name, null, null, null, 
topicPattern);
+        return this;
     }
 
 
@@ -479,8 +259,11 @@ public class TopologyBuilder {
      * @param topicPattern       regular expression pattern to match Kafka 
topics that this source is to consume
      * @return this builder instance so methods can be chained together; never 
null
      */
-    public synchronized final TopologyBuilder addSource(final 
TimestampExtractor timestampExtractor, final String name, final   Pattern 
topicPattern) {
-        return addSource(null, name, timestampExtractor, null, null, 
topicPattern);
+    public synchronized final TopologyBuilder addSource(final 
TimestampExtractor timestampExtractor,
+                                                        final String name,
+                                                        final Pattern 
topicPattern) {
+        internalTopologyBuilder.addSource(null, name, timestampExtractor, 
null, null, topicPattern);
+        return this;
     }
 
 
@@ -500,8 +283,12 @@ public class TopologyBuilder {
      * @param topicPattern       regular expression pattern to match Kafka 
topics that this source is to consume
      * @return this builder instance so methods can be chained together; never 
null
      */
-    public synchronized final TopologyBuilder addSource(final AutoOffsetReset 
offsetReset, final TimestampExtractor timestampExtractor, final String name, 
final Pattern topicPattern) {
-        return addSource(offsetReset, name, timestampExtractor, null, null, 
topicPattern);
+    public synchronized final TopologyBuilder addSource(final AutoOffsetReset 
offsetReset,
+                                                        final 
TimestampExtractor timestampExtractor,
+                                                        final String name,
+                                                        final Pattern 
topicPattern) {
+        internalTopologyBuilder.addSource(offsetReset, name, 
timestampExtractor, null, null, topicPattern);
+        return this;
     }
 
 
@@ -521,8 +308,12 @@ public class TopologyBuilder {
      * @throws TopologyBuilderException if processor is already added or if 
topics have already been registered by another source
      */
 
-    public synchronized final TopologyBuilder addSource(final String name, 
final Deserializer keyDeserializer, final Deserializer valDeserializer, final 
String... topics) {
-        return addSource(null, name, null, keyDeserializer, valDeserializer, 
topics);
+    public synchronized final TopologyBuilder addSource(final String name,
+                                                        final Deserializer 
keyDeserializer,
+                                                        final Deserializer 
valDeserializer,
+                                                        final String... 
topics) {
+        internalTopologyBuilder.addSource(null, name, null, keyDeserializer, 
valDeserializer, topics);
+        return this;
     }
 
     /**
@@ -550,24 +341,7 @@ public class TopologyBuilder {
                                                         final Deserializer 
keyDeserializer,
                                                         final Deserializer 
valDeserializer,
                                                         final String... 
topics) {
-        if (topics.length == 0) {
-            throw new TopologyBuilderException("You must provide at least one 
topic");
-        }
-        Objects.requireNonNull(name, "name must not be null");
-        if (nodeFactories.containsKey(name))
-            throw new TopologyBuilderException("Processor " + name + " is 
already added.");
-
-        for (String topic : topics) {
-            Objects.requireNonNull(topic, "topic names cannot be null");
-            validateTopicNotAlreadyRegistered(topic);
-            maybeAddToResetList(earliestResetTopics, latestResetTopics, 
offsetReset, topic);
-            sourceTopicNames.add(topic);
-        }
-
-        nodeFactories.put(name, new SourceNodeFactory(name, topics, null, 
timestampExtractor, keyDeserializer, valDeserializer));
-        nodeToSourceTopics.put(name, Arrays.asList(topics));
-        nodeGrouper.add(name);
-
+        internalTopologyBuilder.addSource(offsetReset, name, 
timestampExtractor, keyDeserializer, valDeserializer, topics);
         return this;
     }
 
@@ -600,11 +374,10 @@ public class TopologyBuilder {
                                                        final String topic,
                                                        final String 
processorName,
                                                        final ProcessorSupplier 
stateUpdateSupplier) {
-        return addGlobalStore(storeSupplier, sourceName, null, 
keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, 
null, keyDeserializer, valueDeserializer, topic, processorName, 
stateUpdateSupplier);
+        return this;
     }
 
-
-
     /**
      * Adds a global {@link StateStore} to the topology. The {@link 
StateStore} sources its data
      * from all partitions of the provided input topic. There will be exactly 
one instance of this
@@ -636,58 +409,8 @@ public class TopologyBuilder {
                                                        final String topic,
                                                        final String 
processorName,
                                                        final ProcessorSupplier 
stateUpdateSupplier) {
-        Objects.requireNonNull(storeSupplier, "store supplier must not be 
null");
-        Objects.requireNonNull(sourceName, "sourceName must not be null");
-        Objects.requireNonNull(topic, "topic must not be null");
-        Objects.requireNonNull(stateUpdateSupplier, "supplier must not be 
null");
-        Objects.requireNonNull(processorName, "processorName must not be 
null");
-        if (nodeFactories.containsKey(sourceName)) {
-            throw new TopologyBuilderException("Processor " + sourceName + " 
is already added.");
-        }
-        if (nodeFactories.containsKey(processorName)) {
-            throw new TopologyBuilderException("Processor " + processorName + 
" is already added.");
-        }
-        if (stateFactories.containsKey(storeSupplier.name()) || 
globalStateStores.containsKey(storeSupplier.name())) {
-            throw new TopologyBuilderException("StateStore " + 
storeSupplier.name() + " is already added.");
-        }
-        if (storeSupplier.loggingEnabled()) {
-            throw new TopologyBuilderException("StateStore " + 
storeSupplier.name() + " for global table must not have logging enabled.");
-        }
-        if (sourceName.equals(processorName)) {
-            throw new TopologyBuilderException("sourceName and processorName 
must be different.");
-        }
-
-        validateTopicNotAlreadyRegistered(topic);
-
-        globalTopics.add(topic);
-        final String[] topics = {topic};
-        nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, 
topics, null, timestampExtractor, keyDeserializer, valueDeserializer));
-        nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
-        nodeGrouper.add(sourceName);
-
-        final String[] parents = {sourceName};
-        final ProcessorNodeFactory nodeFactory = new 
ProcessorNodeFactory(processorName, parents, stateUpdateSupplier);
-        nodeFactory.addStateStore(storeSupplier.name());
-        nodeFactories.put(processorName, nodeFactory);
-        nodeGrouper.add(processorName);
-        nodeGrouper.unite(processorName, parents);
-
-        globalStateStores.put(storeSupplier.name(), storeSupplier.get());
-        connectSourceStoreAndTopic(storeSupplier.name(), topic);
+        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, 
timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, 
stateUpdateSupplier);
         return this;
-
-    }
-
-    private void validateTopicNotAlreadyRegistered(final String topic) {
-        if (sourceTopicNames.contains(topic) || globalTopics.contains(topic)) {
-            throw new TopologyBuilderException("Topic " + topic + " has 
already been registered by another source.");
-        }
-
-        for (Pattern pattern : nodeToSourcePatterns.values()) {
-            if (pattern.matcher(topic).matches()) {
-                throw new TopologyBuilderException("Topic " + topic + " 
matches a Pattern already registered by another source.");
-            }
-        }
     }
 
     /**
@@ -708,9 +431,12 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never 
null
      * @throws TopologyBuilderException if processor is already added or if 
topics have already been registered by name
      */
-
-    public synchronized final TopologyBuilder addSource(final String name, 
final Deserializer keyDeserializer, final Deserializer valDeserializer, final 
Pattern topicPattern) {
-        return addSource(null, name, null, keyDeserializer, valDeserializer, 
topicPattern);
+    public synchronized final TopologyBuilder addSource(final String name,
+                                                        final Deserializer 
keyDeserializer,
+                                                        final Deserializer 
valDeserializer,
+                                                        final Pattern 
topicPattern) {
+        internalTopologyBuilder.addSource(null, name, null, keyDeserializer, 
valDeserializer, topicPattern);
+        return this;
     }
 
     /**
@@ -734,36 +460,16 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never 
null
      * @throws TopologyBuilderException if processor is already added or if 
topics have already been registered by name
      */
-
     public synchronized final TopologyBuilder addSource(final AutoOffsetReset 
offsetReset,
                                                         final String name,
                                                         final 
TimestampExtractor timestampExtractor,
                                                         final Deserializer 
keyDeserializer,
                                                         final Deserializer 
valDeserializer,
                                                         final Pattern 
topicPattern) {
-        Objects.requireNonNull(topicPattern, "topicPattern can't be null");
-        Objects.requireNonNull(name, "name can't be null");
-
-        if (nodeFactories.containsKey(name)) {
-            throw new TopologyBuilderException("Processor " + name + " is 
already added.");
-        }
-
-        for (String sourceTopicName : sourceTopicNames) {
-            if (topicPattern.matcher(sourceTopicName).matches()) {
-                throw new TopologyBuilderException("Pattern  " + topicPattern 
+ " will match a topic that has already been registered by another source.");
-            }
-        }
-
-        maybeAddToResetList(earliestResetPatterns, latestResetPatterns, 
offsetReset, topicPattern);
-
-        nodeFactories.put(name, new SourceNodeFactory(name, null, 
topicPattern, timestampExtractor, keyDeserializer, valDeserializer));
-        nodeToSourcePatterns.put(name, topicPattern);
-        nodeGrouper.add(name);
-
+        internalTopologyBuilder.addSource(offsetReset, name, 
timestampExtractor, keyDeserializer, valDeserializer, topicPattern);
         return this;
     }
 
-
     /**
      * Add a new source that consumes from topics matching the given pattern
      * and forwards the records to child processor and/or sink nodes.
@@ -783,37 +489,40 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never 
null
      * @throws TopologyBuilderException if processor is already added or if 
topics have already been registered by name
      */
-
     public synchronized final TopologyBuilder addSource(final AutoOffsetReset 
offsetReset,
                                                         final String name,
                                                         final Deserializer 
keyDeserializer,
                                                         final Deserializer 
valDeserializer,
                                                         final Pattern 
topicPattern) {
-        return addSource(offsetReset, name, null, keyDeserializer, 
valDeserializer, topicPattern);
+        internalTopologyBuilder.addSource(offsetReset, name, null, 
keyDeserializer, valDeserializer, topicPattern);
+        return this;
     }
 
 
     /**
-     * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to the named Kafka topic.
+     * Add a new sink that forwards records from predecessor nodes (processors 
and/or sources) to the named Kafka topic.
      * The sink will use the {@link 
org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default 
key serializer} and
      * {@link 
org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default 
value serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
      *
      * @param name the unique name of the sink
      * @param topic the name of the Kafka topic to which this sink should 
write its records
-     * @param parentNames the name of one or more source or processor nodes 
whose output records this sink should consume
+     * @param predecessorNames the name of one or more source or processor 
nodes whose output records this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never 
null
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
      */
-    public synchronized final TopologyBuilder addSink(final String name, final 
String topic, final String... parentNames) {
-        return addSink(name, topic, null, null, parentNames);
+    public synchronized final TopologyBuilder addSink(final String name,
+                                                      final String topic,
+                                                      final String... 
predecessorNames) {
+        internalTopologyBuilder.addSink(name, topic, null, null, null, 
predecessorNames);
+        return this;
     }
 
     /**
-     * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to the named Kafka topic, using
+     * Add a new sink that forwards records from predecessor nodes (processors 
and/or sources) to the named Kafka topic, using
      * the supplied partitioner.
      * The sink will use the {@link 
org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default 
key serializer} and
      * {@link 
org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default 
value serializer} specified in the
@@ -828,19 +537,23 @@ public class TopologyBuilder {
      * @param name the unique name of the sink
      * @param topic the name of the Kafka topic to which this sink should 
write its records
      * @param partitioner the function that should be used to determine the 
partition for each record processed by the sink
-     * @param parentNames the name of one or more source or processor nodes 
whose output records this sink should consume
+     * @param predecessorNames the name of one or more source or processor 
nodes whose output records this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never 
null
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
      */
-    public synchronized final TopologyBuilder addSink(final String name, final 
String topic, final StreamPartitioner partitioner, final String... parentNames) 
{
-        return addSink(name, topic, null, null, partitioner, parentNames);
+    public synchronized final TopologyBuilder addSink(final String name,
+                                                      final String topic,
+                                                      final StreamPartitioner 
partitioner,
+                                                      final String... 
predecessorNames) {
+        internalTopologyBuilder.addSink(name, topic, null, null, partitioner, 
predecessorNames);
+        return this;
     }
 
     /**
-     * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to the named Kafka topic.
+     * Add a new sink that forwards records from predecessor nodes (processors 
and/or sources) to the named Kafka topic.
      * The sink will use the specified key and value serializers.
      *
      * @param name the unique name of the sink
@@ -851,19 +564,24 @@ public class TopologyBuilder {
      * @param valSerializer the {@link Serializer value serializer} used when 
consuming records; may be null if the sink
      * should use the {@link 
org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default 
value serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
-     * @param parentNames the name of one or more source or processor nodes 
whose output records this sink should consume
+     * @param predecessorNames the name of one or more source or processor 
nodes whose output records this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never 
null
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
      */
-    public synchronized final TopologyBuilder addSink(final String name, final 
String topic, final Serializer keySerializer, final Serializer valSerializer, 
final String... parentNames) {
-        return addSink(name, topic, keySerializer, valSerializer, null, 
parentNames);
+    public synchronized final TopologyBuilder addSink(final String name,
+                                                      final String topic,
+                                                      final Serializer 
keySerializer,
+                                                      final Serializer 
valSerializer,
+                                                      final String... 
predecessorNames) {
+        internalTopologyBuilder.addSink(name, topic, keySerializer, 
valSerializer, null, predecessorNames);
+        return this;
     }
 
     /**
-     * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to the named Kafka topic.
+     * Add a new sink that forwards records from predecessor nodes (processors 
and/or sources) to the named Kafka topic.
      * The sink will use the specified key and value serializers, and the 
supplied partitioner.
      *
      * @param name the unique name of the sink
@@ -875,66 +593,41 @@ public class TopologyBuilder {
      * should use the {@link 
org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default 
value serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param partitioner the function that should be used to determine the 
partition for each record processed by the sink
-     * @param parentNames the name of one or more source or processor nodes 
whose output records this sink should consume
+     * @param predecessorNames the name of one or more source or processor 
nodes whose output records this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never 
null
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
-     * @throws TopologyBuilderException if parent processor is not added yet, 
or if this processor's name is equal to the parent's name
-     */
-    public synchronized final <K, V> TopologyBuilder addSink(final String 
name, final String topic, final Serializer<K> keySerializer, final 
Serializer<V> valSerializer, final StreamPartitioner<? super K, ? super V> 
partitioner, final String... parentNames) {
-        Objects.requireNonNull(name, "name must not be null");
-        Objects.requireNonNull(topic, "topic must not be null");
-        if (nodeFactories.containsKey(name))
-            throw new TopologyBuilderException("Processor " + name + " is 
already added.");
-
-        for (final String parent : parentNames) {
-            if (parent.equals(name)) {
-                throw new TopologyBuilderException("Processor " + name + " 
cannot be a parent of itself.");
-            }
-            if (!nodeFactories.containsKey(parent)) {
-                throw new TopologyBuilderException("Parent processor " + 
parent + " is not added yet.");
-            }
-        }
-
-        nodeFactories.put(name, new SinkNodeFactory<>(name, parentNames, 
topic, keySerializer, valSerializer, partitioner));
-        nodeToSinkTopic.put(name, topic);
-        nodeGrouper.add(name);
-        nodeGrouper.unite(name, parentNames);
+     * @throws TopologyBuilderException if predecessor is not added yet, or if 
this processor's name is equal to the predecessor's name
+     */
+    public synchronized final <K, V> TopologyBuilder addSink(final String name,
+                                                             final String 
topic,
+                                                             final 
Serializer<K> keySerializer,
+                                                             final 
Serializer<V> valSerializer,
+                                                             final 
StreamPartitioner<? super K, ? super V> partitioner,
+                                                             final String... 
predecessorNames) {
+        internalTopologyBuilder.addSink(name, topic, keySerializer, 
valSerializer, partitioner, predecessorNames);
         return this;
     }
 
     /**
-     * Add a new processor node that receives and processes records output by 
one or more parent source or processor node.
+     * Add a new processor node that receives and processes records output by 
one or more predecessor source or processor node.
      * Any new record output by this processor will be forwarded to its child 
processor or sink nodes.
      * @param name the unique name of the processor node
      * @param supplier the supplier used to obtain this node's {@link 
Processor} instance
-     * @param parentNames the name of one or more source or processor nodes 
whose output records this processor should receive
+     * @param predecessorNames the name of one or more source or processor 
nodes whose output records this processor should receive
      * and process
      * @return this builder instance so methods can be chained together; never 
null
-     * @throws TopologyBuilderException if parent processor is not added yet, 
or if this processor's name is equal to the parent's name
+     * @throws TopologyBuilderException if predecessor is not added yet, or if 
this processor's name is equal to the predecessor's name
      */
-    public synchronized final TopologyBuilder addProcessor(final String name, 
final ProcessorSupplier supplier, final String... parentNames) {
-        Objects.requireNonNull(name, "name must not be null");
-        Objects.requireNonNull(supplier, "supplier must not be null");
-        if (nodeFactories.containsKey(name))
-            throw new TopologyBuilderException("Processor " + name + " is 
already added.");
-
-        for (final String parent : parentNames) {
-            if (parent.equals(name)) {
-                throw new TopologyBuilderException("Processor " + name + " 
cannot be a parent of itself.");
-            }
-            if (!nodeFactories.containsKey(parent)) {
-                throw new TopologyBuilderException("Parent processor " + 
parent + " is not added yet.");
-            }
-        }
-
-        nodeFactories.put(name, new ProcessorNodeFactory(name, parentNames, 
supplier));
-        nodeGrouper.add(name);
-        nodeGrouper.unite(name, parentNames);
+    public synchronized final TopologyBuilder addProcessor(final String name,
+                                                           final 
ProcessorSupplier supplier,
+                                                           final String... 
predecessorNames) {
+        internalTopologyBuilder.addProcessor(name, supplier, predecessorNames);
         return this;
     }
+
     /**
      * Adds a state store
      *
@@ -942,20 +635,9 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never 
null
      * @throws TopologyBuilderException if state store supplier is already 
added
      */
-    public synchronized final TopologyBuilder addStateStore(final 
StateStoreSupplier supplier, final String... processorNames) {
-        Objects.requireNonNull(supplier, "supplier can't be null");
-        if (stateFactories.containsKey(supplier.name())) {
-            throw new TopologyBuilderException("StateStore " + supplier.name() 
+ " is already added.");
-        }
-
-        stateFactories.put(supplier.name(), new StateStoreFactory(supplier));
-
-        if (processorNames != null) {
-            for (String processorName : processorNames) {
-                connectProcessorAndStateStore(processorName, supplier.name());
-            }
-        }
-
+    public synchronized final TopologyBuilder addStateStore(final 
StateStoreSupplier supplier,
+                                                            final String... 
processorNames) {
+        internalTopologyBuilder.addStateStore(supplier, processorNames);
         return this;
     }
 
@@ -966,26 +648,25 @@ public class TopologyBuilder {
      * @param stateStoreNames the names of state stores that the processor uses
      * @return this builder instance so methods can be chained together; never 
null
      */
-    public synchronized final TopologyBuilder 
connectProcessorAndStateStores(final String processorName, final String... 
stateStoreNames) {
-        Objects.requireNonNull(processorName, "processorName can't be null");
-        if (stateStoreNames != null) {
-            for (String stateStoreName : stateStoreNames) {
-                connectProcessorAndStateStore(processorName, stateStoreName);
-            }
-        }
-
+    public synchronized final TopologyBuilder 
connectProcessorAndStateStores(final String processorName,
+                                                                             
final String... stateStoreNames) {
+        internalTopologyBuilder.connectProcessorAndStateStores(processorName, 
stateStoreNames);
         return this;
     }
 
     /**
      * This is used only for KStreamBuilder: when adding a KTable from a 
source topic,
      * we need to add the topic as the KTable's materialized state store's 
changelog.
+     *
+     * NOTE this function would not needed by developers working with the 
processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
+     * @deprecated not part of public API and for internal usage only
      */
-    protected synchronized final TopologyBuilder 
connectSourceStoreAndTopic(final String sourceStoreName, final String topic) {
-        if (storeToChangelogTopic.containsKey(sourceStoreName)) {
-            throw new TopologyBuilderException("Source store " + 
sourceStoreName + " is already added.");
-        }
-        storeToChangelogTopic.put(sourceStoreName, topic);
+    @Deprecated
+    protected synchronized final TopologyBuilder 
connectSourceStoreAndTopic(final String sourceStoreName,
+                                                                            
final String topic) {
+        internalTopologyBuilder.connectSourceStoreAndTopic(sourceStoreName, 
topic);
         return this;
     }
 
@@ -998,678 +679,206 @@ public class TopologyBuilder {
      * @param processorNames the name of the processors
      * @return this builder instance so methods can be chained together; never 
null
      * @throws TopologyBuilderException if less than two processors are 
specified, or if one of the processors is not added yet
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public synchronized final TopologyBuilder connectProcessors(final 
String... processorNames) {
-        if (processorNames.length < 2)
-            throw new TopologyBuilderException("At least two processors need 
to participate in the connection.");
-
-        for (String processorName : processorNames) {
-            if (!nodeFactories.containsKey(processorName))
-                throw new TopologyBuilderException("Processor " + 
processorName + " is not added yet.");
-
-        }
-
-        String firstProcessorName = processorNames[0];
-
-        nodeGrouper.unite(firstProcessorName, 
Arrays.copyOfRange(processorNames, 1, processorNames.length));
-
+        internalTopologyBuilder.connectProcessors(processorNames);
         return this;
     }
 
     /**
      * Adds an internal topic
      *
+     * NOTE this function would not needed by developers working with the 
processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
      * @param topicName the name of the topic
      * @return this builder instance so methods can be chained together; never 
null
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public synchronized final TopologyBuilder addInternalTopic(final String 
topicName) {
-        Objects.requireNonNull(topicName, "topicName can't be null");
-        this.internalTopicNames.add(topicName);
-
+        internalTopologyBuilder.addInternalTopic(topicName);
         return this;
     }
 
     /**
      * Asserts that the streams of the specified source nodes must be 
copartitioned.
      *
+     * NOTE this function would not needed by developers working with the 
processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
      * @param sourceNodes a set of source node names
      * @return this builder instance so methods can be chained together; never 
null
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public synchronized final TopologyBuilder copartitionSources(final 
Collection<String> sourceNodes) {
-        copartitionSourceGroups.add(Collections.unmodifiableSet(new 
HashSet<>(sourceNodes)));
+        internalTopologyBuilder.copartitionSources(sourceNodes);
         return this;
     }
 
-    private void connectProcessorAndStateStore(final String processorName, 
final String stateStoreName) {
-        if (!stateFactories.containsKey(stateStoreName))
-            throw new TopologyBuilderException("StateStore " + stateStoreName 
+ " is not added yet.");
-        if (!nodeFactories.containsKey(processorName))
-            throw new TopologyBuilderException("Processor " + processorName + 
" is not added yet.");
-
-        final StateStoreFactory stateStoreFactory = 
stateFactories.get(stateStoreName);
-        final Iterator<String> iter = stateStoreFactory.users.iterator();
-        if (iter.hasNext()) {
-            final String user = iter.next();
-            nodeGrouper.unite(user, processorName);
-        }
-        stateStoreFactory.users.add(processorName);
-
-        NodeFactory nodeFactory = nodeFactories.get(processorName);
-        if (nodeFactory instanceof ProcessorNodeFactory) {
-            final ProcessorNodeFactory processorNodeFactory = 
(ProcessorNodeFactory) nodeFactory;
-            processorNodeFactory.addStateStore(stateStoreName);
-            connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, 
processorNodeFactory);
-        } else {
-            throw new TopologyBuilderException("cannot connect a state store " 
+ stateStoreName + " to a source node or a sink node.");
-        }
-    }
-
-    private Set<SourceNodeFactory> findSourcesForProcessorParents(final 
String[] parents) {
-        final Set<SourceNodeFactory> sourceNodes = new HashSet<>();
-        for (String parent : parents) {
-            final NodeFactory nodeFactory = nodeFactories.get(parent);
-            if (nodeFactory instanceof SourceNodeFactory) {
-                sourceNodes.add((SourceNodeFactory) nodeFactory);
-            } else if (nodeFactory instanceof ProcessorNodeFactory) {
-                
sourceNodes.addAll(findSourcesForProcessorParents(((ProcessorNodeFactory) 
nodeFactory).parents));
-            }
-        }
-        return sourceNodes;
-    }
-
-    private void connectStateStoreNameToSourceTopicsOrPattern(final String 
stateStoreName,
-                                                              final 
ProcessorNodeFactory processorNodeFactory) {
-
-        // we should never update the mapping from state store names to source 
topics if the store name already exists
-        // in the map; this scenario is possible, for example, that a state 
store underlying a source KTable is
-        // connecting to a join operator whose source topic is not the 
original KTable's source topic but an internal repartition topic.
-
-        if (stateStoreNameToSourceTopics.containsKey(stateStoreName) || 
stateStoreNameToSourceRegex.containsKey(stateStoreName)) {
-            return;
-        }
-
-        final Set<String> sourceTopics = new HashSet<>();
-        final Set<Pattern> sourcePatterns = new HashSet<>();
-        final Set<SourceNodeFactory> sourceNodesForParent = 
findSourcesForProcessorParents(processorNodeFactory.parents);
-
-        for (SourceNodeFactory sourceNodeFactory : sourceNodesForParent) {
-            if (sourceNodeFactory.pattern != null) {
-                sourcePatterns.add(sourceNodeFactory.pattern);
-            } else {
-                sourceTopics.addAll(sourceNodeFactory.topics);
-            }
-        }
-        
-        if (!sourceTopics.isEmpty()) {
-            stateStoreNameToSourceTopics.put(stateStoreName,
-                    Collections.unmodifiableSet(sourceTopics));
-        }
-
-        if (!sourcePatterns.isEmpty()) {
-            stateStoreNameToSourceRegex.put(stateStoreName,
-                    Collections.unmodifiableSet(sourcePatterns));
-        }
-
-    }
-
-
-    private <T> void maybeAddToResetList(final Collection<T> earliestResets, 
final Collection<T> latestResets, final AutoOffsetReset offsetReset, final T 
item) {
-        if (offsetReset != null) {
-            switch (offsetReset) {
-                case EARLIEST:
-                    earliestResets.add(item);
-                    break;
-                case LATEST:
-                    latestResets.add(item);
-                    break;
-                default:
-                    throw new 
TopologyBuilderException(String.format("Unrecognized reset format %s", 
offsetReset));
-            }
-        }
-    }
-
     /**
      * Returns the map of node groups keyed by the topic group id.
      *
+     * NOTE this function would not needed by developers working with the 
processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
      * @return groups of node names
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public synchronized Map<Integer, Set<String>> nodeGroups() {
-        if (nodeGroups == null)
-            nodeGroups = makeNodeGroups();
-
-        return nodeGroups;
-    }
-
-    private Map<Integer, Set<String>> makeNodeGroups() {
-        final HashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
-        final HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
-
-        int nodeGroupId = 0;
-
-        // Go through source nodes first. This makes the group id assignment 
easy to predict in tests
-        final HashSet<String> allSourceNodes = new 
HashSet<>(nodeToSourceTopics.keySet());
-        allSourceNodes.addAll(nodeToSourcePatterns.keySet());
-
-        for (String nodeName : Utils.sorted(allSourceNodes)) {
-            final String root = nodeGrouper.root(nodeName);
-            Set<String> nodeGroup = rootToNodeGroup.get(root);
-            if (nodeGroup == null) {
-                nodeGroup = new HashSet<>();
-                rootToNodeGroup.put(root, nodeGroup);
-                nodeGroups.put(nodeGroupId++, nodeGroup);
-            }
-            nodeGroup.add(nodeName);
-        }
-
-        // Go through non-source nodes
-        for (String nodeName : Utils.sorted(nodeFactories.keySet())) {
-            if (!nodeToSourceTopics.containsKey(nodeName)) {
-                final String root = nodeGrouper.root(nodeName);
-                Set<String> nodeGroup = rootToNodeGroup.get(root);
-                if (nodeGroup == null) {
-                    nodeGroup = new HashSet<>();
-                    rootToNodeGroup.put(root, nodeGroup);
-                    nodeGroups.put(nodeGroupId++, nodeGroup);
-                }
-                nodeGroup.add(nodeName);
-            }
-        }
-
-        return nodeGroups;
+        return internalTopologyBuilder.nodeGroups();
     }
 
     /**
      * Build the topology for the specified topic group. This is called 
automatically when passing this builder into the
      * {@link 
org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, 
org.apache.kafka.streams.StreamsConfig)} constructor.
      *
+     * NOTE this function would not needed by developers working with the 
processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
      * @see 
org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, 
org.apache.kafka.streams.StreamsConfig)
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public synchronized ProcessorTopology build(final Integer topicGroupId) {
-        Set<String> nodeGroup;
-        if (topicGroupId != null) {
-            nodeGroup = nodeGroups().get(topicGroupId);
-        } else {
-            // when topicGroupId is null, we build the full topology minus the 
global groups
-            final Set<String> globalNodeGroups = globalNodeGroups();
-            final Collection<Set<String>> values = nodeGroups().values();
-            nodeGroup = new HashSet<>();
-            for (Set<String> value : values) {
-                nodeGroup.addAll(value);
-            }
-            nodeGroup.removeAll(globalNodeGroups);
-
-
-        }
-        return build(nodeGroup);
+        return internalTopologyBuilder.build(topicGroupId);
     }
 
     /**
      * Builds the topology for any global state stores
+     *
+     * NOTE this function would not needed by developers working with the 
processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
      * @return ProcessorTopology
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public synchronized ProcessorTopology buildGlobalStateTopology() {
-        final Set<String> globalGroups = globalNodeGroups();
-        if (globalGroups.isEmpty()) {
-            return null;
-        }
-        return build(globalGroups);
-    }
-
-    private Set<String> globalNodeGroups() {
-        final Set<String> globalGroups = new HashSet<>();
-        for (final Map.Entry<Integer, Set<String>> nodeGroup : 
nodeGroups().entrySet()) {
-            final Set<String> nodes = nodeGroup.getValue();
-            for (String node : nodes) {
-                if (isGlobalSource(node)) {
-                    globalGroups.addAll(nodes);
-                }
-            }
-        }
-        return globalGroups;
-    }
-
-    private ProcessorTopology build(final Set<String> nodeGroup) {
-        final List<ProcessorNode> processorNodes = new 
ArrayList<>(nodeFactories.size());
-        final Map<String, ProcessorNode> processorMap = new HashMap<>();
-        final Map<String, SourceNode> topicSourceMap = new HashMap<>();
-        final Map<String, SinkNode> topicSinkMap = new HashMap<>();
-        final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
-
-        // create processor nodes in a topological order ("nodeFactories" is 
already topologically sorted)
-        for (NodeFactory factory : nodeFactories.values()) {
-            if (nodeGroup == null || nodeGroup.contains(factory.name)) {
-                final ProcessorNode node = factory.build();
-                processorNodes.add(node);
-                processorMap.put(node.name(), node);
-
-                if (factory instanceof ProcessorNodeFactory) {
-                    for (String parent : ((ProcessorNodeFactory) 
factory).parents) {
-                        final ProcessorNode<?, ?> parentNode = 
processorMap.get(parent);
-                        parentNode.addChild(node);
-                    }
-                    for (String stateStoreName : ((ProcessorNodeFactory) 
factory).stateStoreNames) {
-                        if (!stateStoreMap.containsKey(stateStoreName)) {
-                            StateStore stateStore;
-
-                            if (stateFactories.containsKey(stateStoreName)) {
-                                final StateStoreSupplier supplier = 
stateFactories.get(stateStoreName).supplier;
-                                stateStore = supplier.get();
-
-                                // remember the changelog topic if this state 
store is change-logging enabled
-                                if (supplier.loggingEnabled() && 
!storeToChangelogTopic.containsKey(stateStoreName)) {
-                                    final String changelogTopic = 
ProcessorStateManager.storeChangelogTopic(this.applicationId, stateStoreName);
-                                    storeToChangelogTopic.put(stateStoreName, 
changelogTopic);
-                                }
-                            } else {
-                                stateStore = 
globalStateStores.get(stateStoreName);
-                            }
-
-                            stateStoreMap.put(stateStoreName, stateStore);
-                        }
-                    }
-                } else if (factory instanceof SourceNodeFactory) {
-                    final SourceNodeFactory sourceNodeFactory = 
(SourceNodeFactory) factory;
-                    final List<String> topics = (sourceNodeFactory.pattern != 
null) ?
-                            
sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) :
-                            sourceNodeFactory.topics;
-
-                    for (String topic : topics) {
-                        if (internalTopicNames.contains(topic)) {
-                            // prefix the internal topic name with the 
application id
-                            topicSourceMap.put(decorateTopic(topic), 
(SourceNode) node);
-                        } else {
-                            topicSourceMap.put(topic, (SourceNode) node);
-                        }
-                    }
-                } else if (factory instanceof SinkNodeFactory) {
-                    final SinkNodeFactory sinkNodeFactory = (SinkNodeFactory) 
factory;
-
-                    for (String parent : sinkNodeFactory.parents) {
-                        processorMap.get(parent).addChild(node);
-                        if 
(internalTopicNames.contains(sinkNodeFactory.topic)) {
-                            // prefix the internal topic name with the 
application id
-                            
topicSinkMap.put(decorateTopic(sinkNodeFactory.topic), (SinkNode) node);
-                        } else {
-                            topicSinkMap.put(sinkNodeFactory.topic, (SinkNode) 
node);
-                        }
-                    }
-                } else {
-                    throw new TopologyBuilderException("Unknown definition 
class: " + factory.getClass().getName());
-                }
-            }
-        }
-
-        return new ProcessorTopology(processorNodes, topicSourceMap, 
topicSinkMap, new ArrayList<>(stateStoreMap.values()), storeToChangelogTopic, 
new ArrayList<>(globalStateStores.values()));
+        return internalTopologyBuilder.buildGlobalStateTopology();
     }
 
     /**
      * Get any global {@link StateStore}s that are part of the
      * topology
+     *
+     * NOTE this function would not needed by developers working with the 
processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
      * @return map containing all global {@link StateStore}s
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public Map<String, StateStore> globalStateStores() {
-        return Collections.unmodifiableMap(globalStateStores);
+        return internalTopologyBuilder.globalStateStores();
     }
 
     /**
      * Returns the map of topic groups keyed by the group id.
      * A topic group is a group of topics in the same task.
      *
+     * NOTE this function would not needed by developers working with the 
processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
      * @return groups of topic names
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public synchronized Map<Integer, TopicsInfo> topicGroups() {
-        final Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
-
-        if (nodeGroups == null)
-            nodeGroups = makeNodeGroups();
-
-        for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
-            final Set<String> sinkTopics = new HashSet<>();
-            final Set<String> sourceTopics = new HashSet<>();
-            final Map<String, InternalTopicConfig> internalSourceTopics = new 
HashMap<>();
-            final Map<String, InternalTopicConfig> stateChangelogTopics = new 
HashMap<>();
-            for (String node : entry.getValue()) {
-                // if the node is a source node, add to the source topics
-                final List<String> topics = nodeToSourceTopics.get(node);
-                if (topics != null) {
-                    // if some of the topics are internal, add them to the 
internal topics
-                    for (String topic : topics) {
-                        // skip global topic as they don't need partition 
assignment
-                        if (globalTopics.contains(topic)) {
-                            continue;
-                        }
-                        if (this.internalTopicNames.contains(topic)) {
-                            // prefix the internal topic name with the 
application id
-                            final String internalTopic = decorateTopic(topic);
-                            internalSourceTopics.put(internalTopic, new 
InternalTopicConfig(internalTopic,
-                                                                               
             Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
-                                                                               
             Collections.<String, String>emptyMap()));
-                            sourceTopics.add(internalTopic);
-                        } else {
-                            sourceTopics.add(topic);
-                        }
-                    }
-                }
-
-                // if the node is a sink node, add to the sink topics
-                final String topic = nodeToSinkTopic.get(node);
-                if (topic != null) {
-                    if (internalTopicNames.contains(topic)) {
-                        // prefix the change log topic name with the 
application id
-                        sinkTopics.add(decorateTopic(topic));
-                    } else {
-                        sinkTopics.add(topic);
-                    }
-                }
-
-                // if the node is connected to a state, add to the state topics
-                for (StateStoreFactory stateFactory : stateFactories.values()) 
{
-                    final StateStoreSupplier supplier = stateFactory.supplier;
-                    if (supplier.loggingEnabled() && 
stateFactory.users.contains(node)) {
-                        final String name = 
ProcessorStateManager.storeChangelogTopic(applicationId, supplier.name());
-                        final InternalTopicConfig internalTopicConfig = 
createInternalTopicConfig(supplier, name);
-                        stateChangelogTopics.put(name, internalTopicConfig);
-                    }
-                }
-            }
-            if (!sourceTopics.isEmpty()) {
-                topicGroups.put(entry.getKey(), new TopicsInfo(
-                        Collections.unmodifiableSet(sinkTopics),
-                        Collections.unmodifiableSet(sourceTopics),
-                        Collections.unmodifiableMap(internalSourceTopics),
-                        Collections.unmodifiableMap(stateChangelogTopics)));
-            }
-        }
-
-        return Collections.unmodifiableMap(topicGroups);
-    }
-
-    private void setRegexMatchedTopicsToSourceNodes() {
-        if (subscriptionUpdates.hasUpdates()) {
-            for (Map.Entry<String, Pattern> stringPatternEntry : 
nodeToSourcePatterns.entrySet()) {
-                final SourceNodeFactory sourceNode = (SourceNodeFactory) 
nodeFactories.get(stringPatternEntry.getKey());
-                //need to update nodeToSourceTopics with topics matched from 
given regex
-                nodeToSourceTopics.put(stringPatternEntry.getKey(), 
sourceNode.getTopics(subscriptionUpdates.getUpdates()));
-                log.debug("nodeToSourceTopics {}", nodeToSourceTopics);
-            }
-        }
-    }
-
-    private void setRegexMatchedTopicToStateStore() {
-        if (subscriptionUpdates.hasUpdates()) {
-            for (Map.Entry<String, Set<Pattern>> storePattern : 
stateStoreNameToSourceRegex.entrySet()) {
-                final Set<String> updatedTopicsForStateStore = new HashSet<>();
-                for (String subscriptionUpdateTopic : 
subscriptionUpdates.getUpdates()) {
-                    for (Pattern pattern : storePattern.getValue()) {
-                        if 
(pattern.matcher(subscriptionUpdateTopic).matches()) {
-                            
updatedTopicsForStateStore.add(subscriptionUpdateTopic);
-                        }
-                    }
-                }
-                if (!updatedTopicsForStateStore.isEmpty()) {
-                    Collection<String> storeTopics = 
stateStoreNameToSourceTopics.get(storePattern.getKey());
-                    if (storeTopics != null) {
-                        updatedTopicsForStateStore.addAll(storeTopics);
-                    }
-                    stateStoreNameToSourceTopics.put(storePattern.getKey(), 
Collections.unmodifiableSet(updatedTopicsForStateStore));
-                }
-            }
-        }
-    }
-    
-    private InternalTopicConfig createInternalTopicConfig(final 
StateStoreSupplier<?> supplier, final String name) {
-        if (!(supplier instanceof WindowStoreSupplier)) {
-            return new InternalTopicConfig(name, 
Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), 
supplier.logConfig());
-        }
-
-        final WindowStoreSupplier windowStoreSupplier = (WindowStoreSupplier) 
supplier;
-        final InternalTopicConfig config = new InternalTopicConfig(name,
-                                                                   
Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact,
-                                                                           
InternalTopicConfig.CleanupPolicy.delete),
-                                                                   
supplier.logConfig());
-        config.setRetentionMs(windowStoreSupplier.retentionPeriod());
-        return config;
+        return internalTopologyBuilder.topicGroups();
     }
 
     /**
      * Get the Pattern to match all topics requiring to start reading from 
earliest available offset
+     *
+     * NOTE this function would not needed by developers working with the 
processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
      * @return the Pattern for matching all topics reading from earliest 
offset, never null
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public synchronized Pattern earliestResetTopicsPattern() {
-        final List<String> topics = 
maybeDecorateInternalSourceTopics(earliestResetTopics);
-        final Pattern earliestPattern =  
buildPatternForOffsetResetTopics(topics, earliestResetPatterns);
-
-        ensureNoRegexOverlap(earliestPattern, latestResetPatterns, 
latestResetTopics);
-
-        return earliestPattern;
+        return internalTopologyBuilder.earliestResetTopicsPattern();
     }
 
     /**
      * Get the Pattern to match all topics requiring to start reading from 
latest available offset
+     *
+     * NOTE this function would not needed by developers working with the 
processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
      * @return the Pattern for matching all topics reading from latest offset, 
never null
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public synchronized Pattern latestResetTopicsPattern() {
-        final List<String> topics = 
maybeDecorateInternalSourceTopics(latestResetTopics);
-        final Pattern latestPattern = buildPatternForOffsetResetTopics(topics, 
latestResetPatterns);
-
-        ensureNoRegexOverlap(latestPattern, earliestResetPatterns, 
earliestResetTopics);
-
-        return  latestPattern;
-    }
-
-    private void ensureNoRegexOverlap(final Pattern builtPattern, final 
Set<Pattern> otherPatterns, final Set<String> otherTopics) {
-
-        for (Pattern otherPattern : otherPatterns) {
-            if (builtPattern.pattern().contains(otherPattern.pattern())) {
-                throw new TopologyBuilderException(String.format("Found 
overlapping regex [%s] against [%s] for a KStream with auto offset resets", 
otherPattern.pattern(), builtPattern.pattern()));
-            }
-        }
-
-        for (String otherTopic : otherTopics) {
-            if (builtPattern.matcher(otherTopic).matches()) {
-                throw new TopologyBuilderException(String.format("Found 
overlapping regex [%s] matching topic [%s] for a KStream with auto offset 
resets", builtPattern.pattern(), otherTopic));
-            }
-        }
+        return internalTopologyBuilder.latestResetTopicsPattern();
     }
 
     /**
-     * Builds a composite pattern out of topic names and Pattern object for 
matching topic names.  If the provided
-     * arrays are empty a Pattern.compile("") instance is returned.
+     * NOTE this function would not needed by developers working with the 
processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
      *
-     * @param sourceTopics  the name of source topics to add to a composite 
pattern
-     * @param sourcePatterns Patterns for matching source topics to add to a 
composite pattern
-     * @return a Pattern that is composed of the literal source topic names 
and any Patterns for matching source topics
-     */
-    private static synchronized Pattern buildPatternForOffsetResetTopics(final 
Collection<String> sourceTopics, final Collection<Pattern> sourcePatterns) {
-        final StringBuilder builder = new StringBuilder();
-
-        for (String topic : sourceTopics) {
-            builder.append(topic).append("|");
-        }
-
-        for (Pattern sourcePattern : sourcePatterns) {
-            builder.append(sourcePattern.pattern()).append("|");
-        }
-
-        if (builder.length() > 0) {
-            builder.setLength(builder.length() - 1);
-            return Pattern.compile(builder.toString());
-        }
-
-        return EMPTY_ZERO_LENGTH_PATTERN;
-    }
-
-    /**
      * @return a mapping from state store name to a Set of source Topics.
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public Map<String, List<String>> stateStoreNameToSourceTopics() {
-        final Map<String, List<String>> results = new HashMap<>();
-        for (Map.Entry<String, Set<String>> entry : 
stateStoreNameToSourceTopics.entrySet()) {
-            results.put(entry.getKey(), 
maybeDecorateInternalSourceTopics(entry.getValue()));
-        }
-        return results;
+        return internalTopologyBuilder.stateStoreNameToSourceTopics();
     }
 
     /**
      * Returns the copartition groups.
      * A copartition group is a group of source topics that are required to be 
copartitioned.
      *
+     * NOTE this function would not needed by developers working with the 
processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
      * @return groups of topic names
+     * @deprecated not part of public API and for internal usage only
      */
+    @Deprecated
     public synchronized Collection<Set<String>> copartitionGroups() {
-        final List<Set<String>> list = new 
ArrayList<>(copartitionSourceGroups.size());
-        for (Set<String> nodeNames : copartitionSourceGroups) {
-            Set<String> copartitionGroup = new HashSet<>();
-            for (String node : nodeNames) {
-                final List<String> topics = nodeToSourceTopics.get(node);
-                if (topics != null)
-                    
copartitionGroup.addAll(maybeDecorateInternalSourceTopics(topics));
-            }
-            list.add(Collections.unmodifiableSet(copartitionGroup));
-        }
-        return Collections.unmodifiableList(list);
-    }
-
-    private List<String> maybeDecorateInternalSourceTopics(final 
Collection<String> sourceTopics) {
-        final List<String> decoratedTopics = new ArrayList<>();
-        for (String topic : sourceTopics) {
-            if (internalTopicNames.contains(topic)) {
-                decoratedTopics.add(decorateTopic(topic));
-            } else {
-                decoratedTopics.add(topic);
-            }
-        }
-        return decoratedTopics;
-    }
-
-    private String decorateTopic(final String topic) {
-        if (applicationId == null) {
-            throw new TopologyBuilderException("there are internal topics and "
-                    + "applicationId hasn't been set. Call "
-                    + "setApplicationId first");
-        }
-
-        return applicationId + "-" + topic;
+        return internalTopologyBuilder.copartitionGroups();
     }
 
+    /**
+     * NOTE this function would not needed by developers working with the 
processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
+     * @deprecated not part of public API and for internal usage only
+     */
+    @Deprecated
     public SubscriptionUpdates subscriptionUpdates() {
-        return subscriptionUpdates;
+        return internalTopologyBuilder.subscriptionUpdates();
     }
 
+    /**
+     * NOTE this function would not needed by developers working with the 
processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
+     * @deprecated not part of public API and for internal usage only
+     */
+    @Deprecated
     public synchronized Pattern sourceTopicPattern() {
-        if (this.topicPattern == null) {
-            final List<String> allSourceTopics = new ArrayList<>();
-            if (!nodeToSourceTopics.isEmpty()) {
-                for (List<String> topics : nodeToSourceTopics.values()) {
-                    
allSourceTopics.addAll(maybeDecorateInternalSourceTopics(topics));
-                }
-            }
-            Collections.sort(allSourceTopics);
-
-            this.topicPattern = 
buildPatternForOffsetResetTopics(allSourceTopics, 
nodeToSourcePatterns.values());
-        }
-
-        return this.topicPattern;
-    }
-
-    public synchronized void updateSubscriptions(final SubscriptionUpdates 
subscriptionUpdates, final String threadId) {
-        log.debug("stream-thread [{}] updating builder with {} topic(s) with 
possible matching regex subscription(s)", threadId, subscriptionUpdates);
-        this.subscriptionUpdates = subscriptionUpdates;
-        setRegexMatchedTopicsToSourceNodes();
-        setRegexMatchedTopicToStateStore();
-    }
-
-    private boolean isGlobalSource(final String nodeName) {
-        final NodeFactory nodeFactory = nodeFactories.get(nodeName);
-
-        if (nodeFactory instanceof SourceNodeFactory) {
-            final List<String> topics = ((SourceNodeFactory) 
nodeFactory).topics;
-            if (topics != null && topics.size() == 1 && 
globalTopics.contains(topics.get(0))) {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    TopologyDescription describe() {
-        final TopologyDescription description = new TopologyDescription();
-
-        describeSubtopologies(description);
-        describeGlobalStores(description);
-
-        return description;
-    }
-
-    private void describeSubtopologies(final TopologyDescription description) {
-        for (final Map.Entry<Integer, Set<String>> nodeGroup : 
makeNodeGroups().entrySet()) {
-
-            final Set<String> allNodesOfGroups = nodeGroup.getValue();
-            final boolean isNodeGroupOfGlobalStores = 
nodeGroupContainsGlobalSourceNode(allNodesOfGroups);
-
-            if (!isNodeGroupOfGlobalStores) {
-                describeSubtopology(description, nodeGroup.getKey(), 
allNodesOfGroups);
-            }
-        }
+        return internalTopologyBuilder.sourceTopicPattern();
     }
 
-    private boolean nodeGroupContainsGlobalSourceNode(final Set<String> 
allNodesOfGroups) {
-        for (final String node : allNodesOfGroups) {
-            if (isGlobalSource(node)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    private void describeSubtopology(final TopologyDescription description,
-                                     final Integer subtopologyId,
-                                     final Set<String> nodeNames) {
-
-        final HashMap<String, TopologyDescription.AbstractNode> nodesByName = 
new HashMap<>();
-
-        // add all nodes
-        for (final String nodeName : nodeNames) {
-            nodesByName.put(nodeName, nodeFactories.get(nodeName).describe());
-        }
-
-        // connect each node to its predecessors and successors
-        for (final TopologyDescription.AbstractNode node : 
nodesByName.values()) {
-            for (final String predecessorName : 
nodeFactories.get(node.name()).parents) {
-                final TopologyDescription.AbstractNode predecessor = 
nodesByName.get(predecessorName);
-                node.addPredecessor(predecessor);
-                predecessor.addSuccessor(node);
-            }
-        }
-
-        description.addSubtopology(new TopologyDescription.Subtopology(
-            subtopologyId,
-            new HashSet<TopologyDescription.Node>(nodesByName.values())));
-    }
-
-    private void describeGlobalStores(final TopologyDescription description) {
-        for (final Map.Entry<Integer, Set<String>> nodeGroup : 
makeNodeGroups().entrySet()) {
-            final Set<String> nodes = nodeGroup.getValue();
-
-            final Iterator<String> it = nodes.iterator();
-            while (it.hasNext()) {
-                final String node = it.next();
-
-                if (isGlobalSource(node)) {
-                    // we found a GlobalStore node group; those contain 
exactly two node: {sourceNode,processorNode}
-                    it.remove(); // remove sourceNode from group
-                    final String processorNode = nodes.iterator().next(); // 
get remaining processorNode
-
-                    description.addGlobalStore(new 
TopologyDescription.GlobalStore(
-                        node,
-                        processorNode,
-                        ((ProcessorNodeFactory) 
nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
-                        nodeToSourceTopics.get(node).get(0)
-                    ));
-                    break;
-                }
-            }
-        }
+    /**
+     * NOTE this function would not needed by developers working with the 
processor APIs, but only used
+     * for the high-level DSL parsing functionalities.
+     *
+     * @deprecated not part of public API and for internal usage only
+     */
+    @Deprecated
+    public synchronized void updateSubscriptions(final SubscriptionUpdates 
subscriptionUpdates,
+                                                 final String threadId) {
+        internalTopologyBuilder.updateSubscriptions(subscriptionUpdates, 
threadId);
     }
 
 }

Reply via email to