Repository: storm
Updated Branches:
  refs/heads/master 445f43939 -> 0ae5068a2


STORM-2549: Fix broken enforcement mechanism for maxUncommittedOffsets in 
storm-kafka-client spout


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cca93d2a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cca93d2a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cca93d2a

Branch: refs/heads/master
Commit: cca93d2a1ad503e5d2630fb4d1a88778df359c35
Parents: 24ab227
Author: Stig Rohde Døssing <[email protected]>
Authored: Sat Jun 10 19:38:22 2017 +0200
Committer: Stig Rohde Døssing <[email protected]>
Committed: Sun Oct 29 20:30:11 2017 +0100

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 108 ++++++++-------
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  11 +-
 .../kafka/spout/internal/OffsetManager.java     |  22 +++
 .../storm/kafka/spout/KafkaSpoutCommitTest.java |  12 +-
 .../storm/kafka/spout/KafkaSpoutEmitTest.java   | 137 +++++++------------
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |  14 +-
 .../kafka/spout/KafkaSpoutRetryLimitTest.java   |  11 +-
 .../kafka/spout/MaxUncommittedOffsetTest.java   |  32 +++--
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |  51 +++++--
 .../SpoutWithMockedConsumerSetupHelper.java     |  33 +++--
 .../SingleTopicKafkaSpoutConfiguration.java     |   2 +-
 .../kafka/spout/internal/OffsetManagerTest.java |  15 ++
 12 files changed, 252 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/cca93d2a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index b181c43..3364fb0 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -91,9 +91,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     // Always empty if processing guarantee is none or at-most-once
     private transient Set<KafkaSpoutMessageId> emitted;
     // Records that have been polled and are queued to be emitted in the 
nextTuple() call. One record is emitted per nextTuple()
-    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;
-    // Number of offsets that have been polled and emitted but not yet been 
committed. Not used if auto commit mode is enabled.
-    private transient long numUncommittedOffsets;
+    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;            
             
     // Triggers when a subscription should be refreshed
     private transient Timer refreshSubscriptionTimer;
     private transient TopologyContext context;
@@ -115,7 +113,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         // Spout internals
         this.collector = collector;
-        numUncommittedOffsets = 0;
 
         // Offset management
         firstPollOffsetStrategy = 
kafkaSpoutConfig.getFirstPollOffsetStrategy();
@@ -227,13 +224,19 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     public void nextTuple() {
         try {
             if (initialized) {             
+             
+                if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
+                    kafkaSpoutConfig.getSubscription().refreshAssignment();
+                }
+
                 if (commit()) {
                     commitOffsetsForAckedTuples();
                 }
 
-                if (poll()) {
+                Set<TopicPartition> pollablePartitions = poll();
+                if (!pollablePartitions.isEmpty()) {
                     try {
-                        setWaitingToEmit(pollKafkaBroker());
+                        setWaitingToEmit(pollKafkaBroker(pollablePartitions));
                     } catch (RetriableException e) {
                         LOG.error("Failed to poll from kafka.", e);
                     }
@@ -260,27 +263,38 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         return isAtLeastOnceProcessing() && 
commitTimer.isExpiredResetOnTrue();    // timer != null for non auto commit mode
     }
 
-    private boolean poll() {
+    private Set<TopicPartition> poll() {
         final int maxUncommittedOffsets = 
kafkaSpoutConfig.getMaxUncommittedOffsets();
-        final int readyMessageCount = retryService.readyMessageCount();
-        final boolean poll = !waitingToEmit()
-            // Check that the number of uncommitted, non-retriable tuples is 
less than the maxUncommittedOffsets limit.
-            // Accounting for retriable tuples in this way still guarantees 
that the limit is followed on a per partition basis,
-            // and prevents locking up the spout when there are too many 
retriable tuples
-            && (numUncommittedOffsets - readyMessageCount < 
maxUncommittedOffsets || !isAtLeastOnceProcessing());
-
-        if (!poll) {
-            if (waitingToEmit()) {
-                LOG.debug("Not polling. Tuples waiting to be emitted."
-                    + " [{}] uncommitted offsets across all topic partitions", 
numUncommittedOffsets);
-            }
-
-            if (numUncommittedOffsets >= maxUncommittedOffsets && 
isAtLeastOnceProcessing()) {
-                LOG.debug("Not polling. [{}] uncommitted offsets across all 
topic partitions has reached the threshold of [{}]",
-                    numUncommittedOffsets, maxUncommittedOffsets);
+        
+        if (waitingToEmit()) {
+            LOG.debug("Not polling. Tuples waiting to be emitted.");
+            return Collections.emptySet();
+        }
+        Set<TopicPartition> assignment = kafkaConsumer.assignment();
+        if (!isAtLeastOnceProcessing()) {
+            return assignment;
+        }
+        Map<TopicPartition, Long> earliestRetriableOffsets = 
retryService.earliestRetriableOffsets();
+        Set<TopicPartition> pollablePartitions = new HashSet<>();
+        for (TopicPartition tp : assignment) {
+            OffsetManager offsetManager = offsetManagers.get(tp);
+            int numUncommittedOffsets = 
offsetManager.getNumUncommittedOffsets();
+            if (numUncommittedOffsets < maxUncommittedOffsets) {
+                //Allow poll if the partition is not at the 
maxUncommittedOffsets limit
+                pollablePartitions.add(tp);
+            } else {
+                long offsetAtLimit = 
offsetManager.getNthUncommittedOffsetAfterCommittedOffset(maxUncommittedOffsets);
+                Long earliestRetriableOffset = 
earliestRetriableOffsets.get(tp);
+                if (earliestRetriableOffset != null && earliestRetriableOffset 
<= offsetAtLimit) {
+                    //Allow poll if there are retriable tuples within the 
maxUncommittedOffsets limit
+                    pollablePartitions.add(tp);
+                } else {
+                    LOG.debug("Not polling on partition [{}]. It has [{}] 
uncommitted offsets, which exceeds the limit of [{}]. ", tp,
+                        numUncommittedOffsets, maxUncommittedOffsets);
+                }
             }
         }
-        return poll;
+        return pollablePartitions;
     }
 
     private boolean waitingToEmit() {
@@ -296,28 +310,34 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     // ======== poll =========
-    private ConsumerRecords<K, V> pollKafkaBroker() {
-        doSeekRetriableTopicPartitions();
-        if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
-            kafkaSpoutConfig.getSubscription().refreshAssignment();
-        }
-        final ConsumerRecords<K, V> consumerRecords = 
kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
-        final int numPolledRecords = consumerRecords.count();
-        LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets 
across all topic partitions",
-            numPolledRecords, numUncommittedOffsets);
-        if (kafkaSpoutConfig.getProcessingGuarantee() == 
KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
-            //Commit polled records immediately to ensure delivery is 
at-most-once.
-            kafkaConsumer.commitSync();
+    private ConsumerRecords<K, V> pollKafkaBroker(Set<TopicPartition> 
pollablePartitions) {
+        doSeekRetriableTopicPartitions(pollablePartitions);
+        Set<TopicPartition> pausedPartitions = new 
HashSet<>(kafkaConsumer.assignment());
+        pausedPartitions.removeIf(pollablePartitions::contains);
+        try {
+            kafkaConsumer.pause(pausedPartitions);
+            final ConsumerRecords<K, V> consumerRecords = 
kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
+            final int numPolledRecords = consumerRecords.count();
+            LOG.debug("Polled [{}] records from Kafka.",
+                numPolledRecords);
+            if (kafkaSpoutConfig.getProcessingGuarantee() == 
KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
+                //Commit polled records immediately to ensure delivery is 
at-most-once.
+                kafkaConsumer.commitSync();
+            }
+            return consumerRecords;
+        } finally {
+            kafkaConsumer.resume(pausedPartitions);
         }
-        return consumerRecords;
     }
 
-    private void doSeekRetriableTopicPartitions() {
+    private void doSeekRetriableTopicPartitions(Set<TopicPartition> 
pollablePartitions) {
         final Map<TopicPartition, Long> retriableTopicPartitions = 
retryService.earliestRetriableOffsets();
 
         for (Entry<TopicPartition, Long> retriableTopicPartitionAndOffset : 
retriableTopicPartitions.entrySet()) {
-            //Seek directly to the earliest retriable message for each 
retriable topic partition
-            kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), 
retriableTopicPartitionAndOffset.getValue());
+            if 
(pollablePartitions.contains(retriableTopicPartitionAndOffset.getKey())) {
+                //Seek directly to the earliest retriable message for each 
retriable topic partition
+                kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), 
retriableTopicPartitionAndOffset.getValue());
+            }
         }
     }
 
@@ -367,8 +387,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                         offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
                         if (isScheduled) {  // Was scheduled for retry and 
re-emitted, so remove from schedule.
                             retryService.remove(msgId);
-                        } else {            //New tuple, hence increment the 
uncommitted offset counter
-                            numUncommittedOffsets++;
                         }
                         collector.emit(stream, tuple, msgId);
                         tupleListener.onEmit(tuple, msgId);
@@ -429,10 +447,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 
                 
                 final OffsetManager offsetManager = offsetManagers.get(tp);
-                long numCommittedOffsets = 
offsetManager.commit(tpOffset.getValue());
-                numUncommittedOffsets -= numCommittedOffsets;
-                LOG.debug("[{}] uncommitted offsets across all topic 
partitions",
-                    numUncommittedOffsets);
+                offsetManager.commit(tpOffset.getValue());
+                LOG.debug("[{}] uncommitted offsets for partition [{}] after 
commit", offsetManager.getNumUncommittedOffsets(), tp);
             }
         } else {
             LOG.trace("No offsets to commit. {}", this);

http://git-wip-us.apache.org/repos/asf/storm/blob/cca93d2a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 6a693fe..d5fceb4 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -253,11 +253,12 @@ public class KafkaSpoutConfig<K, V> implements 
Serializable {
         }
 
         /**
-         * Defines the max number of polled offsets (records) that can be 
pending commit, before another poll can take place. Once this
-         * limit is reached, no more offsets (records) can be polled until the 
next successful commit(s) sets the number of pending offsets
-         * below the threshold. The default is {@link 
#DEFAULT_MAX_UNCOMMITTED_OFFSETS}. Note that this limit can in some cases be 
exceeded,
-         * but no partition will exceed this limit by more than maxPollRecords 
- 1.
-         *
+         * Defines the max number of polled offsets (records) that can be 
pending commit, before another poll can take place.
+         * Once this limit is reached, no more offsets (records) can be polled 
until the next successful commit(s) sets the number
+         * of pending offsets below the threshold. The default is {@link 
#DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
+         * This limit is per partition and may in some cases be exceeded,
+         * but each partition cannot exceed this limit by more than 
maxPollRecords - 1.
+         * 
          * <p>This setting only has an effect if the configured {@link 
ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}.
          *
          * @param maxUncommittedOffsets max number of records that can be be 
pending commit

http://git-wip-us.apache.org/repos/asf/storm/blob/cca93d2a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index 075c4dd..e7711b0 100755
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.NavigableSet;
+import java.util.NoSuchElementException;
 import java.util.TreeSet;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -65,6 +66,27 @@ public class OffsetManager {
     public void addToEmitMsgs(long offset) {
         this.emittedOffsets.add(offset);                  // O(Log N)
     }
+    
+    public int getNumUncommittedOffsets() {
+        return this.emittedOffsets.size();
+    }
+    
+    /**
+     * Gets the offset of the nth emitted message after the committed offset. 
+     * Example: If the committed offset is 0 and offsets 1, 2, 8, 10 have been 
emitted,
+     * getNthUncommittedOffsetAfterCommittedOffset(3) returns 8.
+     * 
+     * @param index The index of the message to get the offset for
+     * @return The offset
+     * @throws NoSuchElementException if the index is out of range
+     */
+    public long getNthUncommittedOffsetAfterCommittedOffset(int index) {
+        Iterator<Long> offsetIter = emittedOffsets.iterator();
+        for (int i = 0; i < index - 1; i++) {
+            offsetIter.next();
+        }
+        return offsetIter.next();
+    }
 
     /**
      * An offset can only be committed when all emitted records with lower 
offset have been

http://git-wip-us.apache.org/repos/asf/storm/blob/cca93d2a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
index 0714d37..441e649 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
@@ -75,16 +76,13 @@ public class KafkaSpoutCommitTest {
     public void testCommitSuccessWithOffsetVoids() {
         //Verify that the commit logic can handle offset voids
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
+            Set<TopicPartition> assignedPartitions = 
Collections.singleton(partition);
+            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, assignedPartitions);
             Map<TopicPartition, List<ConsumerRecord<String, String>>> records 
= new HashMap<>();
             List<ConsumerRecord<String, String>> recordsForPartition = new 
ArrayList<>();
             // Offsets emitted are 0,1,2,3,4,<void>,8,9
-            for (int i = 0; i < 5; i++) {
-                recordsForPartition.add(new 
ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
-            }
-            for (int i = 8; i < 10; i++) {
-                recordsForPartition.add(new 
ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
-            }
+            
recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition,
 0, 5));
+            
recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition,
 8, 2));
             records.put(partition, recordsForPartition);
 
             when(consumerMock.poll(anyLong()))

http://git-wip-us.apache.org/repos/asf/storm/blob/cca93d2a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
index 74341da..7cfd6b7 100755
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
@@ -26,8 +26,11 @@ import static org.mockito.Mockito.when;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -73,11 +76,7 @@ public class KafkaSpoutEmitTest {
         //This is necessary for Storm to be able to throttle the spout 
according to maxSpoutPending
         KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
         Map<TopicPartition, List<ConsumerRecord<String, String>>> records = 
new HashMap<>();
-        List<ConsumerRecord<String, String>> recordsForPartition = new 
ArrayList<>();
-        for (int i = 0; i < 10; i++) {
-            recordsForPartition.add(new ConsumerRecord<>(partition.topic(), 
partition.partition(), i, "key", "value"));
-        }
-        records.put(partition, recordsForPartition);
+        records.put(partition, 
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 10));
 
         when(consumerMock.poll(anyLong()))
             .thenReturn(new ConsumerRecords<>(records));
@@ -90,27 +89,24 @@ public class KafkaSpoutEmitTest {
     @Test
     public void 
testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExceeded() {
         //The spout must reemit failed messages waiting for retry even if it 
is not allowed to poll for new messages due to maxUncommittedOffsets being 
exceeded
-        
+
         //Emit maxUncommittedOffsets messages, and fail all of them. Then 
ensure that the spout will retry them when the retry backoff has passed
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
             KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
             Map<TopicPartition, List<ConsumerRecord<String, String>>> records 
= new HashMap<>();
-            List<ConsumerRecord<String, String>> recordsForPartition = new 
ArrayList<>();
-            for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) {
-                //This is cheating a bit since maxPollRecords would normally 
spread this across multiple polls
-                recordsForPartition.add(new 
ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
-            }
-            records.put(partition, recordsForPartition);
+            int numRecords = spoutConfig.getMaxUncommittedOffsets();
+            //This is cheating a bit since maxPollRecords would normally 
spread this across multiple polls
+            records.put(partition, 
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, numRecords));
 
             when(consumerMock.poll(anyLong()))
                 .thenReturn(new ConsumerRecords<>(records));
 
-            for (int i = 0; i < recordsForPartition.size(); i++) {
+            for (int i = 0; i < numRecords; i++) {
                 spout.nextTuple();
             }
 
             ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-            verify(collectorMock, 
times(recordsForPartition.size())).emit(anyString(), anyList(), 
messageIds.capture());
+            verify(collectorMock, times(numRecords)).emit(anyString(), 
anyList(), messageIds.capture());
 
             for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
                 spout.fail(messageId);
@@ -120,16 +116,16 @@ public class KafkaSpoutEmitTest {
 
             Time.advanceTime(50);
             //No backoff for test retry service, just check that messages will 
retry immediately
-            for (int i = 0; i < recordsForPartition.size(); i++) {
+            for (int i = 0; i < numRecords; i++) {
                 spout.nextTuple();
             }
 
             ArgumentCaptor<KafkaSpoutMessageId> retryMessageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-            verify(collectorMock, 
times(recordsForPartition.size())).emit(anyString(), anyList(), 
retryMessageIds.capture());
+            verify(collectorMock, times(numRecords)).emit(anyString(), 
anyList(), retryMessageIds.capture());
 
             //Verify that the poll started at the earliest retriable tuple 
offset
             List<Long> failedOffsets = new ArrayList<>();
-            for(KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
+            for (KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
                 failedOffsets.add(msgId.offset());
             }
             InOrder inOrder = inOrder(consumerMock);
@@ -137,93 +133,60 @@ public class KafkaSpoutEmitTest {
             inOrder.verify(consumerMock).poll(anyLong());
         }
     }
-    
+
     @Test
-    public void 
testNextTupleEmitsAtMostMaxUncommittedOffsetsPlusMaxPollRecordsWhenRetryingTuples()
 {
-        /*
-        The spout must reemit failed messages waiting for retry even if it is 
not allowed to poll for new messages due to maxUncommittedOffsets being 
exceeded.
-        numUncommittedOffsets is equal to numNonRetriableEmittedTuples + 
numRetriableTuples.
-        The spout will only emit if numUncommittedOffsets - numRetriableTuples 
< maxUncommittedOffsets (i.e. numNonRetriableEmittedTuples < 
maxUncommittedOffsets)
-        This means that the latest offset a poll can start at for a retriable 
partition,
-        counting from the last committed offset, is maxUncommittedOffsets,
-        where there are maxUncommittedOffsets - 1 uncommitted tuples "to the 
left".
-        If the retry poll starts at that offset, it at most emits the retried 
tuple plus maxPollRecords - 1 new tuples.
-        The limit on uncommitted offsets for one partition is therefore 
maxUncommittedOffsets + maxPollRecords - 1.
-        
-        It is only necessary to test this for a single partition, because 
partitions can't contribute negatively to numNonRetriableEmittedTuples,
-        so if the limit holds for one partition, it will also hold for each 
individual partition when multiple are involved.
-        
-        This makes the actual limit numPartitions * (maxUncommittedOffsets + 
maxPollRecords - 1)
-         */
-        
-        //Emit maxUncommittedOffsets messages, and fail only the last. Then 
ensure that the spout will allow no more than maxUncommittedOffsets + 
maxPollRecords - 1 uncommitted offsets when retrying
+    public void testSpoutWillSkipPartitionsAtTheMaxUncommittedOffsetsLimit() {
+        //This verifies that partitions can't prevent each other from retrying 
tuples due to the maxUncommittedOffsets limit.
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
-            
-            Map<TopicPartition, List<ConsumerRecord<String, String>>> 
firstPollRecords = new HashMap<>();
-            List<ConsumerRecord<String, String>> firstPollRecordsForPartition 
= new ArrayList<>();
-            for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) {
-                //This is cheating a bit since maxPollRecords would normally 
spread this across multiple polls
-                firstPollRecordsForPartition.add(new 
ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
-            }
-            firstPollRecords.put(partition, firstPollRecordsForPartition);
-            
-            int maxPollRecords = 5;
-            Map<TopicPartition, List<ConsumerRecord<String, String>>> 
secondPollRecords = new HashMap<>();
-            List<ConsumerRecord<String, String>> secondPollRecordsForPartition 
= new ArrayList<>();
-            for(int i = 0; i < maxPollRecords; i++) {
-                secondPollRecordsForPartition.add(new 
ConsumerRecord<>(partition.topic(), partition.partition(), 
spoutConfig.getMaxUncommittedOffsets() + i, "key", "value"));
-            }
-            secondPollRecords.put(partition, secondPollRecordsForPartition);
+            TopicPartition partitionTwo = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2);
+            Set<TopicPartition> partitions = new HashSet<>();
+            partitions.add(partition);
+            partitions.add(partitionTwo);
+            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, partitions);
+            Map<TopicPartition, List<ConsumerRecord<String, String>>> records 
= new HashMap<>();
+            //This is cheating a bit since maxPollRecords would normally 
spread this across multiple polls
+            records.put(partition, 
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 
spoutConfig.getMaxUncommittedOffsets()));
+            records.put(partitionTwo, 
SpoutWithMockedConsumerSetupHelper.createRecords(partitionTwo, 0, 
spoutConfig.getMaxUncommittedOffsets()));
 
             when(consumerMock.poll(anyLong()))
-                .thenReturn(new ConsumerRecords<>(firstPollRecords))
-                .thenReturn(new ConsumerRecords<>(secondPollRecords));
+                .thenReturn(new ConsumerRecords<>(records));
 
-            for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() + 
maxPollRecords; i++) {
+            for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets()*2; i++) 
{
                 spout.nextTuple();
             }
 
             ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-            verify(collectorMock, 
times(firstPollRecordsForPartition.size())).emit(anyString(), anyList(), 
messageIds.capture());
-
-            KafkaSpoutMessageId failedMessageId = 
messageIds.getAllValues().get(messageIds.getAllValues().size() - 1);
-            spout.fail(failedMessageId);
-
+            verify(collectorMock, 
times(spoutConfig.getMaxUncommittedOffsets()*2)).emit(anyString(), anyList(), 
messageIds.capture());
+            
+            //Now fail a tuple on partition 0 and verify that it is allowed to 
retry
+            //Partition 1 should be paused, since it is at the uncommitted 
offsets limit
+            Optional<KafkaSpoutMessageId> failedMessageId = 
messageIds.getAllValues().stream()
+                .filter(messageId -> messageId.partition() == 
partition.partition())
+                .findAny();
+            
+            spout.fail(failedMessageId.get());
+            
             reset(collectorMock);
-
-            //Now make the single failed tuple retriable
+            
             Time.advanceTime(50);
-            //The spout should allow another poll since there are now only 
maxUncommittedOffsets - 1 nonretriable tuples
-            for (int i = 0; i < firstPollRecordsForPartition.size() + 
maxPollRecords; i++) {
-                spout.nextTuple();
-            }
-
-            ArgumentCaptor<KafkaSpoutMessageId> retryBatchMessageIdsCaptor = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-            verify(collectorMock, times(maxPollRecords)).emit(anyString(), 
anyList(), retryBatchMessageIdsCaptor.capture());
-            reset(collectorMock);
+            when(consumerMock.poll(anyLong()))
+                .thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition, 
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 
failedMessageId.get().offset(), 1))));
+            
+            spout.nextTuple();
+            
+            verify(collectorMock, times(1)).emit(anyObject(), anyObject(), 
anyObject());
             
-            //Check that the consumer started polling at the failed tuple 
offset
             InOrder inOrder = inOrder(consumerMock);
-            inOrder.verify(consumerMock).seek(partition, 
failedMessageId.offset());
+            
inOrder.verify(consumerMock).pause(Collections.singleton(partitionTwo));
             inOrder.verify(consumerMock).poll(anyLong());
+            
inOrder.verify(consumerMock).resume(Collections.singleton(partitionTwo));
             
-            //Now fail all except one of the last batch, and check that the 
spout won't reemit any tuples because there are more than maxUncommittedOffsets 
nonretriable tuples
-            Time.advanceTime(50);
-            List<KafkaSpoutMessageId> retryBatchMessageIds = 
retryBatchMessageIdsCaptor.getAllValues();
-            KafkaSpoutMessageId firstTupleFromRetryBatch = 
retryBatchMessageIds.remove(0);
-            for(KafkaSpoutMessageId msgId : retryBatchMessageIds) {
-                spout.fail(msgId);
-            }
-            for (int i = 0; i < firstPollRecordsForPartition.size() + 
maxPollRecords; i++) {
-                spout.nextTuple();
-            }
-            verify(collectorMock, never()).emit(any(), any(), any());
+            reset(collectorMock);
             
-            //Fail the last tuple, which brings the number of nonretriable 
tuples back under the limit, and check that the spout polls again
-            spout.fail(firstTupleFromRetryBatch);
+            //Now also check that no more tuples are polled for, since both 
partitions are at their limits
             spout.nextTuple();
-            verify(collectorMock, times(1)).emit(anyString(), anyList(), 
any(KafkaSpoutMessageId.class));
+
+            verify(collectorMock, never()).emit(anyObject(), anyObject(), 
anyObject());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/cca93d2a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 830ac28..d4ae773 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -29,8 +29,10 @@ import static org.mockito.Mockito.when;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -89,19 +91,16 @@ public class KafkaSpoutRebalanceTest {
 
         //Assign partitions to the spout
         ConsumerRebalanceListener consumerRebalanceListener = 
rebalanceListenerCapture.getValue();
-        List<TopicPartition> assignedPartitions = new ArrayList<>();
+        Set<TopicPartition> assignedPartitions = new HashSet<>();
         assignedPartitions.add(partitionThatWillBeRevoked);
         assignedPartitions.add(assignedPartition);
         consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+        when(consumerMock.assignment()).thenReturn(assignedPartitions);
 
         //Make the consumer return a single message for each partition
-        Map<TopicPartition, List<ConsumerRecord<String, String>>> 
firstPartitionRecords = new HashMap<>();
-        firstPartitionRecords.put(partitionThatWillBeRevoked, 
Collections.singletonList(new 
ConsumerRecord(partitionThatWillBeRevoked.topic(), 
partitionThatWillBeRevoked.partition(), 0L, "key", "value")));
-        Map<TopicPartition, List<ConsumerRecord<String, String>>> 
secondPartitionRecords = new HashMap<>();
-        secondPartitionRecords.put(assignedPartition, 
Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), 
assignedPartition.partition(), 0L, "key", "value")));
         when(consumerMock.poll(anyLong()))
-            .thenReturn(new ConsumerRecords<>(firstPartitionRecords))
-            .thenReturn(new ConsumerRecords<>(secondPartitionRecords))
+            .thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partitionThatWillBeRevoked, 
SpoutWithMockedConsumerSetupHelper.createRecords(partitionThatWillBeRevoked, 0, 
1))))
+            .thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(assignedPartition, 
SpoutWithMockedConsumerSetupHelper.createRecords(assignedPartition, 0, 1))))
             .thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
 
         //Emit the messages
@@ -116,6 +115,7 @@ public class KafkaSpoutRebalanceTest {
         //Now rebalance
         consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
         
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition));
+        
when(consumerMock.assignment()).thenReturn(Collections.singleton(assignedPartition));
 
         List<KafkaSpoutMessageId> emittedMessageIds = new ArrayList<>();
         emittedMessageIds.add(messageIdForRevokedPartition.getValue());

http://git-wip-us.apache.org/repos/asf/storm/blob/cca93d2a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
index 6df6bc4..1cad6c2 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
@@ -81,22 +81,19 @@ public class KafkaSpoutRetryLimitTest {
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
             KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
             Map<TopicPartition, List<ConsumerRecord<String, String>>> records 
= new HashMap<>();
-            List<ConsumerRecord<String, String>> recordsForPartition = new 
ArrayList<>();
             int lastOffset = 3;
-            for (int i = 0; i <= lastOffset; i++) {
-                recordsForPartition.add(new 
ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
-            }
-            records.put(partition, recordsForPartition);
+            int numRecords = lastOffset + 1;
+            records.put(partition, 
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, numRecords));
             
             when(consumerMock.poll(anyLong()))
                 .thenReturn(new ConsumerRecords<>(records));
             
-            for (int i = 0; i < recordsForPartition.size(); i++) {
+            for (int i = 0; i < numRecords; i++) {
                 spout.nextTuple();
             }
             
             ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-            verify(collectorMock, 
times(recordsForPartition.size())).emit(anyString(), anyList(), 
messageIds.capture());
+            verify(collectorMock, times(numRecords)).emit(anyString(), 
anyList(), messageIds.capture());
             
             for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
                 spout.fail(messageId);

http://git-wip-us.apache.org/repos/asf/storm/blob/cca93d2a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
index 84d8f23..501f733 100755
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
@@ -15,6 +15,7 @@
  */
 package org.apache.storm.kafka.spout;
 
+import static org.hamcrest.CoreMatchers.either;
 import static org.hamcrest.CoreMatchers.everyItem;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -95,7 +96,7 @@ public class MaxUncommittedOffsetTest {
         ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
         for (int i = 0; i < messageCount; i++) {
             spout.nextTuple();
-        };
+        }
         verify(collector, times(maxUncommittedOffsets)).emit(
             anyObject(),
             anyObject(),
@@ -171,8 +172,13 @@ public class MaxUncommittedOffsetTest {
 
     @Test
     public void 
testNextTupleWillNotEmitMoreThanMaxUncommittedOffsetsPlusMaxPollRecordsMessages()
 throws Exception {
-        //The upper bound on uncommitted offsets should be 
maxUncommittedOffsets + maxPollRecords - 1
-        //This is reachable by emitting maxUncommittedOffsets messages, acking 
the first message, then polling.
+        /*
+        For each partition the spout is allowed to retry all tuples between 
the committed offset, and maxUncommittedOffsets ahead.
+        It is not allowed to retry tuples past that limit.
+        This makes the actual limit per partition maxUncommittedOffsets + 
maxPollRecords - 1,
+        reached if the tuple at the maxUncommittedOffsets limit is the 
earliest retriable tuple,
+        or if the spout is 1 tuple below the limit, and receives a full 
maxPollRecords tuples in the poll.
+         */
         try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
             //First check that maxUncommittedOffsets is respected when 
emitting from scratch
             ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
@@ -180,7 +186,7 @@ public class MaxUncommittedOffsetTest {
 
             failAllExceptTheFirstMessageThenCommit(messageIds);
 
-            //Offset 0 is acked, 1 to maxUncommittedOffsets - 1 are failed
+            //Offset 0 is acked, 1 to maxUncommittedOffsets - 1 are failed but 
not retriable
             //The spout should now emit another maxPollRecords messages
             //This is allowed because the acked message brings the 
numUncommittedOffsets below the cap
             for (int i = 0; i < maxUncommittedOffsets; i++) {
@@ -202,18 +208,20 @@ public class MaxUncommittedOffsetTest {
                 .collect(Collectors.toList());
             assertThat("Expected the newly emitted messages to have no overlap 
with the first batch", secondRunOffsets.removeAll(firstRunOffsets), is(false));
 
-            //Offset 0 is acked, 1 to maxUncommittedOffsets-1 are failed, 
maxUncommittedOffsets to maxUncommittedOffsets + maxPollRecords-1 are emitted
-            //There are now maxUncommittedOffsets-1 + maxPollRecords records 
emitted past the last committed offset
-            //Advance time so the failed tuples become ready for retry, and 
check that the spout will emit retriable tuples as long as 
numNonRetriableEmittedTuples < maxUncommittedOffsets
-            
-            int numNonRetriableEmittedTuples = maxPollRecords; //The other 
tuples were failed and are becoming retriable
-            int allowedPolls = (int)Math.ceil((maxUncommittedOffsets - 
numNonRetriableEmittedTuples)/(double)maxPollRecords);
+            //Offset 0 is committed, 1 to maxUncommittedOffsets-1 are failed, 
maxUncommittedOffsets to maxUncommittedOffsets + maxPollRecords-1 are emitted
+            //Fail the last tuples so only offset 0 is not failed.
+            //Advance time so the failed tuples become ready for retry, and 
check that the spout will emit retriable tuples
+            //for all the failed tuples that are within maxUncommittedOffsets 
tuples of the committed offset
+            //This means 1 to maxUncommitteddOffsets, but not 
maxUncommittedOffsets+1...maxUncommittedOffsets+maxPollRecords-1
+            for(KafkaSpoutMessageId msgId : 
secondRunMessageIds.getAllValues()) {
+                spout.fail(msgId);
+            }
             Time.advanceTimeSecs(initialRetryDelaySecs);
             for (int i = 0; i < numMessages; i++) {
                 spout.nextTuple();
             }
             ArgumentCaptor<KafkaSpoutMessageId> thirdRunMessageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-            verify(collector, times(allowedPolls*maxPollRecords)).emit(
+            verify(collector, times(maxUncommittedOffsets)).emit(
                 anyString(),
                 anyList(),
                 thirdRunMessageIds.capture());
@@ -222,7 +230,7 @@ public class MaxUncommittedOffsetTest {
             List<Long> thirdRunOffsets = 
thirdRunMessageIds.getAllValues().stream()
                 .map(msgId -> msgId.offset())
                 .collect(Collectors.toList());
-            assertThat("Expected the emitted messages to be retries of the 
failed tuples from the first batch", thirdRunOffsets, 
everyItem(isIn(firstRunOffsets)));
+            assertThat("Expected the emitted messages to be retries of the 
failed tuples from the first batch, plus the first failed tuple from the second 
batch", thirdRunOffsets, 
everyItem(either(isIn(firstRunOffsets)).or(is(secondRunMessageIds.getAllValues().get(0).offset()))));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/cca93d2a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index 66216dd..eaca824 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -50,7 +50,9 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.MockitoAnnotations;
 
-import static 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.mockito.Matchers.anyList;
+
+import java.util.regex.Pattern;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -79,12 +81,15 @@ public class SingleTopicKafkaSpoutTest {
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
-        KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
-            .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
-            .setRetry(new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
-                maxRetries, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
-            .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
-            .build();
+        KafkaSpoutConfig<String, String> spoutConfig =
+            SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(
+                KafkaSpoutConfig.builder("127.0.0.1:" + 
kafkaUnitRule.getKafkaUnit().getKafkaPort(),
+                    Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC)))
+                .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+                .setRetry(new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
+                    maxRetries, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
+                .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
maxPollRecords)
+                .build();
         this.consumerSpy = spy(new KafkaConsumerFactoryDefault<String, 
String>().createConsumer(spoutConfig));
         this.spout = new KafkaSpout<>(spoutConfig, (ignored) -> consumerSpy);
     }
@@ -93,7 +98,7 @@ public class SingleTopicKafkaSpoutTest {
         
SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), 
SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
         SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collector);
     }
-    
+
     /*
      * Asserts that commitSync has been called once, 
      * that there are only commits on one topic,
@@ -161,7 +166,7 @@ public class SingleTopicKafkaSpoutTest {
     }
 
     @Test
-    public void shouldContinueWithSlowDoubleAcks() throws Exception {
+    public void testShouldContinueWithSlowDoubleAcks() throws Exception {
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
             int messageCount = 20;
             prepareSpout(messageCount);
@@ -200,7 +205,7 @@ public class SingleTopicKafkaSpoutTest {
     }
 
     @Test
-    public void shouldEmitAllMessages() throws Exception {
+    public void testShouldEmitAllMessages() throws Exception {
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
             int messageCount = 10;
             prepareSpout(messageCount);
@@ -228,7 +233,7 @@ public class SingleTopicKafkaSpoutTest {
     }
 
     @Test
-    public void shouldReplayInOrderFailedMessages() throws Exception {
+    public void testShouldReplayInOrderFailedMessages() throws Exception {
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
             int messageCount = 10;
             prepareSpout(messageCount);
@@ -269,7 +274,7 @@ public class SingleTopicKafkaSpoutTest {
     }
 
     @Test
-    public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception {
+    public void testShouldReplayFirstTupleFailedOutOfOrder() throws Exception {
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
             int messageCount = 10;
             prepareSpout(messageCount);
@@ -313,7 +318,7 @@ public class SingleTopicKafkaSpoutTest {
     }
 
     @Test
-    public void shouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws 
Exception {
+    public void testShouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws 
Exception {
         //The spout must reemit retriable tuples, even if they fail out of 
order.
         //The spout should be able to skip tuples it has already emitted when 
retrying messages, even if those tuples are also retries.
         int messageCount = 10;
@@ -347,7 +352,7 @@ public class SingleTopicKafkaSpoutTest {
     }
 
     @Test
-    public void shouldDropMessagesAfterMaxRetriesAreReached() throws Exception 
{
+    public void testShouldDropMessagesAfterMaxRetriesAreReached() throws 
Exception {
         //Check that if one message fails repeatedly, the retry cap limits how 
many times the message can be reemitted
         int messageCount = 1;
         prepareSpout(messageCount);
@@ -367,4 +372,22 @@ public class SingleTopicKafkaSpoutTest {
         spout.nextTuple();
         verify(collector, never()).emit(any(), any(), any());
     }
+
+    @Test
+    public void testSpoutMustRefreshPartitionsEvenIfNotPolling() throws 
Exception {
+        try (SimulatedTime time = new SimulatedTime()) {
+            SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collector);
+
+            //Nothing is assigned yet, should emit nothing
+            spout.nextTuple();
+            verify(collector, never()).emit(anyString(), anyList(), 
any(KafkaSpoutMessageId.class));
+
+            
SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), 
SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+            
Time.advanceTime(KafkaSpoutConfig.DEFAULT_PARTITION_REFRESH_PERIOD_MS + 
KafkaSpout.TIMER_DELAY_MS);
+
+            //The new partition should be discovered and the message should be 
emitted
+            spout.nextTuple();
+            verify(collector).emit(anyString(), anyList(), 
any(KafkaSpoutMessageId.class));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/cca93d2a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
index f43616d..ad9bd75 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
@@ -51,26 +51,39 @@ public class SpoutWithMockedConsumerSetupHelper {
      */
     public static <K, V> KafkaSpout<K, V> setupSpout(KafkaSpoutConfig<K, V> 
spoutConfig, Map<String, Object> topoConf,
         TopologyContext contextMock, SpoutOutputCollector collectorMock, 
KafkaConsumer<K, V> consumerMock, Set<TopicPartition> assignedPartitions) {
+        
+        stubAssignment(contextMock, consumerMock, assignedPartitions);
+        KafkaConsumerFactory<K, V> consumerFactory = (kafkaSpoutConfig) -> 
consumerMock;
+        KafkaSpout<K, V> spout = new KafkaSpout<>(spoutConfig, 
consumerFactory);
+        
+        spout.open(topoConf, contextMock, collectorMock);
+        spout.activate();
+
+        verify(consumerMock).assign(assignedPartitions);
 
+        return spout;
+    }
+    
+    /**
+     * Sets up the mocked context and consumer to appear to have the given 
partition assignment.
+     * 
+     * @param <K> The Kafka key type
+     * @param <V> The Kafka value type
+     * @param contextMock The mocked topology context
+     * @param consumerMock The mocked consumer
+     * @param assignedPartitions The partitions to assign to the consumer
+     */
+    public static <K, V> void stubAssignment(TopologyContext contextMock, 
KafkaConsumer<K, V> consumerMock, Set<TopicPartition> assignedPartitions) {
         Map<String, List<PartitionInfo>> partitionInfos = 
assignedPartitions.stream()
             .map(tp -> new PartitionInfo(tp.topic(), tp.partition(), null, 
null, null))
             .collect(Collectors.groupingBy(info -> info.topic()));
         partitionInfos.keySet()
             .forEach(key -> when(consumerMock.partitionsFor(key))
             .thenReturn(partitionInfos.get(key)));
-        KafkaConsumerFactory<K, V> consumerFactory = (kafkaSpoutConfig) -> 
consumerMock;
-
-        KafkaSpout<K, V> spout = new KafkaSpout<>(spoutConfig, 
consumerFactory);
-
         
when(contextMock.getComponentTasks(any())).thenReturn(Collections.singletonList(0));
         when(contextMock.getThisTaskIndex()).thenReturn(0);
 
-        spout.open(topoConf, contextMock, collectorMock);
-        spout.activate();
-
-        verify(consumerMock).assign(assignedPartitions);
-
-        return spout;
+        when(consumerMock.assignment()).thenReturn(assignedPartitions);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/cca93d2a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
index 93a771d..3670d8a 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
@@ -41,7 +41,7 @@ public class SingleTopicKafkaSpoutConfiguration {
         return setCommonSpoutConfig(new 
KafkaSpoutConfig.Builder<>("127.0.0.1:" + port, subscription));
     }
 
-    private static KafkaSpoutConfig.Builder<String, String> 
setCommonSpoutConfig(KafkaSpoutConfig.Builder<String, String> config) {
+    public static KafkaSpoutConfig.Builder<String, String> 
setCommonSpoutConfig(KafkaSpoutConfig.Builder<String, String> config) {
         return config.setRecordTranslator((r) -> new Values(r.topic(), 
r.key(), r.value()),
             new Fields("topic", "key", "value"), STREAM)
             .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")

http://git-wip-us.apache.org/repos/asf/storm/blob/cca93d2a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
index abbacf9..3acc252 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
@@ -170,4 +170,19 @@ public class OffsetManagerTest {
         manager.addToAckMsgs(msgId);
     }
 
+    @Test
+    public void testGetNthUncommittedOffsetAfterCommittedOffset() { 
+        manager.addToEmitMsgs(initialFetchOffset + 1);
+        manager.addToEmitMsgs(initialFetchOffset + 2);
+        manager.addToEmitMsgs(initialFetchOffset + 5);
+        manager.addToEmitMsgs(initialFetchOffset + 30);
+        
+        assertThat("The third uncommitted offset should be 5", 
manager.getNthUncommittedOffsetAfterCommittedOffset(3), is(initialFetchOffset + 
5L));
+        assertThat("The fourth uncommitted offset should be 30", 
manager.getNthUncommittedOffsetAfterCommittedOffset(4), is(initialFetchOffset + 
30L));
+        
+        expect.expect(NoSuchElementException.class);
+        manager.getNthUncommittedOffsetAfterCommittedOffset(5);
+        
+    }
+
 }

Reply via email to