STORM-2542: Remove storm-kafka-client KafkaConsumer.subscribe API option
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fdb649e3 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fdb649e3 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fdb649e3 Branch: refs/heads/master Commit: fdb649e352e05fd849cafa312bbd62fc75694579 Parents: cd6ca3e Author: Stig Rohde Døssing <[email protected]> Authored: Mon Jun 5 14:59:19 2017 +0200 Committer: Stig Rohde Døssing <[email protected]> Committed: Wed Jul 19 00:18:03 2017 +0200 ---------------------------------------------------------------------- docs/storm-kafka-client.md | 7 +- .../storm/kafka/spout/KafkaSpoutConfig.java | 175 ++++++++++--------- .../spout/ManualPartitionSubscription.java | 71 -------- .../storm/kafka/spout/ManualPartitioner.java | 40 ----- .../storm/kafka/spout/NamedSubscription.java | 61 ------- .../storm/kafka/spout/NamedTopicFilter.java | 67 ------- .../storm/kafka/spout/PatternSubscription.java | 54 ------ .../storm/kafka/spout/PatternTopicFilter.java | 69 -------- .../spout/RoundRobinManualPartitioner.java | 50 ------ .../apache/storm/kafka/spout/Subscription.java | 53 ------ .../apache/storm/kafka/spout/TopicFilter.java | 38 ---- .../ManualPartitionSubscription.java | 72 ++++++++ .../spout/subscription/ManualPartitioner.java | 40 +++++ .../spout/subscription/NamedTopicFilter.java | 67 +++++++ .../spout/subscription/PatternTopicFilter.java | 69 ++++++++ .../RoundRobinManualPartitioner.java | 50 ++++++ .../kafka/spout/subscription/Subscription.java | 53 ++++++ .../kafka/spout/subscription/TopicFilter.java | 38 ++++ .../storm/kafka/spout/KafkaSpoutCommitTest.java | 36 ++-- .../storm/kafka/spout/KafkaSpoutEmitTest.java | 48 ++--- .../kafka/spout/KafkaSpoutRebalanceTest.java | 37 ++-- .../kafka/spout/KafkaSpoutRetryLimitTest.java | 74 ++++---- .../kafka/spout/MaxUncommittedOffsetTest.java | 7 +- .../storm/kafka/spout/NamedTopicFilterTest.java | 69 -------- .../kafka/spout/PatternTopicFilterTest.java | 73 -------- .../kafka/spout/SingleTopicKafkaSpoutTest.java | 9 +- .../SpoutWithMockedConsumerSetupHelper.java | 74 ++++++++ .../SingleTopicKafkaSpoutConfiguration.java | 44 +++-- .../subscription/NamedTopicFilterTest.java | 68 +++++++ .../subscription/PatternTopicFilterTest.java | 73 ++++++++ 30 files changed, 827 insertions(+), 859 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/docs/storm-kafka-client.md ---------------------------------------------------------------------- diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md index ada8619..99b9ae5 100644 --- a/docs/storm-kafka-client.md +++ b/docs/storm-kafka-client.md @@ -240,12 +240,9 @@ streams. If you are doing this for Trident a value must be in the List returned otherwise trident can throw exceptions. -### Manual Partition Control (ADVANCED) +### Manual Partition Assigment (ADVANCED) -By default Kafka will automatically assign partitions to the current set of spouts. It handles lots of things, but in some cases you may want to manually assign the partitions. -This can cause less churn in the assignments when spouts go down and come back up, but it can result in a lot of issues if not done right. This can all be handled by subclassing -Subscription and we have a few implementations that you can look at for examples on how to do this. ManualPartitionNamedSubscription and ManualPartitionPatternSubscription. Again -please be careful when using these or implementing your own. +By default the KafkaSpout instances will be assigned partitions using a round robin strategy. If you need to customize partition assignment, you must implement the `ManualPartitioner` interface. The implementation can be passed to the `ManualPartitionSubscription` constructor, and the `Subscription` can then be set in the `KafkaSpoutConfig` via the `KafkaSpoutConfig.Builder` constructor. Please take care when supplying a custom implementation, since an incorrect `ManualPartitioner` implementation could leave some partitions unread, or concurrently read by multiple spout instances. See the `RoundRobinManualPartitioner` for an example of how to implement this functionality. ## Use the Maven Shade Plugin to Build the Uber Jar http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 6f09f5f..72fa52e 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -24,39 +24,41 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; +import org.apache.storm.kafka.spout.subscription.ManualPartitionSubscription; +import org.apache.storm.kafka.spout.subscription.NamedTopicFilter; +import org.apache.storm.kafka.spout.subscription.PatternTopicFilter; +import org.apache.storm.kafka.spout.subscription.RoundRobinManualPartitioner; +import org.apache.storm.kafka.spout.subscription.Subscription; import org.apache.storm.tuple.Fields; /** * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics. */ public class KafkaSpoutConfig<K, V> implements Serializable { + private static final long serialVersionUID = 141902646130682494L; // 200ms - public static final long DEFAULT_POLL_TIMEOUT_MS = 200; + public static final long DEFAULT_POLL_TIMEOUT_MS = 200; // 30s - public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000; + public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000; // Retry forever - public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; + public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; // 10,000,000 records => 80MBs of memory footprint in the worst case - public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000; + public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000; // 2s - public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000; + public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000; public static final FirstPollOffsetStrategy DEFAULT_FIRST_POLL_OFFSET_STRATEGY = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; - public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE = - new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), - DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)); - /** - * Retry in a tight loop (keep unit tests fasts) do not use in production. - */ - public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE = - new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(0), - DEFAULT_MAX_RETRIES, TimeInterval.milliSeconds(0)); - + public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE = + new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), + DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)); + // Kafka consumer configuration private final Map<String, Object> kafkaProps; private final Subscription subscription; @@ -73,9 +75,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable { /** * Creates a new KafkaSpoutConfig using a Builder. + * * @param builder The Builder to construct the KafkaSpoutConfig from */ - public KafkaSpoutConfig(Builder<K,V> builder) { + public KafkaSpoutConfig(Builder<K, V> builder) { this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps); this.subscription = builder.subscription; this.translator = builder.translator; @@ -108,12 +111,13 @@ public class KafkaSpoutConfig<K, V> implements Serializable { EARLIEST, LATEST, UNCOMMITTED_EARLIEST, - UNCOMMITTED_LATEST + UNCOMMITTED_LATEST } - - public static class Builder<K,V> { + + public static class Builder<K, V> { + private final Map<String, Object> kafkaProps; - private Subscription subscription; + private final Subscription subscription; private RecordTranslator<K, V> translator; private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS; private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS; @@ -123,20 +127,22 @@ public class KafkaSpoutConfig<K, V> implements Serializable { private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS; private boolean emitNullTuples = false; - public Builder(String bootstrapServers, String ... topics) { - this(bootstrapServers, new NamedSubscription(topics)); + public Builder(String bootstrapServers, String... topics) { + this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics))); } - - public Builder(String bootstrapServers, Collection<String> topics) { - this(bootstrapServers, new NamedSubscription(topics)); + + public Builder(String bootstrapServers, Set<String> topics) { + this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), + new NamedTopicFilter(topics))); } - + public Builder(String bootstrapServers, Pattern topics) { - this(bootstrapServers, new PatternSubscription(topics)); + this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics))); } - + /** * Create a KafkaSpoutConfig builder with default property values and no key/value deserializers. + * * @param bootstrapServers The bootstrap servers the consumer will use * @param subscription The subscription defining which topics and partitions each spout instance will read. */ @@ -149,30 +155,30 @@ public class KafkaSpoutConfig<K, V> implements Serializable { this.subscription = subscription; this.translator = new DefaultRecordTranslator<>(); } - + /** - * Set a {@link KafkaConsumer} property. + * Set a {@link KafkaConsumer} property. */ - public Builder<K,V> setProp(String key, Object value) { + public Builder<K, V> setProp(String key, Object value) { kafkaProps.put(key, value); return this; } - + /** * Set multiple {@link KafkaConsumer} properties. */ - public Builder<K,V> setProp(Map<String, Object> props) { + public Builder<K, V> setProp(Map<String, Object> props) { kafkaProps.putAll(props); return this; } - + /** * Set multiple {@link KafkaConsumer} properties. */ - public Builder<K,V> setProp(Properties props) { + public Builder<K, V> setProp(Properties props) { props.forEach((key, value) -> { if (key instanceof String) { - kafkaProps.put((String)key, value); + kafkaProps.put((String) key, value); } else { throw new IllegalArgumentException("Kafka Consumer property keys must be Strings"); } @@ -183,46 +189,51 @@ public class KafkaSpoutConfig<K, V> implements Serializable { //Spout Settings /** * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s. + * * @param pollTimeoutMs time in ms */ - public Builder<K,V> setPollTimeoutMs(long pollTimeoutMs) { + public Builder<K, V> setPollTimeoutMs(long pollTimeoutMs) { this.pollTimeoutMs = pollTimeoutMs; return this; } /** * Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s. + * * @param offsetCommitPeriodMs time in ms */ - public Builder<K,V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) { + public Builder<K, V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) { this.offsetCommitPeriodMs = offsetCommitPeriodMs; return this; } /** - * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place. - * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number - * of pending offsets below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}. - * Note that this limit can in some cases be exceeded, but no partition will exceed this limit by more than maxPollRecords - 1. + * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place. Once this + * limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number of pending offsets + * below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}. Note that this limit can in some cases be exceeded, + * but no partition will exceed this limit by more than maxPollRecords - 1. + * * @param maxUncommittedOffsets max number of records that can be be pending commit */ - public Builder<K,V> setMaxUncommittedOffsets(int maxUncommittedOffsets) { + public Builder<K, V> setMaxUncommittedOffsets(int maxUncommittedOffsets) { this.maxUncommittedOffsets = maxUncommittedOffsets; return this; } /** - * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start. - * Please refer to to the documentation in {@link FirstPollOffsetStrategy} + * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start. Please refer to to the + * documentation in {@link FirstPollOffsetStrategy} + * * @param firstPollOffsetStrategy Offset used by Kafka spout first poll - * */ + */ public Builder<K, V> setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) { this.firstPollOffsetStrategy = firstPollOffsetStrategy; return this; } - + /** * Sets the retry service for the spout to use. + * * @param retryService the new retry service * @return the builder (this). */ @@ -238,9 +249,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable { this.translator = translator; return this; } - + /** * Configure a translator with tuples to be emitted on the default stream. + * * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted * @param fields the names of the fields extracted * @return this to be able to chain configuration @@ -248,9 +260,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable { public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) { return setRecordTranslator(new SimpleRecordTranslator<>(func, fields)); } - + /** * Configure a translator with tuples to be emitted to a given stream. + * * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted * @param fields the names of the fields extracted * @param stream the stream to emit the tuples on @@ -259,12 +272,12 @@ public class KafkaSpoutConfig<K, V> implements Serializable { public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) { return setRecordTranslator(new SimpleRecordTranslator<>(func, fields, stream)); } - + /** - * Sets partition refresh period in milliseconds. This is how often kafka will be polled - * to check for new topics and/or new partitions. - * This is mostly for Subscription implementations that manually assign partitions. NamedSubscription and + * Sets partition refresh period in milliseconds. This is how often kafka will be polled to check for new topics and/or new + * partitions. This is mostly for Subscription implementations that manually assign partitions. NamedSubscription and * PatternSubscription rely on kafka to handle this instead. + * * @param partitionRefreshPeriodMs time in milliseconds * @return the builder (this) */ @@ -274,8 +287,9 @@ public class KafkaSpoutConfig<K, V> implements Serializable { } /** - * Specifies if the spout should emit null tuples to the component downstream, or rather not emit and directly - * ack them. By default this parameter is set to false, which means that null tuples are not emitted. + * Specifies if the spout should emit null tuples to the component downstream, or rather not emit and directly ack them. By default + * this parameter is set to false, which means that null tuples are not emitted. + * * @param emitNullTuples sets if null tuples should or not be emitted downstream */ public Builder<K, V> setEmitNullTuples(boolean emitNullTuples) { @@ -283,34 +297,36 @@ public class KafkaSpoutConfig<K, V> implements Serializable { return this; } - public KafkaSpoutConfig<K,V> build() { + public KafkaSpoutConfig<K, V> build() { return new KafkaSpoutConfig<>(this); } } - - + /** * Factory method that creates a Builder with String key/value deserializers. + * * @param bootstrapServers The bootstrap servers for the consumer * @param topics The topics to subscribe to * @return The new builder */ - public static Builder<String, String> builder(String bootstrapServers, String ... topics) { + public static Builder<String, String> builder(String bootstrapServers, String... topics) { return setStringDeserializers(new Builder<>(bootstrapServers, topics)); } - + /** * Factory method that creates a Builder with String key/value deserializers. + * * @param bootstrapServers The bootstrap servers for the consumer * @param topics The topics to subscribe to * @return The new builder */ - public static Builder<String, String> builder(String bootstrapServers, Collection<String> topics) { + public static Builder<String, String> builder(String bootstrapServers, Set<String> topics) { return setStringDeserializers(new Builder<>(bootstrapServers, topics)); } - + /** * Factory method that creates a Builder with String key/value deserializers. + * * @param bootstrapServers The bootstrap servers for the consumer * @param topics The topic pattern to subscribe to * @return The new builder @@ -318,13 +334,13 @@ public class KafkaSpoutConfig<K, V> implements Serializable { public static Builder<String, String> builder(String bootstrapServers, Pattern topics) { return setStringDeserializers(new Builder<>(bootstrapServers, topics)); } - + private static Builder<String, String> setStringDeserializers(Builder<String, String> builder) { builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); builder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return builder; } - + private static Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) { // set defaults for properties not specified if (!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { @@ -335,17 +351,18 @@ public class KafkaSpoutConfig<K, V> implements Serializable { /** * Gets the properties that will be passed to the KafkaConsumer. + * * @return The Kafka properties map */ public Map<String, Object> getKafkaProps() { return kafkaProps; } - + public Subscription getSubscription() { return subscription; } - - public RecordTranslator<K,V> getTranslator() { + + public RecordTranslator<K, V> getTranslator() { return translator; } @@ -358,8 +375,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable { } public boolean isConsumerAutoCommitMode() { - return kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == null // default is false - || Boolean.valueOf((String)kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); + return kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == null // default is false + || Boolean.valueOf((String) kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); } public String getConsumerGroupId() { @@ -377,7 +394,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { public KafkaSpoutRetryService getRetryService() { return retryService; } - + public long getPartitionRefreshPeriodMs() { return partitionRefreshPeriodMs; } @@ -389,14 +406,14 @@ public class KafkaSpoutConfig<K, V> implements Serializable { @Override public String toString() { return "KafkaSpoutConfig{" - + "kafkaProps=" + kafkaProps - + ", pollTimeoutMs=" + pollTimeoutMs - + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs - + ", maxUncommittedOffsets=" + maxUncommittedOffsets - + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy - + ", subscription=" + subscription - + ", translator=" + translator - + ", retryService=" + retryService - + '}'; + + "kafkaProps=" + kafkaProps + + ", pollTimeoutMs=" + pollTimeoutMs + + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs + + ", maxUncommittedOffsets=" + maxUncommittedOffsets + + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy + + ", subscription=" + subscription + + ", translator=" + translator + + ", retryService=" + retryService + + '}'; } } http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java deleted file mode 100644 index 2c65d6d..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.storm.task.TopologyContext; - -public class ManualPartitionSubscription extends Subscription { - private static final long serialVersionUID = 5633018073527583826L; - private final ManualPartitioner partitioner; - private final TopicFilter partitionFilter; - private Set<TopicPartition> currentAssignment = null; - private KafkaConsumer<?, ?> consumer = null; - private ConsumerRebalanceListener listener = null; - private TopologyContext context = null; - - public ManualPartitionSubscription(ManualPartitioner parter, TopicFilter partitionFilter) { - this.partitionFilter = partitionFilter; - this.partitioner = parter; - } - - @Override - public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) { - this.consumer = consumer; - this.listener = listener; - this.context = context; - refreshAssignment(); - } - - @Override - public void refreshAssignment() { - List<TopicPartition> allPartitions = partitionFilter.getFilteredTopicPartitions(consumer); - Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE); - Set<TopicPartition> newAssignment = new HashSet<>(partitioner.partition(allPartitions, context)); - if (!newAssignment.equals(currentAssignment)) { - consumer.assign(newAssignment); - if (currentAssignment != null) { - listener.onPartitionsRevoked(currentAssignment); - } - currentAssignment = newAssignment; - listener.onPartitionsAssigned(newAssignment); - } - } - - @Override - public String getTopicsString() { - return partitionFilter.getTopicsString(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java deleted file mode 100644 index 4856687..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import java.util.List; -import org.apache.kafka.common.TopicPartition; -import org.apache.storm.task.TopologyContext; - -/** - * A function used to assign partitions to this spout. - * WARNING if this is not done correctly you can really mess things up, like not reading data in some partitions. - * The complete TopologyContext is passed in, but it is suggested that you use the index of the spout and the total - * number of spouts to avoid missing partitions or double assigning partitions. - */ -@FunctionalInterface -public interface ManualPartitioner { - /** - * Get the partitions for this assignment - * @param allPartitions all of the partitions that the set of spouts want to subscribe to, in a strict ordering - * @param context the context of the topology - * @return the subset of the partitions that this spout should use. - */ - public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context); -} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java deleted file mode 100644 index 0eb48cb..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.storm.task.TopologyContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Subscribe to all topics that follow a given list of values. - */ -public class NamedSubscription extends Subscription { - private static final Logger LOG = LoggerFactory.getLogger(NamedSubscription.class); - private static final long serialVersionUID = 3438543305215813839L; - protected final Collection<String> topics; - - public NamedSubscription(Collection<String> topics) { - this.topics = Collections.unmodifiableCollection(new ArrayList<>(topics)); - } - - public NamedSubscription(String ... topics) { - this(Arrays.asList(topics)); - } - - @Override - public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext unused) { - consumer.subscribe(topics, listener); - LOG.info("Kafka consumer subscribed topics {}", topics); - - // Initial poll to get the consumer registration process going. - // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration - consumer.poll(0); - } - - @Override - public String getTopicsString() { - return String.join(",", topics); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java deleted file mode 100644 index 982828d..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2017 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; - -/** - * Filter that returns all partitions for the specified topics. - */ -public class NamedTopicFilter implements TopicFilter { - - private final Set<String> topics; - - /** - * Create filter based on a set of topic names. - * @param topics The topic names the filter will pass. - */ - public NamedTopicFilter(Set<String> topics) { - this.topics = Collections.unmodifiableSet(topics); - } - - /** - * Convenience constructor. - * @param topics The topic names the filter will pass. - */ - public NamedTopicFilter(String... topics) { - this(new HashSet<>(Arrays.asList(topics))); - } - - @Override - public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) { - List<TopicPartition> allPartitions = new ArrayList<>(); - for (String topic : topics) { - for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) { - allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); - } - } - return allPartitions; - } - - @Override - public String getTopicsString() { - return String.join(",", topics); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java deleted file mode 100644 index ec53f01..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import java.util.regex.Pattern; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.storm.task.TopologyContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Subscribe to all topics that match a given pattern. - */ -public class PatternSubscription extends Subscription { - private static final Logger LOG = LoggerFactory.getLogger(PatternSubscription.class); - private static final long serialVersionUID = 3438543305215813839L; - protected final Pattern pattern; - - public PatternSubscription(Pattern pattern) { - this.pattern = pattern; - } - - @Override - public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext unused) { - consumer.subscribe(pattern, listener); - LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern); - - // Initial poll to get the consumer registration process going. - // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration - consumer.poll(0); - } - - @Override - public String getTopicsString() { - return pattern.pattern(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java deleted file mode 100644 index 2964874..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright 2017 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; - -/** - * Filter that returns all partitions for topics matching the given {@link Pattern}. - */ -public class PatternTopicFilter implements TopicFilter { - - private final Pattern pattern; - private final Set<String> topics = new HashSet<>(); - - /** - * Creates filter based on a Pattern. Only topic names matching the Pattern are passed by the filter. - * - * @param pattern The Pattern to use. - */ - public PatternTopicFilter(Pattern pattern) { - this.pattern = pattern; - } - - @Override - public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) { - topics.clear(); - List<TopicPartition> allPartitions = new ArrayList<>(); - for (Map.Entry<String, List<PartitionInfo>> entry : consumer.listTopics().entrySet()) { - if (pattern.matcher(entry.getKey()).matches()) { - for (PartitionInfo partitionInfo : entry.getValue()) { - allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); - topics.add(partitionInfo.topic()); - } - } - } - return allPartitions; - } - - @Override - public String getTopicsString() { - return String.join(",", topics); - } - - public String getTopicsPattern() { - return pattern.pattern(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java deleted file mode 100644 index 4afcc49..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import org.apache.kafka.common.TopicPartition; -import org.apache.storm.task.TopologyContext; - -/** - * Assign partitions in a round robin fashion for all spouts, - * not just the ones that are alive. Because the parallelism of - * the spouts does not typically change while running this makes - * the assignments more stable in the face of crashing spouts. - * <p/> - * Round Robin means that first spout of N spouts will get the first - * partition, and the N+1th partition... The second spout will get the second partition and - * N+2th partition etc. - */ -public class RoundRobinManualPartitioner implements ManualPartitioner { - - @Override - public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context) { - int thisTaskIndex = context.getThisTaskIndex(); - int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size(); - Set<TopicPartition> myPartitions = new HashSet<>(allPartitions.size() / totalTaskCount + 1); - for (int i = thisTaskIndex; i < allPartitions.size(); i += totalTaskCount) { - myPartitions.add(allPartitions.get(i)); - } - return new ArrayList<>(myPartitions); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java deleted file mode 100644 index 9c5a8c4..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import java.io.Serializable; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.storm.task.TopologyContext; - -/** - * A subscription to kafka. - */ -public abstract class Subscription implements Serializable { - private static final long serialVersionUID = -216136367240198716L; - - /** - * Subscribe the KafkaConsumer to the proper topics - * @param consumer the Consumer to get. - * @param listener the rebalance listener to include in the subscription - */ - public abstract <K, V> void subscribe(KafkaConsumer<K,V> consumer, ConsumerRebalanceListener listener, TopologyContext context); - - /** - * @return A human-readable string representing the subscribed topics. - */ - public abstract String getTopicsString(); - - /** - * NOOP is the default behavior, which means that Kafka will internally handle partition assignment. - * If you wish to do manual partition management, you must provide an implementation of this method - * that will check with kafka for any changes and call the ConsumerRebalanceListener from subscribe - * to inform the rest of the system of those changes. - */ - public void refreshAssignment() { - //NOOP - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java deleted file mode 100644 index 7631c8a..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2017 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import java.io.Serializable; -import java.util.List; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; - -public interface TopicFilter extends Serializable { - - /** - * Get the Kafka TopicPartitions passed by this filter. - * @param consumer The Kafka consumer to use to read the list of existing partitions - * @return The Kafka partitions passed by this filter. - */ - List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer); - - /** - * @return A human-readable string representing the topics that pass the filter. - */ - String getTopicsString(); - -} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java new file mode 100644 index 0000000..17512ea --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.subscription; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.TopicPartitionComparator; +import org.apache.storm.task.TopologyContext; + +public class ManualPartitionSubscription extends Subscription { + private static final long serialVersionUID = 5633018073527583826L; + private final ManualPartitioner partitioner; + private final TopicFilter partitionFilter; + private Set<TopicPartition> currentAssignment = null; + private KafkaConsumer<?, ?> consumer = null; + private ConsumerRebalanceListener listener = null; + private TopologyContext context = null; + + public ManualPartitionSubscription(ManualPartitioner parter, TopicFilter partitionFilter) { + this.partitionFilter = partitionFilter; + this.partitioner = parter; + } + + @Override + public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) { + this.consumer = consumer; + this.listener = listener; + this.context = context; + refreshAssignment(); + } + + @Override + public void refreshAssignment() { + List<TopicPartition> allPartitions = partitionFilter.getFilteredTopicPartitions(consumer); + Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE); + Set<TopicPartition> newAssignment = new HashSet<>(partitioner.partition(allPartitions, context)); + if (!newAssignment.equals(currentAssignment)) { + consumer.assign(newAssignment); + if (currentAssignment != null) { + listener.onPartitionsRevoked(currentAssignment); + } + currentAssignment = newAssignment; + listener.onPartitionsAssigned(newAssignment); + } + } + + @Override + public String getTopicsString() { + return partitionFilter.getTopicsString(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java new file mode 100644 index 0000000..dce7fc6 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.subscription; + +import java.util.List; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.task.TopologyContext; + +/** + * A function used to assign partitions to this spout. + * WARNING if this is not done correctly you can really mess things up, like not reading data in some partitions. + * The complete TopologyContext is passed in, but it is suggested that you use the index of the spout and the total + * number of spouts to avoid missing partitions or double assigning partitions. + */ +@FunctionalInterface +public interface ManualPartitioner { + /** + * Get the partitions for this assignment + * @param allPartitions all of the partitions that the set of spouts want to subscribe to, in a strict ordering + * @param context the context of the topology + * @return the subset of the partitions that this spout should use. + */ + public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context); +} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java new file mode 100644 index 0000000..d6e5fc2 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java @@ -0,0 +1,67 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.subscription; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +/** + * Filter that returns all partitions for the specified topics. + */ +public class NamedTopicFilter implements TopicFilter { + + private final Set<String> topics; + + /** + * Create filter based on a set of topic names. + * @param topics The topic names the filter will pass. + */ + public NamedTopicFilter(Set<String> topics) { + this.topics = Collections.unmodifiableSet(topics); + } + + /** + * Convenience constructor. + * @param topics The topic names the filter will pass. + */ + public NamedTopicFilter(String... topics) { + this(new HashSet<>(Arrays.asList(topics))); + } + + @Override + public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) { + List<TopicPartition> allPartitions = new ArrayList<>(); + for (String topic : topics) { + for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) { + allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + } + } + return allPartitions; + } + + @Override + public String getTopicsString() { + return String.join(",", topics); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java new file mode 100644 index 0000000..98f8b23 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java @@ -0,0 +1,69 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.subscription; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +/** + * Filter that returns all partitions for topics matching the given {@link Pattern}. + */ +public class PatternTopicFilter implements TopicFilter { + + private final Pattern pattern; + private final Set<String> topics = new HashSet<>(); + + /** + * Creates filter based on a Pattern. Only topic names matching the Pattern are passed by the filter. + * + * @param pattern The Pattern to use. + */ + public PatternTopicFilter(Pattern pattern) { + this.pattern = pattern; + } + + @Override + public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) { + topics.clear(); + List<TopicPartition> allPartitions = new ArrayList<>(); + for (Map.Entry<String, List<PartitionInfo>> entry : consumer.listTopics().entrySet()) { + if (pattern.matcher(entry.getKey()).matches()) { + for (PartitionInfo partitionInfo : entry.getValue()) { + allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + topics.add(partitionInfo.topic()); + } + } + } + return allPartitions; + } + + @Override + public String getTopicsString() { + return String.join(",", topics); + } + + public String getTopicsPattern() { + return pattern.pattern(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java new file mode 100644 index 0000000..9660c77 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.subscription; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.task.TopologyContext; + +/** + * Assign partitions in a round robin fashion for all spouts, + * not just the ones that are alive. Because the parallelism of + * the spouts does not typically change while running this makes + * the assignments more stable in the face of crashing spouts. + * <p/> + * Round Robin means that first spout of N spouts will get the first + * partition, and the N+1th partition... The second spout will get the second partition and + * N+2th partition etc. + */ +public class RoundRobinManualPartitioner implements ManualPartitioner { + + @Override + public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context) { + int thisTaskIndex = context.getThisTaskIndex(); + int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size(); + Set<TopicPartition> myPartitions = new HashSet<>(allPartitions.size() / totalTaskCount + 1); + for (int i = thisTaskIndex; i < allPartitions.size(); i += totalTaskCount) { + myPartitions.add(allPartitions.get(i)); + } + return new ArrayList<>(myPartitions); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java new file mode 100644 index 0000000..8091bfa --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.subscription; + +import java.io.Serializable; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.storm.task.TopologyContext; + +/** + * A subscription to kafka. + */ +public abstract class Subscription implements Serializable { + private static final long serialVersionUID = -216136367240198716L; + + /** + * Subscribe the KafkaConsumer to the proper topics + * @param consumer the Consumer to get. + * @param listener the rebalance listener to include in the subscription + */ + public abstract <K, V> void subscribe(KafkaConsumer<K,V> consumer, ConsumerRebalanceListener listener, TopologyContext context); + + /** + * @return A human-readable string representing the subscribed topics. + */ + public abstract String getTopicsString(); + + /** + * NOOP is the default behavior, which means that Kafka will internally handle partition assignment. + * If you wish to do manual partition management, you must provide an implementation of this method + * that will check with kafka for any changes and call the ConsumerRebalanceListener from subscribe + * to inform the rest of the system of those changes. + */ + public void refreshAssignment() { + //NOOP + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java new file mode 100644 index 0000000..497e3ca --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java @@ -0,0 +1,38 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.subscription; + +import java.io.Serializable; +import java.util.List; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; + +public interface TopicFilter extends Serializable { + + /** + * Get the Kafka TopicPartitions passed by this filter. + * @param consumer The Kafka consumer to use to read the list of existing partitions + * @return The Kafka partitions passed by this filter. + */ + List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer); + + /** + * @return A human-readable string representing the topics that pass the filter. + */ + String getTopicsString(); + +} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java index 8dc34d4..7258fe2 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java @@ -26,16 +26,15 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Captor; @@ -50,53 +49,38 @@ public class KafkaSpoutCommitTest { private final Map<String, Object> conf = new HashMap<>(); private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); private KafkaConsumer<String, String> consumerMock; - private KafkaSpout<String, String> spout; - private KafkaSpoutConfig spoutConfig; + private KafkaSpoutConfig<String, String> spoutConfig; @Captor private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture; - private void setupSpout(Set<TopicPartition> assignedPartitions) { + @Before + public void setUp() { MockitoAnnotations.initMocks(this); spoutConfig = getKafkaSpoutConfigBuilder(-1) - .setOffsetCommitPeriodMs(offsetCommitPeriodMs) - .build(); - + .setOffsetCommitPeriodMs(offsetCommitPeriodMs) + .build(); consumerMock = mock(KafkaConsumer.class); - KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock; - - //Set up a spout listening to 1 topic partition - spout = new KafkaSpout<>(spoutConfig, consumerFactory); - - spout.open(conf, contextMock, collectorMock); - spout.activate(); - - ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); - - //Assign partitions to the spout - ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); - consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); } @Test public void testCommitSuccessWithOffsetVoids() { //Verify that the commit logic can handle offset voids try (SimulatedTime simulatedTime = new SimulatedTime()) { - setupSpout(Collections.singleton(partition)); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>(); // Offsets emitted are 0,1,2,3,4,<void>,8,9 for (int i = 0; i < 5; i++) { - recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } for (int i = 8; i < 10; i++) { - recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } records.put(partition, recordsForPartition); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(records)); + .thenReturn(new ConsumerRecords<>(records)); for (int i = 0; i < recordsForPartition.size(); i++) { spout.nextTuple(); http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java index 24a2eda..8e6d390 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java @@ -16,7 +16,6 @@ package org.apache.storm.kafka.spout; import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; -import static org.mockito.Matchers.anyCollection; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.inOrder; @@ -32,18 +31,16 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; @@ -56,45 +53,30 @@ public class KafkaSpoutEmitTest { private final Map<String, Object> conf = new HashMap<>(); private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); private KafkaConsumer<String, String> consumerMock; - private KafkaSpout<String, String> spout; - private KafkaSpoutConfig spoutConfig; + private KafkaSpoutConfig<String, String> spoutConfig; - private void setupSpout(Set<TopicPartition> assignedPartitions) { + @Before + public void setUp() { spoutConfig = getKafkaSpoutConfigBuilder(-1) .setOffsetCommitPeriodMs(offsetCommitPeriodMs) .build(); - consumerMock = mock(KafkaConsumer.class); - KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock; - - //Set up a spout listening to 1 topic partition - spout = new KafkaSpout<>(spoutConfig, consumerFactory); - - spout.open(conf, contextMock, collectorMock); - spout.activate(); - - ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); - - //Assign partitions to the spout - ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); - consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); } @Test public void testNextTupleEmitsAtMostOneTuple() { //The spout should emit at most one message per call to nextTuple //This is necessary for Storm to be able to throttle the spout according to maxSpoutPending - setupSpout(Collections.singleton(partition)); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>(); for (int i = 0; i < 10; i++) { - recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } records.put(partition, recordsForPartition); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(records)); + .thenReturn(new ConsumerRecords<>(records)); spout.nextTuple(); @@ -107,17 +89,17 @@ public class KafkaSpoutEmitTest { //Emit maxUncommittedOffsets messages, and fail all of them. Then ensure that the spout will retry them when the retry backoff has passed try (SimulatedTime simulatedTime = new SimulatedTime()) { - setupSpout(Collections.singleton(partition)); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>(); for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) { //This is cheating a bit since maxPollRecords would normally spread this across multiple polls - recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } records.put(partition, recordsForPartition); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(records)); + .thenReturn(new ConsumerRecords<>(records)); for (int i = 0; i < recordsForPartition.size(); i++) { spout.nextTuple(); @@ -172,13 +154,13 @@ public class KafkaSpoutEmitTest { //Emit maxUncommittedOffsets messages, and fail only the last. Then ensure that the spout will allow no more than maxUncommittedOffsets + maxPollRecords - 1 uncommitted offsets when retrying try (SimulatedTime simulatedTime = new SimulatedTime()) { - setupSpout(Collections.singleton(partition)); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPollRecords = new HashMap<>(); List<ConsumerRecord<String, String>> firstPollRecordsForPartition = new ArrayList<>(); for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) { //This is cheating a bit since maxPollRecords would normally spread this across multiple polls - firstPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + firstPollRecordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } firstPollRecords.put(partition, firstPollRecordsForPartition); @@ -186,13 +168,13 @@ public class KafkaSpoutEmitTest { Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPollRecords = new HashMap<>(); List<ConsumerRecord<String, String>> secondPollRecordsForPartition = new ArrayList<>(); for(int i = 0; i < maxPollRecords; i++) { - secondPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), spoutConfig.getMaxUncommittedOffsets() + i, "key", "value")); + secondPollRecordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), spoutConfig.getMaxUncommittedOffsets() + i, "key", "value")); } secondPollRecords.put(partition, secondPollRecordsForPartition); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(firstPollRecords)) - .thenReturn(new ConsumerRecords(secondPollRecords)); + .thenReturn(new ConsumerRecords<>(firstPollRecords)) + .thenReturn(new ConsumerRecords<>(secondPollRecords)); for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() + maxPollRecords; i++) { spout.nextTuple();
