Repository: storm Updated Branches: refs/heads/master 399e35f10 -> cd6ca3ef0
STORM-2541: Fix storm-kafka-client manual subscription not being able to start consuming Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c7b7b896 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c7b7b896 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c7b7b896 Branch: refs/heads/master Commit: c7b7b896bb0101a1b1b56677b647550b615d515a Parents: 399e35f Author: Stig Rohde Døssing <stigdoess...@gmail.com> Authored: Mon Jun 5 14:59:19 2017 +0200 Committer: Stig Rohde Døssing <s...@apache.org> Committed: Tue Jul 18 22:36:46 2017 +0200 ---------------------------------------------------------------------- .../spout/ManualPartitionNamedSubscription.java | 78 -------------------- .../ManualPartitionPatternSubscription.java | 76 ------------------- .../spout/ManualPartitionSubscription.java | 71 ++++++++++++++++++ .../storm/kafka/spout/NamedTopicFilter.java | 67 +++++++++++++++++ .../storm/kafka/spout/PatternTopicFilter.java | 69 +++++++++++++++++ .../apache/storm/kafka/spout/Subscription.java | 2 +- .../apache/storm/kafka/spout/TopicFilter.java | 38 ++++++++++ .../storm/kafka/spout/NamedTopicFilterTest.java | 69 +++++++++++++++++ .../kafka/spout/PatternTopicFilterTest.java | 73 ++++++++++++++++++ 9 files changed, 388 insertions(+), 155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java deleted file mode 100644 index 926fdf0..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.storm.task.TopologyContext; - -public class ManualPartitionNamedSubscription extends NamedSubscription { - private static final long serialVersionUID = 5633018073527583826L; - private final ManualPartitioner partitioner; - private Set<TopicPartition> currentAssignment = null; - private KafkaConsumer<?, ?> consumer = null; - private ConsumerRebalanceListener listener = null; - private TopologyContext context = null; - - public ManualPartitionNamedSubscription(ManualPartitioner parter, Collection<String> topics) { - super(topics); - this.partitioner = parter; - } - - public ManualPartitionNamedSubscription(ManualPartitioner parter, String ... topics) { - this(parter, Arrays.asList(topics)); - } - - @Override - public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) { - this.consumer = consumer; - this.listener = listener; - this.context = context; - refreshAssignment(); - } - - @Override - public void refreshAssignment() { - List<TopicPartition> allPartitions = new ArrayList<>(); - for (String topic : topics) { - for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) { - allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); - } - } - Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE); - Set<TopicPartition> newAssignment = new HashSet<>(partitioner.partition(allPartitions, context)); - if (!newAssignment.equals(currentAssignment)) { - if (currentAssignment != null) { - listener.onPartitionsRevoked(currentAssignment); - listener.onPartitionsAssigned(newAssignment); - } - currentAssignment = newAssignment; - consumer.assign(currentAssignment); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java deleted file mode 100644 index 2344477..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.storm.task.TopologyContext; - -public class ManualPartitionPatternSubscription extends PatternSubscription { - private static final long serialVersionUID = 5633018073527583826L; - private final ManualPartitioner parter; - private Set<TopicPartition> currentAssignment = null; - private KafkaConsumer<?, ?> consumer = null; - private ConsumerRebalanceListener listener = null; - private TopologyContext context = null; - - public ManualPartitionPatternSubscription(ManualPartitioner parter, Pattern pattern) { - super(pattern); - this.parter = parter; - } - - @Override - public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) { - this.consumer = consumer; - this.listener = listener; - this.context = context; - refreshAssignment(); - } - - @Override - public void refreshAssignment() { - List<TopicPartition> allPartitions = new ArrayList<>(); - for (Map.Entry<String, List<PartitionInfo>> entry: consumer.listTopics().entrySet()) { - if (pattern.matcher(entry.getKey()).matches()) { - for (PartitionInfo partitionInfo: entry.getValue()) { - allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); - } - } - } - Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE); - Set<TopicPartition> newAssignment = new HashSet<>(parter.partition(allPartitions, context)); - if (!newAssignment.equals(currentAssignment)) { - if (currentAssignment != null) { - listener.onPartitionsRevoked(currentAssignment); - listener.onPartitionsAssigned(newAssignment); - } - currentAssignment = newAssignment; - consumer.assign(currentAssignment); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java new file mode 100644 index 0000000..2c65d6d --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.task.TopologyContext; + +public class ManualPartitionSubscription extends Subscription { + private static final long serialVersionUID = 5633018073527583826L; + private final ManualPartitioner partitioner; + private final TopicFilter partitionFilter; + private Set<TopicPartition> currentAssignment = null; + private KafkaConsumer<?, ?> consumer = null; + private ConsumerRebalanceListener listener = null; + private TopologyContext context = null; + + public ManualPartitionSubscription(ManualPartitioner parter, TopicFilter partitionFilter) { + this.partitionFilter = partitionFilter; + this.partitioner = parter; + } + + @Override + public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) { + this.consumer = consumer; + this.listener = listener; + this.context = context; + refreshAssignment(); + } + + @Override + public void refreshAssignment() { + List<TopicPartition> allPartitions = partitionFilter.getFilteredTopicPartitions(consumer); + Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE); + Set<TopicPartition> newAssignment = new HashSet<>(partitioner.partition(allPartitions, context)); + if (!newAssignment.equals(currentAssignment)) { + consumer.assign(newAssignment); + if (currentAssignment != null) { + listener.onPartitionsRevoked(currentAssignment); + } + currentAssignment = newAssignment; + listener.onPartitionsAssigned(newAssignment); + } + } + + @Override + public String getTopicsString() { + return partitionFilter.getTopicsString(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java new file mode 100644 index 0000000..982828d --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java @@ -0,0 +1,67 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +/** + * Filter that returns all partitions for the specified topics. + */ +public class NamedTopicFilter implements TopicFilter { + + private final Set<String> topics; + + /** + * Create filter based on a set of topic names. + * @param topics The topic names the filter will pass. + */ + public NamedTopicFilter(Set<String> topics) { + this.topics = Collections.unmodifiableSet(topics); + } + + /** + * Convenience constructor. + * @param topics The topic names the filter will pass. + */ + public NamedTopicFilter(String... topics) { + this(new HashSet<>(Arrays.asList(topics))); + } + + @Override + public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) { + List<TopicPartition> allPartitions = new ArrayList<>(); + for (String topic : topics) { + for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) { + allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + } + } + return allPartitions; + } + + @Override + public String getTopicsString() { + return String.join(",", topics); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java new file mode 100644 index 0000000..2964874 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java @@ -0,0 +1,69 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +/** + * Filter that returns all partitions for topics matching the given {@link Pattern}. + */ +public class PatternTopicFilter implements TopicFilter { + + private final Pattern pattern; + private final Set<String> topics = new HashSet<>(); + + /** + * Creates filter based on a Pattern. Only topic names matching the Pattern are passed by the filter. + * + * @param pattern The Pattern to use. + */ + public PatternTopicFilter(Pattern pattern) { + this.pattern = pattern; + } + + @Override + public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) { + topics.clear(); + List<TopicPartition> allPartitions = new ArrayList<>(); + for (Map.Entry<String, List<PartitionInfo>> entry : consumer.listTopics().entrySet()) { + if (pattern.matcher(entry.getKey()).matches()) { + for (PartitionInfo partitionInfo : entry.getValue()) { + allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + topics.add(partitionInfo.topic()); + } + } + } + return allPartitions; + } + + @Override + public String getTopicsString() { + return String.join(",", topics); + } + + public String getTopicsPattern() { + return pattern.pattern(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java index 53e825a..9c5a8c4 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java @@ -37,7 +37,7 @@ public abstract class Subscription implements Serializable { public abstract <K, V> void subscribe(KafkaConsumer<K,V> consumer, ConsumerRebalanceListener listener, TopologyContext context); /** - * @return a string representing the subscribed topics. + * @return A human-readable string representing the subscribed topics. */ public abstract String getTopicsString(); http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java new file mode 100644 index 0000000..7631c8a --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java @@ -0,0 +1,38 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import java.io.Serializable; +import java.util.List; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; + +public interface TopicFilter extends Serializable { + + /** + * Get the Kafka TopicPartitions passed by this filter. + * @param consumer The Kafka consumer to use to read the list of existing partitions + * @return The Kafka partitions passed by this filter. + */ + List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer); + + /** + * @return A human-readable string representing the topics that pass the filter. + */ + String getTopicsString(); + +} http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java new file mode 100644 index 0000000..e97c7e1 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; + +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.junit.Before; +import org.junit.Test; + +public class NamedTopicFilterTest { + + private KafkaConsumer<?, ?> consumerMock; + + @Before + public void setUp() { + consumerMock = mock(KafkaConsumer.class); + } + + @Test + public void testFilter() { + String matchingTopicOne = "test-1"; + String matchingTopicTwo = "test-11"; + String unmatchedTopic = "unmatched"; + + NamedTopicFilter filter = new NamedTopicFilter(matchingTopicOne, matchingTopicTwo); + + when(consumerMock.partitionsFor(matchingTopicOne)).thenReturn(Collections.singletonList(createPartitionInfo(matchingTopicOne, 0))); + List<PartitionInfo> partitionTwoPartitions = new ArrayList<>(); + partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0)); + partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1)); + when(consumerMock.partitionsFor(matchingTopicTwo)).thenReturn(partitionTwoPartitions); + when(consumerMock.partitionsFor(unmatchedTopic)).thenReturn(Collections.singletonList(createPartitionInfo(unmatchedTopic, 0))); + + List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock); + + assertThat("Expected filter to pass only topics with exact name matches", matchedPartitions, + containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1))); + + } + + private PartitionInfo createPartitionInfo(String topic, int partition) { + return new PartitionInfo(topic, partition, null, null, null); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java new file mode 100644 index 0000000..877efdc --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java @@ -0,0 +1,73 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.junit.Before; +import org.junit.Test; + +public class PatternTopicFilterTest { + + private KafkaConsumer<?, ?> consumerMock; + + @Before + public void setUp(){ + consumerMock = mock(KafkaConsumer.class); + } + + @Test + public void testFilter() { + Pattern pattern = Pattern.compile("test-\\d+"); + PatternTopicFilter filter = new PatternTopicFilter(pattern); + + String matchingTopicOne = "test-1"; + String matchingTopicTwo = "test-11"; + String unmatchedTopic = "unmatched"; + + Map<String, List<PartitionInfo>> allTopics = new HashMap<>(); + allTopics.put(matchingTopicOne, Collections.singletonList(createPartitionInfo(matchingTopicOne, 0))); + List<PartitionInfo> testTwoPartitions = new ArrayList<>(); + testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0)); + testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1)); + allTopics.put(matchingTopicTwo, testTwoPartitions); + allTopics.put(unmatchedTopic, Collections.singletonList(createPartitionInfo(unmatchedTopic, 0))); + + when(consumerMock.listTopics()).thenReturn(allTopics); + + List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock); + + assertThat("Expected topic partitions matching the pattern to be passed by the filter", matchedPartitions, + containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1))); + } + + private PartitionInfo createPartitionInfo(String topic, int partition) { + return new PartitionInfo(topic, partition, null, null, null); + } +}