Repository: kafka Updated Branches: refs/heads/trunk 4f22705c7 -> a3d3d5379
MINOR: add internal source topic for tracking Author: Guozhang Wang <[email protected]> Reviewers: Yasuhiro Mastuda Closes #775 from guozhangwang/KRepartTopic Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a3d3d537 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a3d3d537 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a3d3d537 Branch: refs/heads/trunk Commit: a3d3d5379df71e7a2c653d06ebf1b30923dde738 Parents: 4f22705 Author: Guozhang Wang <[email protected]> Authored: Thu Jan 14 17:09:33 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Thu Jan 14 17:09:33 2016 -0800 ---------------------------------------------------------------------- .../streams/kstream/internals/KTableImpl.java | 5 ++- .../streams/processor/TopologyBuilder.java | 44 ++++++++++++++++---- .../KafkaStreamingPartitionAssignor.java | 35 +++++++++++----- .../streams/processor/TopologyBuilderTest.java | 13 +++--- 4 files changed, 71 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a3d3d537/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 7f30f59..9888dff 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -46,6 +46,8 @@ import java.util.Set; */ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> { + private static final String REPARTITION_TOPIC_SUFFIX = "-repartition"; + private static final String FILTER_NAME = "KTABLE-FILTER-"; private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-"; @@ -258,7 +260,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, String sourceName = topology.newName(KStreamImpl.SOURCE_NAME); String aggregateName = topology.newName(AGGREGATE_NAME); - String topic = name + "-repartition"; + String topic = name + REPARTITION_TOPIC_SUFFIX; ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerializer); ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer); @@ -278,6 +280,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, this.enableSendingOldValues(); // send the aggregate key-value pairs to the intermediate topic for partitioning + topology.addInternalTopic(topic); topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, selectName); // read the intermediate topic http://git-wip-us.apache.org/repos/asf/kafka/blob/a3d3d537/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 9cd80a4..d6b63d2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.QuickUnion; import org.apache.kafka.streams.processor.internals.SinkNode; @@ -58,6 +59,8 @@ public class TopologyBuilder { private final Set<String> sourceTopicNames = new HashSet<>(); + private final Set<String> internalTopicNames = new HashSet<>(); + private final QuickUnion<String> nodeGrouper = new QuickUnion<>(); private final List<Set<String>> copartitionSourceGroups = new ArrayList<>(); private final HashMap<String, String[]> nodeToTopics = new HashMap<>(); @@ -152,18 +155,20 @@ public class TopologyBuilder { public static class TopicsInfo { public Set<String> sourceTopics; - public Set<String> stateNames; + public Set<String> interSourceTopics; + public Set<String> stateChangelogTopics; - public TopicsInfo(Set<String> sourceTopics, Set<String> stateNames) { + public TopicsInfo(Set<String> sourceTopics, Set<String> interSourceTopics, Set<String> stateChangelogTopics) { this.sourceTopics = sourceTopics; - this.stateNames = stateNames; + this.interSourceTopics = interSourceTopics; + this.stateChangelogTopics = stateChangelogTopics; } @Override public boolean equals(Object o) { if (o instanceof TopicsInfo) { TopicsInfo other = (TopicsInfo) o; - return other.sourceTopics.equals(this.sourceTopics) && other.stateNames.equals(this.stateNames); + return other.sourceTopics.equals(this.sourceTopics) && other.stateChangelogTopics.equals(this.stateChangelogTopics); } else { return false; } @@ -171,7 +176,7 @@ public class TopologyBuilder { @Override public int hashCode() { - long n = ((long) sourceTopics.hashCode() << 32) | (long) stateNames.hashCode(); + long n = ((long) sourceTopics.hashCode() << 32) | (long) stateChangelogTopics.hashCode(); return (int) (n % 0xFFFFFFFFL); } } @@ -424,6 +429,18 @@ public class TopologyBuilder { return this; } + /** + * Adds an internal topic + * + * @param topicName the name of the topic + * @return this builder instance so methods can be chained together; never null + */ + public final TopologyBuilder addInternalTopic(String topicName) { + this.internalTopicNames.add(topicName); + + return this; + } + private void connectProcessorAndStateStore(String processorName, String stateStoreName) { if (!stateFactories.containsKey(stateStoreName)) throw new TopologyException("StateStore " + stateStoreName + " is not added yet."); @@ -460,24 +477,33 @@ public class TopologyBuilder { for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) { Set<String> sourceTopics = new HashSet<>(); - Set<String> stateNames = new HashSet<>(); + Set<String> internalSourceTopics = new HashSet<>(); + Set<String> stateChangelogTopics = new HashSet<>(); for (String node : entry.getValue()) { // if the node is a source node, add to the source topics String[] topics = nodeToTopics.get(node); - if (topics != null) + if (topics != null) { sourceTopics.addAll(Arrays.asList(topics)); + // if some of the topics are internal, add them to the internal topics + for (String topic : topics) { + if (this.internalTopicNames.contains(topic)) + internalSourceTopics.add(topic); + } + } + // if the node is connected to a state, add to the state topics for (StateStoreFactory stateFactory : stateFactories.values()) { if (stateFactory.isInternal && stateFactory.users.contains(node)) { - stateNames.add(stateFactory.supplier.name()); + stateChangelogTopics.add(stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX); } } } topicGroups.put(entry.getKey(), new TopicsInfo( Collections.unmodifiableSet(sourceTopics), - Collections.unmodifiableSet(stateNames))); + Collections.unmodifiableSet(internalSourceTopics), + Collections.unmodifiableSet(stateChangelogTopics))); } return Collections.unmodifiableMap(topicGroups); http://git-wip-us.apache.org/repos/asf/kafka/blob/a3d3d537/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java index 29c67f2..2734f56 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java @@ -66,10 +66,10 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi private int numStandbyReplicas; private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups; private Map<TopicPartition, Set<TaskId>> partitionToTaskIds; - private Map<String, Set<TaskId>> stateNameToTaskIds; + private Map<String, Set<TaskId>> stateChangelogTopicToTaskIds; + private Map<String, Set<TaskId>> internalSourceTopicToTaskIds; private Map<TaskId, Set<TopicPartition>> standbyTasks; - // TODO: the following ZK dependency should be removed after KIP-4 private static final String ZK_TOPIC_PATH = "/brokers/topics"; private static final String ZK_BROKER_PATH = "/brokers/ids"; @@ -296,13 +296,24 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(sourceTopicGroups, metadata); // add tasks to state topic subscribers - stateNameToTaskIds = new HashMap<>(); + stateChangelogTopicToTaskIds = new HashMap<>(); + internalSourceTopicToTaskIds = new HashMap<>(); for (TaskId task : partitionsForTask.keySet()) { - for (String stateName : topicGroups.get(task.topicGroupId).stateNames) { - Set<TaskId> tasks = stateNameToTaskIds.get(stateName); + for (String stateName : topicGroups.get(task.topicGroupId).stateChangelogTopics) { + Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(stateName); if (tasks == null) { tasks = new HashSet<>(); - stateNameToTaskIds.put(stateName, tasks); + stateChangelogTopicToTaskIds.put(stateName, tasks); + } + + tasks.add(task); + } + + for (String topicName : topicGroups.get(task.topicGroupId).interSourceTopics) { + Set<TaskId> tasks = internalSourceTopicToTaskIds.get(topicName); + if (tasks == null) { + tasks = new HashSet<>(); + internalSourceTopicToTaskIds.put(topicName, tasks); } tasks.add(task); @@ -363,12 +374,16 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi } } - // if ZK is specified, get the tasks for each state topic and validate the topic partitions + // if ZK is specified, get the tasks / internal topics for each state topic and validate the topic partitions if (zkClient != null) { log.debug("Starting to validate changelog topics in partition assignor."); - for (Map.Entry<String, Set<TaskId>> entry : stateNameToTaskIds.entrySet()) { - String topic = streamThread.jobId + "-" + entry.getKey() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX; + Map<String, Set<TaskId>> topicToTaskIds = new HashMap<>(); + topicToTaskIds.putAll(stateChangelogTopicToTaskIds); + topicToTaskIds.putAll(internalSourceTopicToTaskIds); + + for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) { + String topic = streamThread.jobId + "-" + entry.getKey(); // the expected number of partitions is the max value of TaskId.partition + 1 int numPartitions = 0; @@ -455,7 +470,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi /* For Test Only */ public Set<TaskId> tasksForState(String stateName) { - return stateNameToTaskIds.get(stateName); + return stateChangelogTopicToTaskIds.get(stateName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX); } public Set<TaskId> tasksForPartition(TopicPartition partition) { http://git-wip-us.apache.org/repos/asf/kafka/blob/a3d3d537/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index af0b3c9..a2f6ec0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo; import org.apache.kafka.test.MockProcessorSupplier; @@ -213,9 +214,9 @@ public class TopologyBuilderTest { Map<Integer, TopicsInfo> topicGroups = builder.topicGroups(); Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>(); - expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet())); - expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), Collections.<String>emptySet())); - expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), Collections.<String>emptySet())); + expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet())); + expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), Collections.<String>emptySet())); + expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), Collections.<String>emptySet(), Collections.<String>emptySet())); assertEquals(3, topicGroups.size()); assertEquals(expectedTopicGroups, topicGroups); @@ -251,9 +252,9 @@ public class TopologyBuilderTest { Map<Integer, TopicsInfo> topicGroups = builder.topicGroups(); Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>(); - expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"), mkSet("store-1"))); - expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), mkSet("store-2"))); - expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), mkSet("store-3"))); + expectedTopicGroups.put(0, new TopicsInfo(mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), mkSet("store-1" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX))); + expectedTopicGroups.put(1, new TopicsInfo(mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), mkSet("store-2" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX))); + expectedTopicGroups.put(2, new TopicsInfo(mkSet("topic-5"), Collections.<String>emptySet(), mkSet("store-3" + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX))); assertEquals(3, topicGroups.size()); assertEquals(expectedTopicGroups, topicGroups);
