Repository: storm
Updated Branches:
  refs/heads/master a1ef0f616 -> 3ee5543f0


STORM-2546: Fix storm-kafka-client spout getting stuck when retriable offsets 
were deleted from the Kafka log due to topic compaction


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/88583fea
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/88583fea
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/88583fea

Branch: refs/heads/master
Commit: 88583feabf6e39c6bc7c4e657c989ca5ba2671d2
Parents: a1ef0f6
Author: Stig Rohde Døssing <[email protected]>
Authored: Sat Sep 2 23:50:33 2017 +0200
Committer: Stig Rohde Døssing <[email protected]>
Committed: Wed Nov 15 08:20:20 2017 +0100

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |  48 +++-
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  25 +++
 .../storm/kafka/spout/KafkaSpoutCommitTest.java | 119 ----------
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |  24 ++
 .../storm/kafka/spout/KafkaSpoutEmitTest.java   |   9 +-
 .../KafkaSpoutLogCompactionSupportTest.java     | 221 +++++++++++++++++++
 .../spout/KafkaSpoutMessagingGuaranteeTest.java |   8 +-
 .../kafka/spout/KafkaSpoutRetryLimitTest.java   |   3 +-
 .../SpoutWithMockedConsumerSetupHelper.java     |  68 +++++-
 .../kafka/spout/internal/OffsetManagerTest.java |   1 -
 10 files changed, 382 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/88583fea/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 3364fb0..425681d 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -91,7 +91,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     // Always empty if processing guarantee is none or at-most-once
     private transient Set<KafkaSpoutMessageId> emitted;
     // Records that have been polled and are queued to be emitted in the 
nextTuple() call. One record is emitted per nextTuple()
-    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;            
             
+    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;
     // Triggers when a subscription should be refreshed
     private transient Timer refreshSubscriptionTimer;
     private transient TopologyContext context;
@@ -152,7 +152,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             previousAssignment = partitions;
 
             LOG.info("Partitions revoked. [consumer-group={}, consumer={}, 
topic-partitions={}]",
-                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
+                kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
 
             if (isAtLeastOnceProcessing()) {
                 commitOffsetsForAckedTuples();
@@ -265,7 +265,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private Set<TopicPartition> poll() {
         final int maxUncommittedOffsets = 
kafkaSpoutConfig.getMaxUncommittedOffsets();
-        
+
         if (waitingToEmit()) {
             LOG.debug("Not polling. Tuples waiting to be emitted.");
             return Collections.emptySet();
@@ -311,12 +311,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     // ======== poll =========
     private ConsumerRecords<K, V> pollKafkaBroker(Set<TopicPartition> 
pollablePartitions) {
-        doSeekRetriableTopicPartitions(pollablePartitions);
+        final Map<TopicPartition, Long> retriableOffsets = 
doSeekRetriableTopicPartitions(pollablePartitions);
         Set<TopicPartition> pausedPartitions = new 
HashSet<>(kafkaConsumer.assignment());
         pausedPartitions.removeIf(pollablePartitions::contains);
         try {
             kafkaConsumer.pause(pausedPartitions);
             final ConsumerRecords<K, V> consumerRecords = 
kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
+            ackRetriableOffsetsIfCompactedAway(retriableOffsets, 
consumerRecords);
             final int numPolledRecords = consumerRecords.count();
             LOG.debug("Polled [{}] records from Kafka.",
                 numPolledRecords);
@@ -330,13 +331,42 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
     }
 
-    private void doSeekRetriableTopicPartitions(Set<TopicPartition> 
pollablePartitions) {
+    private Map<TopicPartition, Long> 
doSeekRetriableTopicPartitions(Set<TopicPartition> pollablePartitions) {
         final Map<TopicPartition, Long> retriableTopicPartitions = 
retryService.earliestRetriableOffsets();
-
+        for (TopicPartition tp : retriableTopicPartitions.keySet()) {
+            if (!pollablePartitions.contains(tp)) {
+                retriableTopicPartitions.remove(tp);
+            }
+        }
         for (Entry<TopicPartition, Long> retriableTopicPartitionAndOffset : 
retriableTopicPartitions.entrySet()) {
-            if 
(pollablePartitions.contains(retriableTopicPartitionAndOffset.getKey())) {
-                //Seek directly to the earliest retriable message for each 
retriable topic partition
-                kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), 
retriableTopicPartitionAndOffset.getValue());
+            //Seek directly to the earliest retriable message for each 
retriable topic partition
+            kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), 
retriableTopicPartitionAndOffset.getValue());
+        }
+        return retriableTopicPartitions;
+    }
+
+    private void ackRetriableOffsetsIfCompactedAway(Map<TopicPartition, Long> 
earliestRetriableOffsets,
+        ConsumerRecords<K, V> consumerRecords) {
+        for (Entry<TopicPartition, Long> entry : 
earliestRetriableOffsets.entrySet()) {
+            TopicPartition tp = entry.getKey();
+            List<ConsumerRecord<K, V>> records = consumerRecords.records(tp);
+            if (!records.isEmpty()) {
+                ConsumerRecord<K, V> record = records.get(0);
+                long seekOffset = entry.getValue();
+                long earliestReceivedOffset = record.offset();
+                if (seekOffset < earliestReceivedOffset) {
+                    //Since we asked for tuples starting at seekOffset, some 
retriable records must have been compacted away.
+                    //Ack up to the first offset received if the record is not 
already acked or currently in the topology
+                    for (long i = seekOffset; i < earliestReceivedOffset; i++) 
{
+                        KafkaSpoutMessageId msgId = 
retryService.getMessageId(tp, i);
+                        if (!offsetManagers.get(tp).contains(msgId) && 
!emitted.contains(msgId)) {
+                            LOG.debug("Record at offset [{}] appears to have 
been compacted away from topic [{}], marking as acked", i, tp);
+                            retryService.remove(msgId);
+                            emitted.add(msgId);
+                            ack(msgId);
+                        }
+                    }
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/88583fea/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index d5fceb4..ca8dcee 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
@@ -38,6 +39,8 @@ import 
org.apache.storm.kafka.spout.subscription.PatternTopicFilter;
 import org.apache.storm.kafka.spout.subscription.RoundRobinManualPartitioner;
 import org.apache.storm.kafka.spout.subscription.Subscription;
 import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * KafkaSpoutConfig defines the required configuration to connect a consumer 
to a consumer group, as well as the subscribing topics.
@@ -65,6 +68,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     public static final ProcessingGuarantee DEFAULT_PROCESSING_GUARANTEE = 
ProcessingGuarantee.AT_LEAST_ONCE;
 
     public static final KafkaTupleListener DEFAULT_TUPLE_LISTENER = new 
EmptyKafkaTupleListener();
+    public static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpoutConfig.class);
 
     // Kafka consumer configuration
     private final Map<String, Object> kafkaProps;
@@ -439,6 +443,27 @@ public class KafkaSpoutConfig<K, V> implements 
Serializable {
         if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
             builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true");
         } else {
+            String autoOffsetResetPolicy = 
(String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+            if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
+                if (autoOffsetResetPolicy == null) {
+                    /*
+                    If the user wants to explicitly set an auto offset reset 
policy, we should respect it, but when the spout is configured
+                    for at-least-once processing we should default to seeking 
to the earliest offset in case there's an offset out of range
+                    error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer 
+                    requests an offset that was deleted.
+                     */
+                    
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+                } else if (!autoOffsetResetPolicy.equals("earliest") && 
!autoOffsetResetPolicy.equals("none")) {
+                    LOG.warn("Cannot guarantee at-least-once processing with 
auto.offset.reset.policy other than 'earliest' or 'none'."
+                        + " Some messages may be skipped.");
+                }
+            } else if (builder.processingGuarantee == 
ProcessingGuarantee.AT_MOST_ONCE) {
+                if (autoOffsetResetPolicy != null
+                    && (!autoOffsetResetPolicy.equals("latest") && 
!autoOffsetResetPolicy.equals("none"))) {
+                    LOG.warn("Cannot guarantee at-most-once processing with 
auto.offset.reset.policy other than 'latest' or 'none'."
+                        + " Some messages may be processed more than once.");
+                }
+            }
             builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/88583fea/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
deleted file mode 100644
index 441e649..0000000
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kafka.spout;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kafka.clients.consumer.*;
-import org.apache.kafka.common.TopicPartition;
-import 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.utils.Time;
-import org.apache.storm.utils.Time.SimulatedTime;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.InOrder;
-import org.mockito.MockitoAnnotations;
-
-import static 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
-import static org.mockito.ArgumentMatchers.anyList;
-
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class KafkaSpoutCommitTest {
-
-    private final long offsetCommitPeriodMs = 2_000;
-    private final TopologyContext contextMock = mock(TopologyContext.class);
-    private final SpoutOutputCollector collectorMock = 
mock(SpoutOutputCollector.class);
-    private final Map<String, Object> conf = new HashMap<>();
-    private final TopicPartition partition = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
-    private KafkaConsumer<String, String> consumerMock;
-    private KafkaSpoutConfig<String, String> spoutConfig;
-
-    @Captor
-    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> 
commitCapture;
-
-    @Before
-    public void setUp() {
-        MockitoAnnotations.initMocks(this);
-        spoutConfig = createKafkaSpoutConfigBuilder(-1)
-            .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
-            .build();
-        consumerMock = mock(KafkaConsumer.class);
-    }
-
-    @Test
-    public void testCommitSuccessWithOffsetVoids() {
-        //Verify that the commit logic can handle offset voids
-        try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            Set<TopicPartition> assignedPartitions = 
Collections.singleton(partition);
-            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, assignedPartitions);
-            Map<TopicPartition, List<ConsumerRecord<String, String>>> records 
= new HashMap<>();
-            List<ConsumerRecord<String, String>> recordsForPartition = new 
ArrayList<>();
-            // Offsets emitted are 0,1,2,3,4,<void>,8,9
-            
recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition,
 0, 5));
-            
recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition,
 8, 2));
-            records.put(partition, recordsForPartition);
-
-            when(consumerMock.poll(anyLong()))
-                    .thenReturn(new ConsumerRecords<>(records));
-
-            for (int i = 0; i < recordsForPartition.size(); i++) {
-                spout.nextTuple();
-            }
-
-            ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-            verify(collectorMock, 
times(recordsForPartition.size())).emit(anyString(), anyList(), 
messageIds.capture());
-
-            for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
-                spout.ack(messageId);
-            }
-
-            // Advance time and then trigger first call to kafka consumer 
commit; the commit must progress to offset 9
-            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
-            when(consumerMock.poll(anyLong()))
-                    .thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
-            spout.nextTuple();
-
-            InOrder inOrder = inOrder(consumerMock);
-            inOrder.verify(consumerMock).commitSync(commitCapture.capture());
-            inOrder.verify(consumerMock).poll(anyLong());
-
-            //verify that Offset 10 was last committed offset, since this is 
the offset the spout should resume at
-            Map<TopicPartition, OffsetAndMetadata> commits = 
commitCapture.getValue();
-            assertTrue(commits.containsKey(partition));
-            assertEquals(10, commits.get(partition).offset());
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/88583fea/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
index 051d212..e42719b 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -17,8 +17,11 @@
  */
 package org.apache.storm.kafka.spout;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import java.util.HashMap;
@@ -38,6 +41,7 @@ public class KafkaSpoutConfigTest {
         HashMap<String, Object> expected = new HashMap<>();
         expected.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:1234");
         expected.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        expected.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         expected.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
         expected.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
         assertEquals(expected, conf.getKafkaProps());
@@ -51,4 +55,24 @@ public class KafkaSpoutConfigTest {
 
         assertTrue("Failed to set emit null tuples to true", 
conf.isEmitNullTuples());
     }
+    
+    @Test
+    public void 
testShouldNotChangeAutoOffsetResetPolicyWhenNotUsingAtLeastOnce() {
+        KafkaSpoutConfig<String, String> conf = 
KafkaSpoutConfig.builder("localhost:1234", "topic")
+            
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .build();
+        
+        assertThat("When at-least-once is not specified, the spout should use 
the Kafka default auto offset reset policy",
+            conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), 
nullValue());
+    }
+    
+    @Test
+    public void testWillRespectExplicitAutoOffsetResetPolicy() {
+        KafkaSpoutConfig<String, String> conf = 
KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setProp(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
+            .build();
+        
+        assertThat("Should allow users to pick a different auto offset reset 
policy than the one recommended for the at-least-once processing guarantee",
+            conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), 
is("none"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/88583fea/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
index 7cfd6b7..91831e4 100755
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
@@ -74,7 +74,7 @@ public class KafkaSpoutEmitTest {
     public void testNextTupleEmitsAtMostOneTuple() {
         //The spout should emit at most one message per call to nextTuple
         //This is necessary for Storm to be able to throttle the spout 
according to maxSpoutPending
-        KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
+        KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, partition);
         Map<TopicPartition, List<ConsumerRecord<String, String>>> records = 
new HashMap<>();
         records.put(partition, 
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 10));
 
@@ -92,7 +92,7 @@ public class KafkaSpoutEmitTest {
 
         //Emit maxUncommittedOffsets messages, and fail all of them. Then 
ensure that the spout will retry them when the retry backoff has passed
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
+            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, partition);
             Map<TopicPartition, List<ConsumerRecord<String, String>>> records 
= new HashMap<>();
             int numRecords = spoutConfig.getMaxUncommittedOffsets();
             //This is cheating a bit since maxPollRecords would normally 
spread this across multiple polls
@@ -139,10 +139,7 @@ public class KafkaSpoutEmitTest {
         //This verifies that partitions can't prevent each other from retrying 
tuples due to the maxUncommittedOffsets limit.
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
             TopicPartition partitionTwo = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2);
-            Set<TopicPartition> partitions = new HashSet<>();
-            partitions.add(partition);
-            partitions.add(partitionTwo);
-            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, partitions);
+            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, partition, partitionTwo);
             Map<TopicPartition, List<ConsumerRecord<String, String>>> records 
= new HashMap<>();
             //This is cheating a bit since maxPollRecords would normally 
spread this across multiple polls
             records.put(partition, 
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 
spoutConfig.getMaxUncommittedOffsets()));

http://git-wip-us.apache.org/repos/asf/storm/blob/88583fea/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
new file mode 100644
index 0000000..b824d07
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.MockitoAnnotations;
+
+import static 
org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.hamcrest.CoreMatchers.is;
+
+import static org.hamcrest.Matchers.hasKey;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class KafkaSpoutLogCompactionSupportTest {
+
+    private final long offsetCommitPeriodMs = 2_000;
+    private final TopologyContext contextMock = mock(TopologyContext.class);
+    private final SpoutOutputCollector collectorMock = 
mock(SpoutOutputCollector.class);
+    private final Map<String, Object> conf = new HashMap<>();
+    private final TopicPartition partition = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+    private KafkaConsumer<String, String> consumerMock;
+    private KafkaSpoutConfig<String, String> spoutConfig;
+
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> 
commitCapture;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+        spoutConfig = createKafkaSpoutConfigBuilder(-1)
+            .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+            .build();
+        consumerMock = mock(KafkaConsumer.class);
+    }
+
+    @Test
+    public void testCommitSuccessWithOffsetVoids() {
+        //Verify that the commit logic can handle offset voids due to log 
compaction
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, partition);
+            Map<TopicPartition, List<ConsumerRecord<String, String>>> records 
= new HashMap<>();
+            List<ConsumerRecord<String, String>> recordsForPartition = new 
ArrayList<>();
+            // Offsets emitted are 0,1,2,3,4,<void>,8,9
+            
recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition,
 0, 5));
+            
recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition,
 8, 2));
+            records.put(partition, recordsForPartition);
+
+            when(consumerMock.poll(anyLong()))
+                    .thenReturn(new ConsumerRecords<>(records));
+
+            for (int i = 0; i < recordsForPartition.size(); i++) {
+                spout.nextTuple();
+            }
+
+            ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collectorMock, 
times(recordsForPartition.size())).emit(anyString(), anyList(), 
messageIds.capture());
+
+            for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
+                spout.ack(messageId);
+            }
+
+            // Advance time and then trigger first call to kafka consumer 
commit; the commit must progress to offset 9
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+            when(consumerMock.poll(anyLong()))
+                    .thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
+            spout.nextTuple();
+
+            InOrder inOrder = inOrder(consumerMock);
+            inOrder.verify(consumerMock).commitSync(commitCapture.capture());
+            inOrder.verify(consumerMock).poll(anyLong());
+
+            //verify that Offset 10 was last committed offset, since this is 
the offset the spout should resume at
+            Map<TopicPartition, OffsetAndMetadata> commits = 
commitCapture.getValue();
+            assertTrue(commits.containsKey(partition));
+            assertEquals(10, commits.get(partition).offset());
+        }
+    }
+    
+    @Test
+    public void testWillSkipRetriableTuplesIfOffsetsAreCompactedAway() {
+        /*
+          Verify that failed offsets will only retry if the corresponding 
message exists. 
+          When log compaction is enabled in Kafka it is possible that a tuple 
can fail, 
+          and then be impossible to retry because the message in Kafka has 
been deleted.
+          The spout needs to quietly ack such tuples to allow commits to 
progress past the deleted offset.
+         */
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            TopicPartition partitionTwo = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2);
+            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, partition, partitionTwo);
+            
+            List<KafkaSpoutMessageId> firstPartitionMsgIds = 
SpoutWithMockedConsumerSetupHelper
+                .pollAndEmit(spout, consumerMock, 3, collectorMock, partition, 
0, 1, 2);
+            reset(collectorMock);
+            List<KafkaSpoutMessageId> secondPartitionMsgIds = 
SpoutWithMockedConsumerSetupHelper
+                .pollAndEmit(spout, consumerMock, 3, collectorMock, 
partitionTwo, 0, 1, 2);
+            reset(collectorMock);
+            
+            for(int i = 0; i < 3; i++) {
+                spout.fail(firstPartitionMsgIds.get(i));
+                spout.fail(secondPartitionMsgIds.get(i));
+            }
+            
+            Time.advanceTime(50);
+            
+            //The failed tuples are ready for retry. Make it appear like 0 and 
1 on the first partition were compacted away.
+            //In this case the second partition acts as control to verify that 
we only skip past offsets that are no longer present.
+            Map<TopicPartition, int[]> retryOffsets = new HashMap<>();
+            retryOffsets.put(partition, new int[] {2});
+            retryOffsets.put(partitionTwo, new int[] {0, 1, 2});
+            int expectedEmits = 4; //2 on first partition, 0-2 on second 
partition
+            List<KafkaSpoutMessageId> retryMessageIds = 
SpoutWithMockedConsumerSetupHelper.pollAndEmit(spout, consumerMock, 
expectedEmits, collectorMock, retryOffsets);
+            
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+            spout.nextTuple();
+            
+            verify(consumerMock).commitSync(commitCapture.capture());
+            Map<TopicPartition, OffsetAndMetadata> committed = 
commitCapture.getValue();
+            assertThat(committed.keySet(), 
is(Collections.singleton(partition)));
+            assertThat("The first partition should have committed up to the 
first retriable tuple that is not missing", committed.get(partition).offset(), 
is(2L));
+            
+            for(KafkaSpoutMessageId msgId : retryMessageIds) {
+                spout.ack(msgId);
+            }
+            
+            //The spout should now commit all the offsets, since all offsets 
are either acked or were missing when retrying
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+            spout.nextTuple();
+
+            verify(consumerMock, times(2)).commitSync(commitCapture.capture());
+            committed = commitCapture.getValue();
+            assertThat(committed, hasKey(partition));
+            assertThat(committed, hasKey(partitionTwo));
+            assertThat(committed.get(partition).offset(), is(3L));
+            assertThat(committed.get(partitionTwo).offset(), is(3L));
+        }
+    }
+    
+    @Test
+    public void 
testWillSkipRetriableTuplesIfOffsetsAreCompactedAwayWithoutAckingPendingTuples()
 {
+        //Demonstrate that the spout doesn't ack pending tuples when skipping 
compacted tuples. The pending tuples should be allowed to finish normally.
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, partition);
+            
+            List<KafkaSpoutMessageId> firstPartitionMsgIds = 
SpoutWithMockedConsumerSetupHelper
+                .pollAndEmit(spout, consumerMock, 3, collectorMock, partition, 
0, 1, 2);
+            reset(collectorMock);
+            
+            spout.fail(firstPartitionMsgIds.get(0));            
+            spout.fail(firstPartitionMsgIds.get(2));
+            
+            Time.advanceTime(50);
+            
+            //The failed tuples are ready for retry. Make it appear like 0 and 
1 were compacted away.
+            List<KafkaSpoutMessageId> retryMessageIds = 
SpoutWithMockedConsumerSetupHelper.pollAndEmit(spout, consumerMock, 1, 
collectorMock, partition, 2);
+            for(KafkaSpoutMessageId msgId : retryMessageIds) {
+                spout.ack(msgId);
+            }
+            
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+            spout.nextTuple();
+            
+            verify(consumerMock).commitSync(commitCapture.capture());
+            Map<TopicPartition, OffsetAndMetadata> committed = 
commitCapture.getValue();
+            assertThat(committed.keySet(), 
is(Collections.singleton(partition)));
+            assertThat("The first partition should have committed the missing 
offset, but no further since the next tuple is pending",
+                committed.get(partition).offset(), is(1L));
+            
+            spout.ack(firstPartitionMsgIds.get(1));
+            
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+            spout.nextTuple();
+            
+            verify(consumerMock, times(2)).commitSync(commitCapture.capture());
+            committed = commitCapture.getValue();
+            assertThat(committed.keySet(), 
is(Collections.singleton(partition)));
+            assertThat("The first partition should have committed all 
offsets", committed.get(partition).offset(), is(3L));
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/88583fea/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
index 07ee2dc..0948957 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -67,7 +67,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
         KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(-1)
             
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
             .build();
-        KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
+        KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, partition);
 
         when(consumerMock.poll(anyLong())).thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
             SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 
1))));
@@ -82,7 +82,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
     }
 
     private void 
doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String> 
spoutConfig) {
-        KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
+        KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, partition);
 
         when(consumerMock.poll(anyLong()))
             .thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
@@ -117,7 +117,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
     }
 
     private void doTestModeCannotReplayTuples(KafkaSpoutConfig<String, String> 
spoutConfig) {
-        KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
+        KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, partition);
 
         when(consumerMock.poll(anyLong())).thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
             SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 
1))));
@@ -163,7 +163,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
 
     private void doTestModeDoesNotCommitAckedTuples(KafkaSpoutConfig<String, 
String> spoutConfig) {
         try (SimulatedTime time = new SimulatedTime()) {
-            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
+            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock,partition);
 
             when(consumerMock.poll(anyLong())).thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
                 SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 
1))));

http://git-wip-us.apache.org/repos/asf/storm/blob/88583fea/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
index 1cad6c2..2f06db3 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
@@ -23,7 +23,6 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -79,7 +78,7 @@ public class KafkaSpoutRetryLimitTest {
     public void testFailingTupleCompletesAckAfterRetryLimitIsMet() {
         //Spout should ack failed messages after they hit the retry limit
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
+            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, partition);
             Map<TopicPartition, List<ConsumerRecord<String, String>>> records 
= new HashMap<>();
             int lastOffset = 3;
             int numRecords = lastOffset + 1;

http://git-wip-us.apache.org/repos/asf/storm/blob/88583fea/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
index ad9bd75..407ab84 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
@@ -17,22 +17,32 @@
 package org.apache.storm.kafka.spout;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
+import org.mockito.ArgumentCaptor;
 
 public class SpoutWithMockedConsumerSetupHelper {
 
@@ -50,16 +60,17 @@ public class SpoutWithMockedConsumerSetupHelper {
      * @return The spout
      */
     public static <K, V> KafkaSpout<K, V> setupSpout(KafkaSpoutConfig<K, V> 
spoutConfig, Map<String, Object> topoConf,
-        TopologyContext contextMock, SpoutOutputCollector collectorMock, 
KafkaConsumer<K, V> consumerMock, Set<TopicPartition> assignedPartitions) {
+        TopologyContext contextMock, SpoutOutputCollector collectorMock, 
KafkaConsumer<K, V> consumerMock, TopicPartition... assignedPartitions) {
+        Set<TopicPartition> assignedPartitionsSet = new 
HashSet<>(Arrays.asList(assignedPartitions));
         
-        stubAssignment(contextMock, consumerMock, assignedPartitions);
+        stubAssignment(contextMock, consumerMock, assignedPartitionsSet);
         KafkaConsumerFactory<K, V> consumerFactory = (kafkaSpoutConfig) -> 
consumerMock;
         KafkaSpout<K, V> spout = new KafkaSpout<>(spoutConfig, 
consumerFactory);
         
         spout.open(topoConf, contextMock, collectorMock);
         spout.activate();
 
-        verify(consumerMock).assign(assignedPartitions);
+        verify(consumerMock).assign(assignedPartitionsSet);
 
         return spout;
     }
@@ -104,4 +115,55 @@ public class SpoutWithMockedConsumerSetupHelper {
         return recordsForPartition;
     }
 
+    /**
+     * Creates messages for the input offsets, emits the messages by calling 
nextTuple once per offset and returns the captured message ids
+     * @param <K> The Kafka key type
+     * @param <V> The Kafka value type
+     * @param spout The spout
+     * @param consumerMock The consumer used by the spout
+     * @param expectedEmits The number of expected emits
+     * @param collectorMock The collector used by the spout
+     * @param partition The partition to emit messages on
+     * @param offsetsToEmit The offsets to emit
+     * @return The message ids emitted by the spout during the nextTuple calls
+     */
+    public static <K, V> List<KafkaSpoutMessageId> pollAndEmit(KafkaSpout<K, 
V> spout, KafkaConsumer<K, V> consumerMock, int expectedEmits, 
SpoutOutputCollector collectorMock, TopicPartition partition, int... 
offsetsToEmit) {
+        return pollAndEmit(spout, consumerMock, expectedEmits, collectorMock, 
Collections.singletonMap(partition, offsetsToEmit));
+    }
+
+    /**
+     * Creates messages for the input offsets, emits the messages by calling 
nextTuple once per offset and returns the captured message ids
+     * @param <K> The Kafka key type
+     * @param <V> The Kafka value type
+     * @param spout The spout
+     * @param consumerMock The consumer used by the spout
+     * @param collectorMock The collector used by the spout
+     * @param offsetsToEmit The offsets to emit per partition
+     * @return The message ids emitted by the spout during the nextTuple calls
+     */
+    public static <K, V> List<KafkaSpoutMessageId> pollAndEmit(KafkaSpout<K, 
V> spout, KafkaConsumer<K, V> consumerMock, int expectedEmits, 
SpoutOutputCollector collectorMock, Map<TopicPartition, int[]> offsetsToEmit) {
+        int totalOffsets = 0;
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new 
HashMap<>();
+        for (Entry<TopicPartition, int[]> entry : offsetsToEmit.entrySet()) {
+            TopicPartition tp = entry.getKey();
+            List<ConsumerRecord<K, V>> tpRecords = new ArrayList<>();
+            for (Integer offset : entry.getValue()) {
+                tpRecords.add(new ConsumerRecord<>(tp.topic(), tp.partition(), 
offset, null, null));
+                totalOffsets++;
+            }
+            records.put(tp, tpRecords);
+        }
+
+        when(consumerMock.poll(anyLong()))
+            .thenReturn(new ConsumerRecords<>(records));
+
+        for (int i = 0; i < totalOffsets; i++) {
+            spout.nextTuple();
+        }
+
+        ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        verify(collectorMock, times(expectedEmits)).emit(anyString(), 
anyList(), messageIds.capture());
+        return messageIds.getAllValues();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/88583fea/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
index 3acc252..46b3c9b 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
@@ -21,7 +21,6 @@ import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertThat;
 
 import java.util.NoSuchElementException;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.KafkaSpoutMessageId;

Reply via email to