This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 9bf815e  KAFKA-10102: update ProcessorTopology instead of rebuilding 
it (#8803)
9bf815e is described below

commit 9bf815e86f2edf27501d4dbb06a77dbf2e44ce55
Author: A. Sophie Blee-Goldman <sop...@confluent.io>
AuthorDate: Mon Jun 8 15:55:16 2020 -0700

    KAFKA-10102: update ProcessorTopology instead of rebuilding it (#8803)
    
    Reviewers: Boyang Chen  <boy...@confluent.io>, Matthias J. Sax 
<matth...@confluent.io>
---
 .../streams/processor/internals/AbstractTask.java  |  6 +-
 .../internals/InternalTopologyBuilder.java         | 15 ++---
 .../processor/internals/ProcessorTopology.java     | 72 +++++++++++++++++++---
 .../streams/processor/internals/SourceNode.java    | 24 +-------
 .../streams/processor/internals/StreamTask.java    | 16 ++---
 .../kafka/streams/processor/internals/Task.java    |  3 +-
 .../streams/processor/internals/TaskManager.java   |  2 +-
 .../processor/internals/GlobalStateTaskTest.java   |  2 -
 .../processor/internals/PartitionGroupTest.java    |  4 +-
 .../processor/internals/ProcessorTopologyTest.java | 25 +++++++-
 .../internals/RecordDeserializerTest.java          |  4 +-
 .../processor/internals/RecordQueueTest.java       |  2 +-
 .../processor/internals/SourceNodeTest.java        |  7 +--
 .../processor/internals/StreamTaskTest.java        | 21 ++++++-
 .../processor/internals/TaskManagerTest.java       |  6 +-
 .../java/org/apache/kafka/test/MockSourceNode.java |  6 +-
 16 files changed, 139 insertions(+), 76 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index adebaf3..f59571b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.util.List;
+import java.util.Map;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
@@ -118,8 +120,8 @@ public abstract class AbstractTask implements Task {
     }
 
     @Override
-    public void update(final Set<TopicPartition> topicPartitions, final 
ProcessorTopology processorTopology) {
+    public void update(final Set<TopicPartition> topicPartitions, final 
Map<String, List<String>> nodeToSourceTopics) {
         this.inputPartitions = topicPartitions;
-        this.topology = processorTopology;
+        topology.updateSourceTopics(nodeToSourceTopics);
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 6c3934b..7dc2df1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -270,16 +270,7 @@ public class InternalTopologyBuilder {
 
         @Override
         public ProcessorNode<K, V> build() {
-            final List<String> sourceTopics = nodeToSourceTopics.get(name);
-
-            // if it is subscribed via patterns, it is possible that the topic 
metadata has not been updated
-            // yet and hence the map from source node to topics is stale, in 
this case we put the pattern as a place holder;
-            // this should only happen for debugging since during runtime this 
function should always be called after the metadata has updated.
-            if (sourceTopics == null) {
-                return new SourceNode<>(name, 
Collections.singletonList(String.valueOf(pattern)), timestampExtractor, 
keyDeserializer, valDeserializer);
-            } else {
-                return new SourceNode<>(name, 
maybeDecorateInternalSourceTopics(sourceTopics), timestampExtractor, 
keyDeserializer, valDeserializer);
-            }
+            return new SourceNode<>(name, timestampExtractor, keyDeserializer, 
valDeserializer);
         }
 
         private boolean isMatch(final String topic) {
@@ -1105,6 +1096,10 @@ public class InternalTopologyBuilder {
         return Collections.unmodifiableMap(topicGroups);
     }
 
+    public Map<String, List<String>> nodeToSourceTopics() {
+        return Collections.unmodifiableMap(nodeToSourceTopics);
+    }
+
     private RepartitionTopicConfig buildRepartitionTopicConfig(final String 
internalTopic,
                                                                final 
Optional<Integer> numberOfPartitions) {
         return numberOfPartitions
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 1497329..51b61b5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import org.apache.kafka.streams.processor.StateStore;
 
 import java.util.Collections;
@@ -23,11 +25,15 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ProcessorTopology {
+    private final Logger log = 
LoggerFactory.getLogger(ProcessorTopology.class);
 
     private final List<ProcessorNode<?, ?>> processorNodes;
-    private final Map<String, SourceNode<?, ?>> sourcesByTopic;
+    private final Map<String, SourceNode<?, ?>> sourceNodesByName;
+    private final Map<String, SourceNode<?, ?>> sourceNodesByTopic;
     private final Map<String, SinkNode<?, ?>> sinksByTopic;
     private final Set<String> terminalNodes;
     private final List<StateStore> stateStores;
@@ -38,14 +44,14 @@ public class ProcessorTopology {
     private final Map<String, String> storeToChangelogTopic;
 
     public ProcessorTopology(final List<ProcessorNode<?, ?>> processorNodes,
-                             final Map<String, SourceNode<?, ?>> 
sourcesByTopic,
+                             final Map<String, SourceNode<?, ?>> 
sourceNodesByTopic,
                              final Map<String, SinkNode<?, ?>> sinksByTopic,
                              final List<StateStore> stateStores,
                              final List<StateStore> globalStateStores,
                              final Map<String, String> storeToChangelogTopic,
                              final Set<String> repartitionTopics) {
         this.processorNodes = Collections.unmodifiableList(processorNodes);
-        this.sourcesByTopic = Collections.unmodifiableMap(sourcesByTopic);
+        this.sourceNodesByTopic = new HashMap<>(sourceNodesByTopic);
         this.sinksByTopic = Collections.unmodifiableMap(sinksByTopic);
         this.stateStores = Collections.unmodifiableList(stateStores);
         this.globalStateStores = 
Collections.unmodifiableList(globalStateStores);
@@ -58,18 +64,23 @@ public class ProcessorTopology {
                 terminalNodes.add(node.name());
             }
         }
+
+        this.sourceNodesByName = new HashMap<>();
+        for (final SourceNode<?, ?> source : sourceNodesByTopic.values()) {
+            sourceNodesByName.put(source.name(), source);
+        }
     }
 
     public Set<String> sourceTopics() {
-        return sourcesByTopic.keySet();
+        return sourceNodesByTopic.keySet();
     }
 
     public SourceNode<?, ?> source(final String topic) {
-        return sourcesByTopic.get(topic);
+        return sourceNodesByTopic.get(topic);
     }
 
     public Set<SourceNode<?, ?>> sources() {
-        return new HashSet<>(sourcesByTopic.values());
+        return new HashSet<>(sourceNodesByTopic.values());
     }
 
     public Set<String> sinkTopics() {
@@ -131,6 +142,27 @@ public class ProcessorTopology {
         return false;
     }
 
+    public void updateSourceTopics(final Map<String, List<String>> 
sourceTopicsByName) {
+        if (!sourceTopicsByName.keySet().equals(sourceNodesByName.keySet())) {
+            log.error("Set of source nodes do not match: \n" +
+                "sourceNodesByName = {}\n" +
+                "sourceTopicsByName = {}",
+                sourceNodesByName.keySet(), sourceTopicsByName.keySet());
+            throw new IllegalStateException("Tried to update source topics but 
source nodes did not match");
+        }
+        sourceNodesByTopic.clear();
+        for (final Map.Entry<String, List<String>> sourceEntry : 
sourceTopicsByName.entrySet()) {
+            final String nodeName = sourceEntry.getKey();
+            for (final String topic : sourceEntry.getValue()) {
+                if (sourceNodesByTopic.containsKey(topic)) {
+                    throw new IllegalStateException("Topic " + topic + " was 
already registered to source node "
+                        + sourceNodesByTopic.get(topic).name());
+                }
+                sourceNodesByTopic.put(topic, sourceNodesByName.get(nodeName));
+            }
+        }
+    }
+
     private String childrenToString(final String indent, final 
List<ProcessorNode<?, ?>> children) {
         if (children == null || children.isEmpty()) {
             return "";
@@ -167,12 +199,36 @@ public class ProcessorTopology {
      * @return A string representation of this instance.
      */
     public String toString(final String indent) {
+        final Map<SourceNode<?, ?>, List<String>> sourceToTopics = new 
HashMap<>();
+        for (final Map.Entry<String, SourceNode<?, ?>> sourceNodeEntry : 
sourceNodesByTopic.entrySet()) {
+            final String topic = sourceNodeEntry.getKey();
+            final SourceNode<?, ?> source = sourceNodeEntry.getValue();
+            sourceToTopics.computeIfAbsent(source, s -> new ArrayList<>());
+            sourceToTopics.get(source).add(topic);
+        }
+
         final StringBuilder sb = new StringBuilder(indent + 
"ProcessorTopology:\n");
 
         // start from sources
-        for (final SourceNode<?, ?> source : sourcesByTopic.values()) {
-            sb.append(source.toString(indent + 
"\t")).append(childrenToString(indent + "\t", source.children()));
+        for (final Map.Entry<SourceNode<?, ?>, List<String>> sourceNodeEntry : 
sourceToTopics.entrySet()) {
+            final SourceNode<?, ?> source = sourceNodeEntry.getKey();
+            final List<String> topics = sourceNodeEntry.getValue();
+            sb.append(source.toString(indent + "\t"))
+                .append(topicsToString(indent + "\t", topics))
+                .append(childrenToString(indent + "\t", source.children()));
+        }
+        return sb.toString();
+    }
+
+    private static String topicsToString(final String indent, final 
List<String> topics) {
+        final StringBuilder sb = new StringBuilder();
+        sb.append(indent).append("\ttopics:\t\t[");
+        for (final String topic : topics) {
+            sb.append(topic);
+            sb.append(", ");
         }
+        sb.setLength(sb.length() - 2);  // remove the last comma
+        sb.append("]\n");
         return sb.toString();
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 717495e..39b8c0e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -23,12 +23,8 @@ import 
org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
 
-import java.util.List;
-
 public class SourceNode<K, V> extends ProcessorNode<K, V> {
 
-    private final List<String> topics;
-
     private InternalProcessorContext context;
     private Deserializer<K> keyDeserializer;
     private Deserializer<V> valDeserializer;
@@ -36,22 +32,19 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
     private Sensor processAtSourceSensor;
 
     public SourceNode(final String name,
-                      final List<String> topics,
                       final TimestampExtractor timestampExtractor,
                       final Deserializer<K> keyDeserializer,
                       final Deserializer<V> valDeserializer) {
         super(name);
-        this.topics = topics;
         this.timestampExtractor = timestampExtractor;
         this.keyDeserializer = keyDeserializer;
         this.valDeserializer = valDeserializer;
     }
 
     public SourceNode(final String name,
-                      final List<String> topics,
                       final Deserializer<K> keyDeserializer,
                       final Deserializer<V> valDeserializer) {
-        this(name, topics, null, keyDeserializer, valDeserializer);
+        this(name, null, keyDeserializer, valDeserializer);
     }
 
     K deserializeKey(final String topic, final Headers headers, final byte[] 
data) {
@@ -109,21 +102,6 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
         return toString("");
     }
 
-    /**
-     * @return a string representation of this node starting with the given 
indent, useful for debugging.
-     */
-    public String toString(final String indent) {
-        final StringBuilder sb = new StringBuilder(super.toString(indent));
-        sb.append(indent).append("\ttopics:\t\t[");
-        for (final String topic : topics) {
-            sb.append(topic);
-            sb.append(", ");
-        }
-        sb.setLength(sb.length() - 2);  // remove the last comma
-        sb.append("]\n");
-        return sb.toString();
-    }
-
     public TimestampExtractor getTimestampExtractor() {
         return timestampExtractor;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index b23a1a7..955adab 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.util.List;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -496,12 +497,9 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
     }
 
     @Override
-    public void update(final Set<TopicPartition> topicPartitions, final 
ProcessorTopology processorTopology) {
-        super.update(topicPartitions, processorTopology);
+    public void update(final Set<TopicPartition> topicPartitions, final 
Map<String, List<String>> nodeToSourceTopics) {
+        super.update(topicPartitions, nodeToSourceTopics);
         partitionGroup.updatePartitions(topicPartitions, 
recordQueueCreator::createQueue);
-        if (state() != State.RESTORING) { // if task is RESTORING then 
topology will be initialized in completeRestoration
-            initializeTopology();
-        }
     }
 
     @Override
@@ -512,18 +510,16 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
 
         switch (state()) {
             case CREATED:
-            case RUNNING:
             case RESTORING:
+            case RUNNING:
             case SUSPENDED:
                 stateMgr.recycle();
                 recordCollector.close();
                 break;
-
             case CLOSED:
-                throw new IllegalStateException("Illegal state " + state() + " 
while closing active task " + id);
-
+                throw new IllegalStateException("Illegal state " + state() + " 
while recycling active task " + id);
             default:
-                throw new IllegalStateException("Unknown state " + state() + " 
while closing active task " + id);
+                throw new IllegalStateException("Unknown state " + state() + " 
while recycling active task " + id);
         }
 
         partitionGroup.close();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 9283e86..1318816 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.util.List;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
@@ -167,7 +168,7 @@ public interface Task {
     /**
      * Updates input partitions and topology after rebalance
      */
-    void update(final Set<TopicPartition> topicPartitions, final 
ProcessorTopology processorTopology);
+    void update(final Set<TopicPartition> topicPartitions, final Map<String, 
List<String>> nodeToSourceTopics);
 
     /**
      * Attempt a clean close but do not close the underlying state
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 12361d7..c4efacd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -357,7 +357,7 @@ public class TaskManager {
             for (final TopicPartition topicPartition : topicPartitions) {
                 partitionToTask.put(topicPartition, task);
             }
-            task.update(topicPartitions, 
builder.buildSubtopology(task.id().topicGroupId));
+            task.update(topicPartitions, builder.nodeToSourceTopics());
         }
         task.resume();
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 0895fa6..2319199 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -56,11 +56,9 @@ public class GlobalStateTaskTest {
     private final TopicPartition t1 = new TopicPartition(topic1, 1);
     private final TopicPartition t2 = new TopicPartition(topic2, 1);
     private final MockSourceNode<String, String> sourceOne = new 
MockSourceNode<>(
-        new String[]{topic1},
         new StringDeserializer(),
         new StringDeserializer());
     private final MockSourceNode<Integer, Integer>  sourceTwo = new 
MockSourceNode<>(
-        new String[]{topic2},
         new IntegerDeserializer(),
         new IntegerDeserializer());
     private final MockProcessorNode<?, ?> processorOne = new 
MockProcessorNode<>();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 7e7f1b9..e358ef2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -100,7 +100,7 @@ public class PartitionGroupTest {
     private RecordQueue createQueue1() {
         return new RecordQueue(
                 partition1,
-                new MockSourceNode<>(topics, intDeserializer, intDeserializer),
+                new MockSourceNode<>(intDeserializer, intDeserializer),
                 timestampExtractor,
                 new LogAndContinueExceptionHandler(),
                 new InternalMockProcessorContext(),
@@ -111,7 +111,7 @@ public class PartitionGroupTest {
     private RecordQueue createQueue2() {
         return new RecordQueue(
                 partition2,
-                new MockSourceNode<>(topics, intDeserializer, intDeserializer),
+                new MockSourceNode<>(intDeserializer, intDeserializer),
                 timestampExtractor,
                 new LogAndContinueExceptionHandler(),
                 new InternalMockProcessorContext(),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index baa59f3..9f852cc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -58,6 +58,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.function.Supplier;
 
+import static java.util.Arrays.asList;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -88,7 +89,6 @@ public class ProcessorTopologyTest {
     private TopologyTestDriver driver;
     private final Properties props = new Properties();
 
-
     @Before
     public void setup() {
         // Create a new directory in which we'll put all of the state for this 
test, enabling running tests in parallel ...
@@ -150,6 +150,29 @@ public class ProcessorTopologyTest {
     }
 
     @Test
+    public void shouldUpdateSourceTopicsWithNewMatchingTopic() {
+        topology.addSource("source-1", "topic-1");
+        final ProcessorTopology processorTopology = 
topology.getInternalBuilder("X").buildTopology();
+
+        assertNull(processorTopology.source("topic-2"));
+        
processorTopology.updateSourceTopics(Collections.singletonMap("source-1", 
asList("topic-1", "topic-2")));
+
+        assertThat(processorTopology.source("topic-2").name(), 
equalTo("source-1"));
+    }
+
+    @Test
+    public void shouldUpdateSourceTopicsWithRemovedTopic() {
+        topology.addSource("source-1", "topic-1", "topic-2");
+        final ProcessorTopology processorTopology = 
topology.getInternalBuilder("X").buildTopology();
+
+        assertThat(processorTopology.source("topic-2").name(), 
equalTo("source-1"));
+
+        
processorTopology.updateSourceTopics(Collections.singletonMap("source-1", 
Collections.singletonList("topic-1")));
+
+        assertNull(processorTopology.source("topic-2"));
+    }
+
+    @Test
     public void testDrivingSimpleTopology() {
         final int partition = 10;
         driver = new TopologyTestDriver(createSimpleTopology(partition), 
props);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
index f4b1c7f..7299067 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -26,8 +26,6 @@ import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.LogContext;
 import org.junit.Test;
 
-import java.util.Collections;
-
 import static org.junit.Assert.assertEquals;
 
 public class RecordDeserializerTest {
@@ -80,7 +78,7 @@ public class RecordDeserializerTest {
                       final boolean valueThrowsException,
                       final Object key,
                       final Object value) {
-            super("", Collections.emptyList(), null, null);
+            super("", null, null);
             this.keyThrowsException = keyThrowsException;
             this.valueThrowsException = valueThrowsException;
             this.key = key;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 01c1520..6929bb8e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -65,7 +65,7 @@ public class RecordQueueTest {
         new MockRecordCollector()
     );
     private final MockSourceNode<Integer, Integer> mockSourceNodeWithMetrics
-        = new MockSourceNode<>(new String[] {"topic"}, intDeserializer, 
intDeserializer);
+        = new MockSourceNode<>(intDeserializer, intDeserializer);
     private final RecordQueue queue = new RecordQueue(
         new TopicPartition("topic", 1),
         mockSourceNodeWithMetrics,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
index 32ba4fb..028c4fe 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
@@ -30,7 +30,6 @@ import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.nio.charset.StandardCharsets;
-import java.util.Collections;
 import java.util.Map;
 import java.util.stream.Collectors;
 
@@ -44,7 +43,7 @@ import static org.junit.Assert.assertTrue;
 public class SourceNodeTest {
     @Test
     public void shouldProvideTopicHeadersAndDataToKeyDeserializer() {
-        final SourceNode<String, String> sourceNode = new MockSourceNode<>(new 
String[]{""}, new TheDeserializer(), new TheDeserializer());
+        final SourceNode<String, String> sourceNode = new MockSourceNode<>(new 
TheDeserializer(), new TheDeserializer());
         final RecordHeaders headers = new RecordHeaders();
         final String deserializeKey = sourceNode.deserializeKey("topic", 
headers, "data".getBytes(StandardCharsets.UTF_8));
         assertThat(deserializeKey, is("topic" + headers + "data"));
@@ -52,7 +51,7 @@ public class SourceNodeTest {
 
     @Test
     public void shouldProvideTopicHeadersAndDataToValueDeserializer() {
-        final SourceNode<String, String> sourceNode = new MockSourceNode<>(new 
String[]{""}, new TheDeserializer(), new TheDeserializer());
+        final SourceNode<String, String> sourceNode = new MockSourceNode<>(new 
TheDeserializer(), new TheDeserializer());
         final RecordHeaders headers = new RecordHeaders();
         final String deserializedValue = sourceNode.deserializeValue("topic", 
headers, "data".getBytes(StandardCharsets.UTF_8));
         assertThat(deserializedValue, is("topic" + headers + "data"));
@@ -85,7 +84,7 @@ public class SourceNodeTest {
         final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, "test-client", builtInMetricsVersion);
         final InternalMockProcessorContext context = new 
InternalMockProcessorContext(streamsMetrics);
         final SourceNode<String, String> node =
-            new SourceNode<>(context.currentNode().name(), 
Collections.singletonList("topic"), new TheDeserializer(), new 
TheDeserializer());
+            new SourceNode<>(context.currentNode().name(), new 
TheDeserializer(), new TheDeserializer());
         node.init(context);
 
         final String threadId = Thread.currentThread().getName();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 27135f8..d77cbcb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.util.HashSet;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
@@ -116,9 +117,9 @@ public class StreamTaskTest {
     private final Serializer<Integer> intSerializer = 
Serdes.Integer().serializer();
     private final Deserializer<Integer> intDeserializer = 
Serdes.Integer().deserializer();
 
-    private final MockSourceNode<Integer, Integer> source1 = new 
MockSourceNode<>(new String[] {topic1}, intDeserializer, intDeserializer);
-    private final MockSourceNode<Integer, Integer> source2 = new 
MockSourceNode<>(new String[] {topic2}, intDeserializer, intDeserializer);
-    private final MockSourceNode<Integer, Integer> source3 = new 
MockSourceNode<Integer, Integer>(new String[] {topic2}, intDeserializer, 
intDeserializer) {
+    private final MockSourceNode<Integer, Integer> source1 = new 
MockSourceNode<>(intDeserializer, intDeserializer);
+    private final MockSourceNode<Integer, Integer> source2 = new 
MockSourceNode<>(intDeserializer, intDeserializer);
+    private final MockSourceNode<Integer, Integer> source3 = new 
MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) {
         @Override
         public void process(final Integer key, final Integer value) {
             throw new RuntimeException("KABOOM!");
@@ -1806,6 +1807,20 @@ public class StreamTaskTest {
         EasyMock.replay(stateManager);
     }
 
+    @Test
+    public void shouldUpdatePartitions() {
+        task = createStatelessTask(createConfig(false, "0"), 
StreamsConfig.METRICS_LATEST);
+        final Set<TopicPartition> newPartitions = new 
HashSet<>(task.inputPartitions());
+        newPartitions.add(new TopicPartition("newTopic", 0));
+
+        task.update(newPartitions, mkMap(
+            mkEntry(source1.name(), asList(topic1, "newTopic")),
+            mkEntry(source2.name(), singletonList(topic2)))
+        );
+
+        assertThat(task.inputPartitions(), equalTo(newPartitions));
+    }
+
     private StreamTask createOptimizedStatefulTask(final StreamsConfig config, 
final Consumer<byte[], byte[]> consumer) {
         final StateStore stateStore = new MockKeyValueStore(storeName, true);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 200d841..0354e4a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -2623,7 +2623,6 @@ public class TaskManagerTest {
         private Map<TopicPartition, OffsetAndMetadata> committableOffsets = 
Collections.emptyMap();
         private Map<TopicPartition, Long> purgeableOffsets;
         private Map<TopicPartition, Long> changelogOffsets = 
Collections.emptyMap();
-        private InternalProcessorContext processorContext = 
mock(InternalProcessorContext.class);
 
         private final Map<TopicPartition, LinkedList<ConsumerRecord<byte[], 
byte[]>>> queue = new HashMap<>();
 
@@ -2723,6 +2722,11 @@ public class TaskManagerTest {
             transitionTo(State.CLOSED);
         }
 
+        @Override
+        public void update(final Set<TopicPartition> topicPartitions, final 
Map<String, List<String>> nodeToSourceTopics) {
+            inputPartitions = topicPartitions;
+        }
+
         void setCommittableOffsetsAndMetadata(final Map<TopicPartition, 
OffsetAndMetadata> committableOffsets) {
             if (!active) {
                 throw new IllegalStateException("Cannot set 
CommittableOffsetsAndMetadate for StandbyTasks");
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java 
b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
index 89b00c4..f582202 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
@@ -16,13 +16,11 @@
  */
 package org.apache.kafka.test;
 
-
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class MockSourceNode<K, V> extends SourceNode<K, V> {
@@ -36,8 +34,8 @@ public class MockSourceNode<K, V> extends SourceNode<K, V> {
     public boolean initialized;
     public boolean closed;
 
-    public MockSourceNode(final String[] topics, final Deserializer<K> 
keyDeserializer, final Deserializer<V> valDeserializer) {
-        super(NAME + INDEX.getAndIncrement(), Arrays.asList(topics), 
keyDeserializer, valDeserializer);
+    public MockSourceNode(final Deserializer<K> keyDeserializer, final 
Deserializer<V> valDeserializer) {
+        super(NAME + INDEX.getAndIncrement(), keyDeserializer, 
valDeserializer);
     }
 
     @Override

Reply via email to