Repository: storm
Updated Branches:
  refs/heads/1.x-branch d128cd9a8 -> 8efd09e48


STORM-2640: Deprecate KafkaConsumer.subscribe API option, make 
KafkaConsumer.assign the default


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2ebf2268
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2ebf2268
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2ebf2268

Branch: refs/heads/1.x-branch
Commit: 2ebf22685c0fc1afc3e6fe7707aa43aaa0fe62e5
Parents: 2fe66dc
Author: Stig Rohde Døssing <[email protected]>
Authored: Tue Jul 18 23:46:09 2017 +0200
Committer: Stig Rohde Døssing <[email protected]>
Committed: Tue Jul 25 10:39:17 2017 +0200

----------------------------------------------------------------------
 docs/storm-kafka-client.md                      |  7 +-
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 29 +++----
 .../spout/ManualPartitionSubscription.java      |  1 +
 .../storm/kafka/spout/ManualPartitioner.java    |  1 +
 .../storm/kafka/spout/NamedSubscription.java    |  4 +-
 .../storm/kafka/spout/PatternSubscription.java  |  4 +-
 .../storm/kafka/spout/KafkaSpoutCommitTest.java | 41 +++------
 .../storm/kafka/spout/KafkaSpoutEmitTest.java   | 53 ++++--------
 .../kafka/spout/KafkaSpoutRebalanceTest.java    | 37 +++++----
 .../kafka/spout/KafkaSpoutRetryLimitTest.java   | 79 +++++++-----------
 .../kafka/spout/MaxUncommittedOffsetTest.java   |  7 +-
 .../storm/kafka/spout/NamedTopicFilterTest.java |  3 +-
 .../kafka/spout/PatternTopicFilterTest.java     |  2 +
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  | 10 ++-
 .../SpoutWithMockedConsumerSetupHelper.java     | 87 ++++++++++++++++++++
 .../SingleTopicKafkaSpoutConfiguration.java     | 46 +++++++----
 16 files changed, 238 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
index 9166cb9..93d622e 100644
--- a/docs/storm-kafka-client.md
+++ b/docs/storm-kafka-client.md
@@ -247,12 +247,9 @@ streams.  If you are doing this for Trident a value must 
be in the List returned
 otherwise trident can throw exceptions.
 
 
-### Manual Partition Control (ADVANCED)
+### Manual Partition Assigment (ADVANCED)
 
-By default Kafka will automatically assign partitions to the current set of 
spouts.  It handles lots of things, but in some cases you may want to manually 
assign the partitions.
-This can cause less churn in the assignments when spouts go down and come back 
up, but it can result in a lot of issues if not done right.  This can all be 
handled by subclassing
-Subscription and we have a few implementations that you can look at for 
examples on how to do this.  ManualPartitionNamedSubscription and 
ManualPartitionPatternSubscription.  Again
-please be careful when using these or implementing your own.
+By default the KafkaSpout instances will be assigned partitions using a round 
robin strategy. If you need to customize partition assignment, you must 
implement the `ManualPartitioner` interface. The implementation can be passed 
to the `ManualPartitionSubscription` constructor, and the `Subscription` can 
then be set in the `KafkaSpoutConfig` via the `KafkaSpoutConfig.Builder` 
constructor. Please take care when supplying a custom implementation, since an 
incorrect `ManualPartitioner` implementation could leave some partitions 
unread, or concurrently read by multiple spout instances. See the 
`RoundRobinManualPartitioner` for an example of how to implement this 
functionality.
 
 ## Use the Maven Shade Plugin to Build the Uber Jar
 

http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 2b5a81a..833ce4a 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -28,6 +28,7 @@ import org.apache.storm.tuple.Fields;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -55,12 +56,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =
         new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.milliSeconds(2),
             DEFAULT_MAX_RETRIES, TimeInterval.seconds(10));
-    /**
-     * Retry in a tight loop (keep unit tests fasts) do not use in production.
-     */
-    public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE =
-        new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.milliSeconds(0),
-            DEFAULT_MAX_RETRIES, TimeInterval.milliSeconds(0));
 
     // Kafka consumer configuration
     private final Map<String, Object> kafkaProps;
@@ -128,7 +123,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
     public static class Builder<K, V> {
 
         private final Map<String, Object> kafkaProps;
-        private Subscription subscription;
+        private final Subscription subscription;
         private final SerializableDeserializer<K> keyDes;
         private final Class<? extends Deserializer<K>> keyDesClazz;
         private final SerializableDeserializer<V> valueDes;
@@ -143,15 +138,16 @@ public class KafkaSpoutConfig<K, V> implements 
Serializable {
         private boolean emitNullTuples = false;
 
         public Builder(String bootstrapServers, String... topics) {
-            this(bootstrapServers, (SerializableDeserializer) null, 
(SerializableDeserializer) null, new NamedSubscription(topics));
+            this(bootstrapServers, (SerializableDeserializer) null, 
(SerializableDeserializer) null, new ManualPartitionSubscription(new 
RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
         }
 
         public Builder(String bootstrapServers, Collection<String> topics) {
-            this(bootstrapServers, (SerializableDeserializer) null, 
(SerializableDeserializer) null, new NamedSubscription(topics));
+            this(bootstrapServers, (SerializableDeserializer) null, 
(SerializableDeserializer) null, new ManualPartitionSubscription(new 
RoundRobinManualPartitioner(),
+                new NamedTopicFilter(new HashSet<String>(topics))));
         }
 
         public Builder(String bootstrapServers, Pattern topics) {
-            this(bootstrapServers, (SerializableDeserializer) null, 
(SerializableDeserializer) null, new PatternSubscription(topics));
+            this(bootstrapServers, (SerializableDeserializer) null, 
(SerializableDeserializer) null, new ManualPartitionSubscription(new 
RoundRobinManualPartitioner(), new PatternTopicFilter(topics)));
         }
 
         /**
@@ -161,7 +157,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
          */
         @Deprecated
         public Builder(String bootstrapServers, SerializableDeserializer<K> 
keyDes, SerializableDeserializer<V> valDes, String... topics) {
-            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
+            this(bootstrapServers, keyDes, valDes, new 
ManualPartitionSubscription(new RoundRobinManualPartitioner(), new 
NamedTopicFilter(topics)));
         }
 
         /**
@@ -171,7 +167,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
          */
         @Deprecated
         public Builder(String bootstrapServers, SerializableDeserializer<K> 
keyDes, SerializableDeserializer<V> valDes, Collection<String> topics) {
-            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
+            this(bootstrapServers, keyDes, valDes, new 
ManualPartitionSubscription(new RoundRobinManualPartitioner(), new 
NamedTopicFilter(new HashSet<String>(topics))));
         }
 
         /**
@@ -181,7 +177,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
          */
         @Deprecated
         public Builder(String bootstrapServers, SerializableDeserializer<K> 
keyDes, SerializableDeserializer<V> valDes, Pattern topics) {
-            this(bootstrapServers, keyDes, valDes, new 
PatternSubscription(topics));
+            this(bootstrapServers, keyDes, valDes, new 
ManualPartitionSubscription(new RoundRobinManualPartitioner(), new 
PatternTopicFilter(topics)));
         }
 
         /**
@@ -199,7 +195,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
          */
         @Deprecated
         public Builder(String bootstrapServers, Class<? extends 
Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, String... 
topics) {
-            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
+            this(bootstrapServers, keyDes, valDes, new 
ManualPartitionSubscription(new RoundRobinManualPartitioner(), new 
NamedTopicFilter(topics)));
         }
 
         /**
@@ -207,7 +203,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
          */
         @Deprecated
         public Builder(String bootstrapServers, Class<? extends 
Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, 
Collection<String> topics) {
-            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
+            this(bootstrapServers, keyDes, valDes, new 
ManualPartitionSubscription(new RoundRobinManualPartitioner(), new 
NamedTopicFilter(new HashSet<String>(topics))));
         }
 
         /**
@@ -215,7 +211,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
          */
         @Deprecated
         public Builder(String bootstrapServers, Class<? extends 
Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Pattern 
topics) {
-            this(bootstrapServers, keyDes, valDes, new 
PatternSubscription(topics));
+            this(bootstrapServers, keyDes, valDes, new 
ManualPartitionSubscription(new RoundRobinManualPartitioner(), new 
PatternTopicFilter(topics)));
         }
 
         /**
@@ -479,7 +475,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
          * documentation in {@link FirstPollOffsetStrategy}
          *
          * @param firstPollOffsetStrategy Offset used by Kafka spout first poll
-         *
          */
         public Builder<K, V> 
setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) {
             this.firstPollOffsetStrategy = firstPollOffsetStrategy;

http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
index 61b98a8..6bc4bea 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.TopicPartitionComparator;
 import org.apache.storm.task.TopologyContext;
 
 public class ManualPartitionSubscription extends Subscription {

http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
index 0abd6c8..f9a6869 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
@@ -15,6 +15,7 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+
 package org.apache.storm.kafka.spout;
 
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
index 3409184..3bb7152 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
@@ -30,8 +30,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Subscribe to all topics that follow a given list of values
+ * Subscribe to all topics that follow a given list of values.
+ * @deprecated Please use {@link ManualPartitionSubscription} with {@link 
NamedTopicFilter} instead
  */
+@Deprecated
 public class NamedSubscription extends Subscription {
     private static final Logger LOG = 
LoggerFactory.getLogger(NamedSubscription.class);
     private static final long serialVersionUID = 3438543305215813839L;

http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
index 9a8de0f..dc9f9e3 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
@@ -26,8 +26,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Subscribe to all topics that match a given pattern
+ * Subscribe to all topics that match a given pattern.
+ * @deprecated Please use {@link ManualPartitionSubscription} with {@link 
PatternTopicFilter} instead
  */
+@Deprecated
 public class PatternSubscription extends Subscription {
     private static final Logger LOG = 
LoggerFactory.getLogger(PatternSubscription.class);
     private static final long serialVersionUID = 3438543305215813839L;

http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
index b7737c7..17ba378 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
@@ -26,16 +26,15 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
 import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
@@ -50,58 +49,38 @@ public class KafkaSpoutCommitTest {
     private final Map<String, Object> conf = new HashMap<>();
     private final TopicPartition partition = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
     private KafkaConsumer<String, String> consumerMock;
-    private KafkaSpout<String, String> spout;
-    private KafkaSpoutConfig spoutConfig;
+    private KafkaSpoutConfig<String, String> spoutConfig;
 
     @Captor
     private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> 
commitCapture;
 
-    private void setupSpout(Set<TopicPartition> assignedPartitions) {
+    @Before
+    public void setUp() {
         MockitoAnnotations.initMocks(this);
         spoutConfig = getKafkaSpoutConfigBuilder(-1)
-                .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
-                .build();
-
+            .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+            .build();
         consumerMock = mock(KafkaConsumer.class);
-        KafkaConsumerFactory<String, String> consumerFactory = new 
KafkaConsumerFactory<String, String>() {
-            @Override
-            public KafkaConsumer<String, String> 
createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
-                return consumerMock;
-            }
-        };
-
-        //Set up a spout listening to 1 topic partition
-        spout = new KafkaSpout<>(spoutConfig, consumerFactory);
-
-        spout.open(conf, contextMock, collectorMock);
-        spout.activate();
-
-        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = 
ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-        verify(consumerMock).subscribe(anyCollection(), 
rebalanceListenerCapture.capture());
-
-        //Assign partitions to the spout
-        ConsumerRebalanceListener consumerRebalanceListener = 
rebalanceListenerCapture.getValue();
-        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
     }
 
     @Test
     public void testCommitSuccessWithOffsetVoids() {
         //Verify that the commit logic can handle offset voids
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            setupSpout(Collections.singleton(partition));
+            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
             Map<TopicPartition, List<ConsumerRecord<String, String>>> records 
= new HashMap<>();
             List<ConsumerRecord<String, String>> recordsForPartition = new 
ArrayList<>();
             // Offsets emitted are 0,1,2,3,4,<void>,8,9
             for (int i = 0; i < 5; i++) {
-                recordsForPartition.add(new ConsumerRecord(partition.topic(), 
partition.partition(), i, "key", "value"));
+                recordsForPartition.add(new 
ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
             }
             for (int i = 8; i < 10; i++) {
-                recordsForPartition.add(new ConsumerRecord(partition.topic(), 
partition.partition(), i, "key", "value"));
+                recordsForPartition.add(new 
ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
             }
             records.put(partition, recordsForPartition);
 
             when(consumerMock.poll(anyLong()))
-                    .thenReturn(new ConsumerRecords(records));
+                    .thenReturn(new ConsumerRecords<>(records));
 
             for (int i = 0; i < recordsForPartition.size(); i++) {
                 spout.nextTuple();

http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
index 447f8c4..e8e93b0 100755
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
@@ -17,7 +17,6 @@ package org.apache.storm.kafka.spout;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyCollection;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.mock;
@@ -31,14 +30,11 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.junit.Test;
@@ -53,6 +49,7 @@ import static org.mockito.Mockito.never;
 import java.util.HashSet;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
 import org.mockito.InOrder;
 
 public class KafkaSpoutEmitTest {
@@ -63,50 +60,30 @@ public class KafkaSpoutEmitTest {
     private final Map<String, Object> conf = new HashMap<>();
     private final TopicPartition partition = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
     private KafkaConsumer<String, String> consumerMock;
-    private KafkaSpout<String, String> spout;
-    private KafkaSpoutConfig spoutConfig;
+    private KafkaSpoutConfig<String, String> spoutConfig;
 
-    private void setupSpout(Set<TopicPartition> assignedPartitions) {
+    @Before
+    public void setUp() {
         spoutConfig = getKafkaSpoutConfigBuilder(-1)
             .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
             .build();
-
         consumerMock = mock(KafkaConsumer.class);
-        KafkaConsumerFactory<String, String> consumerFactory = new 
KafkaConsumerFactory<String, String>() {
-            @Override
-            public KafkaConsumer<String, String> 
createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
-                return consumerMock;
-            }
-        };
-
-        //Set up a spout listening to 1 topic partition
-        spout = new KafkaSpout<>(spoutConfig, consumerFactory);
-
-        spout.open(conf, contextMock, collectorMock);
-        spout.activate();
-
-        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = 
ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-        verify(consumerMock).subscribe(anyCollection(), 
rebalanceListenerCapture.capture());
-
-        //Assign partitions to the spout
-        ConsumerRebalanceListener consumerRebalanceListener = 
rebalanceListenerCapture.getValue();
-        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
     }
 
     @Test
     public void testNextTupleEmitsAtMostOneTuple() {
         //The spout should emit at most one message per call to nextTuple
         //This is necessary for Storm to be able to throttle the spout 
according to maxSpoutPending
-        setupSpout(Collections.singleton(partition));
+        KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
         Map<TopicPartition, List<ConsumerRecord<String, String>>> records = 
new HashMap<>();
         List<ConsumerRecord<String, String>> recordsForPartition = new 
ArrayList<>();
         for (int i = 0; i < 10; i++) {
-            recordsForPartition.add(new ConsumerRecord(partition.topic(), 
partition.partition(), i, "key", "value"));
+            recordsForPartition.add(new ConsumerRecord<>(partition.topic(), 
partition.partition(), i, "key", "value"));
         }
         records.put(partition, recordsForPartition);
 
         when(consumerMock.poll(anyLong()))
-            .thenReturn(new ConsumerRecords(records));
+            .thenReturn(new ConsumerRecords<>(records));
 
         spout.nextTuple();
 
@@ -119,17 +96,17 @@ public class KafkaSpoutEmitTest {
         
         //Emit maxUncommittedOffsets messages, and fail all of them. Then 
ensure that the spout will retry them when the retry backoff has passed
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            setupSpout(Collections.singleton(partition));
+            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
             Map<TopicPartition, List<ConsumerRecord<String, String>>> records 
= new HashMap<>();
             List<ConsumerRecord<String, String>> recordsForPartition = new 
ArrayList<>();
             for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) {
                 //This is cheating a bit since maxPollRecords would normally 
spread this across multiple polls
-                recordsForPartition.add(new ConsumerRecord(partition.topic(), 
partition.partition(), i, "key", "value"));
+                recordsForPartition.add(new 
ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
             }
             records.put(partition, recordsForPartition);
 
             when(consumerMock.poll(anyLong()))
-                .thenReturn(new ConsumerRecords(records));
+                .thenReturn(new ConsumerRecords<>(records));
 
             for (int i = 0; i < recordsForPartition.size(); i++) {
                 spout.nextTuple();
@@ -184,13 +161,13 @@ public class KafkaSpoutEmitTest {
         
         //Emit maxUncommittedOffsets messages, and fail only the last. Then 
ensure that the spout will allow no more than maxUncommittedOffsets + 
maxPollRecords - 1 uncommitted offsets when retrying
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            setupSpout(Collections.singleton(partition));
+            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
             
             Map<TopicPartition, List<ConsumerRecord<String, String>>> 
firstPollRecords = new HashMap<>();
             List<ConsumerRecord<String, String>> firstPollRecordsForPartition 
= new ArrayList<>();
             for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) {
                 //This is cheating a bit since maxPollRecords would normally 
spread this across multiple polls
-                firstPollRecordsForPartition.add(new 
ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+                firstPollRecordsForPartition.add(new 
ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
             }
             firstPollRecords.put(partition, firstPollRecordsForPartition);
             
@@ -198,13 +175,13 @@ public class KafkaSpoutEmitTest {
             Map<TopicPartition, List<ConsumerRecord<String, String>>> 
secondPollRecords = new HashMap<>();
             List<ConsumerRecord<String, String>> secondPollRecordsForPartition 
= new ArrayList<>();
             for(int i = 0; i < maxPollRecords; i++) {
-                secondPollRecordsForPartition.add(new 
ConsumerRecord(partition.topic(), partition.partition(), 
spoutConfig.getMaxUncommittedOffsets() + i, "key", "value"));
+                secondPollRecordsForPartition.add(new 
ConsumerRecord<>(partition.topic(), partition.partition(), 
spoutConfig.getMaxUncommittedOffsets() + i, "key", "value"));
             }
             secondPollRecords.put(partition, secondPollRecordsForPartition);
 
             when(consumerMock.poll(anyLong()))
-                .thenReturn(new ConsumerRecords(firstPollRecords))
-                .thenReturn(new ConsumerRecords(secondPollRecords));
+                .thenReturn(new ConsumerRecords<>(firstPollRecords))
+                .thenReturn(new ConsumerRecords<>(secondPollRecords));
 
             for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() + 
maxPollRecords; i++) {
                 spout.nextTuple();

http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index bd6e582..8996190 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -17,11 +17,11 @@ package org.apache.storm.kafka.spout;
 
 import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.hamcrest.CoreMatchers.not;
-import static org.hamcrest.Matchers.any;
 import static org.hamcrest.Matchers.hasKey;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
@@ -81,14 +81,11 @@ public class KafkaSpoutRebalanceTest {
     }
 
     //Returns messageIds in order of emission
-    private List<KafkaSpoutMessageId> 
emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> 
spout, TopicPartition partitionThatWillBeRevoked, TopicPartition 
assignedPartition) {
-        //Setup spout with mock consumer so we can get at the rebalance 
listener
+    private List<KafkaSpoutMessageId> 
emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> 
spout, TopicPartition partitionThatWillBeRevoked, TopicPartition 
assignedPartition, ArgumentCaptor<ConsumerRebalanceListener> 
rebalanceListenerCapture) {
+        //Setup spout with mock consumer so we can get at the rebalance 
listener   
         spout.open(conf, contextMock, collectorMock);
         spout.activate();
 
-        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = 
ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-        verify(consumerMock).subscribe(anyCollection(), 
rebalanceListenerCapture.capture());
-
         //Assign partitions to the spout
         ConsumerRebalanceListener consumerRebalanceListener = 
rebalanceListenerCapture.getValue();
         List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -102,9 +99,9 @@ public class KafkaSpoutRebalanceTest {
         Map<TopicPartition, List<ConsumerRecord<String, String>>> 
secondPartitionRecords = new HashMap<>();
         secondPartitionRecords.put(assignedPartition, 
Collections.singletonList(new ConsumerRecord<>(assignedPartition.topic(), 
assignedPartition.partition(), 0L, "key", "value")));
         when(consumerMock.poll(anyLong()))
-            .thenReturn(new ConsumerRecords(firstPartitionRecords))
-            .thenReturn(new ConsumerRecords(secondPartitionRecords))
-            .thenReturn(new ConsumerRecords(Collections.emptyMap()));
+            .thenReturn(new ConsumerRecords<>(firstPartitionRecords))
+            .thenReturn(new ConsumerRecords<>(secondPartitionRecords))
+            .thenReturn(new ConsumerRecords<>(new HashMap<TopicPartition, 
List<ConsumerRecord<String, String>>>()));
 
         //Emit the messages
         spout.nextTuple();
@@ -129,7 +126,12 @@ public class KafkaSpoutRebalanceTest {
     public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() 
throws Exception {
         //Acking tuples for partitions that are no longer assigned is useless 
since the spout will not be allowed to commit them
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            KafkaSpout<String, String> spout = new 
KafkaSpout<>(getKafkaSpoutConfigBuilder(-1)
+            ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture 
= ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+            Subscription subscriptionMock = mock(Subscription.class);
+            doNothing()
+                .when(subscriptionMock)
+                .subscribe(any(KafkaConsumer.class), 
rebalanceListenerCapture.capture(), any(TopologyContext.class));
+            KafkaSpout<String, String> spout = new 
KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1)
                 .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
                 .build(), consumerFactoryMock);
             String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
@@ -137,7 +139,8 @@ public class KafkaSpoutRebalanceTest {
             TopicPartition assignedPartition = new TopicPartition(topic, 2);
 
             //Emit a message on each partition and revoke the first partition
-            List<KafkaSpoutMessageId> emittedMessageIds = 
emitOneMessagePerPartitionThenRevokeOnePartition(spout, 
partitionThatWillBeRevoked, assignedPartition);
+            List<KafkaSpoutMessageId> emittedMessageIds = 
emitOneMessagePerPartitionThenRevokeOnePartition(
+                spout, partitionThatWillBeRevoked, assignedPartition, 
rebalanceListenerCapture);
 
             //Ack both emitted tuples
             spout.ack(emittedMessageIds.get(0));
@@ -159,8 +162,13 @@ public class KafkaSpoutRebalanceTest {
     @Test
     public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() 
throws Exception {
         //Failing tuples for partitions that are no longer assigned is useless 
since the spout will not be allowed to commit them if they later pass
+        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = 
ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+            Subscription subscriptionMock = mock(Subscription.class);
+            doNothing()
+                .when(subscriptionMock)
+                .subscribe(any(KafkaConsumer.class), 
rebalanceListenerCapture.capture(), any(TopologyContext.class));
         KafkaSpoutRetryService retryServiceMock = 
mock(KafkaSpoutRetryService.class);
-        KafkaSpout<String, String> spout = new 
KafkaSpout<>(getKafkaSpoutConfigBuilder(-1)
+        KafkaSpout<String, String> spout = new 
KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1)
             .setOffsetCommitPeriodMs(10)
             .setRetry(retryServiceMock)
             .build(), consumerFactoryMock);
@@ -173,7 +181,8 @@ public class KafkaSpoutRebalanceTest {
             .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0));
         
         //Emit a message on each partition and revoke the first partition
-        List<KafkaSpoutMessageId> emittedMessageIds = 
emitOneMessagePerPartitionThenRevokeOnePartition(spout, 
partitionThatWillBeRevoked, assignedPartition);
+        List<KafkaSpoutMessageId> emittedMessageIds = 
emitOneMessagePerPartitionThenRevokeOnePartition(
+            spout, partitionThatWillBeRevoked, assignedPartition, 
rebalanceListenerCapture);
 
         //Check that only two message ids were generated
         verify(retryServiceMock, 
times(2)).getMessageId(Mockito.any(ConsumerRecord.class));

http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
index 831e383..79f7398 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
@@ -30,85 +30,71 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
 import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
 import org.mockito.InOrder;
+import org.mockito.MockitoAnnotations;
 
 public class KafkaSpoutRetryLimitTest {
-
+    
     private final long offsetCommitPeriodMs = 2_000;
     private final TopologyContext contextMock = mock(TopologyContext.class);
     private final SpoutOutputCollector collectorMock = 
mock(SpoutOutputCollector.class);
     private final Map<String, Object> conf = new HashMap<>();
     private final TopicPartition partition = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
     private KafkaConsumer<String, String> consumerMock;
-    private KafkaSpout<String, String> spout;
-    private KafkaSpoutConfig spoutConfig;
-
+    private KafkaSpoutConfig<String, String> spoutConfig;
+    
     public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE =
-            new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
-                    0, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
-
-    private void setupSpoutWithNoRetry(Set<TopicPartition> assignedPartitions) 
{
+        new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+            0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+    
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> 
commitCapture;
+    
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
         spoutConfig = getKafkaSpoutConfigBuilder(-1)
-                .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
-                .setRetry(ZERO_RETRIES_RETRY_SERVICE)
-                .build();
-
+            .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+            .setRetry(ZERO_RETRIES_RETRY_SERVICE)
+            .build();
         consumerMock = mock(KafkaConsumer.class);
-        KafkaConsumerFactory<String, String> consumerFactory = new 
KafkaConsumerFactory<String, String>() {
-            @Override
-            public KafkaConsumer<String, String> 
createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
-                return consumerMock;
-            }
-        };
-
-        spout = new KafkaSpout<>(spoutConfig, consumerFactory);
-
-        spout.open(conf, contextMock, collectorMock);
-        spout.activate();
-
-        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = 
ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-        verify(consumerMock).subscribe(anyCollection(), 
rebalanceListenerCapture.capture());
-
-        //Assign partitions to the spout
-        ConsumerRebalanceListener consumerRebalanceListener = 
rebalanceListenerCapture.getValue();
-        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
     }
-
+    
     @Test
     public void testFailingTupleCompletesAckAfterRetryLimitIsMet() {
         //Spout should ack failed messages after they hit the retry limit
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            setupSpoutWithNoRetry(Collections.singleton(partition));
+            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
             Map<TopicPartition, List<ConsumerRecord<String, String>>> records 
= new HashMap<>();
             List<ConsumerRecord<String, String>> recordsForPartition = new 
ArrayList<>();
             int lastOffset = 3;
             for (int i = 0; i <= lastOffset; i++) {
-                recordsForPartition.add(new ConsumerRecord(partition.topic(), 
partition.partition(), i, "key", "value"));
+                recordsForPartition.add(new 
ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
             }
             records.put(partition, recordsForPartition);
-
+            
             when(consumerMock.poll(anyLong()))
-                    .thenReturn(new ConsumerRecords(records));
-
+                .thenReturn(new ConsumerRecords<>(records));
+            
             for (int i = 0; i < recordsForPartition.size(); i++) {
                 spout.nextTuple();
             }
-
+            
             ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
             verify(collectorMock, 
times(recordsForPartition.size())).emit(anyString(), anyList(), 
messageIds.capture());
-
+            
             for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
                 spout.fail(messageId);
             }
@@ -116,16 +102,15 @@ public class KafkaSpoutRetryLimitTest {
             // Advance time and then trigger call to kafka consumer commit
             Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
             spout.nextTuple();
-
-            ArgumentCaptor<Map> 
committedOffsets=ArgumentCaptor.forClass(Map.class);
+            
             InOrder inOrder = inOrder(consumerMock);
-            
inOrder.verify(consumerMock).commitSync(committedOffsets.capture());
+            inOrder.verify(consumerMock).commitSync(commitCapture.capture());
             inOrder.verify(consumerMock).poll(anyLong());
 
             //verify that Offset 3 was committed for the given TopicPartition
-            assertTrue(committedOffsets.getValue().containsKey(partition));
-            assertEquals(lastOffset, ((OffsetAndMetadata) 
(committedOffsets.getValue().get(partition))).offset());
+            assertTrue(commitCapture.getValue().containsKey(partition));
+            assertEquals(lastOffset, ((OffsetAndMetadata) 
(commitCapture.getValue().get(partition))).offset());
         }
     }
-
-}
\ No newline at end of file
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
index 00b973c..ccb2a6c 100755
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.isIn;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
@@ -29,8 +30,10 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -60,7 +63,7 @@ public class MaxUncommittedOffsetTest {
     private final int maxUncommittedOffsets = 10;
     private final int maxPollRecords = 5;
     private final int initialRetryDelaySecs = 60;
-    private final KafkaSpoutConfig spoutConfig = 
getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
+    private final KafkaSpoutConfig<String, String> spoutConfig = 
getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
         .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
         .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
         .setMaxUncommittedOffsets(maxUncommittedOffsets)
@@ -95,6 +98,8 @@ public class MaxUncommittedOffsetTest {
 
     private void initializeSpout(int msgCount) throws Exception {
         populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
+        when(topologyContext.getThisTaskIndex()).thenReturn(0);
+        
when(topologyContext.getComponentTasks(anyString())).thenReturn(Collections.singletonList(0));
         spout.open(conf, topologyContext, collector);
         spout.activate();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
index e97c7e1..fe3325c 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
@@ -16,10 +16,11 @@
 
 package org.apache.storm.kafka.spout;
 
+import org.apache.storm.kafka.spout.NamedTopicFilter;
+
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
-
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
index 877efdc..335ab31 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
@@ -16,6 +16,8 @@
 
 package org.apache.storm.kafka.spout;
 
+import org.apache.storm.kafka.spout.PatternTopicFilter;
+
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;

http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index 6042c80..7759b3c 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -36,7 +36,7 @@ import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Matchers.anyObject;
@@ -48,7 +48,9 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -83,12 +85,12 @@ public class SingleTopicKafkaSpoutTest {
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
-        KafkaSpoutConfig spoutConfig = 
getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
+        KafkaSpoutConfig<String, String> spoutConfig = 
getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
             .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
             .setRetry(new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
                 maxRetries, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
             .build();
-        this.consumerSpy = spy(new 
KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
+        this.consumerSpy = spy(new KafkaConsumerFactoryDefault<String, 
String>().createConsumer(spoutConfig));
         this.consumerFactory = new KafkaConsumerFactory<String, String>() {
             @Override
             public KafkaConsumer<String, String> 
createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
@@ -112,6 +114,8 @@ public class SingleTopicKafkaSpoutTest {
 
     private void initializeSpout(int msgCount) throws InterruptedException, 
ExecutionException, TimeoutException {
         populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
+        when(topologyContext.getThisTaskIndex()).thenReturn(0);
+        
when(topologyContext.getComponentTasks(anyString())).thenReturn(Collections.singletonList(0));
         spout.open(conf, topologyContext, collector);
         spout.activate();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
new file mode 100644
index 0000000..67b1f2c
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+
+public class SpoutWithMockedConsumerSetupHelper {
+    
+    /**
+     * Creates, opens and activates a KafkaSpout using a mocked consumer.
+     * @param <K> The Kafka key type
+     * @param <V> The Kafka value type
+     * @param spoutConfig The spout config to use
+     * @param topoConf The topo conf to pass to the spout
+     * @param contextMock The topo context to pass to the spout
+     * @param collectorMock The mocked collector to pass to the spout
+     * @param consumerMock The mocked consumer
+     * @param assignedPartitions The partitions to assign to this spout. The 
consumer will act like these partitions are assigned to it.
+     * @return The spout
+     */
+    public static <K, V> KafkaSpout<K, V> setupSpout(KafkaSpoutConfig<K, V> 
spoutConfig, Map<String, Object> topoConf,
+        TopologyContext contextMock, SpoutOutputCollector collectorMock, final 
KafkaConsumer<K, V> consumerMock, Set<TopicPartition> assignedPartitions) {     
+
+        Map<String, List<PartitionInfo>> partitionInfos = new HashMap<>();
+        for(TopicPartition tp : assignedPartitions) {
+            PartitionInfo info = new PartitionInfo(tp.topic(), tp.partition(), 
null, null, null);
+            List<PartitionInfo> infos = partitionInfos.get(tp.topic());
+            if(infos == null) {
+                infos = new ArrayList<>();
+                partitionInfos.put(tp.topic(), infos);
+            }
+            infos.add(info);
+        }
+        for(String topic : partitionInfos.keySet()) {
+            when(consumerMock.partitionsFor(topic))
+                .thenReturn(partitionInfos.get(topic));
+        }
+        KafkaConsumerFactory<K, V> consumerFactory = new 
KafkaConsumerFactory<K, V>() {
+            @Override
+            public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> 
kafkaSpoutConfig) {
+                return consumerMock;
+            }
+        };
+
+        KafkaSpout<K, V> spout = new KafkaSpout<>(spoutConfig, 
consumerFactory);
+
+        
when(contextMock.getComponentTasks(anyString())).thenReturn(Collections.singletonList(0));
+        when(contextMock.getThisTaskIndex()).thenReturn(0);
+        
+        spout.open(topoConf, contextMock, collectorMock);
+        spout.activate();
+
+        verify(consumerMock).assign(assignedPartitions);
+        
+        return spout;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/2ebf2268/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
index a5c364c..1ab4966 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.kafka.spout.builders;
 
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.DEFAULT_MAX_RETRIES;
 import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
 
 import java.util.List;
@@ -27,7 +28,9 @@ import org.apache.storm.generated.StormTopology;
 import org.apache.storm.kafka.spout.Func;
 import org.apache.storm.kafka.spout.KafkaSpout;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
+import org.apache.storm.kafka.spout.Subscription;
 import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
@@ -38,6 +41,13 @@ public class SingleTopicKafkaSpoutConfiguration {
     public static final String STREAM = "test_stream";
     public static final String TOPIC = "test";
 
+    /**
+     * Retry in a tight loop (keep unit tests fasts).
+     */
+    public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE =
+        new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+            DEFAULT_MAX_RETRIES, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+
     public static Config getConfig() {
         Config config = new Config();
         config.setDebug(true);
@@ -57,21 +67,29 @@ public class SingleTopicKafkaSpoutConfiguration {
             return new Values(r.topic(), r.key(), r.value());
         }
     };
-    
-    public static KafkaSpoutConfig.Builder<String,String> 
getKafkaSpoutConfigBuilder(int port) {
-        return KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC)
-                .setRecordTranslator(TOPIC_KEY_VALUE_FUNC,
-                        new Fields("topic", "key", "value"), STREAM)
-                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
-                .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5)
-                .setRetry(getRetryService())
-                .setOffsetCommitPeriodMs(10_000)
-                .setFirstPollOffsetStrategy(EARLIEST)
-                .setMaxUncommittedOffsets(250)
-                .setPollTimeoutMs(1000);
+
+    public static KafkaSpoutConfig.Builder<String, String> 
getKafkaSpoutConfigBuilder(int port) {
+        return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + 
port, TOPIC));
+    }
+
+    public static KafkaSpoutConfig.Builder<String, String> 
getKafkaSpoutConfigBuilder(Subscription subscription, int port) {
+        return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<String, 
String>("127.0.0.1:" + port, subscription));
     }
-        
+
+    private static KafkaSpoutConfig.Builder<String, String> 
setCommonSpoutConfig(KafkaSpoutConfig.Builder<String, String> config) {
+        return config
+            .setRecordTranslator(TOPIC_KEY_VALUE_FUNC,
+                new Fields("topic", "key", "value"), STREAM)
+            .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
+            .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5)
+            .setRetry(getRetryService())
+            .setOffsetCommitPeriodMs(10_000)
+            .setFirstPollOffsetStrategy(EARLIEST)
+            .setMaxUncommittedOffsets(250)
+            .setPollTimeoutMs(1000);
+    }
+
     protected static KafkaSpoutRetryService getRetryService() {
-        return KafkaSpoutConfig.UNIT_TEST_RETRY_SERVICE;
+        return UNIT_TEST_RETRY_SERVICE;
     }
 }

Reply via email to