Repository: kafka Updated Branches: refs/heads/trunk 002b377da -> 2a58ba9fd
KAFKA-3311; Prepare internal source topics before calling partition grouper Author: Guozhang Wang <[email protected]> Reviewers: Yasuhiro Matsuda <[email protected]>, Jun Rao <[email protected]> Closes #990 from guozhangwang/K3311 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2a58ba9f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2a58ba9f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2a58ba9f Branch: refs/heads/trunk Commit: 2a58ba9fd893979f89aec251579b10f5cda41d10 Parents: 002b377 Author: Guozhang Wang <[email protected]> Authored: Wed Mar 2 13:43:48 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Mar 2 13:43:48 2016 -0800 ---------------------------------------------------------------------- .../java/org/apache/kafka/common/Cluster.java | 36 ++++++ .../streams/processor/TopologyBuilder.java | 29 +++-- .../internals/StreamPartitionAssignor.java | 113 ++++++++++++++++++- .../processor/internals/StreamThread.java | 30 ----- .../streams/processor/TopologyBuilderTest.java | 12 +- 5 files changed, 169 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2a58ba9f/clients/src/main/java/org/apache/kafka/common/Cluster.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index 8883e45..d86e3a4 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -118,6 +118,42 @@ public final class Cluster { } /** + * Update the cluster information for specific topic with new partition information + */ + public Cluster update(String topic, Collection<PartitionInfo> partitions) { + + // re-index the partitions by topic/partition for quick lookup + for (PartitionInfo p : partitions) + this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p); + + // re-index the partitions by topic and node respectively + this.partitionsByTopic.put(topic, Collections.unmodifiableList(new ArrayList<>(partitions))); + + List<PartitionInfo> availablePartitions = new ArrayList<>(); + for (PartitionInfo part : partitions) { + if (part.leader() != null) + availablePartitions.add(part); + } + this.availablePartitionsByTopic.put(topic, Collections.unmodifiableList(availablePartitions)); + + HashMap<Integer, List<PartitionInfo>> partsForNode = new HashMap<>(); + for (Node n : this.nodes) { + partsForNode.put(n.id(), new ArrayList<PartitionInfo>()); + } + for (PartitionInfo p : partitions) { + if (p.leader() != null) { + List<PartitionInfo> psNode = Utils.notNull(partsForNode.get(p.leader().id())); + psNode.add(p); + } + } + + for (Map.Entry<Integer, List<PartitionInfo>> entry : partsForNode.entrySet()) + this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue())); + + return this; + } + + /** * @return The known set of nodes */ public List<Node> nodes() { http://git-wip-us.apache.org/repos/asf/kafka/blob/2a58ba9f/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index be5c728..3ef1b0a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -58,12 +58,11 @@ public class TopologyBuilder { private final Map<String, StateStoreFactory> stateFactories = new HashMap<>(); private final Set<String> sourceTopicNames = new HashSet<>(); - private final Set<String> internalTopicNames = new HashSet<>(); - private final QuickUnion<String> nodeGrouper = new QuickUnion<>(); private final List<Set<String>> copartitionSourceGroups = new ArrayList<>(); - private final HashMap<String, String[]> nodeToTopics = new HashMap<>(); + private final HashMap<String, String[]> nodeToSourceTopics = new HashMap<>(); + private final HashMap<String, String> nodeToSinkTopic = new HashMap<>(); private Map<Integer, Set<String>> nodeGroups = null; private static class StateStoreFactory { @@ -154,11 +153,13 @@ public class TopologyBuilder { } public static class TopicsInfo { + public Set<String> sinkTopics; public Set<String> sourceTopics; public Set<String> interSourceTopics; public Set<String> stateChangelogTopics; - public TopicsInfo(Set<String> sourceTopics, Set<String> interSourceTopics, Set<String> stateChangelogTopics) { + public TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, Set<String> interSourceTopics, Set<String> stateChangelogTopics) { + this.sinkTopics = sinkTopics; this.sourceTopics = sourceTopics; this.interSourceTopics = interSourceTopics; this.stateChangelogTopics = stateChangelogTopics; @@ -228,7 +229,7 @@ public class TopologyBuilder { } nodeFactories.put(name, new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer)); - nodeToTopics.put(name, topics.clone()); + nodeToSourceTopics.put(name, topics.clone()); nodeGrouper.add(name); return this; @@ -345,6 +346,7 @@ public class TopologyBuilder { } nodeFactories.put(name, new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer, partitioner)); + nodeToSinkTopic.put(name, topic); nodeGrouper.add(name); nodeGrouper.unite(name, parentNames); return this; @@ -502,12 +504,13 @@ public class TopologyBuilder { nodeGroups = makeNodeGroups(); for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) { + Set<String> sinkTopics = new HashSet<>(); Set<String> sourceTopics = new HashSet<>(); Set<String> internalSourceTopics = new HashSet<>(); Set<String> stateChangelogTopics = new HashSet<>(); for (String node : entry.getValue()) { // if the node is a source node, add to the source topics - String[] topics = nodeToTopics.get(node); + String[] topics = nodeToSourceTopics.get(node); if (topics != null) { sourceTopics.addAll(Arrays.asList(topics)); @@ -518,6 +521,11 @@ public class TopologyBuilder { } } + // if the node is a sink node, add to the sink topics + String topic = nodeToSinkTopic.get(node); + if (topic != null) + sinkTopics.add(topic); + // if the node is connected to a state, add to the state topics for (StateStoreFactory stateFactory : stateFactories.values()) { @@ -529,6 +537,7 @@ public class TopologyBuilder { } } topicGroups.put(entry.getKey(), new TopicsInfo( + Collections.unmodifiableSet(sinkTopics), Collections.unmodifiableSet(sourceTopics), Collections.unmodifiableSet(internalSourceTopics), Collections.unmodifiableSet(stateChangelogTopics))); @@ -556,7 +565,7 @@ public class TopologyBuilder { int nodeGroupId = 0; // Go through source nodes first. This makes the group id assignment easy to predict in tests - for (String nodeName : Utils.sorted(nodeToTopics.keySet())) { + for (String nodeName : Utils.sorted(nodeToSourceTopics.keySet())) { String root = nodeGrouper.root(nodeName); Set<String> nodeGroup = rootToNodeGroup.get(root); if (nodeGroup == null) { @@ -569,7 +578,7 @@ public class TopologyBuilder { // Go through non-source nodes for (String nodeName : Utils.sorted(nodeFactories.keySet())) { - if (!nodeToTopics.containsKey(nodeName)) { + if (!nodeToSourceTopics.containsKey(nodeName)) { String root = nodeGrouper.root(nodeName); Set<String> nodeGroup = rootToNodeGroup.get(root); if (nodeGroup == null) { @@ -597,7 +606,7 @@ public class TopologyBuilder { /** * Returns the copartition groups. - * A copartition group is a group of topics that are required to be copartitioned. + * A copartition group is a group of source topics that are required to be copartitioned. * * @return groups of topic names */ @@ -606,7 +615,7 @@ public class TopologyBuilder { for (Set<String> nodeNames : copartitionSourceGroups) { Set<String> copartitionGroup = new HashSet<>(); for (String node : nodeNames) { - String[] topics = nodeToTopics.get(node); + String[] topics = nodeToSourceTopics.get(node); if (topics != null) copartitionGroup.addAll(Arrays.asList(topics)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/2a58ba9f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 1cc5287..55cbb0e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -23,18 +23,22 @@ import org.apache.kafka.common.Configurable; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.TaskAssignmentException; +import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; import org.apache.kafka.streams.processor.internals.assignment.ClientState; import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; -import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor; +import org.apache.kafka.streams.StreamsConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -156,7 +160,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable Map<UUID, Set<String>> consumersByClient = new HashMap<>(); Map<UUID, ClientState<TaskId>> states = new HashMap<>(); - // Decode subscription info + // decode subscription info for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) { String consumerId = entry.getKey(); Subscription subscription = entry.getValue(); @@ -182,11 +186,73 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable state.capacity = state.capacity + 1d; } - // get the tasks as partition groups from the partition grouper + // ensure the co-partitioning topics within the group have the same number of partitions, + // and enforce the number of partitions for those internal topics. Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>(); + Map<Integer, Set<String>> internalSourceTopicGroups = new HashMap<>(); for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) { sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics); + internalSourceTopicGroups.put(entry.getKey(), entry.getValue().interSourceTopics); } + Collection<Set<String>> copartitionTopicGroups = streamThread.builder.copartitionGroups(); + + ensureCopartitioning(copartitionTopicGroups, internalSourceTopicGroups, metadata); + + // for those internal source topics that do not have co-partition enforcement, + // set the number of partitions to the maximum of the depending sub-topologies source topics + for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) { + Set<String> internalTopics = entry.getValue().interSourceTopics; + for (String internalTopic : internalTopics) { + Set<TaskId> tasks = internalSourceTopicToTaskIds.get(internalTopic); + + if (tasks == null) { + int numPartitions = -1; + for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> other : topicGroups.entrySet()) { + Set<String> otherSinkTopics = other.getValue().sinkTopics; + + if (otherSinkTopics.contains(internalTopic)) { + for (String topic : other.getValue().sourceTopics) { + List<PartitionInfo> infos = metadata.partitionsForTopic(topic); + + if (infos != null && infos.size() > numPartitions) + numPartitions = infos.size(); + } + } + } + + internalSourceTopicToTaskIds.put(internalTopic, Collections.singleton(new TaskId(entry.getKey(), numPartitions))); + } + } + } + + // if ZK is specified, prepare the internal source topic before calling partition grouper + if (internalTopicManager != null) { + log.debug("Starting to validate internal source topics in partition assignor."); + + for (Map.Entry<String, Set<TaskId>> entry : internalSourceTopicToTaskIds.entrySet()) { + String topic = streamThread.jobId + "-" + entry.getKey(); + + // should have size 1 only + int numPartitions = -1; + for (TaskId task : entry.getValue()) { + numPartitions = task.partition; + } + + internalTopicManager.makeReady(topic, numPartitions); + + // wait until the topic metadata has been propagated to all brokers + List<PartitionInfo> partitions; + do { + partitions = streamThread.restoreConsumer.partitionsFor(topic); + } while (partitions == null || partitions.size() != numPartitions); + + metadata.update(topic, partitions); + } + + log.info("Completed validating internal source topics in partition assignor."); + } + + // get the tasks as partition groups from the partition grouper Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(sourceTopicGroups, metadata); // add tasks to state topic subscribers @@ -274,7 +340,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } } - // if ZK is specified, get the tasks / internal topics for each state topic and validate the topic partitions + // if ZK is specified, validate the internal source topics and the state changelog topics if (internalTopicManager != null) { log.debug("Starting to validate changelog topics in partition assignor."); @@ -339,6 +405,43 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable this.partitionToTaskIds = partitionToTaskIds; } + private void ensureCopartitioning(Collection<Set<String>> copartitionGroups, Map<Integer, Set<String>> internalTopicGroups, Cluster metadata) { + Set<String> internalTopics = new HashSet<>(); + for (Set<String> topics : internalTopicGroups.values()) + internalTopics.addAll(topics); + + for (Set<String> copartitionGroup : copartitionGroups) { + ensureCopartitioning(copartitionGroup, internalTopics, metadata); + } + } + + private void ensureCopartitioning(Set<String> copartitionGroup, Set<String> internalTopics, Cluster metadata) { + int numPartitions = -1; + + for (String topic : copartitionGroup) { + if (!internalTopics.contains(topic)) { + List<PartitionInfo> infos = metadata.partitionsForTopic(topic); + + if (infos == null) + throw new TopologyBuilderException("External source topic not found: " + topic); + + if (numPartitions == -1) { + numPartitions = infos.size(); + } else if (numPartitions != infos.size()) { + String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]); + Arrays.sort(topics); + throw new TopologyBuilderException("Topics not copartitioned: [" + Utils.mkString(Arrays.asList(topics), ",") + "]"); + } + } + } + + // enforce co-partitioning restrictions to internal topics reusing internalSourceTopicToTaskIds + for (String topic : internalTopics) { + if (copartitionGroup.contains(topic)) + internalSourceTopicToTaskIds.put(topic, Collections.singleton(new TaskId(-1, numPartitions))); + } + } + /* For Test Only */ public Set<TaskId> tasksForState(String stateName) { return stateChangelogTopicToTaskIds.get(stateName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX); http://git-wip-us.apache.org/repos/asf/kafka/blob/2a58ba9f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index b8ff135..4ce86ac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -27,7 +27,6 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.MeasurableStat; import org.apache.kafka.common.metrics.Metrics; @@ -44,7 +43,6 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskIdFormatException; -import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.processor.PartitionGrouper; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TopologyBuilder; @@ -56,7 +54,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.nio.channels.FileLock; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -316,8 +313,6 @@ public class StreamThread extends Thread { long lastPoll = 0L; boolean requiresPoll = true; - ensureCopartitioning(builder.copartitionGroups()); - consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener); while (stillRunning()) { @@ -720,31 +715,6 @@ public class StreamThread extends Thread { } } - private void ensureCopartitioning(Collection<Set<String>> copartitionGroups) { - for (Set<String> copartitionGroup : copartitionGroups) { - ensureCopartitioning(copartitionGroup); - } - } - - private void ensureCopartitioning(Set<String> copartitionGroup) { - int numPartitions = -1; - - for (String topic : copartitionGroup) { - List<PartitionInfo> infos = consumer.partitionsFor(topic); - - if (infos == null) - throw new TopologyBuilderException("Topic not found: " + topic); - - if (numPartitions == -1) { - numPartitions = infos.size(); - } else if (numPartitions != infos.size()) { - String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]); - Arrays.sort(topics); - throw new TopologyBuilderException("Topics not copartitioned: [" + Utils.mkString(Arrays.asList(topics), ",") + "]"); - } - } - } - private class StreamsMetricsImpl implements StreamsMetrics { final Metrics metrics; final String metricGrpName; http://git-wip-us.apache.org/repos/asf/kafka/blob/2a58ba9f/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index a93b8ab..0635bd2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -215,9 +215,9 @@ public class TopologyBuilderTest { Map<Integer, TopicsInfo> topicGroups = builder.topicGroups(); Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>(); - expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet())); - expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), Collections.<String>emptySet())); - expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), Collections.<String>emptySet(), Collections.<String>emptySet())); + expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet())); + expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), Collections.<String>emptySet())); + expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String>emptySet(), Collections.<String>emptySet())); assertEquals(3, topicGroups.size()); assertEquals(expectedTopicGroups, topicGroups); @@ -253,9 +253,9 @@ public class TopologyBuilderTest { Map<Integer, TopicsInfo> topicGroups = builder.topicGroups(); Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>(); - expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), mkSet("store-1" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX))); - expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), mkSet("store-2" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX))); - expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), Collections.<String>emptySet(), mkSet("store-3" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX))); + expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), mkSet("store-1" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX))); + expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), mkSet("store-2" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX))); + expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String>emptySet(), mkSet("store-3" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX))); assertEquals(3, topicGroups.size()); assertEquals(expectedTopicGroups, topicGroups);
