Repository: kafka Updated Branches: refs/heads/trunk 1ef7b494b -> fb42558e2
KAFKA-3443: support for adding sources to KafkaStreams via Pattern. This PR is the follow on to the closed PR #1410. Author: bbejeck <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1477 from bbejeck/KAFKA-3443_streams_support_for_regex_sources Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fb42558e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fb42558e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fb42558e Branch: refs/heads/trunk Commit: fb42558e2500835722a4e5028896ddae4f407d6f Parents: 1ef7b49 Author: bbejeck <[email protected]> Authored: Wed Jun 15 19:20:43 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed Jun 15 19:20:43 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/kafka/clients/Metadata.java | 19 +- .../kafka/streams/kstream/KStreamBuilder.java | 39 ++ .../streams/processor/TopologyBuilder.java | 150 +++++++- .../internals/StreamPartitionAssignor.java | 37 +- .../processor/internals/StreamThread.java | 10 +- .../integration/RegexSourceIntegrationTest.java | 365 +++++++++++++++++++ .../utils/EmbeddedSingleNodeKafkaCluster.java | 7 +- .../integration/utils/KafkaEmbedded.java | 15 + .../streams/processor/TopologyBuilderTest.java | 43 ++- 9 files changed, 664 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/clients/src/main/java/org/apache/kafka/clients/Metadata.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 54b19a3..3934627 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -12,6 +12,13 @@ */ package org.apache.kafka.clients; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.TimeoutException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -22,13 +29,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.errors.TimeoutException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * A class encapsulating some of the logic around metadata. * <p> @@ -292,7 +292,10 @@ public final class Metadata { unauthorizedTopics.retainAll(this.topics.keySet()); for (String topic : this.topics.keySet()) { - partitionInfos.addAll(cluster.partitionsForTopic(topic)); + List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic); + if (partitionInfoList != null) { + partitionInfos.addAll(partitionInfoList); + } } nodes = cluster.nodes(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index 9d90ba0..53b2f4e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.processor.TopologyBuilder; import java.util.Collections; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; /** * {@link KStreamBuilder} is a subclass of {@link TopologyBuilder} that provides the Kafka Streams DSL @@ -55,6 +56,22 @@ public class KStreamBuilder extends TopologyBuilder { return stream(null, null, topics); } + + /** + * Create a {@link KStream} instance from the specified Pattern. + * The default deserializers specified in the config are used. + * <p> + * If multiple topics are matched by the specified pattern, the created stream will read data from all of them, + * and there is no ordering guarantee between records from different topics + * + * @param topicPattern the Pattern to match for topic names + * @return a {@link KStream} for topics matching the regex pattern. + */ + public <K, V> KStream<K, V> stream(Pattern topicPattern) { + return stream(null, null, topicPattern); + } + + /** * Create a {@link KStream} instance from the specified topics. * <p> @@ -75,6 +92,28 @@ public class KStreamBuilder extends TopologyBuilder { return new KStreamImpl<>(this, name, Collections.singleton(name)); } + + /** + * Create a {@link KStream} instance from the specified Pattern. + * <p> + * If multiple topics are matched by the specified pattern, the created stream will read data from all of them, + * and there is no ordering guarantee between records from different topics. + * + * @param keySerde key serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param valSerde value serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param topicPattern the Pattern to match for topic names + * @return a {@link KStream} for the specified topics + */ + public <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valSerde, Pattern topicPattern) { + String name = newName(KStreamImpl.SOURCE_NAME); + + addSource(name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern); + + return new KStreamImpl<>(this, name, Collections.singleton(name)); + } + /** * Create a {@link KTable} instance for the specified topic. * The default deserializers specified in the config are used. http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 5425149..1743baf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.QuickUnion; import org.apache.kafka.streams.processor.internals.SinkNode; import org.apache.kafka.streams.processor.internals.SourceNode; +import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates; import java.util.ArrayList; import java.util.Arrays; @@ -39,6 +40,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; /** * A component that is used to build a {@link ProcessorTopology}. A topology contains an acyclic graph of sources, processors, @@ -62,8 +64,14 @@ public class TopologyBuilder { private final QuickUnion<String> nodeGrouper = new QuickUnion<>(); private final List<Set<String>> copartitionSourceGroups = new ArrayList<>(); private final HashMap<String, String[]> nodeToSourceTopics = new HashMap<>(); + private final HashMap<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>(); + private final HashMap<String, Pattern> topicToPatterns = new HashMap<>(); private final HashMap<String, String> nodeToSinkTopic = new HashMap<>(); + private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates(); private Map<Integer, Set<String>> nodeGroups = null; + private Pattern topicPattern; + + private static class StateStoreFactory { public final Set<String> users; @@ -110,23 +118,49 @@ public class TopologyBuilder { } } - private static class SourceNodeFactory extends NodeFactory { - public final String[] topics; + private class SourceNodeFactory extends NodeFactory { + private final String[] topics; + public final Pattern pattern; private Deserializer keyDeserializer; private Deserializer valDeserializer; - private SourceNodeFactory(String name, String[] topics, Deserializer keyDeserializer, Deserializer valDeserializer) { + private SourceNodeFactory(String name, String[] topics, Pattern pattern, Deserializer keyDeserializer, Deserializer valDeserializer) { super(name); - this.topics = topics.clone(); + this.topics = topics != null ? topics.clone() : null; + this.pattern = pattern; this.keyDeserializer = keyDeserializer; this.valDeserializer = valDeserializer; } + public String[] getTopics() { + return topics; + } + + public String[] getTopics(Collection<String> subscribedTopics) { + List<String> matchedTopics = new ArrayList<>(); + for (String update : subscribedTopics) { + if (this.pattern == topicToPatterns.get(update)) { + matchedTopics.add(update); + //not same pattern instance,but still matches not allowed + } else if (topicToPatterns.containsKey(update) && isMatch(update)) { + throw new TopologyBuilderException("Topic " + update + " already matched check for overlapping regex patterns"); + } else if (isMatch(update)) { + topicToPatterns.put(update, this.pattern); + matchedTopics.add(update); + } + } + return matchedTopics.toArray(new String[matchedTopics.size()]); + } + @SuppressWarnings("unchecked") @Override public ProcessorNode build(String applicationId) { return new SourceNode(name, keyDeserializer, valDeserializer); } + + private boolean isMatch(String topic) { + return this.pattern.matcher(topic).matches(); + } } private class SinkNodeFactory extends NodeFactory { @@ -193,7 +227,7 @@ public class TopologyBuilder { public TopologyBuilder() {} /** - * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes. + * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes. * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. @@ -207,6 +241,23 @@ public class TopologyBuilder { return addSource(name, (Deserializer) null, (Deserializer) null, topics); } + + /** + * Add a new source that consumes from topics matching the given pattern + * and forward the records to child processor and/or sink nodes. + * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and + * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the + * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. + * + * @param name the unique name of the source used to reference this node when + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. + * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume + * @return this builder instance so methods can be chained together; never null + */ + public final TopologyBuilder addSource(String name, Pattern topicPattern) { + return addSource(name, (Deserializer) null, (Deserializer) null, topicPattern); + } + /** * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes. * The source will use the specified key and value deserializers. @@ -231,10 +282,16 @@ public class TopologyBuilder { if (sourceTopicNames.contains(topic)) throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source."); + for (Pattern pattern : nodeToSourcePatterns.values()) { + if (pattern.matcher(topic).matches()) { + throw new TopologyBuilderException("Topic " + topic + " matches a Pattern already registered by another source."); + } + } + sourceTopicNames.add(topic); } - nodeFactories.put(name, new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer)); + nodeFactories.put(name, new SourceNodeFactory(name, topics, null, keyDeserializer, valDeserializer)); nodeToSourceTopics.put(name, topics.clone()); nodeGrouper.add(name); @@ -242,6 +299,49 @@ public class TopologyBuilder { } /** + * Add a new source that consumes from topics matching the given pattern + * and forwards the records to child processor and/or sink nodes. + * The source will use the specified key and value deserializers. The provided + * de-/serializers will be used for all matched topics, so care should be taken to specify patterns for + * topics that share the same key-value data format. + * + * @param name the unique name of the source used to reference this node when + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. + * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming records; may be null if the source + * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} specified in the + * {@link org.apache.kafka.streams.StreamsConfig stream configuration} + * @param valDeserializer the {@link Deserializer value deserializer} used when consuming records; may be null if the source + * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the + * {@link org.apache.kafka.streams.StreamsConfig stream configuration} + * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume + * @return this builder instance so methods can be chained together; never null + * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name + */ + + public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) { + + if (topicPattern == null) { + throw new TopologyBuilderException("Pattern can't be null"); + } + + if (nodeFactories.containsKey(name)) { + throw new TopologyBuilderException("Processor " + name + " is already added."); + } + + for (String sourceTopicName : sourceTopicNames) { + if (topicPattern.matcher(sourceTopicName).matches()) { + throw new TopologyBuilderException("Pattern " + topicPattern + " will match a topic that has already been registered by another source."); + } + } + + nodeToSourcePatterns.put(name, topicPattern); + nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, keyDeserializer, valDeserializer)); + nodeGrouper.add(name); + + return this; + } + + /** * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic. * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} and * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the @@ -504,9 +604,19 @@ public class TopologyBuilder { public Map<Integer, TopicsInfo> topicGroups(String applicationId) { Map<Integer, TopicsInfo> topicGroups = new HashMap<>(); + + if (subscriptionUpdates.hasUpdates()) { + for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) { + SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey()); + //need to update nodeToSourceTopics with topics matched from given regex + nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates())); + } + } + if (nodeGroups == null) nodeGroups = makeNodeGroups(); + for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) { Set<String> sinkTopics = new HashSet<>(); Set<String> sourceTopics = new HashSet<>(); @@ -677,7 +787,9 @@ public class TopologyBuilder { } } } else if (factory instanceof SourceNodeFactory) { - for (String topic : ((SourceNodeFactory) factory).topics) { + SourceNodeFactory sourceNodeFactory = (SourceNodeFactory) factory; + String[] topics = (sourceNodeFactory.pattern != null) ? sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) : sourceNodeFactory.getTopics(); + for (String topic : topics) { if (internalTopicNames.contains(topic)) { // prefix the internal topic name with the application id topicSourceMap.put(applicationId + "-" + topic, (SourceNode) node); @@ -713,4 +825,28 @@ public class TopologyBuilder { } return Collections.unmodifiableSet(topics); } + + public Pattern sourceTopicPattern() { + if (this.topicPattern == null && !nodeToSourcePatterns.isEmpty()) { + StringBuilder builder = new StringBuilder(); + for (Pattern pattern : nodeToSourcePatterns.values()) { + builder.append(pattern.pattern()).append("|"); + } + if (!nodeToSourceTopics.isEmpty()) { + for (String[] topics : nodeToSourceTopics.values()) { + for (String topic : topics) { + builder.append(topic).append("|"); + } + } + } + + builder.setLength(builder.length() - 1); + this.topicPattern = Pattern.compile(builder.toString()); + } + return this.topicPattern; + } + + public void updateSubscriptions(SubscriptionUpdates subscriptionUpdates) { + this.subscriptionUpdates = subscriptionUpdates; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 085ff94..adefab9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -118,8 +118,6 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable streamThread = (StreamThread) o; streamThread.partitionAssignor(this); - this.topicGroups = streamThread.builder.topicGroups(streamThread.applicationId); - if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) { internalTopicManager = new InternalTopicManager( (String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG), @@ -228,12 +226,17 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable // 2. within each client, tasks are assigned to consumer clients in round-robin manner. Map<UUID, Set<String>> consumersByClient = new HashMap<>(); Map<UUID, ClientState<TaskId>> states = new HashMap<>(); - + SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates(); // decode subscription info for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) { String consumerId = entry.getKey(); Subscription subscription = entry.getValue(); + if (streamThread.builder.sourceTopicPattern() != null) { + // update the topic groups with the returned subscription list for regex pattern subscriptions + subscriptionUpdates.updateTopics(subscription.topics()); + } + SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData()); Set<String> consumers = consumersByClient.get(info.processId); @@ -255,6 +258,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable state.capacity = state.capacity + 1d; } + streamThread.builder.updateSubscriptions(subscriptionUpdates); + this.topicGroups = streamThread.builder.topicGroups(streamThread.applicationId); + // ensure the co-partitioning topics within the group have the same number of partitions, // and enforce the number of partitions for those internal topics. internalSourceTopicToTaskIds = new HashMap<>(); @@ -486,4 +492,29 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable public void setInternalTopicManager(InternalTopicManager internalTopicManager) { this.internalTopicManager = internalTopicManager; } + + /** + * Used to capture subscribed topic via Patterns discovered during the + * partition assignment process. + */ + public static class SubscriptionUpdates { + + private final Set<String> updatedTopicSubscriptions = new HashSet<>(); + + + private void updateTopics(Collection<String> topicNames) { + updatedTopicSubscriptions.clear(); + updatedTopicSubscriptions.addAll(topicNames); + } + + public Collection<String> getUpdates() { + return Collections.unmodifiableSet(new HashSet<>(updatedTopicSubscriptions)); + } + + public boolean hasUpdates() { + return !updatedTopicSubscriptions.isEmpty(); + } + + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 72eeef5..64127a8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -62,6 +62,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; import static java.util.Collections.singleton; @@ -78,6 +79,7 @@ public class StreamThread extends Thread { protected final StreamsConfig config; protected final TopologyBuilder builder; protected final Set<String> sourceTopics; + protected final Pattern topicPattern; protected final Producer<byte[], byte[]> producer; protected final Consumer<byte[], byte[]> consumer; protected final Consumer<byte[], byte[]> restoreConsumer; @@ -160,6 +162,7 @@ public class StreamThread extends Thread { this.config = config; this.builder = builder; this.sourceTopics = builder.sourceTopics(applicationId); + this.topicPattern = builder.sourceTopicPattern(); this.clientId = clientId; this.processId = processId; this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class); @@ -283,7 +286,12 @@ public class StreamThread extends Thread { long lastPoll = 0L; boolean requiresPoll = true; - consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener); + if (topicPattern != null) { + consumer.subscribe(topicPattern, rebalanceListener); + } else { + consumer.subscribe(new ArrayList<>(sourceTopics), rebalanceListener); + } + while (stillRunning()) { // try to fetch some records if necessary http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java new file mode 100644 index 0000000..7e18cff --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -0,0 +1,365 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KafkaClientSupplier; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; +import org.apache.kafka.streams.processor.internals.StreamTask; +import org.apache.kafka.streams.processor.internals.StreamThread; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.regex.Pattern; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +/** + * End-to-end integration test based on using regex and named topics for creating sources, using + * an embedded Kafka cluster. + */ + +public class RegexSourceIntegrationTest { + @ClassRule + public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); + + private static final String TOPIC_1 = "topic-1"; + private static final String TOPIC_2 = "topic-2"; + private static final String TOPIC_A = "topic-A"; + private static final String TOPIC_C = "topic-C"; + private static final String TOPIC_Y = "topic-Y"; + private static final String TOPIC_Z = "topic-Z"; + private static final String FA_TOPIC = "fa"; + private static final String FOO_TOPIC = "foo"; + + private static final int FIRST_UPDATE = 0; + private static final int SECOND_UPDATE = 1; + + private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic"; + private Properties streamsConfiguration; + + + @BeforeClass + public static void startKafkaCluster() throws Exception { + CLUSTER.createTopic(TOPIC_1); + CLUSTER.createTopic(TOPIC_2); + CLUSTER.createTopic(TOPIC_A); + CLUSTER.createTopic(TOPIC_C); + CLUSTER.createTopic(TOPIC_Y); + CLUSTER.createTopic(TOPIC_Z); + CLUSTER.createTopic(FA_TOPIC); + CLUSTER.createTopic(FOO_TOPIC); + + } + + @Before + public void setUp() { + streamsConfiguration = getStreamsConfig(); + } + + @After + public void tearDown() throws Exception { + // Remove any state from previous test runs + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + } + + @Test + public void testRegexMatchesTopicsAWhenCreated() throws Exception { + + final Serde<String> stringSerde = Serdes.String(); + + StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration); + + CLUSTER.createTopic("TEST-TOPIC-1"); + + KStreamBuilder builder = new KStreamBuilder(); + + KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); + + pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); + + KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + + Field streamThreadsField = streams.getClass().getDeclaredField("threads"); + streamThreadsField.setAccessible(true); + StreamThread[] streamThreads = (StreamThread[]) streamThreadsField.get(streams); + StreamThread originalThread = streamThreads[0]; + + TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig, + new DefaultKafkaClientSupplier(), + originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), new SystemTime()); + + streamThreads[0] = testStreamThread; + streams.start(); + testStreamThread.waitUntilTasksUpdated(); + + CLUSTER.createTopic("TEST-TOPIC-2"); + + testStreamThread.waitUntilTasksUpdated(); + + streams.close(); + + List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-1"); + List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); + + assertThat(testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE), equalTo(expectedFirstAssignment)); + assertThat(testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE), equalTo(expectedSecondAssignment)); + } + + @Test + public void testRegexMatchesTopicsAWhenDeleted() throws Exception { + + final Serde<String> stringSerde = Serdes.String(); + + StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration); + + CLUSTER.createTopic("TEST-TOPIC-A"); + CLUSTER.createTopic("TEST-TOPIC-B"); + + KStreamBuilder builder = new KStreamBuilder(); + + KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]")); + + pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); + + KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + + Field streamThreadsField = streams.getClass().getDeclaredField("threads"); + streamThreadsField.setAccessible(true); + StreamThread[] streamThreads = (StreamThread[]) streamThreadsField.get(streams); + StreamThread originalThread = streamThreads[0]; + + TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig, + new DefaultKafkaClientSupplier(), + originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), new SystemTime()); + + streamThreads[0] = testStreamThread; + streams.start(); + + testStreamThread.waitUntilTasksUpdated(); + + CLUSTER.deleteTopic("TEST-TOPIC-A"); + + testStreamThread.waitUntilTasksUpdated(); + + streams.close(); + + List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B"); + List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-B"); + + assertThat(testStreamThread.assignedTopicPartitions.get(FIRST_UPDATE), equalTo(expectedFirstAssignment)); + assertThat(testStreamThread.assignedTopicPartitions.get(SECOND_UPDATE), equalTo(expectedSecondAssignment)); + } + + + @Test + public void testShouldReadFromRegexAndNamedTopics() throws Exception { + + String topic1TestMessage = "topic-1 test"; + String topic2TestMessage = "topic-2 test"; + String topicATestMessage = "topic-A test"; + String topicCTestMessage = "topic-C test"; + String topicYTestMessage = "topic-Y test"; + String topicZTestMessage = "topic-Z test"; + + + final Serde<String> stringSerde = Serdes.String(); + + KStreamBuilder builder = new KStreamBuilder(); + + KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d")); + KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]")); + KStream<String, String> namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z); + + pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); + pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); + namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); + + KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + streams.start(); + + Properties producerConfig = getProducerConfig(); + + IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList(topic1TestMessage), producerConfig); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Arrays.asList(topic2TestMessage), producerConfig); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Arrays.asList(topicATestMessage), producerConfig); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Arrays.asList(topicCTestMessage), producerConfig); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Arrays.asList(topicYTestMessage), producerConfig); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Arrays.asList(topicZTestMessage), producerConfig); + + Properties consumerConfig = getConsumerConfig(); + + List<String> expectedReceivedValues = Arrays.asList(topicATestMessage, topic1TestMessage, topic2TestMessage, topicCTestMessage, topicYTestMessage, topicZTestMessage); + List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 6); + List<String> actualValues = new ArrayList<>(6); + + for (KeyValue<String, String> receivedKeyValue : receivedKeyValues) { + actualValues.add(receivedKeyValue.value); + } + + streams.close(); + Collections.sort(actualValues); + Collections.sort(expectedReceivedValues); + assertThat(actualValues, equalTo(expectedReceivedValues)); + } + + //TODO should be updated to expected = TopologyBuilderException after KAFKA-3708 + @Test(expected = AssertionError.class) + public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception { + + String fooMessage = "fooMessage"; + String fMessage = "fMessage"; + + + final Serde<String> stringSerde = Serdes.String(); + + KStreamBuilder builder = new KStreamBuilder(); + + + // overlapping patterns here, no messages should be sent as TopologyBuilderException + // will be thrown when the processor topology is built. + + KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("foo.*")); + KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("f.*")); + + + pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); + pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); + + + // Remove any state from previous test runs + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + + KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + streams.start(); + + Properties producerConfig = getProducerConfig(); + + IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Arrays.asList(fMessage), producerConfig); + IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList(fooMessage), producerConfig); + + Properties consumerConfig = getConsumerConfig(); + + try { + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000); + } finally { + streams.close(); + } + + } + + private Properties getProducerConfig() { + Properties producerConfig = new Properties(); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); + producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return producerConfig; + } + + private Properties getStreamsConfig() { + Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "regex-source-integration-test"); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); + streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); + streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams"); + return streamsConfiguration; + } + + private Properties getConsumerConfig() { + Properties consumerConfig = new Properties(); + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "regex-source-integration-consumer"); + consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + return consumerConfig; + } + + private class TestStreamThread extends StreamThread { + + public Map<Integer, List<String>> assignedTopicPartitions = new HashMap<>(); + private int index = 0; + public volatile boolean streamTaskUpdated = false; + + public TestStreamThread(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier, String applicationId, String clientId, UUID processId, Metrics metrics, Time time) { + super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time); + } + + @Override + public StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) { + List<String> assignedTopics = new ArrayList<>(); + for (TopicPartition partition : partitions) { + assignedTopics.add(partition.topic()); + } + Collections.sort(assignedTopics); + streamTaskUpdated = true; + assignedTopicPartitions.put(index++, assignedTopics); + return super.createStreamTask(id, partitions); + } + + + void waitUntilTasksUpdated() { + long maxTimeMillis = 30000; + long startTime = System.currentTimeMillis(); + while (!streamTaskUpdated && ((System.currentTimeMillis() - startTime) < maxTimeMillis)) { + //empty loop just waiting for update + } + streamTaskUpdated = false; + } + + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java index 34753ae..d3ba065 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java @@ -19,12 +19,12 @@ package org.apache.kafka.streams.integration.utils; import kafka.server.KafkaConfig$; import kafka.zk.EmbeddedZookeeper; +import org.junit.rules.ExternalResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Properties; -import org.junit.rules.ExternalResource; /** * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker. @@ -48,6 +48,7 @@ public class EmbeddedSingleNodeKafkaCluster extends ExternalResource { log.debug("ZooKeeper instance is running at {}", zKConnectString()); brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString()); brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT); + brokerConfig.put(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true); log.debug("Starting a Kafka instance on port {} ...", brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp())); broker = new KafkaEmbedded(brokerConfig); @@ -125,4 +126,8 @@ public class EmbeddedSingleNodeKafkaCluster extends ExternalResource { Properties topicConfig) { broker.createTopic(topic, partitions, replication, topicConfig); } + + public void deleteTopic(String topic) { + broker.deleteTopic(topic); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java index 348b46b..43b82d6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java @@ -186,4 +186,19 @@ public class KafkaEmbedded { AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$); zkClient.close(); } + + public void deleteTopic(String topic) { + log.debug("Deleting topic { name: {} }", topic); + + ZkClient zkClient = new ZkClient( + zookeeperConnect(), + DEFAULT_ZK_SESSION_TIMEOUT_MS, + DEFAULT_ZK_CONNECTION_TIMEOUT_MS, + ZKStringSerializer$.MODULE$); + boolean isSecure = false; + ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure); + AdminUtils.deleteTopic(zkUtils, topic); + zkClient.close(); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/fb42558e/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index 9af313a..28acf09 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -18,10 +18,10 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.streams.errors.TopologyBuilderException; +import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorTopology; -import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateStoreSupplier; import org.junit.Test; @@ -33,6 +33,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; import static org.apache.kafka.common.utils.Utils.mkList; import static org.apache.kafka.common.utils.Utils.mkSet; @@ -152,6 +153,46 @@ public class TopologyBuilderTest { assertEquals(expected, builder.sourceTopics("X")); } + @Test + public void testPatternSourceTopic() { + final TopologyBuilder builder = new TopologyBuilder(); + Pattern expectedPattern = Pattern.compile("topic-\\d"); + builder.addSource("source-1", expectedPattern); + assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern()); + } + + @Test + public void testAddMoreThanOnePatternSourceNode() { + final TopologyBuilder builder = new TopologyBuilder(); + Pattern expectedPattern = Pattern.compile("topics[A-Z]|.*-\\d"); + builder.addSource("source-1", Pattern.compile("topics[A-Z]")); + builder.addSource("source-2", Pattern.compile(".*-\\d")); + assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern()); + } + + @Test + public void testSubscribeTopicNameAndPattern() { + final TopologyBuilder builder = new TopologyBuilder(); + Pattern expectedPattern = Pattern.compile(".*-\\d|topic-foo|topic-bar"); + builder.addSource("source-1", "topic-foo", "topic-bar"); + builder.addSource("source-2", Pattern.compile(".*-\\d")); + assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern()); + } + + @Test(expected = TopologyBuilderException.class) + public void testPatternMatchesAlreadyProvidedTopicSource() { + final TopologyBuilder builder = new TopologyBuilder(); + builder.addSource("source-1", "foo"); + builder.addSource("source-2", Pattern.compile("f.*")); + } + + @Test(expected = TopologyBuilderException.class) + public void testNamedTopicMatchesAlreadyProvidedPattern() { + final TopologyBuilder builder = new TopologyBuilder(); + builder.addSource("source-1", Pattern.compile("f.*")); + builder.addSource("source-2", "foo"); + } + @Test(expected = TopologyBuilderException.class) public void testAddStateStoreWithNonExistingProcessor() { final TopologyBuilder builder = new TopologyBuilder();
