Repository: kafka
Updated Branches:
  refs/heads/0.10.0 55af7ec6b -> 801a70612


MINOR: Add application id prefix for copartitionGroups in TopologyBuilder

This is bugfix that is already in trunk but not backported to 0.10.0.

Author: Guozhang Wang <[email protected]>

Reviewers: Damian Guy <[email protected]>, Ewen Cheslack-Postava 
<[email protected]>

Closes #1735 from guozhangwang/Kminor-topology-applicationID-0.10.0


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/801a7061
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/801a7061
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/801a7061

Branch: refs/heads/0.10.0
Commit: 801a706124af16f605abc6141f38f9eed916ffc2
Parents: 55af7ec
Author: Guozhang Wang <[email protected]>
Authored: Mon Aug 15 23:04:40 2016 -0700
Committer: Ewen Cheslack-Postava <[email protected]>
Committed: Mon Aug 15 23:04:40 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |  2 +
 .../streams/processor/TopologyBuilder.java      | 79 ++++++++++++++------
 .../internals/StreamPartitionAssignor.java      |  2 +-
 .../processor/internals/StreamThread.java       |  6 +-
 .../kstream/internals/KStreamImplTest.java      |  2 +-
 .../streams/processor/TopologyBuilderTest.java  | 25 ++++---
 .../internals/ProcessorTopologyTest.java        |  2 +-
 .../internals/StreamPartitionAssignorTest.java  | 12 +--
 .../processor/internals/StreamThreadTest.java   |  6 +-
 .../apache/kafka/test/KStreamTestDriver.java    |  2 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |  2 +-
 11 files changed, 91 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 17c760e..3a311a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -141,6 +141,8 @@ public class KafkaStreams {
         // The application ID is a required config and hence should always 
have value
         final String applicationId = 
config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
 
+        builder.setApplicationId(applicationId);
+
         String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
         if (clientId.length() <= 0)
             clientId = applicationId + "-" + 
STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();

http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 7161a80..6b57b17 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -38,6 +38,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
 /**
@@ -64,6 +65,7 @@ public class TopologyBuilder {
     private final HashMap<String, String[]> nodeToSourceTopics = new 
HashMap<>();
     private final HashMap<String, String> nodeToSinkTopic = new HashMap<>();
     private Map<Integer, Set<String>> nodeGroups = null;
+    private String applicationId = null;
 
     private static class StateStoreFactory {
         public final Set<String> users;
@@ -85,7 +87,7 @@ public class TopologyBuilder {
             this.name = name;
         }
 
-        public abstract ProcessorNode build(String applicationId);
+        public abstract ProcessorNode build();
     }
 
     private static class ProcessorNodeFactory extends NodeFactory {
@@ -105,7 +107,7 @@ public class TopologyBuilder {
 
         @SuppressWarnings("unchecked")
         @Override
-        public ProcessorNode build(String applicationId) {
+        public ProcessorNode build() {
             return new ProcessorNode(name, supplier.get(), stateStoreNames);
         }
     }
@@ -124,7 +126,7 @@ public class TopologyBuilder {
 
         @SuppressWarnings("unchecked")
         @Override
-        public ProcessorNode build(String applicationId) {
+        public ProcessorNode build() {
             return new SourceNode(name, keyDeserializer, valDeserializer);
         }
     }
@@ -147,10 +149,10 @@ public class TopologyBuilder {
 
         @SuppressWarnings("unchecked")
         @Override
-        public ProcessorNode build(String applicationId) {
+        public ProcessorNode build() {
             if (internalTopicNames.contains(topic)) {
                 // prefix the internal topic name with the application id
-                return new SinkNode(name, applicationId + "-" + topic, 
keySerializer, valSerializer, partitioner);
+                return new SinkNode(name, decorateTopic(topic), keySerializer, 
valSerializer, partitioner);
             } else {
                 return new SinkNode(name, topic, keySerializer, valSerializer, 
partitioner);
             }
@@ -193,6 +195,22 @@ public class TopologyBuilder {
     public TopologyBuilder() {}
 
     /**
+     * Set the applicationId to be used for auto-generated internal topics.
+     *
+     * This is required before calling {@link #sourceTopics}, {@link 
#topicGroups},
+     * {@link #copartitionSources} and {@link #build(Integer)}.
+     *
+     * @param applicationId the streams applicationId. Should be the same as 
set by
+     * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG}
+     */
+    public synchronized final TopologyBuilder setApplicationId(String 
applicationId) {
+        Objects.requireNonNull(applicationId, "applicationId can't be null");
+        this.applicationId = applicationId;
+
+        return this;
+    }
+
+    /**
      * Add a new source that consumes the named topics and forwards the 
records to child processor and/or sink nodes.
      * The source will use the {@link 
org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key 
deserializer} and
      * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG 
default value deserializer} specified in the
@@ -501,7 +519,7 @@ public class TopologyBuilder {
      *
      * @return groups of topic names
      */
-    public synchronized Map<Integer, TopicsInfo> topicGroups(String 
applicationId) {
+    public synchronized Map<Integer, TopicsInfo> topicGroups() {
         Map<Integer, TopicsInfo> topicGroups = new HashMap<>();
 
         if (nodeGroups == null)
@@ -520,7 +538,7 @@ public class TopologyBuilder {
                     for (String topic : topics) {
                         if (this.internalTopicNames.contains(topic)) {
                             // prefix the internal topic name with the 
application id
-                            String internalTopic = applicationId + "-" + topic;
+                            String internalTopic = decorateTopic(topic);
                             internalSourceTopics.add(internalTopic);
                             sourceTopics.add(internalTopic);
                         } else {
@@ -534,7 +552,7 @@ public class TopologyBuilder {
                 if (topic != null) {
                     if (internalTopicNames.contains(topic)) {
                         // prefix the change log topic name with the 
application id
-                        sinkTopics.add(applicationId + "-" + topic);
+                        sinkTopics.add(decorateTopic(topic));
                     } else {
                         sinkTopics.add(topic);
                     }
@@ -544,7 +562,7 @@ public class TopologyBuilder {
                 for (StateStoreFactory stateFactory : stateFactories.values()) 
{
                     if (stateFactory.isInternal && 
stateFactory.users.contains(node)) {
                         // prefix the change log topic name with the 
application id
-                        stateChangelogTopics.add(applicationId + "-" + 
stateFactory.supplier.name() + 
ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
+                        
stateChangelogTopics.add(ProcessorStateManager.storeChangelogTopic(applicationId,
 stateFactory.supplier.name()));
                     }
                 }
             }
@@ -629,7 +647,7 @@ public class TopologyBuilder {
             for (String node : nodeNames) {
                 String[] topics = nodeToSourceTopics.get(node);
                 if (topics != null)
-                    copartitionGroup.addAll(Arrays.asList(topics));
+                    
copartitionGroup.addAll(maybeDecorateInternalSourceTopics(topics));
             }
             list.add(Collections.unmodifiableSet(copartitionGroup));
         }
@@ -642,7 +660,7 @@ public class TopologyBuilder {
      *
      * @see 
org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, 
org.apache.kafka.streams.StreamsConfig)
      */
-    public synchronized ProcessorTopology build(String applicationId, Integer 
topicGroupId) {
+    public synchronized ProcessorTopology build(Integer topicGroupId) {
         Set<String> nodeGroup;
         if (topicGroupId != null) {
             nodeGroup = nodeGroups().get(topicGroupId);
@@ -650,11 +668,11 @@ public class TopologyBuilder {
             // when nodeGroup is null, we build the full topology. this is 
used in some tests.
             nodeGroup = null;
         }
-        return build(applicationId, nodeGroup);
+        return build(nodeGroup);
     }
 
     @SuppressWarnings("unchecked")
-    private ProcessorTopology build(String applicationId, Set<String> 
nodeGroup) {
+    private ProcessorTopology build(Set<String> nodeGroup) {
         List<ProcessorNode> processorNodes = new 
ArrayList<>(nodeFactories.size());
         Map<String, ProcessorNode> processorMap = new HashMap<>();
         Map<String, SourceNode> topicSourceMap = new HashMap<>();
@@ -663,7 +681,7 @@ public class TopologyBuilder {
         // create processor nodes in a topological order ("nodeFactories" is 
already topologically sorted)
         for (NodeFactory factory : nodeFactories.values()) {
             if (nodeGroup == null || nodeGroup.contains(factory.name)) {
-                ProcessorNode node = factory.build(applicationId);
+                ProcessorNode node = factory.build();
                 processorNodes.add(node);
                 processorMap.put(node.name(), node);
 
@@ -680,7 +698,7 @@ public class TopologyBuilder {
                     for (String topic : ((SourceNodeFactory) factory).topics) {
                         if (internalTopicNames.contains(topic)) {
                             // prefix the internal topic name with the 
application id
-                            topicSourceMap.put(applicationId + "-" + topic, 
(SourceNode) node);
+                            topicSourceMap.put(decorateTopic(topic), 
(SourceNode) node);
                         } else {
                             topicSourceMap.put(topic, (SourceNode) node);
                         }
@@ -702,15 +720,34 @@ public class TopologyBuilder {
      * Get the names of topics that are to be consumed by the source nodes 
created by this builder.
      * @return the unmodifiable set of topic names used by source nodes, which 
changes as new sources are added; never null
      */
-    public synchronized Set<String> sourceTopics(String applicationId) {
-        Set<String> topics = new HashSet<>();
-        for (String topic : sourceTopicNames) {
+    public synchronized Set<String> sourceTopics() {
+        Set<String> topics = 
maybeDecorateInternalSourceTopics(sourceTopicNames);
+        return Collections.unmodifiableSet(topics);
+    }
+
+    private Set<String> maybeDecorateInternalSourceTopics(final Set<String> 
sourceTopics) {
+        return maybeDecorateInternalSourceTopics(sourceTopics.toArray(new 
String[sourceTopics.size()]));
+    }
+
+    private Set<String> maybeDecorateInternalSourceTopics(String ... 
sourceTopics) {
+        final Set<String> decoratedTopics = new HashSet<>();
+        for (String topic : sourceTopics) {
             if (internalTopicNames.contains(topic)) {
-                topics.add(applicationId + "-" + topic);
+                decoratedTopics.add(decorateTopic(topic));
             } else {
-                topics.add(topic);
+                decoratedTopics.add(topic);
             }
         }
-        return Collections.unmodifiableSet(topics);
+        return decoratedTopics;
+    }
+
+    private String decorateTopic(String topic) {
+        if (applicationId == null) {
+            throw new TopologyBuilderException("there are internal topics and "
+                    + "applicationId hasn't been set. Call "
+                    + "setApplicationId first");
+        }
+
+        return applicationId + "-" + topic;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 085ff94..2ddfe43 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -118,7 +118,7 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
         streamThread = (StreamThread) o;
         streamThread.partitionAssignor(this);
 
-        this.topicGroups = 
streamThread.builder.topicGroups(streamThread.applicationId);
+        this.topicGroups = streamThread.builder.topicGroups();
 
         if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
             internalTopicManager = new InternalTopicManager(

http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 72eeef5..bf88d1b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -159,7 +159,7 @@ public class StreamThread extends Thread {
         this.applicationId = applicationId;
         this.config = config;
         this.builder = builder;
-        this.sourceTopics = builder.sourceTopics(applicationId);
+        this.sourceTopics = builder.sourceTopics();
         this.clientId = clientId;
         this.processId = processId;
         this.partitionGrouper = 
config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, 
PartitionGrouper.class);
@@ -545,7 +545,7 @@ public class StreamThread extends Thread {
     protected StreamTask createStreamTask(TaskId id, 
Collection<TopicPartition> partitions) {
         sensors.taskCreationSensor.record();
 
-        ProcessorTopology topology = builder.build(applicationId, 
id.topicGroupId);
+        ProcessorTopology topology = builder.build(id.topicGroupId);
 
         return new StreamTask(id, applicationId, partitions, topology, 
consumer, producer, restoreConsumer, config, sensors);
     }
@@ -615,7 +615,7 @@ public class StreamThread extends Thread {
     protected StandbyTask createStandbyTask(TaskId id, 
Collection<TopicPartition> partitions) {
         sensors.taskCreationSensor.record();
 
-        ProcessorTopology topology = builder.build(applicationId, 
id.topicGroupId);
+        ProcessorTopology topology = builder.build(id.topicGroupId);
 
         if (!topology.stateStoreSuppliers().isEmpty()) {
             return new StandbyTask(id, applicationId, partitions, topology, 
consumer, restoreConsumer, config, sensors);

http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index a40c8fb..ff16a79 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -132,7 +132,7 @@ public class KStreamImplTest {
             1 + // to
             2 + // through
             1, // process
-            builder.build("X", null).processors().size());
+            builder.build(null).processors().size());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 9af313a..a67b4a9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -144,12 +144,12 @@ public class TopologyBuilderTest {
         builder.addSource("source-3", "topic-3");
         builder.addInternalTopic("topic-3");
 
-        Set<String> expected = new HashSet<String>();
+        Set<String> expected = new HashSet<>();
         expected.add("topic-1");
         expected.add("topic-2");
         expected.add("X-topic-3");
 
-        assertEquals(expected, builder.sourceTopics("X"));
+        assertEquals(expected, builder.setApplicationId("X").sourceTopics());
     }
 
     @Test(expected = TopologyBuilderException.class)
@@ -190,21 +190,22 @@ public class TopologyBuilderTest {
 
         StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", 
false);
         builder.addStateStore(supplier);
-        suppliers = builder.build("X", null).stateStoreSuppliers();
+        suppliers = builder.build(null).stateStoreSuppliers();
         assertEquals(0, suppliers.size());
 
         builder.addSource("source-1", "topic-1");
         builder.addProcessor("processor-1", new MockProcessorSupplier(), 
"source-1");
         builder.connectProcessorAndStateStores("processor-1", "store-1");
-        suppliers = builder.build("X", null).stateStoreSuppliers();
+        suppliers = builder.build(null).stateStoreSuppliers();
         assertEquals(1, suppliers.size());
         assertEquals(supplier.name(), suppliers.get(0).name());
     }
 
     @Test
     public void testTopicGroups() {
-        final TopologyBuilder builder = new TopologyBuilder();
+        final TopologyBuilder builder = new 
TopologyBuilder().setApplicationId("X");
 
+        builder.addInternalTopic("topic-1x");
         builder.addSource("source-1", "topic-1", "topic-1x");
         builder.addSource("source-2", "topic-2");
         builder.addSource("source-3", "topic-3");
@@ -218,10 +219,10 @@ public class TopologyBuilderTest {
 
         builder.addProcessor("processor-3", new MockProcessorSupplier(), 
"source-3", "source-4");
 
-        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups("X");
+        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
 
         Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
-        expectedTopicGroups.put(0, new 
TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", 
"topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet()));
+        expectedTopicGroups.put(0, new 
TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", 
"topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet()));
         expectedTopicGroups.put(1, new 
TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), 
Collections.<String>emptySet(), Collections.<String>emptySet()));
         expectedTopicGroups.put(2, new 
TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), 
Collections.<String>emptySet(), Collections.<String>emptySet()));
 
@@ -230,7 +231,7 @@ public class TopologyBuilderTest {
 
         Collection<Set<String>> copartitionGroups = 
builder.copartitionGroups();
 
-        assertEquals(mkSet(mkSet("topic-1", "topic-1x", "topic-2")), new 
HashSet<>(copartitionGroups));
+        assertEquals(mkSet(mkSet("topic-1", "X-topic-1x", "topic-2")), new 
HashSet<>(copartitionGroups));
     }
 
     @Test
@@ -256,7 +257,7 @@ public class TopologyBuilderTest {
         builder.addStateStore(supplier);
         builder.connectProcessorAndStateStores("processor-5", "store-3");
 
-        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups("X");
+        Map<Integer, TopicsInfo> topicGroups = 
builder.setApplicationId("X").topicGroups();
 
         Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
         expectedTopicGroups.put(0, new 
TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", 
"topic-2"), Collections.<String>emptySet(), 
mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-1"))));
@@ -281,9 +282,9 @@ public class TopologyBuilderTest {
         builder.addProcessor("processor-2", new MockProcessorSupplier(), 
"source-2", "processor-1");
         builder.addProcessor("processor-3", new MockProcessorSupplier(), 
"source-3", "source-4");
 
-        ProcessorTopology topology0 = builder.build("X", 0);
-        ProcessorTopology topology1 = builder.build("X", 1);
-        ProcessorTopology topology2 = builder.build("X", 2);
+        ProcessorTopology topology0 = builder.build(0);
+        ProcessorTopology topology1 = builder.build(1);
+        ProcessorTopology topology2 = builder.build(2);
 
         assertEquals(mkSet("source-1", "source-2", "processor-1", 
"processor-2"), nodeNames(topology0.processors()));
         assertEquals(mkSet("source-3", "source-4", "processor-3"), 
nodeNames(topology1.processors()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 62b283a..382e853 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -96,7 +96,7 @@ public class ProcessorTopologyTest {
         builder.addSink("sink-1", "topic-3", "processor-1");
         builder.addSink("sink-2", "topic-4", "processor-1", "processor-2");
 
-        final ProcessorTopology topology = builder.build("X", null);
+        final ProcessorTopology topology = builder.build(null);
 
         assertEquals(6, topology.processors().size());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 17bda54..f743631 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -269,8 +269,9 @@ public class StreamPartitionAssignorTest {
     @Test
     public void testAssignWithStates() throws Exception {
         StreamsConfig config = new StreamsConfig(configProps());
-
+        String applicationId = "test";
         TopologyBuilder builder = new TopologyBuilder();
+        builder.setApplicationId(applicationId);
 
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
@@ -295,10 +296,10 @@ public class StreamPartitionAssignorTest {
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, new 
MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime());
+        StreamThread thread10 = new StreamThread(builder, config, new 
MockClientSupplier(), applicationId, client1, uuid1, new Metrics(), new 
SystemTime());
 
         StreamPartitionAssignor partitionAssignor = new 
StreamPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, 
"test", client1));
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, 
applicationId, client1));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new 
HashMap<>();
         subscriptions.put("consumer10",
@@ -474,6 +475,7 @@ public class StreamPartitionAssignorTest {
     @Test
     public void testAssignWithInternalTopics() throws Exception {
         StreamsConfig config = new StreamsConfig(configProps());
+        String applicationId = "test";
 
         TopologyBuilder builder = new TopologyBuilder();
         builder.addInternalTopic("topicX");
@@ -489,10 +491,10 @@ public class StreamPartitionAssignorTest {
         String client1 = "client1";
 
         MockClientSupplier clientSupplier = new MockClientSupplier();
-        StreamThread thread10 = new StreamThread(builder, config, 
clientSupplier, "test", client1, uuid1, new Metrics(), new SystemTime());
+        StreamThread thread10 = new 
StreamThread(builder.setApplicationId(applicationId), config, clientSupplier, 
applicationId, client1, uuid1, new Metrics(), new SystemTime());
 
         StreamPartitionAssignor partitionAssignor = new 
StreamPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, 
"test", client1));
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, 
applicationId, client1));
         MockInternalTopicManager internalTopicManager = new 
MockInternalTopicManager(clientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 4ae31e4..b6a6bff 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -160,7 +160,7 @@ public class StreamThreadTest {
         StreamThread thread = new StreamThread(builder, config, new 
MockClientSupplier(), applicationId, clientId, processId, new Metrics(), new 
SystemTime()) {
             @Override
             protected StreamTask createStreamTask(TaskId id, 
Collection<TopicPartition> partitionsForTask) {
-                ProcessorTopology topology = builder.build("X", 
id.topicGroupId);
+                ProcessorTopology topology = builder.build(id.topicGroupId);
                 return new TestStreamTask(id, applicationId, 
partitionsForTask, topology, consumer, producer, restoreConsumer, config);
             }
         };
@@ -284,7 +284,7 @@ public class StreamThreadTest {
 
                 @Override
                 protected StreamTask createStreamTask(TaskId id, 
Collection<TopicPartition> partitionsForTask) {
-                    ProcessorTopology topology = builder.build("X", 
id.topicGroupId);
+                    ProcessorTopology topology = 
builder.build(id.topicGroupId);
                     return new TestStreamTask(id, applicationId, 
partitionsForTask, topology, consumer, producer, restoreConsumer, config);
                 }
             };
@@ -403,7 +403,7 @@ public class StreamThreadTest {
 
                 @Override
                 protected StreamTask createStreamTask(TaskId id, 
Collection<TopicPartition> partitionsForTask) {
-                    ProcessorTopology topology = builder.build("X", 
id.topicGroupId);
+                    ProcessorTopology topology = 
builder.build(id.topicGroupId);
                     return new TestStreamTask(id, applicationId, 
partitionsForTask, topology, consumer, producer, restoreConsumer, config);
                 }
             };

http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 7316804..dfa7f5d 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -57,7 +57,7 @@ public class KStreamTestDriver {
                              File stateDir,
                              Serde<?> keySerde,
                              Serde<?> valSerde) {
-        this.topology = builder.build("X", null);
+        this.topology = 
builder.setApplicationId("KStreamTestDriver").build(null);
         this.stateDir = stateDir;
         this.context = new MockProcessorContext(this, stateDir, keySerde, 
valSerde, new MockRecordCollector());
         this.context.setTime(0L);

http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 4ddbc2a..5188f34 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -147,7 +147,7 @@ public class ProcessorTopologyTestDriver {
      */
     public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder 
builder, String... storeNames) {
         id = new TaskId(0, 0);
-        topology = builder.build("X", null);
+        topology = builder.build(null);
 
         // Set up the consumer and producer ...
         consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);

Reply via email to