This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push: new 9bf815e KAFKA-10102: update ProcessorTopology instead of rebuilding it (#8803) 9bf815e is described below commit 9bf815e86f2edf27501d4dbb06a77dbf2e44ce55 Author: A. Sophie Blee-Goldman <sop...@confluent.io> AuthorDate: Mon Jun 8 15:55:16 2020 -0700 KAFKA-10102: update ProcessorTopology instead of rebuilding it (#8803) Reviewers: Boyang Chen <boy...@confluent.io>, Matthias J. Sax <matth...@confluent.io> --- .../streams/processor/internals/AbstractTask.java | 6 +- .../internals/InternalTopologyBuilder.java | 15 ++--- .../processor/internals/ProcessorTopology.java | 72 +++++++++++++++++++--- .../streams/processor/internals/SourceNode.java | 24 +------- .../streams/processor/internals/StreamTask.java | 16 ++--- .../kafka/streams/processor/internals/Task.java | 3 +- .../streams/processor/internals/TaskManager.java | 2 +- .../processor/internals/GlobalStateTaskTest.java | 2 - .../processor/internals/PartitionGroupTest.java | 4 +- .../processor/internals/ProcessorTopologyTest.java | 25 +++++++- .../internals/RecordDeserializerTest.java | 4 +- .../processor/internals/RecordQueueTest.java | 2 +- .../processor/internals/SourceNodeTest.java | 7 +-- .../processor/internals/StreamTaskTest.java | 21 ++++++- .../processor/internals/TaskManagerTest.java | 6 +- .../java/org/apache/kafka/test/MockSourceNode.java | 6 +- 16 files changed, 139 insertions(+), 76 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index adebaf3..f59571b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.List; +import java.util.Map; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; @@ -118,8 +120,8 @@ public abstract class AbstractTask implements Task { } @Override - public void update(final Set<TopicPartition> topicPartitions, final ProcessorTopology processorTopology) { + public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) { this.inputPartitions = topicPartitions; - this.topology = processorTopology; + topology.updateSourceTopics(nodeToSourceTopics); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 6c3934b..7dc2df1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -270,16 +270,7 @@ public class InternalTopologyBuilder { @Override public ProcessorNode<K, V> build() { - final List<String> sourceTopics = nodeToSourceTopics.get(name); - - // if it is subscribed via patterns, it is possible that the topic metadata has not been updated - // yet and hence the map from source node to topics is stale, in this case we put the pattern as a place holder; - // this should only happen for debugging since during runtime this function should always be called after the metadata has updated. - if (sourceTopics == null) { - return new SourceNode<>(name, Collections.singletonList(String.valueOf(pattern)), timestampExtractor, keyDeserializer, valDeserializer); - } else { - return new SourceNode<>(name, maybeDecorateInternalSourceTopics(sourceTopics), timestampExtractor, keyDeserializer, valDeserializer); - } + return new SourceNode<>(name, timestampExtractor, keyDeserializer, valDeserializer); } private boolean isMatch(final String topic) { @@ -1105,6 +1096,10 @@ public class InternalTopologyBuilder { return Collections.unmodifiableMap(topicGroups); } + public Map<String, List<String>> nodeToSourceTopics() { + return Collections.unmodifiableMap(nodeToSourceTopics); + } + private RepartitionTopicConfig buildRepartitionTopicConfig(final String internalTopic, final Optional<Integer> numberOfPartitions) { return numberOfPartitions diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java index 1497329..51b61b5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.ArrayList; +import java.util.HashMap; import org.apache.kafka.streams.processor.StateStore; import java.util.Collections; @@ -23,11 +25,15 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ProcessorTopology { + private final Logger log = LoggerFactory.getLogger(ProcessorTopology.class); private final List<ProcessorNode<?, ?>> processorNodes; - private final Map<String, SourceNode<?, ?>> sourcesByTopic; + private final Map<String, SourceNode<?, ?>> sourceNodesByName; + private final Map<String, SourceNode<?, ?>> sourceNodesByTopic; private final Map<String, SinkNode<?, ?>> sinksByTopic; private final Set<String> terminalNodes; private final List<StateStore> stateStores; @@ -38,14 +44,14 @@ public class ProcessorTopology { private final Map<String, String> storeToChangelogTopic; public ProcessorTopology(final List<ProcessorNode<?, ?>> processorNodes, - final Map<String, SourceNode<?, ?>> sourcesByTopic, + final Map<String, SourceNode<?, ?>> sourceNodesByTopic, final Map<String, SinkNode<?, ?>> sinksByTopic, final List<StateStore> stateStores, final List<StateStore> globalStateStores, final Map<String, String> storeToChangelogTopic, final Set<String> repartitionTopics) { this.processorNodes = Collections.unmodifiableList(processorNodes); - this.sourcesByTopic = Collections.unmodifiableMap(sourcesByTopic); + this.sourceNodesByTopic = new HashMap<>(sourceNodesByTopic); this.sinksByTopic = Collections.unmodifiableMap(sinksByTopic); this.stateStores = Collections.unmodifiableList(stateStores); this.globalStateStores = Collections.unmodifiableList(globalStateStores); @@ -58,18 +64,23 @@ public class ProcessorTopology { terminalNodes.add(node.name()); } } + + this.sourceNodesByName = new HashMap<>(); + for (final SourceNode<?, ?> source : sourceNodesByTopic.values()) { + sourceNodesByName.put(source.name(), source); + } } public Set<String> sourceTopics() { - return sourcesByTopic.keySet(); + return sourceNodesByTopic.keySet(); } public SourceNode<?, ?> source(final String topic) { - return sourcesByTopic.get(topic); + return sourceNodesByTopic.get(topic); } public Set<SourceNode<?, ?>> sources() { - return new HashSet<>(sourcesByTopic.values()); + return new HashSet<>(sourceNodesByTopic.values()); } public Set<String> sinkTopics() { @@ -131,6 +142,27 @@ public class ProcessorTopology { return false; } + public void updateSourceTopics(final Map<String, List<String>> sourceTopicsByName) { + if (!sourceTopicsByName.keySet().equals(sourceNodesByName.keySet())) { + log.error("Set of source nodes do not match: \n" + + "sourceNodesByName = {}\n" + + "sourceTopicsByName = {}", + sourceNodesByName.keySet(), sourceTopicsByName.keySet()); + throw new IllegalStateException("Tried to update source topics but source nodes did not match"); + } + sourceNodesByTopic.clear(); + for (final Map.Entry<String, List<String>> sourceEntry : sourceTopicsByName.entrySet()) { + final String nodeName = sourceEntry.getKey(); + for (final String topic : sourceEntry.getValue()) { + if (sourceNodesByTopic.containsKey(topic)) { + throw new IllegalStateException("Topic " + topic + " was already registered to source node " + + sourceNodesByTopic.get(topic).name()); + } + sourceNodesByTopic.put(topic, sourceNodesByName.get(nodeName)); + } + } + } + private String childrenToString(final String indent, final List<ProcessorNode<?, ?>> children) { if (children == null || children.isEmpty()) { return ""; @@ -167,12 +199,36 @@ public class ProcessorTopology { * @return A string representation of this instance. */ public String toString(final String indent) { + final Map<SourceNode<?, ?>, List<String>> sourceToTopics = new HashMap<>(); + for (final Map.Entry<String, SourceNode<?, ?>> sourceNodeEntry : sourceNodesByTopic.entrySet()) { + final String topic = sourceNodeEntry.getKey(); + final SourceNode<?, ?> source = sourceNodeEntry.getValue(); + sourceToTopics.computeIfAbsent(source, s -> new ArrayList<>()); + sourceToTopics.get(source).add(topic); + } + final StringBuilder sb = new StringBuilder(indent + "ProcessorTopology:\n"); // start from sources - for (final SourceNode<?, ?> source : sourcesByTopic.values()) { - sb.append(source.toString(indent + "\t")).append(childrenToString(indent + "\t", source.children())); + for (final Map.Entry<SourceNode<?, ?>, List<String>> sourceNodeEntry : sourceToTopics.entrySet()) { + final SourceNode<?, ?> source = sourceNodeEntry.getKey(); + final List<String> topics = sourceNodeEntry.getValue(); + sb.append(source.toString(indent + "\t")) + .append(topicsToString(indent + "\t", topics)) + .append(childrenToString(indent + "\t", source.children())); + } + return sb.toString(); + } + + private static String topicsToString(final String indent, final List<String> topics) { + final StringBuilder sb = new StringBuilder(); + sb.append(indent).append("\ttopics:\t\t["); + for (final String topic : topics) { + sb.append(topic); + sb.append(", "); } + sb.setLength(sb.length() - 2); // remove the last comma + sb.append("]\n"); return sb.toString(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java index 717495e..39b8c0e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java @@ -23,12 +23,8 @@ import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics; -import java.util.List; - public class SourceNode<K, V> extends ProcessorNode<K, V> { - private final List<String> topics; - private InternalProcessorContext context; private Deserializer<K> keyDeserializer; private Deserializer<V> valDeserializer; @@ -36,22 +32,19 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> { private Sensor processAtSourceSensor; public SourceNode(final String name, - final List<String> topics, final TimestampExtractor timestampExtractor, final Deserializer<K> keyDeserializer, final Deserializer<V> valDeserializer) { super(name); - this.topics = topics; this.timestampExtractor = timestampExtractor; this.keyDeserializer = keyDeserializer; this.valDeserializer = valDeserializer; } public SourceNode(final String name, - final List<String> topics, final Deserializer<K> keyDeserializer, final Deserializer<V> valDeserializer) { - this(name, topics, null, keyDeserializer, valDeserializer); + this(name, null, keyDeserializer, valDeserializer); } K deserializeKey(final String topic, final Headers headers, final byte[] data) { @@ -109,21 +102,6 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> { return toString(""); } - /** - * @return a string representation of this node starting with the given indent, useful for debugging. - */ - public String toString(final String indent) { - final StringBuilder sb = new StringBuilder(super.toString(indent)); - sb.append(indent).append("\ttopics:\t\t["); - for (final String topic : topics) { - sb.append(topic); - sb.append(", "); - } - sb.setLength(sb.length() - 2); // remove the last comma - sb.append("]\n"); - return sb.toString(); - } - public TimestampExtractor getTimestampExtractor() { return timestampExtractor; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index b23a1a7..955adab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.List; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -496,12 +497,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, } @Override - public void update(final Set<TopicPartition> topicPartitions, final ProcessorTopology processorTopology) { - super.update(topicPartitions, processorTopology); + public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) { + super.update(topicPartitions, nodeToSourceTopics); partitionGroup.updatePartitions(topicPartitions, recordQueueCreator::createQueue); - if (state() != State.RESTORING) { // if task is RESTORING then topology will be initialized in completeRestoration - initializeTopology(); - } } @Override @@ -512,18 +510,16 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, switch (state()) { case CREATED: - case RUNNING: case RESTORING: + case RUNNING: case SUSPENDED: stateMgr.recycle(); recordCollector.close(); break; - case CLOSED: - throw new IllegalStateException("Illegal state " + state() + " while closing active task " + id); - + throw new IllegalStateException("Illegal state " + state() + " while recycling active task " + id); default: - throw new IllegalStateException("Unknown state " + state() + " while closing active task " + id); + throw new IllegalStateException("Unknown state " + state() + " while recycling active task " + id); } partitionGroup.close(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index 9283e86..1318816 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -167,7 +168,7 @@ public interface Task { /** * Updates input partitions and topology after rebalance */ - void update(final Set<TopicPartition> topicPartitions, final ProcessorTopology processorTopology); + void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics); /** * Attempt a clean close but do not close the underlying state diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 12361d7..c4efacd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -357,7 +357,7 @@ public class TaskManager { for (final TopicPartition topicPartition : topicPartitions) { partitionToTask.put(topicPartition, task); } - task.update(topicPartitions, builder.buildSubtopology(task.id().topicGroupId)); + task.update(topicPartitions, builder.nodeToSourceTopics()); } task.resume(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java index 0895fa6..2319199 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java @@ -56,11 +56,9 @@ public class GlobalStateTaskTest { private final TopicPartition t1 = new TopicPartition(topic1, 1); private final TopicPartition t2 = new TopicPartition(topic2, 1); private final MockSourceNode<String, String> sourceOne = new MockSourceNode<>( - new String[]{topic1}, new StringDeserializer(), new StringDeserializer()); private final MockSourceNode<Integer, Integer> sourceTwo = new MockSourceNode<>( - new String[]{topic2}, new IntegerDeserializer(), new IntegerDeserializer()); private final MockProcessorNode<?, ?> processorOne = new MockProcessorNode<>(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 7e7f1b9..e358ef2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -100,7 +100,7 @@ public class PartitionGroupTest { private RecordQueue createQueue1() { return new RecordQueue( partition1, - new MockSourceNode<>(topics, intDeserializer, intDeserializer), + new MockSourceNode<>(intDeserializer, intDeserializer), timestampExtractor, new LogAndContinueExceptionHandler(), new InternalMockProcessorContext(), @@ -111,7 +111,7 @@ public class PartitionGroupTest { private RecordQueue createQueue2() { return new RecordQueue( partition2, - new MockSourceNode<>(topics, intDeserializer, intDeserializer), + new MockSourceNode<>(intDeserializer, intDeserializer), timestampExtractor, new LogAndContinueExceptionHandler(), new InternalMockProcessorContext(), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index baa59f3..9f852cc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -58,6 +58,7 @@ import java.util.Properties; import java.util.Set; import java.util.function.Supplier; +import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; @@ -88,7 +89,6 @@ public class ProcessorTopologyTest { private TopologyTestDriver driver; private final Properties props = new Properties(); - @Before public void setup() { // Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ... @@ -150,6 +150,29 @@ public class ProcessorTopologyTest { } @Test + public void shouldUpdateSourceTopicsWithNewMatchingTopic() { + topology.addSource("source-1", "topic-1"); + final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology(); + + assertNull(processorTopology.source("topic-2")); + processorTopology.updateSourceTopics(Collections.singletonMap("source-1", asList("topic-1", "topic-2"))); + + assertThat(processorTopology.source("topic-2").name(), equalTo("source-1")); + } + + @Test + public void shouldUpdateSourceTopicsWithRemovedTopic() { + topology.addSource("source-1", "topic-1", "topic-2"); + final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology(); + + assertThat(processorTopology.source("topic-2").name(), equalTo("source-1")); + + processorTopology.updateSourceTopics(Collections.singletonMap("source-1", Collections.singletonList("topic-1"))); + + assertNull(processorTopology.source("topic-2")); + } + + @Test public void testDrivingSimpleTopology() { final int partition = 10; driver = new TopologyTestDriver(createSimpleTopology(partition), props); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java index f4b1c7f..7299067 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java @@ -26,8 +26,6 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.LogContext; import org.junit.Test; -import java.util.Collections; - import static org.junit.Assert.assertEquals; public class RecordDeserializerTest { @@ -80,7 +78,7 @@ public class RecordDeserializerTest { final boolean valueThrowsException, final Object key, final Object value) { - super("", Collections.emptyList(), null, null); + super("", null, null); this.keyThrowsException = keyThrowsException; this.valueThrowsException = valueThrowsException; this.key = key; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index 01c1520..6929bb8e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -65,7 +65,7 @@ public class RecordQueueTest { new MockRecordCollector() ); private final MockSourceNode<Integer, Integer> mockSourceNodeWithMetrics - = new MockSourceNode<>(new String[] {"topic"}, intDeserializer, intDeserializer); + = new MockSourceNode<>(intDeserializer, intDeserializer); private final RecordQueue queue = new RecordQueue( new TopicPartition("topic", 1), mockSourceNodeWithMetrics, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java index 32ba4fb..028c4fe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java @@ -30,7 +30,6 @@ import org.apache.kafka.test.StreamsTestUtils; import org.junit.Test; import java.nio.charset.StandardCharsets; -import java.util.Collections; import java.util.Map; import java.util.stream.Collectors; @@ -44,7 +43,7 @@ import static org.junit.Assert.assertTrue; public class SourceNodeTest { @Test public void shouldProvideTopicHeadersAndDataToKeyDeserializer() { - final SourceNode<String, String> sourceNode = new MockSourceNode<>(new String[]{""}, new TheDeserializer(), new TheDeserializer()); + final SourceNode<String, String> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer()); final RecordHeaders headers = new RecordHeaders(); final String deserializeKey = sourceNode.deserializeKey("topic", headers, "data".getBytes(StandardCharsets.UTF_8)); assertThat(deserializeKey, is("topic" + headers + "data")); @@ -52,7 +51,7 @@ public class SourceNodeTest { @Test public void shouldProvideTopicHeadersAndDataToValueDeserializer() { - final SourceNode<String, String> sourceNode = new MockSourceNode<>(new String[]{""}, new TheDeserializer(), new TheDeserializer()); + final SourceNode<String, String> sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer()); final RecordHeaders headers = new RecordHeaders(); final String deserializedValue = sourceNode.deserializeValue("topic", headers, "data".getBytes(StandardCharsets.UTF_8)); assertThat(deserializedValue, is("topic" + headers + "data")); @@ -85,7 +84,7 @@ public class SourceNodeTest { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", builtInMetricsVersion); final InternalMockProcessorContext context = new InternalMockProcessorContext(streamsMetrics); final SourceNode<String, String> node = - new SourceNode<>(context.currentNode().name(), Collections.singletonList("topic"), new TheDeserializer(), new TheDeserializer()); + new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer()); node.init(context); final String threadId = Thread.currentThread().getName(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 27135f8..d77cbcb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.HashSet; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; @@ -116,9 +117,9 @@ public class StreamTaskTest { private final Serializer<Integer> intSerializer = Serdes.Integer().serializer(); private final Deserializer<Integer> intDeserializer = Serdes.Integer().deserializer(); - private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(new String[] {topic1}, intDeserializer, intDeserializer); - private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(new String[] {topic2}, intDeserializer, intDeserializer); - private final MockSourceNode<Integer, Integer> source3 = new MockSourceNode<Integer, Integer>(new String[] {topic2}, intDeserializer, intDeserializer) { + private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer); + private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer); + private final MockSourceNode<Integer, Integer> source3 = new MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) { @Override public void process(final Integer key, final Integer value) { throw new RuntimeException("KABOOM!"); @@ -1806,6 +1807,20 @@ public class StreamTaskTest { EasyMock.replay(stateManager); } + @Test + public void shouldUpdatePartitions() { + task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); + final Set<TopicPartition> newPartitions = new HashSet<>(task.inputPartitions()); + newPartitions.add(new TopicPartition("newTopic", 0)); + + task.update(newPartitions, mkMap( + mkEntry(source1.name(), asList(topic1, "newTopic")), + mkEntry(source2.name(), singletonList(topic2))) + ); + + assertThat(task.inputPartitions(), equalTo(newPartitions)); + } + private StreamTask createOptimizedStatefulTask(final StreamsConfig config, final Consumer<byte[], byte[]> consumer) { final StateStore stateStore = new MockKeyValueStore(storeName, true); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 200d841..0354e4a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -2623,7 +2623,6 @@ public class TaskManagerTest { private Map<TopicPartition, OffsetAndMetadata> committableOffsets = Collections.emptyMap(); private Map<TopicPartition, Long> purgeableOffsets; private Map<TopicPartition, Long> changelogOffsets = Collections.emptyMap(); - private InternalProcessorContext processorContext = mock(InternalProcessorContext.class); private final Map<TopicPartition, LinkedList<ConsumerRecord<byte[], byte[]>>> queue = new HashMap<>(); @@ -2723,6 +2722,11 @@ public class TaskManagerTest { transitionTo(State.CLOSED); } + @Override + public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) { + inputPartitions = topicPartitions; + } + void setCommittableOffsetsAndMetadata(final Map<TopicPartition, OffsetAndMetadata> committableOffsets) { if (!active) { throw new IllegalStateException("Cannot set CommittableOffsetsAndMetadate for StandbyTasks"); diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java index 89b00c4..f582202 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java +++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java @@ -16,13 +16,11 @@ */ package org.apache.kafka.test; - import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.SourceNode; import java.util.ArrayList; -import java.util.Arrays; import java.util.concurrent.atomic.AtomicInteger; public class MockSourceNode<K, V> extends SourceNode<K, V> { @@ -36,8 +34,8 @@ public class MockSourceNode<K, V> extends SourceNode<K, V> { public boolean initialized; public boolean closed; - public MockSourceNode(final String[] topics, final Deserializer<K> keyDeserializer, final Deserializer<V> valDeserializer) { - super(NAME + INDEX.getAndIncrement(), Arrays.asList(topics), keyDeserializer, valDeserializer); + public MockSourceNode(final Deserializer<K> keyDeserializer, final Deserializer<V> valDeserializer) { + super(NAME + INDEX.getAndIncrement(), keyDeserializer, valDeserializer); } @Override