Repository: kafka
Updated Branches:
  refs/heads/0.10.2 80ceb75d2 -> 0f87991d5


http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index b25fcad..81f4302 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -49,6 +49,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+
 /**
  * A component that is used to build a {@link ProcessorTopology}. A topology 
contains an acyclic graph of sources, processors,
  * and sinks. A {@link SourceNode source} is a node in the graph that consumes 
one or more Kafka topics and forwards them to
@@ -81,7 +82,7 @@ public class TopologyBuilder {
     private final List<Set<String>> copartitionSourceGroups = new 
ArrayList<>();
 
     // map from source processor names to subscribed topics (without 
application-id prefix for internal topics)
-    private final HashMap<String, String[]> nodeToSourceTopics = new 
HashMap<>();
+    private final HashMap<String, List<String>> nodeToSourceTopics = new 
HashMap<>();
 
     // map from source processor names to regex subscription patterns
     private final HashMap<String, Pattern> nodeToSourcePatterns = new 
LinkedHashMap<>();
@@ -146,11 +147,11 @@ public class TopologyBuilder {
     }
 
     private static class ProcessorNodeFactory extends NodeFactory {
-        public final String[] parents;
-        private final ProcessorSupplier supplier;
+        private final String[] parents;
+        private final ProcessorSupplier<?, ?> supplier;
         private final Set<String> stateStoreNames = new HashSet<>();
 
-        public ProcessorNodeFactory(String name, String[] parents, 
ProcessorSupplier supplier) {
+        ProcessorNodeFactory(String name, String[] parents, 
ProcessorSupplier<?, ?> supplier) {
             super(name);
             this.parents = parents.clone();
             this.supplier = supplier;
@@ -160,37 +161,32 @@ public class TopologyBuilder {
             stateStoreNames.add(stateStoreName);
         }
 
-        @SuppressWarnings("unchecked")
         @Override
         public ProcessorNode build() {
-            return new ProcessorNode(name, supplier.get(), stateStoreNames);
+            return new ProcessorNode<>(name, supplier.get(), stateStoreNames);
         }
     }
 
     private class SourceNodeFactory extends NodeFactory {
-        private final String[] topics;
-        public final Pattern pattern;
-        private Deserializer keyDeserializer;
-        private Deserializer valDeserializer;
+        private final List<String> topics;
+        private final Pattern pattern;
+        private final Deserializer<?> keyDeserializer;
+        private final Deserializer<?> valDeserializer;
 
-        private SourceNodeFactory(String name, String[] topics, Pattern 
pattern, Deserializer keyDeserializer, Deserializer valDeserializer) {
+        private SourceNodeFactory(String name, String[] topics, Pattern 
pattern, Deserializer<?> keyDeserializer, Deserializer<?> valDeserializer) {
             super(name);
-            this.topics = topics != null ? topics.clone() : null;
+            this.topics = topics != null ? Arrays.asList(topics) : null;
             this.pattern = pattern;
             this.keyDeserializer = keyDeserializer;
             this.valDeserializer = valDeserializer;
         }
 
-        String[] getTopics() {
-            return topics;
-        }
-
-        String[] getTopics(Collection<String> subscribedTopics) {
+        List<String> getTopics(Collection<String> subscribedTopics) {
             // if it is subscribed via patterns, it is possible that the topic 
metadata has not been updated
             // yet and hence the map from source node to topics is stale, in 
this case we put the pattern as a place holder;
             // this should only happen for debugging since during runtime this 
function should always be called after the metadata has updated.
             if (subscribedTopics.isEmpty())
-                return new String[] {"Pattern[" + pattern + "]"};
+                return Collections.singletonList("Pattern[" + pattern + "]");
 
             List<String> matchedTopics = new ArrayList<>();
             for (String update : subscribedTopics) {
@@ -207,21 +203,20 @@ public class TopologyBuilder {
                     matchedTopics.add(update);
                 }
             }
-            return matchedTopics.toArray(new String[matchedTopics.size()]);
+            return matchedTopics;
         }
 
-        @SuppressWarnings("unchecked")
         @Override
         public ProcessorNode build() {
-            final String[] sourceTopics = nodeToSourceTopics.get(name);
+            final List<String> sourceTopics = nodeToSourceTopics.get(name);
 
             // if it is subscribed via patterns, it is possible that the topic 
metadata has not been updated
             // yet and hence the map from source node to topics is stale, in 
this case we put the pattern as a place holder;
             // this should only happen for debugging since during runtime this 
function should always be called after the metadata has updated.
             if (sourceTopics == null)
-                return new SourceNode(name, new String[] {"Pattern[" + pattern 
+ "]"}, keyDeserializer, valDeserializer);
+                return new SourceNode<>(name, 
Collections.singletonList("Pattern[" + pattern + "]"), keyDeserializer, 
valDeserializer);
             else
-                return new SourceNode(name, 
maybeDecorateInternalSourceTopics(sourceTopics).toArray(new 
String[sourceTopics.length]), keyDeserializer, valDeserializer);
+                return new SourceNode<>(name, 
maybeDecorateInternalSourceTopics(sourceTopics), keyDeserializer, 
valDeserializer);
         }
 
         private boolean isMatch(String topic) {
@@ -229,14 +224,14 @@ public class TopologyBuilder {
         }
     }
 
-    private class SinkNodeFactory extends NodeFactory {
-        public final String[] parents;
-        public final String topic;
-        private Serializer keySerializer;
-        private Serializer valSerializer;
-        private final StreamPartitioner partitioner;
+    private class SinkNodeFactory<K, V> extends NodeFactory {
+        private final String[] parents;
+        private final String topic;
+        private final Serializer<K> keySerializer;
+        private final Serializer<V> valSerializer;
+        private final StreamPartitioner<? super K, ? super V> partitioner;
 
-        private SinkNodeFactory(String name, String[] parents, String topic, 
Serializer keySerializer, Serializer valSerializer, StreamPartitioner 
partitioner) {
+        private SinkNodeFactory(String name, String[] parents, String topic, 
Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? 
super K, ? super V> partitioner) {
             super(name);
             this.parents = parents.clone();
             this.topic = topic;
@@ -245,14 +240,13 @@ public class TopologyBuilder {
             this.partitioner = partitioner;
         }
 
-        @SuppressWarnings("unchecked")
         @Override
         public ProcessorNode build() {
             if (internalTopicNames.contains(topic)) {
                 // prefix the internal topic name with the application id
-                return new SinkNode(name, decorateTopic(topic), keySerializer, 
valSerializer, partitioner);
+                return new SinkNode<>(name, decorateTopic(topic), 
keySerializer, valSerializer, partitioner);
             } else {
-                return new SinkNode(name, topic, keySerializer, valSerializer, 
partitioner);
+                return new SinkNode<>(name, topic, keySerializer, 
valSerializer, partitioner);
             }
         }
     }
@@ -263,7 +257,7 @@ public class TopologyBuilder {
         public Map<String, InternalTopicConfig> stateChangelogTopics;
         public Map<String, InternalTopicConfig> repartitionSourceTopics;
 
-        public TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, 
Map<String, InternalTopicConfig> repartitionSourceTopics, Map<String, 
InternalTopicConfig> stateChangelogTopics) {
+        TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, 
Map<String, InternalTopicConfig> repartitionSourceTopics, Map<String, 
InternalTopicConfig> stateChangelogTopics) {
             this.sinkTopics = sinkTopics;
             this.sourceTopics = sourceTopics;
             this.stateChangelogTopics = stateChangelogTopics;
@@ -312,8 +306,8 @@ public class TopologyBuilder {
     /**
      * Set the applicationId to be used for auto-generated internal topics.
      *
-     * This is required before calling {@link #sourceTopics}, {@link 
#topicGroups},
-     * {@link #copartitionSources}, {@link #stateStoreNameToSourceTopics} and 
{@link #build(Integer)}.
+     * This is required before calling {@link #topicGroups}, {@link 
#copartitionSources},
+     * {@link #stateStoreNameToSourceTopics} and {@link #build(Integer)}.
      *
      * @param applicationId the streams applicationId. Should be the same as 
set by
      * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG}
@@ -337,7 +331,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never 
null
      */
     public synchronized final TopologyBuilder addSource(String name, String... 
topics) {
-        return addSource(null, name, (Deserializer) null, (Deserializer) null, 
topics);
+        return addSource(null, name, null, null, topics);
     }
 
     /**
@@ -353,7 +347,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never 
null
      */
     public synchronized final TopologyBuilder addSource(AutoOffsetReset 
offsetReset, String name,  String... topics) {
-        return addSource(offsetReset, name, (Deserializer) null, 
(Deserializer) null, topics);
+        return addSource(offsetReset, name, null, null, topics);
     }
 
 
@@ -370,7 +364,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never 
null
      */
     public synchronized final TopologyBuilder addSource(String name, Pattern 
topicPattern) {
-        return addSource(null, name, (Deserializer) null, (Deserializer) null, 
topicPattern);
+        return addSource(null, name, null, null, topicPattern);
     }
 
     /**
@@ -387,7 +381,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never 
null
      */
     public synchronized final TopologyBuilder addSource(AutoOffsetReset 
offsetReset, String name,  Pattern topicPattern) {
-        return addSource(offsetReset, name, (Deserializer) null, 
(Deserializer) null, topicPattern);
+        return addSource(offsetReset, name, null, null, topicPattern);
     }
 
 
@@ -445,7 +439,7 @@ public class TopologyBuilder {
         }
 
         nodeFactories.put(name, new SourceNodeFactory(name, topics, null, 
keyDeserializer, valDeserializer));
-        nodeToSourceTopics.put(name, topics.clone());
+        nodeToSourceTopics.put(name, Arrays.asList(topics));
         nodeGrouper.add(name);
 
         return this;
@@ -470,7 +464,7 @@ public class TopologyBuilder {
      * @param topic                 the topic to source the data from
      * @param processorName         the name of the {@link ProcessorSupplier}
      * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
-     * @return
+     * @return this builder instance so methods can be chained together; never 
null
      */
     public synchronized TopologyBuilder addGlobalStore(final StateStore store,
                                                        final String sourceName,
@@ -499,7 +493,7 @@ public class TopologyBuilder {
         globalTopics.add(topic);
         final String[] topics = {topic};
         nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, 
topics, null, keyDeserializer, valueDeserializer));
-        nodeToSourceTopics.put(sourceName, topics.clone());
+        nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
         nodeGrouper.add(sourceName);
 
         final String[] parents = {sourceName};
@@ -612,7 +606,7 @@ public class TopologyBuilder {
      * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
      */
     public synchronized final TopologyBuilder addSink(String name, String 
topic, String... parentNames) {
-        return addSink(name, topic, (Serializer) null, (Serializer) null, 
parentNames);
+        return addSink(name, topic, null, null, parentNames);
     }
 
     /**
@@ -639,7 +633,7 @@ public class TopologyBuilder {
      * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
      */
     public synchronized final TopologyBuilder addSink(String name, String 
topic, StreamPartitioner partitioner, String... parentNames) {
-        return addSink(name, topic, (Serializer) null, (Serializer) null, 
partitioner, parentNames);
+        return addSink(name, topic, null, null, partitioner, parentNames);
     }
 
     /**
@@ -662,7 +656,7 @@ public class TopologyBuilder {
      * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
      */
     public synchronized final TopologyBuilder addSink(String name, String 
topic, Serializer keySerializer, Serializer valSerializer, String... 
parentNames) {
-        return addSink(name, topic, keySerializer, valSerializer, 
(StreamPartitioner) null, parentNames);
+        return addSink(name, topic, keySerializer, valSerializer, null, 
parentNames);
     }
 
     /**
@@ -703,7 +697,7 @@ public class TopologyBuilder {
             }
         }
 
-        nodeFactories.put(name, new SinkNodeFactory(name, parentNames, topic, 
keySerializer, valSerializer, partitioner));
+        nodeFactories.put(name, new SinkNodeFactory<>(name, parentNames, 
topic, keySerializer, valSerializer, partitioner));
         nodeToSinkTopic.put(name, topic);
         nodeGrouper.add(name);
         nodeGrouper.unite(name, parentNames);
@@ -876,7 +870,7 @@ public class TopologyBuilder {
         for (String parent : parents) {
             NodeFactory nodeFactory = nodeFactories.get(parent);
             if (nodeFactory instanceof SourceNodeFactory) {
-                sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
nodeFactory).getTopics()));
+                sourceTopics.addAll(((SourceNodeFactory) nodeFactory).topics);
             } else if (nodeFactory instanceof ProcessorNodeFactory) {
                 
sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) 
nodeFactory).parents));
             }
@@ -1013,8 +1007,8 @@ public class TopologyBuilder {
             for (String node : nodes) {
                 final NodeFactory nodeFactory = nodeFactories.get(node);
                 if (nodeFactory instanceof SourceNodeFactory) {
-                    final String[] topics = ((SourceNodeFactory) 
nodeFactory).getTopics();
-                    if (topics != null && topics.length == 1 && 
globalTopics.contains(topics[0])) {
+                    final List<String> topics = ((SourceNodeFactory) 
nodeFactory).topics;
+                    if (topics != null && topics.size() == 1 && 
globalTopics.contains(topics.get(0))) {
                         globalGroups.addAll(nodes);
                     }
                 }
@@ -1023,7 +1017,6 @@ public class TopologyBuilder {
         return globalGroups;
     }
 
-    @SuppressWarnings("unchecked")
     private ProcessorTopology build(Set<String> nodeGroup) {
         List<ProcessorNode> processorNodes = new 
ArrayList<>(nodeFactories.size());
         Map<String, ProcessorNode> processorMap = new HashMap<>();
@@ -1040,7 +1033,8 @@ public class TopologyBuilder {
 
                 if (factory instanceof ProcessorNodeFactory) {
                     for (String parent : ((ProcessorNodeFactory) 
factory).parents) {
-                        processorMap.get(parent).addChild(node);
+                        ProcessorNode<?, ?> parentNode = 
processorMap.get(parent);
+                        parentNode.addChild(node);
                     }
                     for (String stateStoreName : ((ProcessorNodeFactory) 
factory).stateStoreNames) {
                         if (!stateStoreMap.containsKey(stateStoreName)) {
@@ -1063,10 +1057,10 @@ public class TopologyBuilder {
                         }
                     }
                 } else if (factory instanceof SourceNodeFactory) {
-                    SourceNodeFactory sourceNodeFactory = (SourceNodeFactory) 
factory;
-                    String[] topics = (sourceNodeFactory.pattern != null) ?
+                    final SourceNodeFactory sourceNodeFactory = 
(SourceNodeFactory) factory;
+                    final List<String> topics = (sourceNodeFactory.pattern != 
null) ?
                             
sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) :
-                            sourceNodeFactory.getTopics();
+                            sourceNodeFactory.topics;
 
                     for (String topic : topics) {
                         if (internalTopicNames.contains(topic)) {
@@ -1077,7 +1071,8 @@ public class TopologyBuilder {
                         }
                     }
                 } else if (factory instanceof SinkNodeFactory) {
-                    SinkNodeFactory sinkNodeFactory = (SinkNodeFactory) 
factory;
+                    final SinkNodeFactory sinkNodeFactory = (SinkNodeFactory) 
factory;
+
                     for (String parent : sinkNodeFactory.parents) {
                         processorMap.get(parent).addChild(node);
                         if 
(internalTopicNames.contains(sinkNodeFactory.topic)) {
@@ -1105,13 +1100,6 @@ public class TopologyBuilder {
         return Collections.unmodifiableMap(globalStateStores);
     }
 
-    private StateStore getStateStore(final String stateStoreName) {
-        if (stateFactories.containsKey(stateStoreName)) {
-            return stateFactories.get(stateStoreName).supplier.get();
-        }
-        return globalStateStores.get(stateStoreName);
-    }
-
     /**
      * Returns the map of topic groups keyed by the group id.
      * A topic group is a group of topics in the same task.
@@ -1131,7 +1119,7 @@ public class TopologyBuilder {
             Map<String, InternalTopicConfig> stateChangelogTopics = new 
HashMap<>();
             for (String node : entry.getValue()) {
                 // if the node is a source node, add to the source topics
-                String[] topics = nodeToSourceTopics.get(node);
+                List<String> topics = nodeToSourceTopics.get(node);
                 if (topics != null) {
                     // if some of the topics are internal, add them to the 
internal topics
                     for (String topic : topics) {
@@ -1196,7 +1184,7 @@ public class TopologyBuilder {
         }
     }
 
-    private InternalTopicConfig createInternalTopicConfig(final 
StateStoreSupplier supplier, final String name) {
+    private InternalTopicConfig createInternalTopicConfig(final 
StateStoreSupplier<?> supplier, final String name) {
         if (!(supplier instanceof WindowStoreSupplier)) {
             return new InternalTopicConfig(name, 
Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), 
supplier.logConfig());
         }
@@ -1210,27 +1198,13 @@ public class TopologyBuilder {
         return config;
     }
 
-
-    /**
-     * Get the names of topics that are to be consumed by the source nodes 
created by this builder.
-     * @return the unmodifiable set of topic names used by source nodes, which 
changes as new sources are added; never null
-     */
-    public synchronized Set<String> sourceTopics() {
-        Set<String> topics = 
maybeDecorateInternalSourceTopics(sourceTopicNames);
-        return Collections.unmodifiableSet(topics);
-    }
-
     /**
      * Get the Pattern to match all topics requiring to start reading from 
earliest available offset
      * @return the Pattern for matching all topics reading from earliest 
offset, never null
      */
     public synchronized Pattern earliestResetTopicsPattern() {
-        Set<String> topics = 
maybeDecorateInternalSourceTopics(earliestResetTopics);
-
-        String[] sourceTopicNames = topics.toArray(new String[topics.size()]);
-        Pattern[] sourceTopicPatterns = earliestResetPatterns.toArray(new 
Pattern[earliestResetPatterns.size()]);
-
-        Pattern earliestPattern =  
buildPatternForOffsetResetTopics(sourceTopicNames, sourceTopicPatterns);
+        final List<String> topics = 
maybeDecorateInternalSourceTopics(earliestResetTopics);
+        final Pattern earliestPattern =  
buildPatternForOffsetResetTopics(topics, earliestResetPatterns);
 
         ensureNoRegexOverlap(earliestPattern, latestResetPatterns, 
latestResetTopics);
 
@@ -1242,12 +1216,8 @@ public class TopologyBuilder {
      * @return the Pattern for matching all topics reading from latest offset, 
never null
      */
     public synchronized Pattern latestResetTopicsPattern() {
-        Set<String> topics = 
maybeDecorateInternalSourceTopics(latestResetTopics);
-
-        String[] sourceTopicNames = topics.toArray(new String[topics.size()]);
-        Pattern[] sourceTopicPatterns = latestResetPatterns.toArray(new 
Pattern[latestResetPatterns.size()]);
-
-        Pattern latestPattern = 
buildPatternForOffsetResetTopics(sourceTopicNames, sourceTopicPatterns);
+        final List<String> topics = 
maybeDecorateInternalSourceTopics(latestResetTopics);
+        final Pattern latestPattern = buildPatternForOffsetResetTopics(topics, 
latestResetPatterns);
 
         ensureNoRegexOverlap(latestPattern, earliestResetPatterns, 
earliestResetTopics);
 
@@ -1267,10 +1237,8 @@ public class TopologyBuilder {
                 throw new TopologyBuilderException(String.format("Found 
overlapping regex [%s] matching topic [%s] for a KStream with auto offset 
resets", builtPattern.pattern(), otherTopic));
             }
         }
-
     }
 
-
     /**
      * Builds a composite pattern out of topic names and Pattern object for 
matching topic names.  If the provided
      * arrays are empty a Pattern.compile("") instance is returned.
@@ -1279,7 +1247,7 @@ public class TopologyBuilder {
      * @param sourcePatterns Patterns for matching source topics to add to a 
composite pattern
      * @return a Pattern that is composed of the literal source topic names 
and any Patterns for matching source topics
      */
-    private static synchronized Pattern 
buildPatternForOffsetResetTopics(String[] sourceTopics, Pattern[] 
sourcePatterns) {
+    private static synchronized Pattern 
buildPatternForOffsetResetTopics(Collection<String> sourceTopics, 
Collection<Pattern> sourcePatterns) {
         StringBuilder builder = new StringBuilder();
 
         for (String topic : sourceTopics) {
@@ -1301,11 +1269,10 @@ public class TopologyBuilder {
     /**
      * @return a mapping from state store name to a Set of source Topics.
      */
-    public Map<String, Set<String>> stateStoreNameToSourceTopics() {
-        final Map<String, Set<String>> results = new HashMap<>();
+    public Map<String, List<String>> stateStoreNameToSourceTopics() {
+        final Map<String, List<String>> results = new HashMap<>();
         for (Map.Entry<String, Set<String>> entry : 
stateStoreNameToSourceTopics.entrySet()) {
             results.put(entry.getKey(), 
maybeDecorateInternalSourceTopics(entry.getValue()));
-
         }
         return results;
     }
@@ -1321,7 +1288,7 @@ public class TopologyBuilder {
         for (Set<String> nodeNames : copartitionSourceGroups) {
             Set<String> copartitionGroup = new HashSet<>();
             for (String node : nodeNames) {
-                String[] topics = nodeToSourceTopics.get(node);
+                final List<String> topics = nodeToSourceTopics.get(node);
                 if (topics != null)
                     
copartitionGroup.addAll(maybeDecorateInternalSourceTopics(topics));
             }
@@ -1330,12 +1297,8 @@ public class TopologyBuilder {
         return Collections.unmodifiableList(list);
     }
 
-    private Set<String> maybeDecorateInternalSourceTopics(final Set<String> 
sourceTopics) {
-        return maybeDecorateInternalSourceTopics(sourceTopics.toArray(new 
String[sourceTopics.size()]));
-    }
-
-    private Set<String> maybeDecorateInternalSourceTopics(String ... 
sourceTopics) {
-        final Set<String> decoratedTopics = new HashSet<>();
+    private List<String> maybeDecorateInternalSourceTopics(final 
Collection<String> sourceTopics) {
+        final List<String> decoratedTopics = new ArrayList<>();
         for (String topic : sourceTopics) {
             if (internalTopicNames.contains(topic)) {
                 decoratedTopics.add(decorateTopic(topic));
@@ -1357,23 +1320,18 @@ public class TopologyBuilder {
     }
 
     public synchronized Pattern sourceTopicPattern() {
-        if (this.topicPattern == null && !nodeToSourcePatterns.isEmpty()) {
-
-            List<String> allNodeToSourceTopics = new ArrayList<>();
+        if (this.topicPattern == null) {
+            List<String> allSourceTopics = new ArrayList<>();
             if (!nodeToSourceTopics.isEmpty()) {
-                for (String[] topics : nodeToSourceTopics.values()) {
-                    allNodeToSourceTopics.addAll(Arrays.asList(topics));
-
+                for (List<String> topics : nodeToSourceTopics.values()) {
+                    
allSourceTopics.addAll(maybeDecorateInternalSourceTopics(topics));
                 }
             }
-            int numPatterns = nodeToSourcePatterns.values().size();
-            int numTopics = allNodeToSourceTopics.size();
-
-            Pattern[] patterns = nodeToSourcePatterns.values().toArray(new 
Pattern[numPatterns]);
-            String[] allTopics = allNodeToSourceTopics.toArray(new 
String[numTopics]);
+            Collections.sort(allSourceTopics);
 
-            this.topicPattern = buildPatternForOffsetResetTopics(allTopics, 
patterns);
+            this.topicPattern = 
buildPatternForOffsetResetTopics(allSourceTopics, 
nodeToSourcePatterns.values());
         }
+
         return this.topicPattern;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 0ebfda7..c4db740 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -29,11 +29,11 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
     private final String topic;
     private Serializer<K> keySerializer;
     private Serializer<V> valSerializer;
-    private final StreamPartitioner<K, V> partitioner;
+    private final StreamPartitioner<? super K, ? super V> partitioner;
 
     private ProcessorContext context;
 
-    public SinkNode(String name, String topic, Serializer<K> keySerializer, 
Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner) {
+    public SinkNode(String name, String topic, Serializer<K> keySerializer, 
Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> 
partitioner) {
         super(name);
 
         this.topic = topic;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 771f504..3406606 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -21,14 +21,17 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.streams.kstream.internals.ChangedDeserializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
+import java.util.List;
+
 public class SourceNode<K, V> extends ProcessorNode<K, V> {
 
+    private final List<String> topics;
+
+    private ProcessorContext context;
     private Deserializer<K> keyDeserializer;
     private Deserializer<V> valDeserializer;
-    private ProcessorContext context;
-    private String[] topics;
 
-    public SourceNode(String name, String[] topics, Deserializer<K> 
keyDeserializer, Deserializer<V> valDeserializer) {
+    public SourceNode(String name, List<String> topics, Deserializer<K> 
keyDeserializer, Deserializer<V> valDeserializer) {
         super(name);
         this.topics = topics;
         this.keyDeserializer = keyDeserializer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 38961f2..b445a51 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -180,14 +180,13 @@ public class StreamThread extends Thread {
 
     protected final StreamsConfig config;
     protected final TopologyBuilder builder;
-    protected final Set<String> sourceTopics;
-    protected final Pattern topicPattern;
     protected final Producer<byte[], byte[]> producer;
     protected final Consumer<byte[], byte[]> consumer;
     protected final Consumer<byte[], byte[]> restoreConsumer;
 
     private final String logPrefix;
     private final String threadClientId;
+    private final Pattern sourceTopicPattern;
     private final Map<TaskId, StreamTask> activeTasks;
     private final Map<TaskId, StandbyTask> standbyTasks;
     private final Map<TopicPartition, StreamTask> activeTasksByPartition;
@@ -200,6 +199,7 @@ public class StreamThread extends Thread {
     private final long cleanTimeMs;
     private final long commitTimeMs;
     private final StreamsMetricsThreadImpl streamsMetrics;
+    // TODO: this is not private only for tests, should be better refactored
     final StateDirectory stateDirectory;
     private String originalReset;
     private StreamPartitionAssignor partitionAssignor = null;
@@ -291,8 +291,7 @@ public class StreamThread extends Thread {
         String threadName = getName();
         this.config = config;
         this.builder = builder;
-        this.sourceTopics = builder.sourceTopics();
-        this.topicPattern = builder.sourceTopicPattern();
+        this.sourceTopicPattern = builder.sourceTopicPattern();
         this.clientId = clientId;
         this.processId = processId;
         this.partitionGrouper = 
config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, 
PartitionGrouper.class);
@@ -566,11 +565,7 @@ public class StreamThread extends Thread {
         boolean requiresPoll = true;
         boolean polledRecords = false;
 
-        if (topicPattern != null) {
-            consumer.subscribe(topicPattern, rebalanceListener);
-        } else {
-            consumer.subscribe(new ArrayList<>(sourceTopics), 
rebalanceListener);
-        }
+        consumer.subscribe(sourceTopicPattern, rebalanceListener);
 
         while (stillRunning()) {
             this.timerStartedMs = time.milliseconds();

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index a59eb5f..6fb6e06 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -84,7 +84,7 @@ public class StreamsMetadataState {
             return allMetadata;
         }
 
-        final Set<String> sourceTopics = 
builder.stateStoreNameToSourceTopics().get(storeName);
+        final List<String> sourceTopics = 
builder.stateStoreNameToSourceTopics().get(storeName);
         if (sourceTopics == null) {
             return Collections.emptyList();
         }
@@ -201,7 +201,7 @@ public class StreamsMetadataState {
         rebuildMetadata(currentState);
     }
 
-    private boolean hasPartitionsForAnyTopics(final Set<String> topicNames, 
final Set<TopicPartition> partitionForHost) {
+    private boolean hasPartitionsForAnyTopics(final List<String> topicNames, 
final Set<TopicPartition> partitionForHost) {
         for (TopicPartition topicPartition : partitionForHost) {
             if (topicNames.contains(topicPartition.topic())) {
                 return true;
@@ -215,13 +215,13 @@ public class StreamsMetadataState {
         if (currentState.isEmpty()) {
             return;
         }
-        final Map<String, Set<String>> stores = 
builder.stateStoreNameToSourceTopics();
+        final Map<String, List<String>> stores = 
builder.stateStoreNameToSourceTopics();
         for (Map.Entry<HostInfo, Set<TopicPartition>> entry : 
currentState.entrySet()) {
             final HostInfo key = entry.getKey();
             final Set<TopicPartition> partitionsForHost = new 
HashSet<>(entry.getValue());
             final Set<String> storesOnHost = new HashSet<>();
-            for (Map.Entry<String, Set<String>> storeTopicEntry : 
stores.entrySet()) {
-                final Set<String> topicsForStore = storeTopicEntry.getValue();
+            for (Map.Entry<String, List<String>> storeTopicEntry : 
stores.entrySet()) {
+                final List<String> topicsForStore = storeTopicEntry.getValue();
                 if (hasPartitionsForAnyTopics(topicsForStore, 
partitionsForHost)) {
                     storesOnHost.add(storeTopicEntry.getKey());
                 }
@@ -259,7 +259,7 @@ public class StreamsMetadataState {
     }
 
     private SourceTopicsInfo getSourceTopicsInfo(final String storeName) {
-        final Set<String> sourceTopics = 
builder.stateStoreNameToSourceTopics().get(storeName);
+        final List<String> sourceTopics = 
builder.stateStoreNameToSourceTopics().get(storeName);
         if (sourceTopics == null || sourceTopics.isEmpty()) {
             return null;
         }
@@ -271,11 +271,11 @@ public class StreamsMetadataState {
     }
 
     private class SourceTopicsInfo {
-        private final Set<String> sourceTopics;
+        private final List<String> sourceTopics;
         private int maxPartitions;
         private String topicWithMostPartitions;
 
-        private SourceTopicsInfo(final Set<String> sourceTopics) {
+        private SourceTopicsInfo(final List<String> sourceTopics) {
             this.sourceTopics = sourceTopics;
             for (String topic : sourceTopics) {
                 final List<PartitionInfo> partitions = 
clusterMetadata.partitionsForTopic(topic);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 41277c7..5dae8dd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.config.ConfigException;
@@ -201,6 +202,7 @@ public class KafkaStreamsTest {
             final Properties props = new Properties();
             props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
             props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+            props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
 
             final KStreamBuilder builder = new KStreamBuilder();
             final CountDownLatch latch = new CountDownLatch(1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 6f3c95a..f0fb0a2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -354,9 +354,9 @@ public class KStreamRepartitionJoinTest {
         streamOneInput = "stream-one-" + testNo;
         streamTwoInput = "stream-two-" + testNo;
         streamFourInput = "stream-four-" + testNo;
-        CLUSTER.createTopic(streamOneInput);
-        CLUSTER.createTopic(streamTwoInput);
-        CLUSTER.createTopic(streamFourInput);
+        CLUSTER.createTopic(streamOneInput, 2, 1);
+        CLUSTER.createTopic(streamTwoInput, 2, 1);
+        CLUSTER.createTopic(streamFourInput, 2, 1);
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 619b6b5..fe7bebc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -68,7 +68,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
         putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
         putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0);
-        putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), false);
+        putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
 
         for (int i = 0; i < brokers.length; i++) {
             brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 875c359..3e7c41b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -214,7 +214,7 @@ public class IntegrationTestUtils {
             }
         };
 
-        final String conditionDetails = "Did not receive " + 
expectedNumRecords + " number of records";
+        final String conditionDetails = "Expecting " + expectedNumRecords + " 
records while only received " + accumData.size() + ": " + accumData;
 
         TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
 
@@ -254,7 +254,7 @@ public class IntegrationTestUtils {
             }
         };
 
-        final String conditionDetails = "Did not receive " + 
expectedNumRecords + " number of records";
+        final String conditionDetails = "Expecting " + expectedNumRecords + " 
records while only received " + accumData.size() + ": " + accumData;
 
         TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 5f126c3..a469f25 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -133,9 +133,8 @@ public class KStreamBuilderTest {
 
         final KStream<String, String> merged = builder.merge(processedSource1, 
processedSource2, source3);
         merged.groupByKey().count("my-table");
-        final Map<String, Set<String>> actual = 
builder.stateStoreNameToSourceTopics();
-
-        assertEquals(Utils.mkSet("topic-1", "topic-2", "topic-3"), 
actual.get("my-table"));
+        final Map<String, List<String>> actual = 
builder.stateStoreNameToSourceTopics();
+        assertEquals(Utils.mkList("topic-1", "topic-2", "topic-3"), 
actual.get("my-table"));
     }
 
     @Test(expected = TopologyBuilderException.class)
@@ -227,12 +226,11 @@ public class KStreamBuilderTest {
         final KStream<String, String> playEvents = builder.stream("events");
 
         final KTable<String, String> table = builder.table("table-topic", 
"table-store");
-        assertEquals(Collections.singleton("table-topic"), 
builder.stateStoreNameToSourceTopics().get("table-store"));
+        assertEquals(Collections.singletonList("table-topic"), 
builder.stateStoreNameToSourceTopics().get("table-store"));
 
         final KStream<String, String> mapped = 
playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
         mapped.leftJoin(table, 
MockValueJoiner.TOSTRING_JOINER).groupByKey().count("count");
-
-        assertEquals(Collections.singleton("table-topic"), 
builder.stateStoreNameToSourceTopics().get("table-store"));
-        assertEquals(Collections.singleton(APP_ID + 
"-KSTREAM-MAP-0000000003-repartition"), 
builder.stateStoreNameToSourceTopics().get("count"));
+        assertEquals(Collections.singletonList("table-topic"), 
builder.stateStoreNameToSourceTopics().get("table-store"));
+        assertEquals(Collections.singletonList(APP_ID + 
"-KSTREAM-MAP-0000000003-repartition"), 
builder.stateStoreNameToSourceTopics().get("count"));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 4712320..2f3a450 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -156,12 +156,9 @@ public class TopologyBuilderTest {
         builder.addSource("source-3", "topic-3");
         builder.addInternalTopic("topic-3");
 
-        Set<String> expected = new HashSet<>();
-        expected.add("topic-1");
-        expected.add("topic-2");
-        expected.add("X-topic-3");
+        Pattern expectedPattern = Pattern.compile("X-topic-3|topic-1|topic-2");
 
-        assertEquals(expected, builder.sourceTopics());
+        assertEquals(expectedPattern.pattern(), 
builder.sourceTopicPattern().pattern());
     }
 
     @Test
@@ -184,7 +181,7 @@ public class TopologyBuilderTest {
     @Test
     public void testSubscribeTopicNameAndPattern() {
         final TopologyBuilder builder = new TopologyBuilder();
-        Pattern expectedPattern = 
Pattern.compile("topic-foo|topic-bar|.*-\\d");
+        Pattern expectedPattern = 
Pattern.compile("topic-bar|topic-foo|.*-\\d");
         builder.addSource("source-1", "topic-foo", "topic-bar");
         builder.addSource("source-2", Pattern.compile(".*-\\d"));
         assertEquals(expectedPattern.pattern(), 
builder.sourceTopicPattern().pattern());
@@ -441,9 +438,9 @@ public class TopologyBuilderTest {
         builder.addSource("source", "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), 
"source");
         builder.addStateStore(new MockStateStoreSupplier("store", false), 
"processor");
-        final Map<String, Set<String>> stateStoreNameToSourceTopic = 
builder.stateStoreNameToSourceTopics();
+        final Map<String, List<String>> stateStoreNameToSourceTopic = 
builder.stateStoreNameToSourceTopics();
         assertEquals(1, stateStoreNameToSourceTopic.size());
-        assertEquals(Collections.singleton("topic"), 
stateStoreNameToSourceTopic.get("store"));
+        assertEquals(Collections.singletonList("topic"), 
stateStoreNameToSourceTopic.get("store"));
     }
 
     @Test
@@ -452,9 +449,9 @@ public class TopologyBuilderTest {
         builder.addSource("source", "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), 
"source");
         builder.addStateStore(new MockStateStoreSupplier("store", false), 
"processor");
-        final Map<String, Set<String>> stateStoreNameToSourceTopic = 
builder.stateStoreNameToSourceTopics();
+        final Map<String, List<String>> stateStoreNameToSourceTopic = 
builder.stateStoreNameToSourceTopics();
         assertEquals(1, stateStoreNameToSourceTopic.size());
-        assertEquals(Collections.singleton("topic"), 
stateStoreNameToSourceTopic.get("store"));
+        assertEquals(Collections.singletonList("topic"), 
stateStoreNameToSourceTopic.get("store"));
     }
 
     @Test
@@ -465,9 +462,9 @@ public class TopologyBuilderTest {
         builder.addSource("source", "internal-topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), 
"source");
         builder.addStateStore(new MockStateStoreSupplier("store", false), 
"processor");
-        final Map<String, Set<String>> stateStoreNameToSourceTopic = 
builder.stateStoreNameToSourceTopics();
+        final Map<String, List<String>> stateStoreNameToSourceTopic = 
builder.stateStoreNameToSourceTopics();
         assertEquals(1, stateStoreNameToSourceTopic.size());
-        assertEquals(Collections.singleton("appId-internal-topic"), 
stateStoreNameToSourceTopic.get("store"));
+        assertEquals(Collections.singletonList("appId-internal-topic"), 
stateStoreNameToSourceTopic.get("store"));
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
index fdd9127..cf328ee 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.junit.Test;
 
+import java.util.Collections;
+
 import static org.junit.Assert.assertEquals;
 
 public class SourceNodeRecordDeserializerTest {
@@ -82,7 +84,7 @@ public class SourceNodeRecordDeserializerTest {
                       final boolean valueThrowsException,
                       final Object key,
                       final Object value) {
-            super("", new String[0], null, null);
+            super("", Collections.EMPTY_LIST, null, null);
             this.keyThrowsException = keyThrowsException;
             this.valueThrowsException = valueThrowsException;
             this.key = key;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f87991d/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java 
b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
index 096c64a..4e0d21a 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class MockSourceNode<K, V> extends SourceNode<K, V> {
@@ -36,7 +37,7 @@ public class MockSourceNode<K, V> extends SourceNode<K, V> {
     public boolean initialized;
 
     public MockSourceNode(String[] topics, Deserializer<K> keyDeserializer, 
Deserializer<V> valDeserializer) {
-        super(NAME + INDEX.getAndIncrement(), topics, keyDeserializer, 
valDeserializer);
+        super(NAME + INDEX.getAndIncrement(), Arrays.asList(topics), 
keyDeserializer, valDeserializer);
     }
 
     @Override

Reply via email to