Repository: storm
Updated Branches:
  refs/heads/master a894fe63a -> 41d6cdcc9


STORM-2869: Only discard outdated records when adjusting KafkaConsumer position 
during commit


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/df5187e8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/df5187e8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/df5187e8

Branch: refs/heads/master
Commit: df5187e803d38f2cbb35ac9f1853ed7f2410ad0e
Parents: f998e26
Author: Stig Rohde Døssing <s...@apache.org>
Authored: Tue Dec 26 20:55:30 2017 +0100
Committer: Stig Rohde Døssing <s...@apache.org>
Committed: Tue Dec 26 23:17:13 2017 +0100

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |  63 +--
 .../kafka/spout/KafkaSpoutAbstractTest.java     |   8 +-
 .../kafka/spout/KafkaSpoutSingleTopicTest.java  |  81 ++--
 ...outTopologyDeployActivateDeactivateTest.java |   4 +-
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  | 379 -------------------
 .../spout/SingleTopicKafkaUnitSetupHelper.java  |   4 +-
 6 files changed, 86 insertions(+), 453 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/df5187e8/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index b7e3136..9fe654f 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -28,12 +28,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -98,7 +98,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     // Always empty if processing guarantee is none or at-most-once
     private transient Set<KafkaSpoutMessageId> emitted;
     // Records that have been polled and are queued to be emitted in the 
nextTuple() call. One record is emitted per nextTuple()
-    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;
+    private transient Map<TopicPartition, List<ConsumerRecord<K, V>>> 
waitingToEmit;
     // Triggers when a subscription should be refreshed
     private transient Timer refreshSubscriptionTimer;
     private transient TopologyContext context;
@@ -138,7 +138,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         offsetManagers = new HashMap<>();
         emitted = new HashSet<>();
-        waitingToEmit = Collections.emptyListIterator();
+        waitingToEmit = new HashMap<>();
         setCommitMetadata(context);
 
         tupleListener.open(conf, context);
@@ -151,7 +151,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
                 context.getStormId(), context.getThisTaskId(), 
Thread.currentThread().getName()));
         } catch (JsonProcessingException e) {
-            LOG.error("Failed to create Kafka commit metadata due to JSON 
serialization error",e);
+            LOG.error("Failed to create Kafka commit metadata due to JSON 
serialization error", e);
             throw new RuntimeException(e);
         }
     }
@@ -164,7 +164,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private class KafkaSpoutConsumerRebalanceListener implements 
ConsumerRebalanceListener {
 
         private Collection<TopicPartition> previousAssignment = new 
HashSet<>();
-        
+
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
{
             previousAssignment = partitions;
@@ -198,6 +198,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                  */
                 emitted.removeIf(msgId -> 
!partitions.contains(msgId.getTopicPartition()));
             }
+            waitingToEmit.keySet().retainAll(partitions);
 
             Set<TopicPartition> newPartitions = new HashSet<>(partitions);
             newPartitions.removeAll(previousAssignment);
@@ -252,8 +253,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     /**
-     * Checks If {@link OffsetAndMetadata} was committed by a {@link 
KafkaSpout} instance in this topology.
-     * This info is used to decide if {@link FirstPollOffsetStrategy} should 
be applied
+     * Checks if {@link OffsetAndMetadata} was committed by a {@link 
KafkaSpout} instance in this topology. This info is used to decide if
+     * {@link FirstPollOffsetStrategy} should be applied
      *
      * @param tp topic-partition
      * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
@@ -271,7 +272,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             LOG.warn("Failed to deserialize [{}]. Error likely occurred 
because the last commit "
                 + "for this topic-partition was done using an earlier version 
of Storm. "
                 + "Defaulting to behavior compatible with earlier version", 
committedOffset);
-            LOG.trace("",e);
+            LOG.trace("", e);
             return false;
         }
     }
@@ -318,12 +319,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             LOG.debug("Not polling. Tuples waiting to be emitted.");
             return new PollablePartitionsInfo(Collections.emptySet(), 
Collections.emptyMap());
         }
-        
+
         Set<TopicPartition> assignment = kafkaConsumer.assignment();
         if (!isAtLeastOnceProcessing()) {
             return new PollablePartitionsInfo(assignment, 
Collections.emptyMap());
         }
-        
+
         Map<TopicPartition, Long> earliestRetriableOffsets = 
retryService.earliestRetriableOffsets();
         Set<TopicPartition> pollablePartitions = new HashSet<>();
         final int maxUncommittedOffsets = 
kafkaSpoutConfig.getMaxUncommittedOffsets();
@@ -349,15 +350,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     private boolean isWaitingToEmit() {
-        return waitingToEmit != null && waitingToEmit.hasNext();
+        return waitingToEmit.values().stream()
+            .anyMatch(list -> !list.isEmpty());
     }
 
     private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
-        List<ConsumerRecord<K, V>> waitingToEmitList = new LinkedList<>();
         for (TopicPartition tp : consumerRecords.partitions()) {
-            waitingToEmitList.addAll(consumerRecords.records(tp));
+            waitingToEmit.put(tp, new 
ArrayList<>(consumerRecords.records(tp)));
         }
-        waitingToEmit = waitingToEmitList.iterator();
     }
 
     // ======== poll =========
@@ -417,12 +417,17 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     // ======== emit  =========
     private void emitIfWaitingNotEmitted() {
-        while (isWaitingToEmit()) {
-            final boolean emittedTuple = 
emitOrRetryTuple(waitingToEmit.next());
-            waitingToEmit.remove();
-            if (emittedTuple) {
-                break;
+        Iterator<List<ConsumerRecord<K, V>>> waitingToEmitIter = 
waitingToEmit.values().iterator();
+        outerLoop:
+        while (waitingToEmitIter.hasNext()) {
+            List<ConsumerRecord<K, V>> waitingToEmitForTp = 
waitingToEmitIter.next();
+            while (!waitingToEmitForTp.isEmpty()) {
+                final boolean emittedTuple = 
emitOrRetryTuple(waitingToEmitForTp.remove(0));
+                if (emittedTuple) {
+                    break outerLoop;
+                }
             }
+            waitingToEmitIter.remove();
         }
     }
 
@@ -443,7 +448,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         } else {
             final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
             if (committedOffset != null && isOffsetCommittedByThisTopology(tp, 
committedOffset)
-                && committedOffset.offset() > kafkaConsumer.position(tp)) {
+                && committedOffset.offset() > record.offset()) {
                 // Ensures that after a topology with this id is started, the 
consumer fetch
                 // position never falls behind the committed offset 
(STORM-2844)
                 throw new IllegalStateException("Attempting to emit a message 
that has already been committed.");
@@ -522,15 +527,21 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                      * The position is behind the committed offset. This can 
happen in some cases, e.g. if a message failed,
                      * lots of (more than max.poll.records) later messages 
were acked, and the failed message then gets acked. 
                      * The consumer may only be part way through "catching up" 
to where it was when it went back to retry the failed tuple. 
-                     * Skip the consumer forward to the committed offset drop 
the current waiting to emit list,
+                     * Skip the consumer forward to the committed offset and 
drop the current waiting to emit list,
                      * since it'll likely contain committed offsets.
                      */
                     LOG.debug("Consumer fell behind committed offset. Catching 
up. Position was [{}], skipping to [{}]",
                         position, committedOffset);
                     kafkaConsumer.seek(tp, committedOffset);
-                    waitingToEmit = null;
+                    List<ConsumerRecord<K, V>> waitingToEmitForTp = 
waitingToEmit.get(tp);
+                    if (waitingToEmitForTp != null) {
+                        //Discard the pending records that are already 
committed
+                        waitingToEmit.put(tp, waitingToEmitForTp.stream()
+                            .filter(record -> record.offset() >= 
committedOffset)
+                            .collect(Collectors.toList()));
+                    }
                 }
-                
+
                 final OffsetManager offsetManager = 
assignedOffsetManagers.get(tp);
                 offsetManager.commit(tpOffset.getValue());
                 LOG.debug("[{}] uncommitted offsets for partition [{}] after 
commit", offsetManager.getNumUncommittedOffsets(), tp);
@@ -678,20 +689,20 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private String getTopicsString() {
         return kafkaSpoutConfig.getSubscription().getTopicsString();
     }
-    
+
     private static class PollablePartitionsInfo {
 
         private final Set<TopicPartition> pollablePartitions;
         //The subset of earliest retriable offsets that are on pollable 
partitions
         private final Map<TopicPartition, Long> 
pollableEarliestRetriableOffsets;
-        
+
         public PollablePartitionsInfo(Set<TopicPartition> pollablePartitions, 
Map<TopicPartition, Long> earliestRetriableOffsets) {
             this.pollablePartitions = pollablePartitions;
             this.pollableEarliestRetriableOffsets = 
earliestRetriableOffsets.entrySet().stream()
                 .filter(entry -> pollablePartitions.contains(entry.getKey()))
                 .collect(Collectors.toMap(entry -> entry.getKey(), entry -> 
entry.getValue()));
         }
-        
+
         public boolean shouldPoll() {
             return !this.pollablePartitions.isEmpty();
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/df5187e8/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
index 5254320..dd29696 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
@@ -53,7 +53,7 @@ public abstract class KafkaSpoutAbstractTest {
 
     final TopologyContext topologyContext = mock(TopologyContext.class);
     final Map<String, Object> conf = new HashMap<>();
-    final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+    final SpoutOutputCollector collectorMock = 
mock(SpoutOutputCollector.class);
     final long commitOffsetPeriodMs;
 
     KafkaConsumer<String, String> consumerSpy;
@@ -109,7 +109,7 @@ public abstract class KafkaSpoutAbstractTest {
 
     void prepareSpout(int messageCount) throws Exception {
         
SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), 
SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
-        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collector);
+        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collectorMock);
     }
 
     /**
@@ -131,7 +131,7 @@ public abstract class KafkaSpoutAbstractTest {
 
         spout.ack(messageId.getValue());
 
-        reset(collector);
+        reset(collectorMock);
 
         return messageId;
     }
@@ -140,7 +140,7 @@ public abstract class KafkaSpoutAbstractTest {
     ArgumentCaptor<Object> verifyMessageEmitted(int offset) {
         final ArgumentCaptor<Object> messageId = 
ArgumentCaptor.forClass(Object.class);
 
-        verify(collector).emit(
+        verify(collectorMock).emit(
             eq(SingleTopicKafkaSpoutConfiguration.STREAM),
             eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
                 Integer.toString(offset),

http://git-wip-us.apache.org/repos/asf/storm/blob/df5187e8/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
index 33c71bc..1470332 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
@@ -28,13 +28,14 @@ import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Matchers.anyListOf;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.never;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyListOf;
+import static org.mockito.ArgumentMatchers.anyObject;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
 import java.util.HashSet;
 import java.util.List;
@@ -43,9 +44,9 @@ import java.util.Set;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.utils.Time;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 
 import java.util.regex.Pattern;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -81,7 +82,7 @@ public class KafkaSpoutSingleTopicTest extends 
KafkaSpoutAbstractTest {
             spout.nextTuple();
         }
         ArgumentCaptor<KafkaSpoutMessageId> messageIdCaptor = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-        verify(collector, times(messageCount)).emit(anyString(), anyList(), 
messageIdCaptor.capture());
+        verify(collectorMock, times(messageCount)).emit(anyString(), 
anyList(), messageIdCaptor.capture());
         List<KafkaSpoutMessageId> messageIds = messageIdCaptor.getAllValues();
         for (int i = 1; i < messageIds.size(); i++) {
             spout.ack(messageIds.get(i));
@@ -90,16 +91,17 @@ public class KafkaSpoutSingleTopicTest extends 
KafkaSpoutAbstractTest {
         spout.fail(failedTuple);
 
         //Advance the time and replay the failed tuple. 
-        reset(collector);
+        reset(collectorMock);
         spout.nextTuple();
         ArgumentCaptor<KafkaSpoutMessageId> failedIdReplayCaptor = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-        verify(collector).emit(anyString(), anyList(), 
failedIdReplayCaptor.capture());
+        verify(collectorMock).emit(anyString(), anyList(), 
failedIdReplayCaptor.capture());
 
         assertThat("Expected replay of failed tuple", 
failedIdReplayCaptor.getValue(), is(failedTuple));
 
         /* Ack the tuple, and commit.
          * Since the tuple is more than max poll records behind the most 
recent emitted tuple, the consumer won't catch up in this poll.
          */
+        clearInvocations(collectorMock);
         Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + commitOffsetPeriodMs);
         spout.ack(failedIdReplayCaptor.getValue());
         spout.nextTuple();
@@ -110,16 +112,15 @@ public class KafkaSpoutSingleTopicTest extends 
KafkaSpoutAbstractTest {
         assertThat("Should have committed to the right topic", capturedCommit, 
Matchers.hasKey(expectedTp));
         assertThat("Should have committed all the acked messages", 
capturedCommit.get(expectedTp).offset(), is((long)messageCount));
 
-            /* Verify that the following acked (now committed) tuples are not 
emitted again
-             * Since the consumer position was somewhere in the middle of the 
acked tuples when the commit happened,
-             * this verifies that the spout keeps the consumer position ahead 
of the committed offset when committing
-             */
-        reset(collector);
+        /* Verify that the following acked (now committed) tuples are not 
emitted again
+         * Since the consumer position was somewhere in the middle of the 
acked tuples when the commit happened,
+         * this verifies that the spout keeps the consumer position ahead of 
the committed offset when committing
+         */
         //Just do a few polls to check that nothing more is emitted
         for(int i = 0; i < 3; i++) {
             spout.nextTuple();
         }
-        verify(collector, never()).emit(anyString(), anyList(), anyObject());
+        verify(collectorMock, never()).emit(anyString(), anyList(), 
anyObject());
     }
 
     @Test
@@ -130,7 +131,7 @@ public class KafkaSpoutSingleTopicTest extends 
KafkaSpoutAbstractTest {
         //play 1st tuple
         ArgumentCaptor<Object> messageIdToDoubleAck = 
ArgumentCaptor.forClass(Object.class);
         spout.nextTuple();
-        verify(collector).emit(anyString(), anyList(), 
messageIdToDoubleAck.capture());
+        verify(collectorMock).emit(anyString(), anyList(), 
messageIdToDoubleAck.capture());
         spout.ack(messageIdToDoubleAck.getValue());
 
         //Emit some more messages
@@ -147,7 +148,7 @@ public class KafkaSpoutSingleTopicTest extends 
KafkaSpoutAbstractTest {
 
         //Verify that all messages are emitted, ack all the messages
         ArgumentCaptor<Object> messageIds = 
ArgumentCaptor.forClass(Object.class);
-        verify(collector, 
times(messageCount)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+        verify(collectorMock, 
times(messageCount)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM),
             anyList(),
             messageIds.capture());
         for(Object id : messageIds.getAllValues()) {
@@ -170,14 +171,14 @@ public class KafkaSpoutSingleTopicTest extends 
KafkaSpoutAbstractTest {
         for(int i = 0; i < messageCount; i++) {
             spout.nextTuple();
             ArgumentCaptor<Object> messageId = 
ArgumentCaptor.forClass(Object.class);
-            verify(collector).emit(
+            verify(collectorMock).emit(
                 eq(SingleTopicKafkaSpoutConfiguration.STREAM),
                 eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
                     Integer.toString(i),
                     Integer.toString(i))),
                 messageId.capture());
             spout.ack(messageId.getValue());
-            reset(collector);
+            reset(collectorMock);
         }
 
         Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
@@ -195,16 +196,16 @@ public class KafkaSpoutSingleTopicTest extends 
KafkaSpoutAbstractTest {
         //play and ack 1 tuple
         ArgumentCaptor<Object> messageIdAcked = 
ArgumentCaptor.forClass(Object.class);
         spout.nextTuple();
-        verify(collector).emit(anyString(), anyList(), 
messageIdAcked.capture());
+        verify(collectorMock).emit(anyString(), anyList(), 
messageIdAcked.capture());
         spout.ack(messageIdAcked.getValue());
-        reset(collector);
+        reset(collectorMock);
 
         //play and fail 1 tuple
         ArgumentCaptor<Object> messageIdFailed = 
ArgumentCaptor.forClass(Object.class);
         spout.nextTuple();
-        verify(collector).emit(anyString(), anyList(), 
messageIdFailed.capture());
+        verify(collectorMock).emit(anyString(), anyList(), 
messageIdFailed.capture());
         spout.fail(messageIdFailed.getValue());
-        reset(collector);
+        reset(collectorMock);
 
         //Emit all remaining messages. Failed tuples retry immediately with 
current configuration, so no need to wait.
         for(int i = 0; i < messageCount; i++) {
@@ -213,7 +214,7 @@ public class KafkaSpoutSingleTopicTest extends 
KafkaSpoutAbstractTest {
 
         ArgumentCaptor<Object> remainingMessageIds = 
ArgumentCaptor.forClass(Object.class);
         //All messages except the first acked message should have been emitted
-        verify(collector, times(messageCount - 1)).emit(
+        verify(collectorMock, times(messageCount - 1)).emit(
             eq(SingleTopicKafkaSpoutConfiguration.STREAM),
             anyList(),
             remainingMessageIds.capture());
@@ -236,14 +237,14 @@ public class KafkaSpoutSingleTopicTest extends 
KafkaSpoutAbstractTest {
         //play 1st tuple
         ArgumentCaptor<Object> messageIdToFail = 
ArgumentCaptor.forClass(Object.class);
         spout.nextTuple();
-        verify(collector).emit(anyString(), anyList(), 
messageIdToFail.capture());
-        reset(collector);
+        verify(collectorMock).emit(anyString(), anyList(), 
messageIdToFail.capture());
+        reset(collectorMock);
 
         //play 2nd tuple
         ArgumentCaptor<Object> messageIdToAck = 
ArgumentCaptor.forClass(Object.class);
         spout.nextTuple();
-        verify(collector).emit(anyString(), anyList(), 
messageIdToAck.capture());
-        reset(collector);
+        verify(collectorMock).emit(anyString(), anyList(), 
messageIdToAck.capture());
+        reset(collectorMock);
 
         //ack 2nd tuple
         spout.ack(messageIdToAck.getValue());
@@ -257,7 +258,7 @@ public class KafkaSpoutSingleTopicTest extends 
KafkaSpoutAbstractTest {
 
         ArgumentCaptor<Object> remainingIds = 
ArgumentCaptor.forClass(Object.class);
         //All messages except the first acked message should have been emitted
-        verify(collector, times(messageCount - 1)).emit(
+        verify(collectorMock, times(messageCount - 1)).emit(
             eq(SingleTopicKafkaSpoutConfiguration.STREAM),
             anyList(),
             remainingIds.capture());
@@ -284,8 +285,8 @@ public class KafkaSpoutSingleTopicTest extends 
KafkaSpoutAbstractTest {
             spout.nextTuple();
         }
         ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-        verify(collector, times(messageCount)).emit(anyString(), anyList(), 
messageIds.capture());
-        reset(collector);
+        verify(collectorMock, times(messageCount)).emit(anyString(), 
anyList(), messageIds.capture());
+        reset(collectorMock);
         //Fail tuple 5 and 3, call nextTuple, then fail tuple 2
         List<KafkaSpoutMessageId> capturedMessageIds = 
messageIds.getAllValues();
         spout.fail(capturedMessageIds.get(5));
@@ -298,7 +299,7 @@ public class KafkaSpoutSingleTopicTest extends 
KafkaSpoutAbstractTest {
         for (int i = 0; i < messageCount; i++) {
             spout.nextTuple();
         }
-        verify(collector, times(3)).emit(anyString(), anyList(), 
reemittedMessageIds.capture());
+        verify(collectorMock, times(3)).emit(anyString(), anyList(), 
reemittedMessageIds.capture());
         Set<KafkaSpoutMessageId> expectedReemitIds = new HashSet<>();
         expectedReemitIds.add(capturedMessageIds.get(5));
         expectedReemitIds.add(capturedMessageIds.get(3));
@@ -316,31 +317,31 @@ public class KafkaSpoutSingleTopicTest extends 
KafkaSpoutAbstractTest {
         for (int i = 0; i <= maxRetries; i++) {
             ArgumentCaptor<KafkaSpoutMessageId> messageIdFailed = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
             spout.nextTuple();
-            verify(collector).emit(anyString(), anyListOf(Object.class), 
messageIdFailed.capture());
+            verify(collectorMock).emit(anyString(), anyListOf(Object.class), 
messageIdFailed.capture());
             KafkaSpoutMessageId msgId = messageIdFailed.getValue();
             spout.fail(msgId);
             assertThat("Expected message id number of failures to match the 
number of times the message has failed", msgId.numFails(), is(i + 1));
-            reset(collector);
+            reset(collectorMock);
         }
 
         //Verify that the tuple is not emitted again
         spout.nextTuple();
-        verify(collector, never()).emit(anyString(), anyListOf(Object.class), 
anyObject());
+        verify(collectorMock, never()).emit(anyString(), 
anyListOf(Object.class), anyObject());
     }
 
     @Test
     public void testSpoutMustRefreshPartitionsEvenIfNotPolling() throws 
Exception {
-        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collector);
+        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collectorMock);
 
         //Nothing is assigned yet, should emit nothing
         spout.nextTuple();
-        verify(collector, never()).emit(anyString(), anyList(), 
any(KafkaSpoutMessageId.class));
+        verify(collectorMock, never()).emit(anyString(), anyList(), 
any(KafkaSpoutMessageId.class));
 
         
SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), 
SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
         Time.advanceTime(KafkaSpoutConfig.DEFAULT_PARTITION_REFRESH_PERIOD_MS 
+ KafkaSpout.TIMER_DELAY_MS);
 
         //The new partition should be discovered and the message should be 
emitted
         spout.nextTuple();
-        verify(collector).emit(anyString(), anyList(), 
any(KafkaSpoutMessageId.class));
+        verify(collectorMock).emit(anyString(), anyList(), 
any(KafkaSpoutMessageId.class));
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/df5187e8/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
index a96e284..a9d7c75 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
@@ -79,7 +79,7 @@ public class KafkaSpoutTopologyDeployActivateDeactivateTest 
extends KafkaSpoutAb
         // Restart topology with the same topology id, which mimics the 
behavior of partition reassignment
         setUp();
         // Initialize spout using the same populated data (i.e same 
kafkaUnitRule)
-        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collector);
+        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collectorMock);
 
         nextTuple_verifyEmitted_ack_resetCollector(1);
 
@@ -104,7 +104,7 @@ public class KafkaSpoutTopologyDeployActivateDeactivateTest 
extends KafkaSpoutAb
         setUp();
         when(topologyContext.getStormId()).thenReturn("topology-2");
         // Initialize spout using the same populated data (i.e same 
kafkaUnitRule)
-        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collector);
+        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collectorMock);
 
         //Emit all messages and check that they are emitted. Ack the messages 
too
         for (int i = 0; i < messageCount; i++) {

http://git-wip-us.apache.org/repos/asf/storm/blob/df5187e8/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
deleted file mode 100644
index cdbe05d..0000000
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-package org.apache.storm.kafka.spout;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.IntStream;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.KafkaUnitRule;
-import 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Time;
-import org.apache.storm.utils.Time.SimulatedTime;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.MockitoAnnotations;
-
-import java.util.regex.Pattern;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.hamcrest.Matchers;
-
-public class SingleTopicKafkaSpoutTest {
-
-    @Rule
-    public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
-
-    @Captor
-    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> 
commitCapture;
-
-    private final TopologyContext topologyContext = 
mock(TopologyContext.class);
-    private final Map<String, Object> conf = new HashMap<>();
-    private final SpoutOutputCollector collector = 
mock(SpoutOutputCollector.class);
-    private final long commitOffsetPeriodMs = 2_000;
-    private final int maxRetries = 3;
-    private KafkaConsumer<String, String> consumerSpy;
-    private KafkaSpout<String, String> spout;
-    private final int maxPollRecords = 10;
-
-    @Before
-    public void setUp() {
-        MockitoAnnotations.initMocks(this);
-        KafkaSpoutConfig<String, String> spoutConfig =
-            SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(
-                KafkaSpoutConfig.builder("127.0.0.1:" + 
kafkaUnitRule.getKafkaUnit().getKafkaPort(),
-                    Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC)))
-                .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
-                .setRetry(new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
-                    maxRetries, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
-                .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
maxPollRecords)
-                .build();
-        this.consumerSpy = spy(new KafkaConsumerFactoryDefault<String, 
String>().createConsumer(spoutConfig));
-        this.spout = new KafkaSpout<>(spoutConfig, (ignored) -> consumerSpy);
-    }
-
-    private void prepareSpout(int messageCount) throws Exception {
-        
SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), 
SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
-        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collector);
-    }
-
-    @Test
-    public void 
testSeekToCommittedOffsetIfConsumerPositionIsBehindWhenCommitting() throws 
Exception {
-        try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            int messageCount = maxPollRecords * 2;
-            prepareSpout(messageCount);
-
-            //Emit all messages and fail the first one while acking the rest
-            for (int i = 0; i < messageCount; i++) {
-                spout.nextTuple();
-            }
-            ArgumentCaptor<KafkaSpoutMessageId> messageIdCaptor = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-            verify(collector, times(messageCount)).emit(anyString(), 
anyList(), messageIdCaptor.capture());
-            List<KafkaSpoutMessageId> messageIds = 
messageIdCaptor.getAllValues();
-            for (int i = 1; i < messageIds.size(); i++) {
-                spout.ack(messageIds.get(i));
-            }
-            KafkaSpoutMessageId failedTuple = messageIds.get(0);
-            spout.fail(failedTuple);
-
-            //Advance the time and replay the failed tuple. 
-            reset(collector);
-            spout.nextTuple();
-            ArgumentCaptor<KafkaSpoutMessageId> failedIdReplayCaptor = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-            verify(collector).emit(anyString(), anyList(), 
failedIdReplayCaptor.capture());
-
-            assertThat("Expected replay of failed tuple", 
failedIdReplayCaptor.getValue(), is(failedTuple));
-
-            /* Ack the tuple, and commit.
-             * Since the tuple is more than max poll records behind the most 
recent emitted tuple, the consumer won't catch up in this poll.
-             */
-            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + commitOffsetPeriodMs);
-            spout.ack(failedIdReplayCaptor.getValue());
-            spout.nextTuple();
-            verify(consumerSpy).commitSync(commitCapture.capture());
-            
-            Map<TopicPartition, OffsetAndMetadata> capturedCommit = 
commitCapture.getValue();
-            TopicPartition expectedTp = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
-            assertThat("Should have committed to the right topic", 
capturedCommit, Matchers.hasKey(expectedTp));
-            assertThat("Should have committed all the acked messages", 
capturedCommit.get(expectedTp).offset(), is((long)messageCount));
-
-            /* Verify that the following acked (now committed) tuples are not 
emitted again
-             * Since the consumer position was somewhere in the middle of the 
acked tuples when the commit happened,
-             * this verifies that the spout keeps the consumer position ahead 
of the committed offset when committing
-             */
-            reset(collector);
-            //Just do a few polls to check that nothing more is emitted
-            for(int i = 0; i < 3; i++) {
-                spout.nextTuple();
-            }
-            verify(collector, never()).emit(any(), any(), any());
-        }
-    }
-
-    @Test
-    public void testShouldContinueWithSlowDoubleAcks() throws Exception {
-        try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            int messageCount = 20;
-            prepareSpout(messageCount);
-
-            //play 1st tuple
-            ArgumentCaptor<Object> messageIdToDoubleAck = 
ArgumentCaptor.forClass(Object.class);
-            spout.nextTuple();
-            verify(collector).emit(anyString(), anyList(), 
messageIdToDoubleAck.capture());
-            spout.ack(messageIdToDoubleAck.getValue());
-
-            //Emit some more messages
-            IntStream.range(0, messageCount / 2).forEach(value -> {
-                spout.nextTuple();
-            });
-
-            spout.ack(messageIdToDoubleAck.getValue());
-
-            //Emit any remaining messages
-            IntStream.range(0, messageCount).forEach(value -> {
-                spout.nextTuple();
-            });
-
-            //Verify that all messages are emitted, ack all the messages
-            ArgumentCaptor<Object> messageIds = 
ArgumentCaptor.forClass(Object.class);
-            verify(collector, 
times(messageCount)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM),
-                anyList(),
-                messageIds.capture());
-            messageIds.getAllValues().iterator().forEachRemaining(spout::ack);
-
-            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
-            //Commit offsets
-            spout.nextTuple();
-
-            
SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(consumerSpy, 
commitCapture, messageCount);
-        }
-    }
-
-    @Test
-    public void testShouldEmitAllMessages() throws Exception {
-        try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            int messageCount = 10;
-            prepareSpout(messageCount);
-
-            //Emit all messages and check that they are emitted. Ack the 
messages too
-            IntStream.range(0, messageCount).forEach(value -> {
-                spout.nextTuple();
-                ArgumentCaptor<Object> messageId = 
ArgumentCaptor.forClass(Object.class);
-                verify(collector).emit(
-                    eq(SingleTopicKafkaSpoutConfiguration.STREAM),
-                    eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
-                        Integer.toString(value),
-                        Integer.toString(value))),
-                    messageId.capture());
-                spout.ack(messageId.getValue());
-                reset(collector);
-            });
-
-            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
-            //Commit offsets
-            spout.nextTuple();
-
-            
SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(consumerSpy, 
commitCapture, messageCount);
-        }
-    }
-
-    @Test
-    public void testShouldReplayInOrderFailedMessages() throws Exception {
-        try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            int messageCount = 10;
-            prepareSpout(messageCount);
-
-            //play and ack 1 tuple
-            ArgumentCaptor<Object> messageIdAcked = 
ArgumentCaptor.forClass(Object.class);
-            spout.nextTuple();
-            verify(collector).emit(anyString(), anyList(), 
messageIdAcked.capture());
-            spout.ack(messageIdAcked.getValue());
-            reset(collector);
-
-            //play and fail 1 tuple
-            ArgumentCaptor<Object> messageIdFailed = 
ArgumentCaptor.forClass(Object.class);
-            spout.nextTuple();
-            verify(collector).emit(anyString(), anyList(), 
messageIdFailed.capture());
-            spout.fail(messageIdFailed.getValue());
-            reset(collector);
-
-            //Emit all remaining messages. Failed tuples retry immediately 
with current configuration, so no need to wait.
-            IntStream.range(0, messageCount).forEach(value -> {
-                spout.nextTuple();
-            });
-
-            ArgumentCaptor<Object> remainingMessageIds = 
ArgumentCaptor.forClass(Object.class);
-            //All messages except the first acked message should have been 
emitted
-            verify(collector, times(messageCount - 1)).emit(
-                eq(SingleTopicKafkaSpoutConfiguration.STREAM),
-                anyList(),
-                remainingMessageIds.capture());
-            
remainingMessageIds.getAllValues().iterator().forEachRemaining(spout::ack);
-
-            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
-            //Commit offsets
-            spout.nextTuple();
-
-            
SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(consumerSpy, 
commitCapture, messageCount);
-        }
-    }
-
-    @Test
-    public void testShouldReplayFirstTupleFailedOutOfOrder() throws Exception {
-        try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            int messageCount = 10;
-            prepareSpout(messageCount);
-
-            //play 1st tuple
-            ArgumentCaptor<Object> messageIdToFail = 
ArgumentCaptor.forClass(Object.class);
-            spout.nextTuple();
-            verify(collector).emit(anyString(), anyList(), 
messageIdToFail.capture());
-            reset(collector);
-
-            //play 2nd tuple
-            ArgumentCaptor<Object> messageIdToAck = 
ArgumentCaptor.forClass(Object.class);
-            spout.nextTuple();
-            verify(collector).emit(anyString(), anyList(), 
messageIdToAck.capture());
-            reset(collector);
-
-            //ack 2nd tuple
-            spout.ack(messageIdToAck.getValue());
-            //fail 1st tuple
-            spout.fail(messageIdToFail.getValue());
-
-            //Emit all remaining messages. Failed tuples retry immediately 
with current configuration, so no need to wait.
-            IntStream.range(0, messageCount).forEach(value -> {
-                spout.nextTuple();
-            });
-
-            ArgumentCaptor<Object> remainingIds = 
ArgumentCaptor.forClass(Object.class);
-            //All messages except the first acked message should have been 
emitted
-            verify(collector, times(messageCount - 1)).emit(
-                eq(SingleTopicKafkaSpoutConfiguration.STREAM),
-                anyList(),
-                remainingIds.capture());
-            
remainingIds.getAllValues().iterator().forEachRemaining(spout::ack);
-
-            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
-            //Commit offsets
-            spout.nextTuple();
-
-            
SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(consumerSpy, 
commitCapture, messageCount);
-        }
-    }
-
-    @Test
-    public void testShouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws 
Exception {
-        //The spout must reemit retriable tuples, even if they fail out of 
order.
-        //The spout should be able to skip tuples it has already emitted when 
retrying messages, even if those tuples are also retries.
-        int messageCount = 10;
-        prepareSpout(messageCount);
-
-        //play all tuples
-        for (int i = 0; i < messageCount; i++) {
-            spout.nextTuple();
-        }
-        ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-        verify(collector, times(messageCount)).emit(anyString(), anyList(), 
messageIds.capture());
-        reset(collector);
-        //Fail tuple 5 and 3, call nextTuple, then fail tuple 2
-        List<KafkaSpoutMessageId> capturedMessageIds = 
messageIds.getAllValues();
-        spout.fail(capturedMessageIds.get(5));
-        spout.fail(capturedMessageIds.get(3));
-        spout.nextTuple();
-        spout.fail(capturedMessageIds.get(2));
-
-        //Check that the spout will reemit all 3 failed tuples and no other 
tuples
-        ArgumentCaptor<KafkaSpoutMessageId> reemittedMessageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-        for (int i = 0; i < messageCount; i++) {
-            spout.nextTuple();
-        }
-        verify(collector, times(3)).emit(anyString(), anyList(), 
reemittedMessageIds.capture());
-        Set<KafkaSpoutMessageId> expectedReemitIds = new HashSet<>();
-        expectedReemitIds.add(capturedMessageIds.get(5));
-        expectedReemitIds.add(capturedMessageIds.get(3));
-        expectedReemitIds.add(capturedMessageIds.get(2));
-        assertThat("Expected reemits to be the 3 failed tuples", new 
HashSet<>(reemittedMessageIds.getAllValues()), is(expectedReemitIds));
-    }
-
-    @Test
-    public void testShouldDropMessagesAfterMaxRetriesAreReached() throws 
Exception {
-        //Check that if one message fails repeatedly, the retry cap limits how 
many times the message can be reemitted
-        int messageCount = 1;
-        prepareSpout(messageCount);
-
-        //Emit and fail the same tuple until we've reached retry limit
-        for (int i = 0; i <= maxRetries; i++) {
-            ArgumentCaptor<KafkaSpoutMessageId> messageIdFailed = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-            spout.nextTuple();
-            verify(collector).emit(anyString(), anyList(), 
messageIdFailed.capture());
-            KafkaSpoutMessageId msgId = messageIdFailed.getValue();
-            spout.fail(msgId);
-            assertThat("Expected message id number of failures to match the 
number of times the message has failed", msgId.numFails(), is(i + 1));
-            reset(collector);
-        }
-
-        //Verify that the tuple is not emitted again
-        spout.nextTuple();
-        verify(collector, never()).emit(any(), any(), any());
-    }
-
-    @Test
-    public void testSpoutMustRefreshPartitionsEvenIfNotPolling() throws 
Exception {
-        try (SimulatedTime time = new SimulatedTime()) {
-            SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collector);
-
-            //Nothing is assigned yet, should emit nothing
-            spout.nextTuple();
-            verify(collector, never()).emit(anyString(), anyList(), 
any(KafkaSpoutMessageId.class));
-
-            
SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), 
SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
-            
Time.advanceTime(KafkaSpoutConfig.DEFAULT_PARTITION_REFRESH_PERIOD_MS + 
KafkaSpout.TIMER_DELAY_MS);
-
-            //The new partition should be discovered and the message should be 
emitted
-            spout.nextTuple();
-            verify(collector).emit(anyString(), anyList(), 
any(KafkaSpoutMessageId.class));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/df5187e8/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
index 3690aef..eba9625 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
@@ -45,7 +45,7 @@ public class SingleTopicKafkaUnitSetupHelper {
      */
     public static void populateTopicData(KafkaUnit kafkaUnit, String 
topicName, int msgCount) throws Exception {
         kafkaUnit.createTopic(topicName);
-
+        
         for (int i = 0; i < msgCount; i++) {
             ProducerRecord<String, String> producerRecord = new 
ProducerRecord<>(
                 topicName, Integer.toString(i),
@@ -53,7 +53,7 @@ public class SingleTopicKafkaUnitSetupHelper {
             kafkaUnit.sendMessage(producerRecord);
         }
     }
-    
+
     /*
      * Asserts that commitSync has been called once, 
      * that there are only commits on one topic,

Reply via email to