http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/processor/TopologyDescription.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyDescription.java deleted file mode 100644 index 0949bf5..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyDescription.java +++ /dev/null @@ -1,476 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor; - -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.processor.internals.StreamTask; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Objects; -import java.util.Set; - -/** - * A meta representation of a {@link Topology topology}. - * <p> - * The nodes of a topology are grouped into {@link Subtopology sub-topologies} if they are connected. - * In contrast, two sub-topologies are not connected but can be linked to each other via topics, i.e., if one - * sub-topology {@link Topology#addSink(String, String, String...) writes} into a topic and another sub-topology - * {@link Topology#addSource(String, String...) reads} from the same topic. - * <p> - * For {@link KafkaStreams#start() execution} sub-topologies are translated into {@link StreamTask tasks}. - */ -// TODO make public (hide until KIP-120 if fully implemented) -final class TopologyDescription { - private final Set<Subtopology> subtopologies = new HashSet<>(); - private final Set<GlobalStore> globalStores = new HashSet<>(); - - /** - * A connected sub-graph of a {@link Topology}. - * <p> - * Nodes of a {@code Subtopology} are connected {@link Topology#addProcessor(String, ProcessorSupplier, String...) - * directly} or indirectly via {@link Topology#connectProcessorAndStateStores(String, String...) state stores} - * (i.e., if multiple processors share the same state). - */ - public final static class Subtopology { - private final int id; - private final Set<Node> nodes; - - Subtopology(final int id, - final Set<Node> nodes) { - this.id = id; - this.nodes = nodes; - } - - /** - * Internally assigned unique ID. - * @return the ID of the sub-topology - */ - public int id() { - return id; - } - - /** - * All nodes of this sub-topology. - * @return set of all nodes within the sub-topology - */ - public Set<Node> nodes() { - return Collections.unmodifiableSet(nodes); - } - - @Override - public String toString() { - return "Sub-topology: " + id + "\n" + nodesAsString(); - } - - private String nodesAsString() { - final StringBuilder sb = new StringBuilder(); - for (final Node node : nodes) { - sb.append(" "); - sb.append(node); - sb.append('\n'); - } - return sb.toString(); - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - final Subtopology that = (Subtopology) o; - return id == that.id - && nodes.equals(that.nodes); - } - - @Override - public int hashCode() { - return Objects.hash(id, nodes); - } - } - - /** - * Represents a {@link Topology#addGlobalStore(StateStoreSupplier, String, - * org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer, String, - * String, ProcessorSupplier)} global store}. - * Adding a global store results in adding a source node and one stateful processor node. - * Note, that all added global stores form a single unit (similar to a {@link Subtopology}) even if different - * global stores are not connected to each other. - * Furthermore, global stores are available to all processors without connecting them explicitly, and thus global - * stores will never be part of any {@link Subtopology}. - */ - public final static class GlobalStore { - private final Source source; - private final Processor processor; - - GlobalStore(final String sourceName, - final String processorName, - final String storeName, - final String topicName) { - source = new Source(sourceName, topicName); - processor = new Processor(processorName, Collections.singleton(storeName)); - source.successors.add(processor); - processor.predecessors.add(source); - } - - /** - * The source node reading from a "global" topic. - * @return the "global" source node - */ - public Source source() { - return source; - } - - /** - * The processor node maintaining the global store. - * @return the "global" processor node - */ - public Processor processor() { - return processor; - } - - @Override - public String toString() { - return "GlobalStore: " + source.name + "(topic: " + source.topics + ") -> " - + processor.name + "(store: " + processor.stores.iterator().next() + ")\n"; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - final GlobalStore that = (GlobalStore) o; - return source.equals(that.source) - && processor.equals(that.processor); - } - - @Override - public int hashCode() { - return Objects.hash(source, processor); - } - } - - /** - * A node of a topology. Can be a source, sink, or processor node. - */ - public interface Node { - /** - * The name of the node. Will never be {@code null}. - * @return the name of the node - */ - String name(); - /** - * The predecessors of this node within a sub-topology. - * Note, sources do not have any predecessors. - * Will never be {@code null}. - * @return set of all predecessors - */ - Set<Node> predecessors(); - /** - * The successor of this node within a sub-topology. - * Note, sinks do not have any successors. - * Will never be {@code null}. - * @return set of all successor - */ - Set<Node> successors(); - } - - abstract static class AbstractNode implements Node { - final String name; - final Set<Node> predecessors = new HashSet<>(); - final Set<Node> successors = new HashSet<>(); - - AbstractNode(final String name) { - this.name = name; - } - - @Override - public String name() { - return name; - } - - @Override - public Set<Node> predecessors() { - return Collections.unmodifiableSet(predecessors); - } - - @Override - public Set<Node> successors() { - return Collections.unmodifiableSet(successors); - } - - void addPredecessor(final Node predecessor) { - predecessors.add(predecessor); - } - - void addSuccessor(final Node successor) { - successors.add(successor); - } - } - - /** - * A source node of a topology. - */ - public final static class Source extends AbstractNode { - private final String topics; - - Source(final String name, - final String topics) { - super(name); - this.topics = topics; - } - - /** - * The topic names this source node is reading from. - * @return comma separated list of topic names or pattern (as String) - */ - public String topics() { - return topics; - } - - @Override - void addPredecessor(final Node predecessor) { - throw new UnsupportedOperationException("Sources don't have predecessors."); - } - - @Override - public String toString() { - return "Source: " + name + "(topics: " + topics + ") --> " + nodeNames(successors); - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - final Source source = (Source) o; - // omit successor to avoid infinite loops - return name.equals(source.name) - && topics.equals(source.topics); - } - - @Override - public int hashCode() { - // omit successor as it might change and alter the hash code - return Objects.hash(name, topics); - } - } - - /** - * A processor node of a topology. - */ - public final static class Processor extends AbstractNode { - private final Set<String> stores; - - Processor(final String name, - final Set<String> stores) { - super(name); - this.stores = stores; - } - - /** - * The names of all connected stores. - * @return set of store names - */ - public Set<String> stores() { - return Collections.unmodifiableSet(stores); - } - - @Override - public String toString() { - return "Processor: " + name + "(stores: " + stores + ") --> " + nodeNames(successors) + " <-- " + nodeNames(predecessors); - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - final Processor processor = (Processor) o; - // omit successor to avoid infinite loops - return name.equals(processor.name) - && stores.equals(processor.stores) - && predecessors.equals(processor.predecessors); - } - - @Override - public int hashCode() { - // omit successor as it might change and alter the hash code - return Objects.hash(name, stores); - } - } - - /** - * A sink node of a topology. - */ - public final static class Sink extends AbstractNode { - private final String topic; - - Sink(final String name, - final String topic) { - super(name); - this.topic = topic; - } - - /** - * The topic name this sink node is writing to. - * @return a topic name - */ - public String topic() { - return topic; - } - - @Override - void addSuccessor(final Node successor) { - throw new UnsupportedOperationException("Sinks don't have successors."); - } - - @Override - public String toString() { - return "Sink: " + name + "(topic: " + topic + ") <-- " + nodeNames(predecessors); - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - final Sink sink = (Sink) o; - return name.equals(sink.name) - && topic.equals(sink.topic) - && predecessors.equals(sink.predecessors); - } - - @Override - public int hashCode() { - // omit predecessors as it might change and alter the hash code - return Objects.hash(name, topic); - } - } - - void addSubtopology(final Subtopology subtopology) { - subtopologies.add(subtopology); - } - - void addGlobalStore(final GlobalStore globalStore) { - globalStores.add(globalStore); - } - - /** - * All sub-topologies of the represented topology. - * @return set of all sub-topologies - */ - public Set<Subtopology> subtopologies() { - return Collections.unmodifiableSet(subtopologies); - } - - /** - * All global stores of the represented topology. - * @return set of all global stores - */ - public Set<GlobalStore> globalStores() { - return Collections.unmodifiableSet(globalStores); - } - - @Override - public String toString() { - return subtopologiesAsString() + globalStoresAsString(); - } - - private static String nodeNames(final Set<Node> nodes) { - final StringBuilder sb = new StringBuilder(); - if (!nodes.isEmpty()) { - for (final Node n : nodes) { - sb.append(n.name()); - sb.append(", "); - } - sb.deleteCharAt(sb.length() - 1); - sb.deleteCharAt(sb.length() - 1); - } - return sb.toString(); - } - - private String subtopologiesAsString() { - final StringBuilder sb = new StringBuilder(); - sb.append("Sub-topologies: \n"); - if (subtopologies.isEmpty()) { - sb.append(" none\n"); - } else { - for (final Subtopology st : subtopologies) { - sb.append(" "); - sb.append(st); - } - } - return sb.toString(); - } - - private String globalStoresAsString() { - final StringBuilder sb = new StringBuilder(); - sb.append("Global Stores:\n"); - if (globalStores.isEmpty()) { - sb.append(" none\n"); - } else { - for (final GlobalStore gs : globalStores) { - sb.append(" "); - sb.append(gs); - } - } - return sb.toString(); - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - final TopologyDescription that = (TopologyDescription) o; - return subtopologies.equals(that.subtopologies) - && globalStores.equals(that.globalStores); - } - - @Override - public int hashCode() { - return Objects.hash(subtopologies, globalStores); - } - -} -
http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java new file mode 100644 index 0000000..ff65d31 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -0,0 +1,1491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.TopologyBuilderException; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.processor.StreamPartitioner; +import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.internals.WindowStoreSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.regex.Pattern; + + +public class InternalTopologyBuilder { + + private static final Logger log = LoggerFactory.getLogger(InternalTopologyBuilder.class); + + private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); + + private static final String[] NO_PREDECESSORS = {}; + + // node factories in a topological order + private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap<>(); + + // state factories + private final Map<String, StateStoreFactory> stateFactories = new HashMap<>(); + + // global state factories + private final Map<String, StateStore> globalStateStores = new LinkedHashMap<>(); + + // all topics subscribed from source processors (without application-id prefix for internal topics) + private final Set<String> sourceTopicNames = new HashSet<>(); + + // all internal topics auto-created by the topology builder and used in source / sink processors + private final Set<String> internalTopicNames = new HashSet<>(); + + // groups of source processors that need to be copartitioned + private final List<Set<String>> copartitionSourceGroups = new ArrayList<>(); + + // map from source processor names to subscribed topics (without application-id prefix for internal topics) + private final HashMap<String, List<String>> nodeToSourceTopics = new HashMap<>(); + + // map from source processor names to regex subscription patterns + private final HashMap<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>(); + + // map from sink processor names to subscribed topic (without application-id prefix for internal topics) + private final HashMap<String, String> nodeToSinkTopic = new HashMap<>(); + + // map from topics to their matched regex patterns, this is to ensure one topic is passed through on source node + // even if it can be matched by multiple regex patterns + private final HashMap<String, Pattern> topicToPatterns = new HashMap<>(); + + // map from state store names to all the topics subscribed from source processors that + // are connected to these state stores + private final Map<String, Set<String>> stateStoreNameToSourceTopics = new HashMap<>(); + + // map from state store names to all the regex subscribed topics from source processors that + // are connected to these state stores + private final Map<String, Set<Pattern>> stateStoreNameToSourceRegex = new HashMap<>(); + + // map from state store names to this state store's corresponding changelog topic if possible, + // this is used in the extended KStreamBuilder. + private final Map<String, String> storeToChangelogTopic = new HashMap<>(); + + // all global topics + private final Set<String> globalTopics = new HashSet<>(); + + private final Set<String> earliestResetTopics = new HashSet<>(); + + private final Set<String> latestResetTopics = new HashSet<>(); + + private final Set<Pattern> earliestResetPatterns = new HashSet<>(); + + private final Set<Pattern> latestResetPatterns = new HashSet<>(); + + private final QuickUnion<String> nodeGrouper = new QuickUnion<>(); + + private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates(); + + private String applicationId = null; + + private Pattern topicPattern = null; + + private Map<Integer, Set<String>> nodeGroups = null; + + private static class StateStoreFactory { + public final Set<String> users; + + public final StateStoreSupplier supplier; + + StateStoreFactory(final StateStoreSupplier supplier) { + this.supplier = supplier; + users = new HashSet<>(); + } + } + + private static abstract class NodeFactory { + final String name; + final String[] predecessors; + + NodeFactory(final String name, + final String[] predecessors) { + this.name = name; + this.predecessors = predecessors; + } + + public abstract ProcessorNode build(); + + abstract AbstractNode describe(); + } + + private static class ProcessorNodeFactory extends NodeFactory { + private final ProcessorSupplier<?, ?> supplier; + private final Set<String> stateStoreNames = new HashSet<>(); + + ProcessorNodeFactory(final String name, + final String[] predecessors, + final ProcessorSupplier<?, ?> supplier) { + super(name, predecessors.clone()); + this.supplier = supplier; + } + + public void addStateStore(final String stateStoreName) { + stateStoreNames.add(stateStoreName); + } + + @Override + public ProcessorNode build() { + return new ProcessorNode<>(name, supplier.get(), stateStoreNames); + } + + @Override + Processor describe() { + return new Processor(name, new HashSet<>(stateStoreNames)); + } + } + + private class SourceNodeFactory extends NodeFactory { + private final List<String> topics; + private final Pattern pattern; + private final Deserializer<?> keyDeserializer; + private final Deserializer<?> valDeserializer; + private final TimestampExtractor timestampExtractor; + + private SourceNodeFactory(final String name, + final String[] topics, + final Pattern pattern, + final TimestampExtractor timestampExtractor, + final Deserializer<?> keyDeserializer, + final Deserializer<?> valDeserializer) { + super(name, NO_PREDECESSORS); + this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>(); + this.pattern = pattern; + this.keyDeserializer = keyDeserializer; + this.valDeserializer = valDeserializer; + this.timestampExtractor = timestampExtractor; + } + + List<String> getTopics(final Collection<String> subscribedTopics) { + // if it is subscribed via patterns, it is possible that the topic metadata has not been updated + // yet and hence the map from source node to topics is stale, in this case we put the pattern as a place holder; + // this should only happen for debugging since during runtime this function should always be called after the metadata has updated. + if (subscribedTopics.isEmpty()) { + return Collections.singletonList("" + pattern + ""); + } + + final List<String> matchedTopics = new ArrayList<>(); + for (final String update : subscribedTopics) { + if (pattern == topicToPatterns.get(update)) { + matchedTopics.add(update); + } else if (topicToPatterns.containsKey(update) && isMatch(update)) { + // the same topic cannot be matched to more than one pattern + // TODO: we should lift this requirement in the future + throw new TopologyBuilderException("Topic " + update + + " is already matched for another regex pattern " + topicToPatterns.get(update) + + " and hence cannot be matched to this regex pattern " + pattern + " any more."); + } else if (isMatch(update)) { + topicToPatterns.put(update, pattern); + matchedTopics.add(update); + } + } + return matchedTopics; + } + + @Override + public ProcessorNode build() { + final List<String> sourceTopics = nodeToSourceTopics.get(name); + + // if it is subscribed via patterns, it is possible that the topic metadata has not been updated + // yet and hence the map from source node to topics is stale, in this case we put the pattern as a place holder; + // this should only happen for debugging since during runtime this function should always be called after the metadata has updated. + if (sourceTopics == null) { + return new SourceNode<>(name, Collections.singletonList("" + pattern + ""), timestampExtractor, keyDeserializer, valDeserializer); + } else { + return new SourceNode<>(name, maybeDecorateInternalSourceTopics(sourceTopics), timestampExtractor, keyDeserializer, valDeserializer); + } + } + + private boolean isMatch(final String topic) { + return pattern.matcher(topic).matches(); + } + + @Override + Source describe() { + String sourceTopics; + + if (pattern == null) { + sourceTopics = topics.toString(); + sourceTopics = sourceTopics.substring(1, sourceTopics.length() - 1); // trim first and last, ie. [] + } else { + sourceTopics = pattern.toString(); + } + + return new Source(name, sourceTopics); + } + } + + private class SinkNodeFactory<K, V> extends NodeFactory { + private final String topic; + private final Serializer<K> keySerializer; + private final Serializer<V> valSerializer; + private final StreamPartitioner<? super K, ? super V> partitioner; + + private SinkNodeFactory(final String name, + final String[] predecessors, + final String topic, + final Serializer<K> keySerializer, + final Serializer<V> valSerializer, + final StreamPartitioner<? super K, ? super V> partitioner) { + super(name, predecessors.clone()); + this.topic = topic; + this.keySerializer = keySerializer; + this.valSerializer = valSerializer; + this.partitioner = partitioner; + } + + @Override + public ProcessorNode build() { + if (internalTopicNames.contains(topic)) { + // prefix the internal topic name with the application id + return new SinkNode<>(name, decorateTopic(topic), keySerializer, valSerializer, partitioner); + } else { + return new SinkNode<>(name, topic, keySerializer, valSerializer, partitioner); + } + } + + @Override + Sink describe() { + return new Sink(name, topic); + } + } + + public synchronized final InternalTopologyBuilder setApplicationId(final String applicationId) { + Objects.requireNonNull(applicationId, "applicationId can't be null"); + this.applicationId = applicationId; + + return this; + } + + public final void addSource(final TopologyBuilder.AutoOffsetReset offsetReset, + final String name, + final TimestampExtractor timestampExtractor, + final Deserializer keyDeserializer, + final Deserializer valDeserializer, + final String... topics) { + if (topics.length == 0) { + throw new TopologyBuilderException("You must provide at least one topic"); + } + Objects.requireNonNull(name, "name must not be null"); + if (nodeFactories.containsKey(name)) { + throw new TopologyBuilderException("Processor " + name + " is already added."); + } + + for (final String topic : topics) { + Objects.requireNonNull(topic, "topic names cannot be null"); + validateTopicNotAlreadyRegistered(topic); + maybeAddToResetList(earliestResetTopics, latestResetTopics, offsetReset, topic); + sourceTopicNames.add(topic); + } + + nodeFactories.put(name, new SourceNodeFactory(name, topics, null, timestampExtractor, keyDeserializer, valDeserializer)); + nodeToSourceTopics.put(name, Arrays.asList(topics)); + nodeGrouper.add(name); + } + + public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier, + final String sourceName, + final TimestampExtractor timestampExtractor, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final String topic, + final String processorName, + final ProcessorSupplier stateUpdateSupplier) { + Objects.requireNonNull(storeSupplier, "store supplier must not be null"); + Objects.requireNonNull(sourceName, "sourceName must not be null"); + Objects.requireNonNull(topic, "topic must not be null"); + Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null"); + Objects.requireNonNull(processorName, "processorName must not be null"); + if (nodeFactories.containsKey(sourceName)) { + throw new TopologyBuilderException("Processor " + sourceName + " is already added."); + } + if (nodeFactories.containsKey(processorName)) { + throw new TopologyBuilderException("Processor " + processorName + " is already added."); + } + if (stateFactories.containsKey(storeSupplier.name()) || globalStateStores.containsKey(storeSupplier.name())) { + throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " is already added."); + } + if (storeSupplier.loggingEnabled()) { + throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " for global table must not have logging enabled."); + } + if (sourceName.equals(processorName)) { + throw new TopologyBuilderException("sourceName and processorName must be different."); + } + + validateTopicNotAlreadyRegistered(topic); + + globalTopics.add(topic); + final String[] topics = {topic}; + nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer)); + nodeToSourceTopics.put(sourceName, Arrays.asList(topics)); + nodeGrouper.add(sourceName); + + final String[] predecessors = {sourceName}; + final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, predecessors, stateUpdateSupplier); + nodeFactory.addStateStore(storeSupplier.name()); + nodeFactories.put(processorName, nodeFactory); + nodeGrouper.add(processorName); + nodeGrouper.unite(processorName, predecessors); + + globalStateStores.put(storeSupplier.name(), storeSupplier.get()); + connectSourceStoreAndTopic(storeSupplier.name(), topic); + } + + private void validateTopicNotAlreadyRegistered(final String topic) { + if (sourceTopicNames.contains(topic) || globalTopics.contains(topic)) { + throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source."); + } + + for (final Pattern pattern : nodeToSourcePatterns.values()) { + if (pattern.matcher(topic).matches()) { + throw new TopologyBuilderException("Topic " + topic + " matches a Pattern already registered by another source."); + } + } + } + + public final void addSource(final TopologyBuilder.AutoOffsetReset offsetReset, + final String name, + final TimestampExtractor timestampExtractor, + final Deserializer keyDeserializer, + final Deserializer valDeserializer, + final Pattern topicPattern) { + Objects.requireNonNull(topicPattern, "topicPattern can't be null"); + Objects.requireNonNull(name, "name can't be null"); + + if (nodeFactories.containsKey(name)) { + throw new TopologyBuilderException("Processor " + name + " is already added."); + } + + for (final String sourceTopicName : sourceTopicNames) { + if (topicPattern.matcher(sourceTopicName).matches()) { + throw new TopologyBuilderException("Pattern " + topicPattern + " will match a topic that has already been registered by another source."); + } + } + + maybeAddToResetList(earliestResetPatterns, latestResetPatterns, offsetReset, topicPattern); + + nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, timestampExtractor, keyDeserializer, valDeserializer)); + nodeToSourcePatterns.put(name, topicPattern); + nodeGrouper.add(name); + } + + public final <K, V> void addSink(final String name, + final String topic, + final Serializer<K> keySerializer, + final Serializer<V> valSerializer, + final StreamPartitioner<? super K, ? super V> partitioner, + final String... predecessorNames) { + Objects.requireNonNull(name, "name must not be null"); + Objects.requireNonNull(topic, "topic must not be null"); + if (nodeFactories.containsKey(name)) { + throw new TopologyBuilderException("Processor " + name + " is already added."); + } + + for (final String predecessor : predecessorNames) { + if (predecessor.equals(name)) { + throw new TopologyBuilderException("Processor " + name + " cannot be a predecessor of itself."); + } + if (!nodeFactories.containsKey(predecessor)) { + throw new TopologyBuilderException("Predecessor processor " + predecessor + " is not added yet."); + } + } + + nodeFactories.put(name, new SinkNodeFactory<>(name, predecessorNames, topic, keySerializer, valSerializer, partitioner)); + nodeToSinkTopic.put(name, topic); + nodeGrouper.add(name); + nodeGrouper.unite(name, predecessorNames); + } + + public final void addProcessor(final String name, + final ProcessorSupplier supplier, + final String... predecessorNames) { + Objects.requireNonNull(name, "name must not be null"); + Objects.requireNonNull(supplier, "supplier must not be null"); + if (nodeFactories.containsKey(name)) { + throw new TopologyBuilderException("Processor " + name + " is already added."); + } + + for (final String predecessor : predecessorNames) { + if (predecessor.equals(name)) { + throw new TopologyBuilderException("Processor " + name + " cannot be a predecessor of itself."); + } + if (!nodeFactories.containsKey(predecessor)) { + throw new TopologyBuilderException("Predecessor processor " + predecessor + " is not added yet."); + } + } + + nodeFactories.put(name, new ProcessorNodeFactory(name, predecessorNames, supplier)); + nodeGrouper.add(name); + nodeGrouper.unite(name, predecessorNames); + } + + public final void addStateStore(final StateStoreSupplier supplier, + final String... processorNames) { + Objects.requireNonNull(supplier, "supplier can't be null"); + if (stateFactories.containsKey(supplier.name())) { + throw new TopologyBuilderException("StateStore " + supplier.name() + " is already added."); + } + + stateFactories.put(supplier.name(), new StateStoreFactory(supplier)); + + if (processorNames != null) { + for (final String processorName : processorNames) { + connectProcessorAndStateStore(processorName, supplier.name()); + } + } + } + + public final void connectProcessorAndStateStores(final String processorName, + final String... stateStoreNames) { + Objects.requireNonNull(processorName, "processorName can't be null"); + if (stateStoreNames != null) { + for (final String stateStoreName : stateStoreNames) { + connectProcessorAndStateStore(processorName, stateStoreName); + } + } + } + + public final void connectSourceStoreAndTopic(final String sourceStoreName, + final String topic) { + if (storeToChangelogTopic.containsKey(sourceStoreName)) { + throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added."); + } + storeToChangelogTopic.put(sourceStoreName, topic); + } + + public final void connectProcessors(final String... processorNames) { + if (processorNames.length < 2) { + throw new TopologyBuilderException("At least two processors need to participate in the connection."); + } + + for (final String processorName : processorNames) { + if (!nodeFactories.containsKey(processorName)) { + throw new TopologyBuilderException("Processor " + processorName + " is not added yet."); + } + } + + nodeGrouper.unite(processorNames[0], Arrays.copyOfRange(processorNames, 1, processorNames.length)); + } + + public final void addInternalTopic(final String topicName) { + Objects.requireNonNull(topicName, "topicName can't be null"); + internalTopicNames.add(topicName); + } + + public final void copartitionSources(final Collection<String> sourceNodes) { + copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes))); + } + + private void connectProcessorAndStateStore(final String processorName, + final String stateStoreName) { + if (!stateFactories.containsKey(stateStoreName)) { + throw new TopologyBuilderException("StateStore " + stateStoreName + " is not added yet."); + } + if (!nodeFactories.containsKey(processorName)) { + throw new TopologyBuilderException("Processor " + processorName + " is not added yet."); + } + + final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName); + final Iterator<String> iter = stateStoreFactory.users.iterator(); + if (iter.hasNext()) { + final String user = iter.next(); + nodeGrouper.unite(user, processorName); + } + stateStoreFactory.users.add(processorName); + + final NodeFactory nodeFactory = nodeFactories.get(processorName); + if (nodeFactory instanceof ProcessorNodeFactory) { + final ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory) nodeFactory; + processorNodeFactory.addStateStore(stateStoreName); + connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory); + } else { + throw new TopologyBuilderException("cannot connect a state store " + stateStoreName + " to a source node or a sink node."); + } + } + + private Set<SourceNodeFactory> findSourcesForProcessorPredecessors(final String[] predecessors) { + final Set<SourceNodeFactory> sourceNodes = new HashSet<>(); + for (final String predecessor : predecessors) { + final NodeFactory nodeFactory = nodeFactories.get(predecessor); + if (nodeFactory instanceof SourceNodeFactory) { + sourceNodes.add((SourceNodeFactory) nodeFactory); + } else if (nodeFactory instanceof ProcessorNodeFactory) { + sourceNodes.addAll(findSourcesForProcessorPredecessors(((ProcessorNodeFactory) nodeFactory).predecessors)); + } + } + return sourceNodes; + } + + private void connectStateStoreNameToSourceTopicsOrPattern(final String stateStoreName, + final ProcessorNodeFactory processorNodeFactory) { + // we should never update the mapping from state store names to source topics if the store name already exists + // in the map; this scenario is possible, for example, that a state store underlying a source KTable is + // connecting to a join operator whose source topic is not the original KTable's source topic but an internal repartition topic. + + if (stateStoreNameToSourceTopics.containsKey(stateStoreName) || stateStoreNameToSourceRegex.containsKey(stateStoreName)) { + return; + } + + final Set<String> sourceTopics = new HashSet<>(); + final Set<Pattern> sourcePatterns = new HashSet<>(); + final Set<SourceNodeFactory> sourceNodesForPredecessor = findSourcesForProcessorPredecessors(processorNodeFactory.predecessors); + + for (final SourceNodeFactory sourceNodeFactory : sourceNodesForPredecessor) { + if (sourceNodeFactory.pattern != null) { + sourcePatterns.add(sourceNodeFactory.pattern); + } else { + sourceTopics.addAll(sourceNodeFactory.topics); + } + } + + if (!sourceTopics.isEmpty()) { + stateStoreNameToSourceTopics.put(stateStoreName, + Collections.unmodifiableSet(sourceTopics)); + } + + if (!sourcePatterns.isEmpty()) { + stateStoreNameToSourceRegex.put(stateStoreName, + Collections.unmodifiableSet(sourcePatterns)); + } + + } + + private <T> void maybeAddToResetList(final Collection<T> earliestResets, + final Collection<T> latestResets, + final TopologyBuilder.AutoOffsetReset offsetReset, + final T item) { + if (offsetReset != null) { + switch (offsetReset) { + case EARLIEST: + earliestResets.add(item); + break; + case LATEST: + latestResets.add(item); + break; + default: + throw new TopologyBuilderException(String.format("Unrecognized reset format %s", offsetReset)); + } + } + } + + public synchronized Map<Integer, Set<String>> nodeGroups() { + if (nodeGroups == null) { + nodeGroups = makeNodeGroups(); + } + return nodeGroups; + } + + private Map<Integer, Set<String>> makeNodeGroups() { + final HashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<>(); + final HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>(); + + int nodeGroupId = 0; + + // Go through source nodes first. This makes the group id assignment easy to predict in tests + final HashSet<String> allSourceNodes = new HashSet<>(nodeToSourceTopics.keySet()); + allSourceNodes.addAll(nodeToSourcePatterns.keySet()); + + for (final String nodeName : Utils.sorted(allSourceNodes)) { + final String root = nodeGrouper.root(nodeName); + Set<String> nodeGroup = rootToNodeGroup.get(root); + if (nodeGroup == null) { + nodeGroup = new HashSet<>(); + rootToNodeGroup.put(root, nodeGroup); + nodeGroups.put(nodeGroupId++, nodeGroup); + } + nodeGroup.add(nodeName); + } + + // Go through non-source nodes + for (final String nodeName : Utils.sorted(nodeFactories.keySet())) { + if (!nodeToSourceTopics.containsKey(nodeName)) { + final String root = nodeGrouper.root(nodeName); + Set<String> nodeGroup = rootToNodeGroup.get(root); + if (nodeGroup == null) { + nodeGroup = new HashSet<>(); + rootToNodeGroup.put(root, nodeGroup); + nodeGroups.put(nodeGroupId++, nodeGroup); + } + nodeGroup.add(nodeName); + } + } + + return nodeGroups; + } + + public synchronized ProcessorTopology build(final Integer topicGroupId) { + final Set<String> nodeGroup; + if (topicGroupId != null) { + nodeGroup = nodeGroups().get(topicGroupId); + } else { + // when topicGroupId is null, we build the full topology minus the global groups + final Set<String> globalNodeGroups = globalNodeGroups(); + final Collection<Set<String>> values = nodeGroups().values(); + nodeGroup = new HashSet<>(); + for (final Set<String> value : values) { + nodeGroup.addAll(value); + } + nodeGroup.removeAll(globalNodeGroups); + + + } + return build(nodeGroup); + } + + /** + * Builds the topology for any global state stores + * @return ProcessorTopology + */ + public synchronized ProcessorTopology buildGlobalStateTopology() { + final Set<String> globalGroups = globalNodeGroups(); + if (globalGroups.isEmpty()) { + return null; + } + return build(globalGroups); + } + + private Set<String> globalNodeGroups() { + final Set<String> globalGroups = new HashSet<>(); + for (final Map.Entry<Integer, Set<String>> nodeGroup : nodeGroups().entrySet()) { + final Set<String> nodes = nodeGroup.getValue(); + for (final String node : nodes) { + if (isGlobalSource(node)) { + globalGroups.addAll(nodes); + } + } + } + return globalGroups; + } + + private ProcessorTopology build(final Set<String> nodeGroup) { + final List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size()); + final Map<String, ProcessorNode> processorMap = new HashMap<>(); + final Map<String, SourceNode> topicSourceMap = new HashMap<>(); + final Map<String, SinkNode> topicSinkMap = new HashMap<>(); + final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>(); + + // create processor nodes in a topological order ("nodeFactories" is already topologically sorted) + for (final NodeFactory factory : nodeFactories.values()) { + if (nodeGroup == null || nodeGroup.contains(factory.name)) { + final ProcessorNode node = factory.build(); + processorNodes.add(node); + processorMap.put(node.name(), node); + + if (factory instanceof ProcessorNodeFactory) { + for (final String predecessor : ((ProcessorNodeFactory) factory).predecessors) { + final ProcessorNode<?, ?> predecessorNode = processorMap.get(predecessor); + predecessorNode.addChild(node); + } + for (final String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) { + if (!stateStoreMap.containsKey(stateStoreName)) { + final StateStore stateStore; + + if (stateFactories.containsKey(stateStoreName)) { + final StateStoreSupplier supplier = stateFactories.get(stateStoreName).supplier; + stateStore = supplier.get(); + + // remember the changelog topic if this state store is change-logging enabled + if (supplier.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) { + final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, stateStoreName); + storeToChangelogTopic.put(stateStoreName, changelogTopic); + } + } else { + stateStore = globalStateStores.get(stateStoreName); + } + + stateStoreMap.put(stateStoreName, stateStore); + } + } + } else if (factory instanceof SourceNodeFactory) { + final SourceNodeFactory sourceNodeFactory = (SourceNodeFactory) factory; + final List<String> topics = (sourceNodeFactory.pattern != null) ? + sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) : + sourceNodeFactory.topics; + + for (final String topic : topics) { + if (internalTopicNames.contains(topic)) { + // prefix the internal topic name with the application id + topicSourceMap.put(decorateTopic(topic), (SourceNode) node); + } else { + topicSourceMap.put(topic, (SourceNode) node); + } + } + } else if (factory instanceof SinkNodeFactory) { + final SinkNodeFactory sinkNodeFactory = (SinkNodeFactory) factory; + + for (final String predecessor : sinkNodeFactory.predecessors) { + processorMap.get(predecessor).addChild(node); + if (internalTopicNames.contains(sinkNodeFactory.topic)) { + // prefix the internal topic name with the application id + topicSinkMap.put(decorateTopic(sinkNodeFactory.topic), (SinkNode) node); + } else { + topicSinkMap.put(sinkNodeFactory.topic, (SinkNode) node); + } + } + } else { + throw new TopologyBuilderException("Unknown definition class: " + factory.getClass().getName()); + } + } + } + + return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), storeToChangelogTopic, new ArrayList<>(globalStateStores.values())); + } + + /** + * Get any global {@link StateStore}s that are part of the + * topology + * @return map containing all global {@link StateStore}s + */ + public Map<String, StateStore> globalStateStores() { + return Collections.unmodifiableMap(globalStateStores); + } + + /** + * Returns the map of topic groups keyed by the group id. + * A topic group is a group of topics in the same task. + * + * @return groups of topic names + */ + public synchronized Map<Integer, TopologyBuilder.TopicsInfo> topicGroups() { + final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = new LinkedHashMap<>(); + + if (nodeGroups == null) { + nodeGroups = makeNodeGroups(); + } + + for (final Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) { + final Set<String> sinkTopics = new HashSet<>(); + final Set<String> sourceTopics = new HashSet<>(); + final Map<String, InternalTopicConfig> internalSourceTopics = new HashMap<>(); + final Map<String, InternalTopicConfig> stateChangelogTopics = new HashMap<>(); + for (final String node : entry.getValue()) { + // if the node is a source node, add to the source topics + final List<String> topics = nodeToSourceTopics.get(node); + if (topics != null) { + // if some of the topics are internal, add them to the internal topics + for (final String topic : topics) { + // skip global topic as they don't need partition assignment + if (globalTopics.contains(topic)) { + continue; + } + if (internalTopicNames.contains(topic)) { + // prefix the internal topic name with the application id + final String internalTopic = decorateTopic(topic); + internalSourceTopics.put(internalTopic, new InternalTopicConfig(internalTopic, + Collections.singleton(InternalTopicConfig.CleanupPolicy.delete), + Collections.<String, String>emptyMap())); + sourceTopics.add(internalTopic); + } else { + sourceTopics.add(topic); + } + } + } + + // if the node is a sink node, add to the sink topics + final String topic = nodeToSinkTopic.get(node); + if (topic != null) { + if (internalTopicNames.contains(topic)) { + // prefix the change log topic name with the application id + sinkTopics.add(decorateTopic(topic)); + } else { + sinkTopics.add(topic); + } + } + + // if the node is connected to a state, add to the state topics + for (final StateStoreFactory stateFactory : stateFactories.values()) { + final StateStoreSupplier supplier = stateFactory.supplier; + if (supplier.loggingEnabled() && stateFactory.users.contains(node)) { + final String name = ProcessorStateManager.storeChangelogTopic(applicationId, supplier.name()); + final InternalTopicConfig internalTopicConfig = createInternalTopicConfig(supplier, name); + stateChangelogTopics.put(name, internalTopicConfig); + } + } + } + if (!sourceTopics.isEmpty()) { + topicGroups.put(entry.getKey(), new TopologyBuilder.TopicsInfo( + Collections.unmodifiableSet(sinkTopics), + Collections.unmodifiableSet(sourceTopics), + Collections.unmodifiableMap(internalSourceTopics), + Collections.unmodifiableMap(stateChangelogTopics))); + } + } + + return Collections.unmodifiableMap(topicGroups); + } + + private void setRegexMatchedTopicsToSourceNodes() { + if (subscriptionUpdates.hasUpdates()) { + for (final Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) { + final SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey()); + //need to update nodeToSourceTopics with topics matched from given regex + nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates())); + log.debug("nodeToSourceTopics {}", nodeToSourceTopics); + } + } + } + + private void setRegexMatchedTopicToStateStore() { + if (subscriptionUpdates.hasUpdates()) { + for (final Map.Entry<String, Set<Pattern>> storePattern : stateStoreNameToSourceRegex.entrySet()) { + final Set<String> updatedTopicsForStateStore = new HashSet<>(); + for (final String subscriptionUpdateTopic : subscriptionUpdates.getUpdates()) { + for (final Pattern pattern : storePattern.getValue()) { + if (pattern.matcher(subscriptionUpdateTopic).matches()) { + updatedTopicsForStateStore.add(subscriptionUpdateTopic); + } + } + } + if (!updatedTopicsForStateStore.isEmpty()) { + final Collection<String> storeTopics = stateStoreNameToSourceTopics.get(storePattern.getKey()); + if (storeTopics != null) { + updatedTopicsForStateStore.addAll(storeTopics); + } + stateStoreNameToSourceTopics.put(storePattern.getKey(), Collections.unmodifiableSet(updatedTopicsForStateStore)); + } + } + } + } + + private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier<?> supplier, + final String name) { + if (!(supplier instanceof WindowStoreSupplier)) { + return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), supplier.logConfig()); + } + + final WindowStoreSupplier windowStoreSupplier = (WindowStoreSupplier) supplier; + final InternalTopicConfig config = new InternalTopicConfig(name, + Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact, + InternalTopicConfig.CleanupPolicy.delete), + supplier.logConfig()); + config.setRetentionMs(windowStoreSupplier.retentionPeriod()); + return config; + } + + public synchronized Pattern earliestResetTopicsPattern() { + final List<String> topics = maybeDecorateInternalSourceTopics(earliestResetTopics); + final Pattern earliestPattern = buildPatternForOffsetResetTopics(topics, earliestResetPatterns); + + ensureNoRegexOverlap(earliestPattern, latestResetPatterns, latestResetTopics); + + return earliestPattern; + } + + public synchronized Pattern latestResetTopicsPattern() { + final List<String> topics = maybeDecorateInternalSourceTopics(latestResetTopics); + final Pattern latestPattern = buildPatternForOffsetResetTopics(topics, latestResetPatterns); + + ensureNoRegexOverlap(latestPattern, earliestResetPatterns, earliestResetTopics); + + return latestPattern; + } + + private void ensureNoRegexOverlap(final Pattern builtPattern, + final Set<Pattern> otherPatterns, + final Set<String> otherTopics) { + for (final Pattern otherPattern : otherPatterns) { + if (builtPattern.pattern().contains(otherPattern.pattern())) { + throw new TopologyBuilderException( + String.format("Found overlapping regex [%s] against [%s] for a KStream with auto offset resets", + otherPattern.pattern(), + builtPattern.pattern())); + } + } + + for (final String otherTopic : otherTopics) { + if (builtPattern.matcher(otherTopic).matches()) { + throw new TopologyBuilderException( + String.format("Found overlapping regex [%s] matching topic [%s] for a KStream with auto offset resets", + builtPattern.pattern(), + otherTopic)); + } + } + } + + private static Pattern buildPatternForOffsetResetTopics(final Collection<String> sourceTopics, + final Collection<Pattern> sourcePatterns) { + final StringBuilder builder = new StringBuilder(); + + for (final String topic : sourceTopics) { + builder.append(topic).append("|"); + } + + for (final Pattern sourcePattern : sourcePatterns) { + builder.append(sourcePattern.pattern()).append("|"); + } + + if (builder.length() > 0) { + builder.setLength(builder.length() - 1); + return Pattern.compile(builder.toString()); + } + + return EMPTY_ZERO_LENGTH_PATTERN; + } + + public Map<String, List<String>> stateStoreNameToSourceTopics() { + final Map<String, List<String>> results = new HashMap<>(); + for (final Map.Entry<String, Set<String>> entry : stateStoreNameToSourceTopics.entrySet()) { + results.put(entry.getKey(), maybeDecorateInternalSourceTopics(entry.getValue())); + } + return results; + } + + public synchronized Collection<Set<String>> copartitionGroups() { + final List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size()); + for (final Set<String> nodeNames : copartitionSourceGroups) { + final Set<String> copartitionGroup = new HashSet<>(); + for (final String node : nodeNames) { + final List<String> topics = nodeToSourceTopics.get(node); + if (topics != null) { + copartitionGroup.addAll(maybeDecorateInternalSourceTopics(topics)); + } + } + list.add(Collections.unmodifiableSet(copartitionGroup)); + } + return Collections.unmodifiableList(list); + } + + private List<String> maybeDecorateInternalSourceTopics(final Collection<String> sourceTopics) { + final List<String> decoratedTopics = new ArrayList<>(); + for (final String topic : sourceTopics) { + if (internalTopicNames.contains(topic)) { + decoratedTopics.add(decorateTopic(topic)); + } else { + decoratedTopics.add(topic); + } + } + return decoratedTopics; + } + + private String decorateTopic(final String topic) { + if (applicationId == null) { + throw new TopologyBuilderException("there are internal topics and " + + "applicationId hasn't been set. Call " + + "setApplicationId first"); + } + + return applicationId + "-" + topic; + } + + public SubscriptionUpdates subscriptionUpdates() { + return subscriptionUpdates; + } + + public synchronized Pattern sourceTopicPattern() { + if (topicPattern == null) { + final List<String> allSourceTopics = new ArrayList<>(); + if (!nodeToSourceTopics.isEmpty()) { + for (final List<String> topics : nodeToSourceTopics.values()) { + allSourceTopics.addAll(maybeDecorateInternalSourceTopics(topics)); + } + } + Collections.sort(allSourceTopics); + + topicPattern = buildPatternForOffsetResetTopics(allSourceTopics, nodeToSourcePatterns.values()); + } + + return topicPattern; + } + + public synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates, + final String threadId) { + log.debug("stream-thread [{}] updating builder with {} topic(s) with possible matching regex subscription(s)", + threadId, subscriptionUpdates); + this.subscriptionUpdates = subscriptionUpdates; + setRegexMatchedTopicsToSourceNodes(); + setRegexMatchedTopicToStateStore(); + } + + private boolean isGlobalSource(final String nodeName) { + final NodeFactory nodeFactory = nodeFactories.get(nodeName); + + if (nodeFactory instanceof SourceNodeFactory) { + final List<String> topics = ((SourceNodeFactory) nodeFactory).topics; + if (topics != null && topics.size() == 1 && globalTopics.contains(topics.get(0))) { + return true; + } + } + + return false; + } + + public TopologyDescription describe() { + final TopologyDescription description = new TopologyDescription(); + + describeSubtopologies(description); + describeGlobalStores(description); + + return description; + } + + private void describeSubtopologies(final TopologyDescription description) { + for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet()) { + + final Set<String> allNodesOfGroups = nodeGroup.getValue(); + final boolean isNodeGroupOfGlobalStores = nodeGroupContainsGlobalSourceNode(allNodesOfGroups); + + if (!isNodeGroupOfGlobalStores) { + describeSubtopology(description, nodeGroup.getKey(), allNodesOfGroups); + } + } + } + + private boolean nodeGroupContainsGlobalSourceNode(final Set<String> allNodesOfGroups) { + for (final String node : allNodesOfGroups) { + if (isGlobalSource(node)) { + return true; + } + } + return false; + } + + private void describeSubtopology(final TopologyDescription description, + final Integer subtopologyId, + final Set<String> nodeNames) { + + final HashMap<String, AbstractNode> nodesByName = new HashMap<>(); + + // add all nodes + for (final String nodeName : nodeNames) { + nodesByName.put(nodeName, nodeFactories.get(nodeName).describe()); + } + + // connect each node to its predecessors and successors + for (final AbstractNode node : nodesByName.values()) { + for (final String predecessorName : nodeFactories.get(node.name()).predecessors) { + final AbstractNode predecessor = nodesByName.get(predecessorName); + node.addPredecessor(predecessor); + predecessor.addSuccessor(node); + } + } + + description.addSubtopology(new Subtopology( + subtopologyId, + new HashSet<TopologyDescription.Node>(nodesByName.values()))); + } + + private void describeGlobalStores(final TopologyDescription description) { + for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet()) { + final Set<String> nodes = nodeGroup.getValue(); + + final Iterator<String> it = nodes.iterator(); + while (it.hasNext()) { + final String node = it.next(); + + if (isGlobalSource(node)) { + // we found a GlobalStore node group; those contain exactly two node: {sourceNode,processorNode} + it.remove(); // remove sourceNode from group + final String processorNode = nodes.iterator().next(); // get remaining processorNode + + description.addGlobalStore(new GlobalStore( + node, + processorNode, + ((ProcessorNodeFactory) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(), + nodeToSourceTopics.get(node).get(0) + )); + break; + } + } + } + } + + public final static class GlobalStore implements TopologyDescription.GlobalStore { + private final Source source; + private final Processor processor; + + public GlobalStore(final String sourceName, + final String processorName, + final String storeName, + final String topicName) { + source = new Source(sourceName, topicName); + processor = new Processor(processorName, Collections.singleton(storeName)); + source.successors.add(processor); + processor.predecessors.add(source); + } + + @Override + public TopologyDescription.Source source() { + return source; + } + + @Override + public TopologyDescription.Processor processor() { + return processor; + } + + @Override + public String toString() { + return "GlobalStore: " + source.name + "(topic: " + source.topics + ") -> " + + processor.name + "(store: " + processor.stores.iterator().next() + ")\n"; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final GlobalStore that = (GlobalStore) o; + return source.equals(that.source) + && processor.equals(that.processor); + } + + @Override + public int hashCode() { + return Objects.hash(source, processor); + } + } + + public abstract static class AbstractNode implements TopologyDescription.Node { + final String name; + final Set<TopologyDescription.Node> predecessors = new HashSet<>(); + final Set<TopologyDescription.Node> successors = new HashSet<>(); + + AbstractNode(final String name) { + this.name = name; + } + + @Override + public String name() { + return name; + } + + @Override + public Set<TopologyDescription.Node> predecessors() { + return Collections.unmodifiableSet(predecessors); + } + + @Override + public Set<TopologyDescription.Node> successors() { + return Collections.unmodifiableSet(successors); + } + + public void addPredecessor(final TopologyDescription.Node predecessor) { + predecessors.add(predecessor); + } + + public void addSuccessor(final TopologyDescription.Node successor) { + successors.add(successor); + } + } + + public final static class Source extends AbstractNode implements TopologyDescription.Source { + private final String topics; + + public Source(final String name, + final String topics) { + super(name); + this.topics = topics; + } + + @Override + public String topics() { + return topics; + } + + @Override + public void addPredecessor(final TopologyDescription.Node predecessor) { + throw new UnsupportedOperationException("Sources don't have predecessors."); + } + + @Override + public String toString() { + return "Source: " + name + "(topics: " + topics + ") --> " + nodeNames(successors); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final Source source = (Source) o; + // omit successor to avoid infinite loops + return name.equals(source.name) + && topics.equals(source.topics); + } + + @Override + public int hashCode() { + // omit successor as it might change and alter the hash code + return Objects.hash(name, topics); + } + } + + public final static class Processor extends AbstractNode implements TopologyDescription.Processor { + private final Set<String> stores; + + public Processor(final String name, + final Set<String> stores) { + super(name); + this.stores = stores; + } + + @Override + public Set<String> stores() { + return Collections.unmodifiableSet(stores); + } + + @Override + public String toString() { + return "Processor: " + name + "(stores: " + stores + ") --> " + nodeNames(successors) + " <-- " + nodeNames(predecessors); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final Processor processor = (Processor) o; + // omit successor to avoid infinite loops + return name.equals(processor.name) + && stores.equals(processor.stores) + && predecessors.equals(processor.predecessors); + } + + @Override + public int hashCode() { + // omit successor as it might change and alter the hash code + return Objects.hash(name, stores); + } + } + + public final static class Sink extends AbstractNode implements TopologyDescription.Sink { + private final String topic; + + public Sink(final String name, + final String topic) { + super(name); + this.topic = topic; + } + + @Override + public String topic() { + return topic; + } + + @Override + public void addSuccessor(final TopologyDescription.Node successor) { + throw new UnsupportedOperationException("Sinks don't have successors."); + } + + @Override + public String toString() { + return "Sink: " + name + "(topic: " + topic + ") <-- " + nodeNames(predecessors); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final Sink sink = (Sink) o; + return name.equals(sink.name) + && topic.equals(sink.topic) + && predecessors.equals(sink.predecessors); + } + + @Override + public int hashCode() { + // omit predecessors as it might change and alter the hash code + return Objects.hash(name, topic); + } + } + + public final static class Subtopology implements org.apache.kafka.streams.TopologyDescription.Subtopology { + private final int id; + private final Set<org.apache.kafka.streams.TopologyDescription.Node> nodes; + + public Subtopology(final int id, + final Set<org.apache.kafka.streams.TopologyDescription.Node> nodes) { + this.id = id; + this.nodes = nodes; + } + + @Override + public int id() { + return id; + } + + @Override + public Set<org.apache.kafka.streams.TopologyDescription.Node> nodes() { + return Collections.unmodifiableSet(nodes); + } + + @Override + public String toString() { + return "Sub-topology: " + id + "\n" + nodesAsString(); + } + + private String nodesAsString() { + final StringBuilder sb = new StringBuilder(); + for (final org.apache.kafka.streams.TopologyDescription.Node node : nodes) { + sb.append(" "); + sb.append(node); + sb.append('\n'); + } + return sb.toString(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final Subtopology that = (Subtopology) o; + return id == that.id + && nodes.equals(that.nodes); + } + + @Override + public int hashCode() { + return Objects.hash(id, nodes); + } + } + + public final static class TopologyDescription implements org.apache.kafka.streams.TopologyDescription { + private final Set<org.apache.kafka.streams.TopologyDescription.Subtopology> subtopologies = new HashSet<>(); + private final Set<org.apache.kafka.streams.TopologyDescription.GlobalStore> globalStores = new HashSet<>(); + + public void addSubtopology(final org.apache.kafka.streams.TopologyDescription.Subtopology subtopology) { + subtopologies.add(subtopology); + } + + public void addGlobalStore(final org.apache.kafka.streams.TopologyDescription.GlobalStore globalStore) { + globalStores.add(globalStore); + } + + @Override + public Set<org.apache.kafka.streams.TopologyDescription.Subtopology> subtopologies() { + return Collections.unmodifiableSet(subtopologies); + } + + @Override + public Set<org.apache.kafka.streams.TopologyDescription.GlobalStore> globalStores() { + return Collections.unmodifiableSet(globalStores); + } + + @Override + public String toString() { + return subtopologiesAsString() + globalStoresAsString(); + } + + private String subtopologiesAsString() { + final StringBuilder sb = new StringBuilder(); + sb.append("Sub-topologies: \n"); + if (subtopologies.isEmpty()) { + sb.append(" none\n"); + } else { + for (final org.apache.kafka.streams.TopologyDescription.Subtopology st : subtopologies) { + sb.append(" "); + sb.append(st); + } + } + return sb.toString(); + } + + private String globalStoresAsString() { + final StringBuilder sb = new StringBuilder(); + sb.append("Global Stores:\n"); + if (globalStores.isEmpty()) { + sb.append(" none\n"); + } else { + for (final org.apache.kafka.streams.TopologyDescription.GlobalStore gs : globalStores) { + sb.append(" "); + sb.append(gs); + } + } + return sb.toString(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final TopologyDescription that = (TopologyDescription) o; + return subtopologies.equals(that.subtopologies) + && globalStores.equals(that.globalStores); + } + + @Override + public int hashCode() { + return Objects.hash(subtopologies, globalStores); + } + + } + + private static String nodeNames(final Set<TopologyDescription.Node> nodes) { + final StringBuilder sb = new StringBuilder(); + if (!nodes.isEmpty()) { + for (final TopologyDescription.Node n : nodes) { + sb.append(n.name()); + sb.append(", "); + } + sb.deleteCharAt(sb.length() - 1); + sb.deleteCharAt(sb.length() - 1); + } else { + return "none"; + } + return sb.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 91856b0..e8b6a1a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -356,8 +356,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable if (otherSinkTopics.contains(topicName)) { // if this topic is one of the sink topics of this topology, // use the maximum of all its source topic partitions as the number of partitions - for (String sourceTopicName : otherTopicsInfo.sourceTopics) { - Integer numPartitionsCandidate; + for (final String sourceTopicName : otherTopicsInfo.sourceTopics) { + final Integer numPartitionsCandidate; // It is possible the sourceTopic is another internal topic, i.e, // map().join().join(map()) if (repartitionTopicMetadata.containsKey(sourceTopicName)) { @@ -377,10 +377,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } // if we still have not find the right number of partitions, // another iteration is needed - if (numPartitions == UNKNOWN) + if (numPartitions == UNKNOWN) { numPartitionsNeeded = true; - else + } else { repartitionTopicMetadata.get(topicName).numPartitions = numPartitions; + } } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index eb75b14..f10bf41 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -34,9 +34,9 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Count; -import org.apache.kafka.common.metrics.stats.Sum; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.Sum; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.StreamsConfig; @@ -45,7 +45,6 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskIdFormatException; import org.apache.kafka.streams.processor.PartitionGrouper; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.internals.ThreadCache; import org.slf4j.Logger; @@ -398,7 +397,7 @@ public class StreamThread extends Thread { public final UUID processId; protected final StreamsConfig config; - protected final TopologyBuilder builder; + protected final InternalTopologyBuilder builder; Producer<byte[], byte[]> threadProducer; private final KafkaClientSupplier clientSupplier; protected final Consumer<byte[], byte[]> consumer; @@ -441,7 +440,7 @@ public class StreamThread extends Thread { final ConsumerRebalanceListener rebalanceListener; private final static int UNLIMITED_RECORDS = -1; - public StreamThread(final TopologyBuilder builder, + public StreamThread(final InternalTopologyBuilder builder, final StreamsConfig config, final KafkaClientSupplier clientSupplier, final String applicationId, http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java index bb74b48..3fd1613 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; -import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.StreamsMetadata; @@ -44,14 +43,14 @@ import java.util.Set; */ public class StreamsMetadataState { public static final HostInfo UNKNOWN_HOST = new HostInfo("unknown", -1); - private final TopologyBuilder builder; + private final InternalTopologyBuilder builder; private final List<StreamsMetadata> allMetadata = new ArrayList<>(); private final Set<String> globalStores; private final HostInfo thisHost; private Cluster clusterMetadata; private StreamsMetadata myMetadata; - public StreamsMetadataState(final TopologyBuilder builder, final HostInfo thisHost) { + public StreamsMetadataState(final InternalTopologyBuilder builder, final HostInfo thisHost) { this.builder = builder; this.globalStores = builder.globalStateStores().keySet(); this.thisHost = thisHost;
