Repository: storm Updated Branches: refs/heads/1.x-branch d128cd9a8 -> 8efd09e48
STORM-2640: Deprecate KafkaConsumer.subscribe API option, make KafkaConsumer.assign the default Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2ebf2268 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2ebf2268 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2ebf2268 Branch: refs/heads/1.x-branch Commit: 2ebf22685c0fc1afc3e6fe7707aa43aaa0fe62e5 Parents: 2fe66dc Author: Stig Rohde Døssing <[email protected]> Authored: Tue Jul 18 23:46:09 2017 +0200 Committer: Stig Rohde Døssing <[email protected]> Committed: Tue Jul 25 10:39:17 2017 +0200 ---------------------------------------------------------------------- docs/storm-kafka-client.md | 7 +- .../storm/kafka/spout/KafkaSpoutConfig.java | 29 +++---- .../spout/ManualPartitionSubscription.java | 1 + .../storm/kafka/spout/ManualPartitioner.java | 1 + .../storm/kafka/spout/NamedSubscription.java | 4 +- .../storm/kafka/spout/PatternSubscription.java | 4 +- .../storm/kafka/spout/KafkaSpoutCommitTest.java | 41 +++------ .../storm/kafka/spout/KafkaSpoutEmitTest.java | 53 ++++-------- .../kafka/spout/KafkaSpoutRebalanceTest.java | 37 +++++---- .../kafka/spout/KafkaSpoutRetryLimitTest.java | 79 +++++++----------- .../kafka/spout/MaxUncommittedOffsetTest.java | 7 +- .../storm/kafka/spout/NamedTopicFilterTest.java | 3 +- .../kafka/spout/PatternTopicFilterTest.java | 2 + .../kafka/spout/SingleTopicKafkaSpoutTest.java | 10 ++- .../SpoutWithMockedConsumerSetupHelper.java | 87 ++++++++++++++++++++ .../SingleTopicKafkaSpoutConfiguration.java | 46 +++++++---- 16 files changed, 238 insertions(+), 173 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/docs/storm-kafka-client.md ---------------------------------------------------------------------- diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md index 9166cb9..93d622e 100644 --- a/docs/storm-kafka-client.md +++ b/docs/storm-kafka-client.md @@ -247,12 +247,9 @@ streams. If you are doing this for Trident a value must be in the List returned otherwise trident can throw exceptions. -### Manual Partition Control (ADVANCED) +### Manual Partition Assigment (ADVANCED) -By default Kafka will automatically assign partitions to the current set of spouts. It handles lots of things, but in some cases you may want to manually assign the partitions. -This can cause less churn in the assignments when spouts go down and come back up, but it can result in a lot of issues if not done right. This can all be handled by subclassing -Subscription and we have a few implementations that you can look at for examples on how to do this. ManualPartitionNamedSubscription and ManualPartitionPatternSubscription. Again -please be careful when using these or implementing your own. +By default the KafkaSpout instances will be assigned partitions using a round robin strategy. If you need to customize partition assignment, you must implement the `ManualPartitioner` interface. The implementation can be passed to the `ManualPartitionSubscription` constructor, and the `Subscription` can then be set in the `KafkaSpoutConfig` via the `KafkaSpoutConfig.Builder` constructor. Please take care when supplying a custom implementation, since an incorrect `ManualPartitioner` implementation could leave some partitions unread, or concurrently read by multiple spout instances. See the `RoundRobinManualPartitioner` for an example of how to implement this functionality. ## Use the Maven Shade Plugin to Build the Uber Jar http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 2b5a81a..833ce4a 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -28,6 +28,7 @@ import org.apache.storm.tuple.Fields; import java.io.Serializable; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -55,12 +56,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable { public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)); - /** - * Retry in a tight loop (keep unit tests fasts) do not use in production. - */ - public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE = - new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(0), - DEFAULT_MAX_RETRIES, TimeInterval.milliSeconds(0)); // Kafka consumer configuration private final Map<String, Object> kafkaProps; @@ -128,7 +123,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { public static class Builder<K, V> { private final Map<String, Object> kafkaProps; - private Subscription subscription; + private final Subscription subscription; private final SerializableDeserializer<K> keyDes; private final Class<? extends Deserializer<K>> keyDesClazz; private final SerializableDeserializer<V> valueDes; @@ -143,15 +138,16 @@ public class KafkaSpoutConfig<K, V> implements Serializable { private boolean emitNullTuples = false; public Builder(String bootstrapServers, String... topics) { - this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new NamedSubscription(topics)); + this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics))); } public Builder(String bootstrapServers, Collection<String> topics) { - this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new NamedSubscription(topics)); + this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), + new NamedTopicFilter(new HashSet<String>(topics)))); } public Builder(String bootstrapServers, Pattern topics) { - this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new PatternSubscription(topics)); + this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics))); } /** @@ -161,7 +157,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { */ @Deprecated public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String... topics) { - this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); + this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics))); } /** @@ -171,7 +167,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { */ @Deprecated public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Collection<String> topics) { - this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); + this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(new HashSet<String>(topics)))); } /** @@ -181,7 +177,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { */ @Deprecated public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Pattern topics) { - this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics)); + this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics))); } /** @@ -199,7 +195,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { */ @Deprecated public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, String... topics) { - this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); + this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics))); } /** @@ -207,7 +203,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { */ @Deprecated public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Collection<String> topics) { - this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); + this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(new HashSet<String>(topics)))); } /** @@ -215,7 +211,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { */ @Deprecated public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Pattern topics) { - this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics)); + this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics))); } /** @@ -479,7 +475,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable { * documentation in {@link FirstPollOffsetStrategy} * * @param firstPollOffsetStrategy Offset used by Kafka spout first poll - * */ public Builder<K, V> setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) { this.firstPollOffsetStrategy = firstPollOffsetStrategy; http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java index 61b98a8..6bc4bea 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java @@ -25,6 +25,7 @@ import java.util.Set; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.TopicPartitionComparator; import org.apache.storm.task.TopologyContext; public class ManualPartitionSubscription extends Subscription { http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java index 0abd6c8..f9a6869 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.kafka.spout; import java.util.List; http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java index 3409184..3bb7152 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java @@ -30,8 +30,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Subscribe to all topics that follow a given list of values + * Subscribe to all topics that follow a given list of values. + * @deprecated Please use {@link ManualPartitionSubscription} with {@link NamedTopicFilter} instead */ +@Deprecated public class NamedSubscription extends Subscription { private static final Logger LOG = LoggerFactory.getLogger(NamedSubscription.class); private static final long serialVersionUID = 3438543305215813839L; http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java index 9a8de0f..dc9f9e3 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java @@ -26,8 +26,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Subscribe to all topics that match a given pattern + * Subscribe to all topics that match a given pattern. + * @deprecated Please use {@link ManualPartitionSubscription} with {@link PatternTopicFilter} instead */ +@Deprecated public class PatternSubscription extends Subscription { private static final Logger LOG = LoggerFactory.getLogger(PatternSubscription.class); private static final long serialVersionUID = 3438543305215813839L; http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java index b7737c7..17ba378 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java @@ -26,16 +26,15 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Captor; @@ -50,58 +49,38 @@ public class KafkaSpoutCommitTest { private final Map<String, Object> conf = new HashMap<>(); private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); private KafkaConsumer<String, String> consumerMock; - private KafkaSpout<String, String> spout; - private KafkaSpoutConfig spoutConfig; + private KafkaSpoutConfig<String, String> spoutConfig; @Captor private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture; - private void setupSpout(Set<TopicPartition> assignedPartitions) { + @Before + public void setUp() { MockitoAnnotations.initMocks(this); spoutConfig = getKafkaSpoutConfigBuilder(-1) - .setOffsetCommitPeriodMs(offsetCommitPeriodMs) - .build(); - + .setOffsetCommitPeriodMs(offsetCommitPeriodMs) + .build(); consumerMock = mock(KafkaConsumer.class); - KafkaConsumerFactory<String, String> consumerFactory = new KafkaConsumerFactory<String, String>() { - @Override - public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) { - return consumerMock; - } - }; - - //Set up a spout listening to 1 topic partition - spout = new KafkaSpout<>(spoutConfig, consumerFactory); - - spout.open(conf, contextMock, collectorMock); - spout.activate(); - - ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); - - //Assign partitions to the spout - ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); - consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); } @Test public void testCommitSuccessWithOffsetVoids() { //Verify that the commit logic can handle offset voids try (SimulatedTime simulatedTime = new SimulatedTime()) { - setupSpout(Collections.singleton(partition)); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>(); // Offsets emitted are 0,1,2,3,4,<void>,8,9 for (int i = 0; i < 5; i++) { - recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } for (int i = 8; i < 10; i++) { - recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } records.put(partition, recordsForPartition); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(records)); + .thenReturn(new ConsumerRecords<>(records)); for (int i = 0; i < recordsForPartition.size(); i++) { spout.nextTuple(); http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java index 447f8c4..e8e93b0 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java @@ -17,7 +17,6 @@ package org.apache.storm.kafka.spout; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.anyCollection; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; @@ -31,14 +30,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.junit.Test; @@ -53,6 +49,7 @@ import static org.mockito.Mockito.never; import java.util.HashSet; import org.apache.storm.utils.Time; import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; import org.mockito.InOrder; public class KafkaSpoutEmitTest { @@ -63,50 +60,30 @@ public class KafkaSpoutEmitTest { private final Map<String, Object> conf = new HashMap<>(); private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); private KafkaConsumer<String, String> consumerMock; - private KafkaSpout<String, String> spout; - private KafkaSpoutConfig spoutConfig; + private KafkaSpoutConfig<String, String> spoutConfig; - private void setupSpout(Set<TopicPartition> assignedPartitions) { + @Before + public void setUp() { spoutConfig = getKafkaSpoutConfigBuilder(-1) .setOffsetCommitPeriodMs(offsetCommitPeriodMs) .build(); - consumerMock = mock(KafkaConsumer.class); - KafkaConsumerFactory<String, String> consumerFactory = new KafkaConsumerFactory<String, String>() { - @Override - public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) { - return consumerMock; - } - }; - - //Set up a spout listening to 1 topic partition - spout = new KafkaSpout<>(spoutConfig, consumerFactory); - - spout.open(conf, contextMock, collectorMock); - spout.activate(); - - ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); - - //Assign partitions to the spout - ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); - consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); } @Test public void testNextTupleEmitsAtMostOneTuple() { //The spout should emit at most one message per call to nextTuple //This is necessary for Storm to be able to throttle the spout according to maxSpoutPending - setupSpout(Collections.singleton(partition)); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>(); for (int i = 0; i < 10; i++) { - recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } records.put(partition, recordsForPartition); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(records)); + .thenReturn(new ConsumerRecords<>(records)); spout.nextTuple(); @@ -119,17 +96,17 @@ public class KafkaSpoutEmitTest { //Emit maxUncommittedOffsets messages, and fail all of them. Then ensure that the spout will retry them when the retry backoff has passed try (SimulatedTime simulatedTime = new SimulatedTime()) { - setupSpout(Collections.singleton(partition)); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>(); for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) { //This is cheating a bit since maxPollRecords would normally spread this across multiple polls - recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } records.put(partition, recordsForPartition); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(records)); + .thenReturn(new ConsumerRecords<>(records)); for (int i = 0; i < recordsForPartition.size(); i++) { spout.nextTuple(); @@ -184,13 +161,13 @@ public class KafkaSpoutEmitTest { //Emit maxUncommittedOffsets messages, and fail only the last. Then ensure that the spout will allow no more than maxUncommittedOffsets + maxPollRecords - 1 uncommitted offsets when retrying try (SimulatedTime simulatedTime = new SimulatedTime()) { - setupSpout(Collections.singleton(partition)); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPollRecords = new HashMap<>(); List<ConsumerRecord<String, String>> firstPollRecordsForPartition = new ArrayList<>(); for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) { //This is cheating a bit since maxPollRecords would normally spread this across multiple polls - firstPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + firstPollRecordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } firstPollRecords.put(partition, firstPollRecordsForPartition); @@ -198,13 +175,13 @@ public class KafkaSpoutEmitTest { Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPollRecords = new HashMap<>(); List<ConsumerRecord<String, String>> secondPollRecordsForPartition = new ArrayList<>(); for(int i = 0; i < maxPollRecords; i++) { - secondPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), spoutConfig.getMaxUncommittedOffsets() + i, "key", "value")); + secondPollRecordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), spoutConfig.getMaxUncommittedOffsets() + i, "key", "value")); } secondPollRecords.put(partition, secondPollRecordsForPartition); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(firstPollRecords)) - .thenReturn(new ConsumerRecords(secondPollRecords)); + .thenReturn(new ConsumerRecords<>(firstPollRecords)) + .thenReturn(new ConsumerRecords<>(secondPollRecords)); for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() + maxPollRecords; i++) { spout.nextTuple(); http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index bd6e582..8996190 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -17,11 +17,11 @@ package org.apache.storm.kafka.spout; import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.Matchers.any; import static org.hamcrest.Matchers.hasKey; import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.anyCollection; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -81,14 +81,11 @@ public class KafkaSpoutRebalanceTest { } //Returns messageIds in order of emission - private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) { - //Setup spout with mock consumer so we can get at the rebalance listener + private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition, ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture) { + //Setup spout with mock consumer so we can get at the rebalance listener spout.open(conf, contextMock, collectorMock); spout.activate(); - ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); - //Assign partitions to the spout ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); List<TopicPartition> assignedPartitions = new ArrayList<>(); @@ -102,9 +99,9 @@ public class KafkaSpoutRebalanceTest { Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>(); secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord<>(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value"))); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(firstPartitionRecords)) - .thenReturn(new ConsumerRecords(secondPartitionRecords)) - .thenReturn(new ConsumerRecords(Collections.emptyMap())); + .thenReturn(new ConsumerRecords<>(firstPartitionRecords)) + .thenReturn(new ConsumerRecords<>(secondPartitionRecords)) + .thenReturn(new ConsumerRecords<>(new HashMap<TopicPartition, List<ConsumerRecord<String, String>>>())); //Emit the messages spout.nextTuple(); @@ -129,7 +126,12 @@ public class KafkaSpoutRebalanceTest { public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception { //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them try (SimulatedTime simulatedTime = new SimulatedTime()) { - KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1) + ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); + Subscription subscriptionMock = mock(Subscription.class); + doNothing() + .when(subscriptionMock) + .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class)); + KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1) .setOffsetCommitPeriodMs(offsetCommitPeriodMs) .build(), consumerFactoryMock); String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; @@ -137,7 +139,8 @@ public class KafkaSpoutRebalanceTest { TopicPartition assignedPartition = new TopicPartition(topic, 2); //Emit a message on each partition and revoke the first partition - List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition); + List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition( + spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture); //Ack both emitted tuples spout.ack(emittedMessageIds.get(0)); @@ -159,8 +162,13 @@ public class KafkaSpoutRebalanceTest { @Test public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception { //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass + ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); + Subscription subscriptionMock = mock(Subscription.class); + doNothing() + .when(subscriptionMock) + .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class)); KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class); - KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1) + KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1) .setOffsetCommitPeriodMs(10) .setRetry(retryServiceMock) .build(), consumerFactoryMock); @@ -173,7 +181,8 @@ public class KafkaSpoutRebalanceTest { .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0)); //Emit a message on each partition and revoke the first partition - List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition); + List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition( + spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture); //Check that only two message ids were generated verify(retryServiceMock, times(2)).getMessageId(Mockito.any(ConsumerRecord.class)); http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java index 831e383..79f7398 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java @@ -30,85 +30,71 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.InOrder; +import org.mockito.MockitoAnnotations; public class KafkaSpoutRetryLimitTest { - + private final long offsetCommitPeriodMs = 2_000; private final TopologyContext contextMock = mock(TopologyContext.class); private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); private final Map<String, Object> conf = new HashMap<>(); private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); private KafkaConsumer<String, String> consumerMock; - private KafkaSpout<String, String> spout; - private KafkaSpoutConfig spoutConfig; - + private KafkaSpoutConfig<String, String> spoutConfig; + public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE = - new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), - 0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); - - private void setupSpoutWithNoRetry(Set<TopicPartition> assignedPartitions) { + new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), + 0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); + + @Captor + private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); spoutConfig = getKafkaSpoutConfigBuilder(-1) - .setOffsetCommitPeriodMs(offsetCommitPeriodMs) - .setRetry(ZERO_RETRIES_RETRY_SERVICE) - .build(); - + .setOffsetCommitPeriodMs(offsetCommitPeriodMs) + .setRetry(ZERO_RETRIES_RETRY_SERVICE) + .build(); consumerMock = mock(KafkaConsumer.class); - KafkaConsumerFactory<String, String> consumerFactory = new KafkaConsumerFactory<String, String>() { - @Override - public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) { - return consumerMock; - } - }; - - spout = new KafkaSpout<>(spoutConfig, consumerFactory); - - spout.open(conf, contextMock, collectorMock); - spout.activate(); - - ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); - - //Assign partitions to the spout - ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); - consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); } - + @Test public void testFailingTupleCompletesAckAfterRetryLimitIsMet() { //Spout should ack failed messages after they hit the retry limit try (SimulatedTime simulatedTime = new SimulatedTime()) { - setupSpoutWithNoRetry(Collections.singleton(partition)); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>(); int lastOffset = 3; for (int i = 0; i <= lastOffset; i++) { - recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } records.put(partition, recordsForPartition); - + when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(records)); - + .thenReturn(new ConsumerRecords<>(records)); + for (int i = 0; i < recordsForPartition.size(); i++) { spout.nextTuple(); } - + ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture()); - + for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) { spout.fail(messageId); } @@ -116,16 +102,15 @@ public class KafkaSpoutRetryLimitTest { // Advance time and then trigger call to kafka consumer commit Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); spout.nextTuple(); - - ArgumentCaptor<Map> committedOffsets=ArgumentCaptor.forClass(Map.class); + InOrder inOrder = inOrder(consumerMock); - inOrder.verify(consumerMock).commitSync(committedOffsets.capture()); + inOrder.verify(consumerMock).commitSync(commitCapture.capture()); inOrder.verify(consumerMock).poll(anyLong()); //verify that Offset 3 was committed for the given TopicPartition - assertTrue(committedOffsets.getValue().containsKey(partition)); - assertEquals(lastOffset, ((OffsetAndMetadata) (committedOffsets.getValue().get(partition))).offset()); + assertTrue(commitCapture.getValue().containsKey(partition)); + assertEquals(lastOffset, ((OffsetAndMetadata) (commitCapture.getValue().get(partition))).offset()); } } - -} \ No newline at end of file + +} http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java index 00b973c..ccb2a6c 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyList; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; @@ -29,8 +30,10 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -60,7 +63,7 @@ public class MaxUncommittedOffsetTest { private final int maxUncommittedOffsets = 10; private final int maxPollRecords = 5; private final int initialRetryDelaySecs = 60; - private final KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) + private final KafkaSpoutConfig<String, String> spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) .setOffsetCommitPeriodMs(commitOffsetPeriodMs) .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) .setMaxUncommittedOffsets(maxUncommittedOffsets) @@ -95,6 +98,8 @@ public class MaxUncommittedOffsetTest { private void initializeSpout(int msgCount) throws Exception { populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount); + when(topologyContext.getThisTaskIndex()).thenReturn(0); + when(topologyContext.getComponentTasks(anyString())).thenReturn(Collections.singletonList(0)); spout.open(conf, topologyContext, collector); spout.activate(); } http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java index e97c7e1..fe3325c 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java @@ -16,10 +16,11 @@ package org.apache.storm.kafka.spout; +import org.apache.storm.kafka.spout.NamedTopicFilter; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; - import static org.mockito.Mockito.when; import java.util.ArrayList; http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java index 877efdc..335ab31 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java @@ -16,6 +16,8 @@ package org.apache.storm.kafka.spout; +import org.apache.storm.kafka.spout.PatternTopicFilter; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index 6042c80..7759b3c 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -36,7 +36,7 @@ import java.util.concurrent.TimeoutException; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Matchers.anyCollection; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyList; import static org.mockito.Matchers.anyListOf; import static org.mockito.Matchers.anyObject; @@ -48,7 +48,9 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -83,12 +85,12 @@ public class SingleTopicKafkaSpoutTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); - KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) + KafkaSpoutConfig<String, String> spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) .setOffsetCommitPeriodMs(commitOffsetPeriodMs) .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0))) .build(); - this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig)); + this.consumerSpy = spy(new KafkaConsumerFactoryDefault<String, String>().createConsumer(spoutConfig)); this.consumerFactory = new KafkaConsumerFactory<String, String>() { @Override public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) { @@ -112,6 +114,8 @@ public class SingleTopicKafkaSpoutTest { private void initializeSpout(int msgCount) throws InterruptedException, ExecutionException, TimeoutException { populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount); + when(topologyContext.getThisTaskIndex()).thenReturn(0); + when(topologyContext.getComponentTasks(anyString())).thenReturn(Collections.singletonList(0)); spout.open(conf, topologyContext, collector); spout.activate(); } http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java new file mode 100644 index 0000000..67b1f2c --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java @@ -0,0 +1,87 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; + +public class SpoutWithMockedConsumerSetupHelper { + + /** + * Creates, opens and activates a KafkaSpout using a mocked consumer. + * @param <K> The Kafka key type + * @param <V> The Kafka value type + * @param spoutConfig The spout config to use + * @param topoConf The topo conf to pass to the spout + * @param contextMock The topo context to pass to the spout + * @param collectorMock The mocked collector to pass to the spout + * @param consumerMock The mocked consumer + * @param assignedPartitions The partitions to assign to this spout. The consumer will act like these partitions are assigned to it. + * @return The spout + */ + public static <K, V> KafkaSpout<K, V> setupSpout(KafkaSpoutConfig<K, V> spoutConfig, Map<String, Object> topoConf, + TopologyContext contextMock, SpoutOutputCollector collectorMock, final KafkaConsumer<K, V> consumerMock, Set<TopicPartition> assignedPartitions) { + + Map<String, List<PartitionInfo>> partitionInfos = new HashMap<>(); + for(TopicPartition tp : assignedPartitions) { + PartitionInfo info = new PartitionInfo(tp.topic(), tp.partition(), null, null, null); + List<PartitionInfo> infos = partitionInfos.get(tp.topic()); + if(infos == null) { + infos = new ArrayList<>(); + partitionInfos.put(tp.topic(), infos); + } + infos.add(info); + } + for(String topic : partitionInfos.keySet()) { + when(consumerMock.partitionsFor(topic)) + .thenReturn(partitionInfos.get(topic)); + } + KafkaConsumerFactory<K, V> consumerFactory = new KafkaConsumerFactory<K, V>() { + @Override + public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig) { + return consumerMock; + } + }; + + KafkaSpout<K, V> spout = new KafkaSpout<>(spoutConfig, consumerFactory); + + when(contextMock.getComponentTasks(anyString())).thenReturn(Collections.singletonList(0)); + when(contextMock.getThisTaskIndex()).thenReturn(0); + + spout.open(topoConf, contextMock, collectorMock); + spout.activate(); + + verify(consumerMock).assign(assignedPartitions); + + return spout; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java index a5c364c..1ab4966 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java @@ -17,6 +17,7 @@ */ package org.apache.storm.kafka.spout.builders; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.DEFAULT_MAX_RETRIES; import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; import java.util.List; @@ -27,7 +28,9 @@ import org.apache.storm.generated.StormTopology; import org.apache.storm.kafka.spout.Func; import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff; import org.apache.storm.kafka.spout.KafkaSpoutRetryService; +import org.apache.storm.kafka.spout.Subscription; import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; @@ -38,6 +41,13 @@ public class SingleTopicKafkaSpoutConfiguration { public static final String STREAM = "test_stream"; public static final String TOPIC = "test"; + /** + * Retry in a tight loop (keep unit tests fasts). + */ + public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE = + new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), + DEFAULT_MAX_RETRIES, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); + public static Config getConfig() { Config config = new Config(); config.setDebug(true); @@ -57,21 +67,29 @@ public class SingleTopicKafkaSpoutConfiguration { return new Values(r.topic(), r.key(), r.value()); } }; - - public static KafkaSpoutConfig.Builder<String,String> getKafkaSpoutConfigBuilder(int port) { - return KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC) - .setRecordTranslator(TOPIC_KEY_VALUE_FUNC, - new Fields("topic", "key", "value"), STREAM) - .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") - .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5) - .setRetry(getRetryService()) - .setOffsetCommitPeriodMs(10_000) - .setFirstPollOffsetStrategy(EARLIEST) - .setMaxUncommittedOffsets(250) - .setPollTimeoutMs(1000); + + public static KafkaSpoutConfig.Builder<String, String> getKafkaSpoutConfigBuilder(int port) { + return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC)); + } + + public static KafkaSpoutConfig.Builder<String, String> getKafkaSpoutConfigBuilder(Subscription subscription, int port) { + return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<String, String>("127.0.0.1:" + port, subscription)); } - + + private static KafkaSpoutConfig.Builder<String, String> setCommonSpoutConfig(KafkaSpoutConfig.Builder<String, String> config) { + return config + .setRecordTranslator(TOPIC_KEY_VALUE_FUNC, + new Fields("topic", "key", "value"), STREAM) + .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") + .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5) + .setRetry(getRetryService()) + .setOffsetCommitPeriodMs(10_000) + .setFirstPollOffsetStrategy(EARLIEST) + .setMaxUncommittedOffsets(250) + .setPollTimeoutMs(1000); + } + protected static KafkaSpoutRetryService getRetryService() { - return KafkaSpoutConfig.UNIT_TEST_RETRY_SERVICE; + return UNIT_TEST_RETRY_SERVICE; } }
