Repository: storm
Updated Branches:
  refs/heads/1.x-branch fca692da3 -> cb2d7e8be


STORM-2648/STORM-2357: Add storm-kafka-client support for at-most-once 
processing and a toggle for whether messages should be emitted with a message 
id when not using at-least-once

* Minor refactor of emit statements
* Add tests for at-most-once and any-times mode, deduplicate some test code in 
other tests
* Fix rebase conflicts and fix leaking state through unit test retry service
* Update storm-kafka-client doc


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/11a7a157
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/11a7a157
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/11a7a157

Branch: refs/heads/1.x-branch
Commit: 11a7a15746010e0baaa5a4db70374dff1d4ab800
Parents: fca692d
Author: Stig Rohde Døssing <[email protected]>
Authored: Mon Jul 31 20:26:55 2017 +0200
Committer: Stig Rohde Døssing <[email protected]>
Committed: Mon Oct 2 18:08:12 2017 +0200

----------------------------------------------------------------------
 docs/storm-kafka-client.md                      |  31 ++-
 .../apache/storm/kafka/spout/KafkaSpout.java    | 175 +++++++++-------
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  99 ++++++++-
 .../storm/kafka/spout/KafkaSpoutCommitTest.java |   5 +-
 .../storm/kafka/spout/KafkaSpoutEmitTest.java   |   8 +-
 .../spout/KafkaSpoutMessagingGuaranteeTest.java | 207 +++++++++++++++++++
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |   8 +-
 .../kafka/spout/KafkaSpoutRetryLimitTest.java   |   5 +-
 .../kafka/spout/MaxUncommittedOffsetTest.java   |  32 +--
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |  47 ++---
 .../spout/SingleTopicKafkaUnitSetupHelper.java  |  67 ++++++
 .../SingleTopicKafkaSpoutConfiguration.java     |  23 +--
 12 files changed, 522 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
index 93d622e..841e8ef 100644
--- a/docs/storm-kafka-client.md
+++ b/docs/storm-kafka-client.md
@@ -341,33 +341,32 @@ Depending on the structure of your Kafka cluster, 
distribution of the data, and
 
 ### Default values
 
-Currently the Kafka spout has has the following default values, which have 
shown to give good performance in the test environment as described in this 
[blog post] 
(https://hortonworks.com/blog/microbenchmarking-storm-1-0-performance/)
+Currently the Kafka spout has has the following default values, which have 
been shown to give good performance in the test environment as described in 
this [blog post] 
(https://hortonworks.com/blog/microbenchmarking-storm-1-0-performance/)
 
 * poll.timeout.ms = 200
 * offset.commit.period.ms = 30000   (30s)
 * max.uncommitted.offsets = 10000000
 <br/>
 
-# Kafka AutoCommitMode 
+# Messaging reliability modes
 
-If reliability isn't important to you -- that is, you don't care about losing 
tuples in failure situations --, and want to remove the overhead of tuple 
tracking, then you can run a KafkaSpout with AutoCommitMode.
-
-To enable it, you need to:
-
-* set Config.TOPOLOGY_ACKERS to 0;
-* enable *AutoCommitMode* in Kafka consumer configuration; 
-
-Here's one example to set AutoCommitMode in KafkaSpout:
+In some cases you may not need or want the spout to guarantee at-least-once 
processing of messages. The spout also supports at-most-once and any-times 
modes. At-most-once guarantees that any tuple emitted to the topology will 
never be reemitted. Any-times makes no guarantees, but may reduce the overhead 
of committing offsets to Kafka in cases where you truly don't care how many 
times a message is processed.
 
+To set the processing guarantee, use the 
KafkaSpoutConfig.Builder.setProcessingGuarantee method, e.g.
 ```java
 KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
-               .builder(String bootstrapServers, String ... topics)
-               .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
-               .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
-               .build();
+  .builder(String bootstrapServers, String ... topics)
+  .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
 ```
 
-*Note that it's not exactly At-Most-Once in Storm, as offset is committed 
periodically by Kafka consumer, some tuples could be replayed when KafkaSpout 
is crashed.*
-
+The spout will disable tuple tracking for emitted tuples by default when you 
use at-most-once or any-times. In some cases you may want to enable tracking 
anyway, because tuple tracking is necessary for some features of Storm, e.g. 
showing complete latency in Storm UI, or enabling backpressure through the 
`Config.TOPOLOGY_MAX_SPOUT_PENDING` parameter.
 
+If you need to enable tracking, use the 
KafkaSpoutConfig.Builder.setForceEnableTupleTracking method, e.g.
+```java
+KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
+  .builder(String bootstrapServers, String ... topics)
+  .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
+  .setForceEnableTupleTracking(true)
+```
 
+Note that this setting has no effect in at-least-once mode, where tuple 
tracking is always enabled.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 9f806b5..fbd869c 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -52,10 +52,12 @@ import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaSpout<K, V> extends BaseRichSpout {
+
     private static final long serialVersionUID = 4151921085047987154L;
     //Initial delay for the commit and subscription refresh timers
     public static final long TIMER_DELAY_MS = 500;
@@ -66,26 +68,36 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     // Kafka
     private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
-    private KafkaConsumerFactory kafkaConsumerFactory;
+    private KafkaConsumerFactory<K, V> kafkaConsumerFactory;
     private transient KafkaConsumer<K, V> kafkaConsumer;
-    private transient boolean consumerAutoCommitMode;
 
     // Bookkeeping
-    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;  // 
Strategy to determine the fetch offset of the first realized by the spout upon 
activation
-    private transient KafkaSpoutRetryService retryService;              // 
Class that has the logic to handle tuple failure
-    private transient KafkaTupleListener tupleListener;                 // 
Handles tuple events (emit, ack etc.)
-    private transient Timer commitTimer;                                // 
timer == null for auto commit mode
-    private transient boolean initialized;                              // 
Flag indicating that the spout is still undergoing initialization process.
+    // Strategy to determine the fetch offset of the first realized by the 
spout upon activation
+    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
+    // Class that has the logic to handle tuple failure.
+    private transient KafkaSpoutRetryService retryService;
+    // Handles tuple events (emit, ack etc.)
+    private transient KafkaTupleListener tupleListener;
+    // timer == null for modes other than at-least-once
+    private transient Timer commitTimer;
+    // Flag indicating that the spout is still undergoing initialization 
process.
+    private transient boolean initialized;
     // Initialization is only complete after the first call to  
KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
 
-    private transient Map<TopicPartition, OffsetManager> offsetManagers;// 
Tuples that were successfully acked/emitted. These tuples will be committed 
periodically when the commit timer expires, or after a consumer rebalance, or 
during close/deactivate
-    private transient Set<KafkaSpoutMessageId> emitted;                 // 
Tuples that have been emitted but that are "on the wire", i.e. pending being 
acked or failed. Not used if it's AutoCommitMode
-    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;     // 
Records that have been polled and are queued to be emitted in the nextTuple() 
call. One record is emitted per nextTuple()
-    private transient long numUncommittedOffsets;                       // 
Number of offsets that have been polled and emitted but not yet been committed. 
Not used if auto commit mode is enabled.
-    private transient Timer refreshSubscriptionTimer;                   // 
Triggers when a subscription should be refreshed
+    // Tuples that were successfully acked/emitted. These tuples will be 
committed periodically when the commit timer expires,
+    //or after a consumer rebalance, or during close/deactivate. Always empty 
if not using at-least-once mode.
+    private transient Map<TopicPartition, OffsetManager> offsetManagers;
+    // Tuples that have been emitted but that are "on the wire", i.e. pending 
being acked or failed.
+    // Always empty if not using at-least-once mode.
+    private transient Set<KafkaSpoutMessageId> emitted;
+    // Records that have been polled and are queued to be emitted in the 
nextTuple() call. One record is emitted per nextTuple()
+    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;
+    // Number of offsets that have been polled and emitted but not yet been 
committed. Not used if auto commit mode is enabled.
+    private transient long numUncommittedOffsets;
+    // Triggers when a subscription should be refreshed
+    private transient Timer refreshSubscriptionTimer;
     private transient TopologyContext context;
 
-
     public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
         this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<K, V>());
     }
@@ -107,9 +119,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         // Offset management
         firstPollOffsetStrategy = 
kafkaSpoutConfig.getFirstPollOffsetStrategy();
-        // with AutoCommitMode, offset will be periodically committed in the 
background by Kafka consumer
-        consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();
-
+        
         // Retries management
         retryService = kafkaSpoutConfig.getRetryService();
 
@@ -117,7 +127,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         tupleListener.open(conf, context);
 
 
-        if (!consumerAutoCommitMode) {     // If it is auto commit, no need to 
commit offsets manually
+        if (isAtLeastOnce()) {
+            // Only used if the spout commits offsets for acked tuples
             commitTimer = new Timer(TIMER_DELAY_MS, 
kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
         }
         refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, 
kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
@@ -129,14 +140,18 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         LOG.info("Kafka Spout opened with the following configuration: {}", 
kafkaSpoutConfig);
     }
 
-    // =========== Consumer Rebalance Listener - On the same thread as the 
caller ===========
+    private boolean isAtLeastOnce() {
+        return kafkaSpoutConfig.getProcessingGuarantee() == 
KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE;
+    }
 
+    // =========== Consumer Rebalance Listener - On the same thread as the 
caller ===========
     private class KafkaSpoutConsumerRebalanceListener implements 
ConsumerRebalanceListener {
+
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
{
             LOG.info("Partitions revoked. [consumer-group={}, consumer={}, 
topic-partitions={}]",
-                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
-            if (!consumerAutoCommitMode && initialized) {
+                kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
+            if (isAtLeastOnce() && initialized) {
                 initialized = false;
                 commitOffsetsForAckedTuples();
             }
@@ -145,36 +160,39 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
             LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, 
consumer={}, topic-partitions={}]",
-                    context.getThisTaskId(), 
kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+                context.getThisTaskId(), 
kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
 
             tupleListener.onPartitionsReassigned(partitions);
             initialize(partitions);
         }
 
         private void initialize(Collection<TopicPartition> partitions) {
-            if (!consumerAutoCommitMode) {
+            if (isAtLeastOnce()) {
                 offsetManagers.keySet().retainAll(partitions);   // remove 
from acked all partitions that are no longer assigned to this spout
-            }
-
-            retryService.retainAll(partitions);
-
-            /*
-             * Emitted messages for partitions that are no longer assigned to 
this spout can't
-             * be acked and should not be retried, hence remove them from 
emitted collection.
-            */
-            Set<TopicPartition> partitionsSet = new HashSet<>(partitions);
-            Iterator<KafkaSpoutMessageId> msgIdIterator = emitted.iterator();
-            while (msgIdIterator.hasNext()) {
-                KafkaSpoutMessageId msgId = msgIdIterator.next();
-                if (!partitionsSet.contains(msgId.getTopicPartition())) {
-                    msgIdIterator.remove();
+                retryService.retainAll(partitions);
+
+                /*
+                 * Emitted messages for partitions that are no longer assigned 
to this spout can't
+                 * be acked and should not be retried, hence remove them from 
emitted collection.
+                 */
+                Set<TopicPartition> partitionsSet = new HashSet<>(partitions);
+                Iterator<KafkaSpoutMessageId> msgIdIterator = 
emitted.iterator();
+                while (msgIdIterator.hasNext()) {
+                    KafkaSpoutMessageId msgId = msgIdIterator.next();
+                    if (!partitionsSet.contains(msgId.getTopicPartition())) {
+                        msgIdIterator.remove();
+                    }
                 }
             }
 
             for (TopicPartition tp : partitions) {
                 final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
                 final long fetchOffset = doSeek(tp, committedOffset);
-                setAcked(tp, fetchOffset);
+                // Add offset managers for the new partitions.
+                // If this partition was previously assigned to this spout, 
leave the acked offsets as they were to resume where it left off
+                if (isAtLeastOnce() && !offsetManagers.containsKey(tp)) {
+                    offsetManagers.put(tp, new OffsetManager(tp, fetchOffset));
+                }
             }
             initialized = true;
             LOG.info("Initialization complete");
@@ -209,15 +227,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
     }
 
-    private void setAcked(TopicPartition tp, long fetchOffset) {
-        // If this partition was previously assigned to this spout, leave the 
acked offsets as they were to resume where it left off
-        if (!consumerAutoCommitMode && !offsetManagers.containsKey(tp)) {
-            offsetManagers.put(tp, new OffsetManager(tp, fetchOffset));
-        }
-    }
-
     // ======== Next Tuple =======
-
     @Override
     public void nextTuple() {
         try {
@@ -252,24 +262,25 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     private boolean commit() {
-        return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue();  
  // timer != null for non auto commit mode
+        return isAtLeastOnce() && commitTimer.isExpiredResetOnTrue();    // 
timer != null for non auto commit mode
     }
 
     private boolean poll() {
         final int maxUncommittedOffsets = 
kafkaSpoutConfig.getMaxUncommittedOffsets();
         final int readyMessageCount = retryService.readyMessageCount();
-        final boolean poll = !waitingToEmit() &&
+        final boolean poll = !waitingToEmit()
             //Check that the number of uncommitted, nonretriable tuples is 
less than the maxUncommittedOffsets limit
-            //Accounting for retriable tuples this way still guarantees that 
the limit is followed on a per partition basis, and prevents locking up the 
spout when there are too many retriable tuples
-            (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets 
||
-            consumerAutoCommitMode);
-        
+            //Accounting for retriable tuples this way still guarantees that 
the limit is followed on a per partition basis,
+            //and prevents locking up the spout when there are too many 
retriable tuples
+            && (numUncommittedOffsets - readyMessageCount < 
maxUncommittedOffsets
+            || !isAtLeastOnce());
+
         if (!poll) {
             if (waitingToEmit()) {
                 LOG.debug("Not polling. Tuples waiting to be emitted. [{}] 
uncommitted offsets across all topic partitions", numUncommittedOffsets);
             }
 
-            if (numUncommittedOffsets >= maxUncommittedOffsets && 
!consumerAutoCommitMode) {
+            if (numUncommittedOffsets >= maxUncommittedOffsets && 
isAtLeastOnce()) {
                 LOG.debug("Not polling. [{}] uncommitted offsets across all 
topic partitions has reached the threshold of [{}]", numUncommittedOffsets, 
maxUncommittedOffsets);
             }
         }
@@ -297,6 +308,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         final ConsumerRecords<K, V> consumerRecords = 
kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
         final int numPolledRecords = consumerRecords.count();
         LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets 
across all topic partitions", numPolledRecords, numUncommittedOffsets);
+        if (kafkaSpoutConfig.getProcessingGuarantee() == 
KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
+            //Commit polled records immediately to ensure delivery is 
at-most-once.
+            kafkaConsumer.commitSync();
+        }
         return consumerRecords;
     }
 
@@ -335,11 +350,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 final boolean isScheduled = retryService.isScheduled(msgId);
                 // not scheduled <=> never failed (i.e. never emitted), or 
scheduled and ready to be retried
                 if (!isScheduled || retryService.isReady(msgId)) {
-                    if (consumerAutoCommitMode) {
-                        if (tuple instanceof KafkaTuple) {
-                            collector.emit(((KafkaTuple) tuple).getStream(), 
tuple);
+                    String stream = tuple instanceof KafkaTuple ? 
((KafkaTuple) tuple).getStream() : Utils.DEFAULT_STREAM_ID;
+                    if (!isAtLeastOnce()) {
+                        if (kafkaSpoutConfig.getForceEnableTupleTracking()) {
+                            collector.emit(stream, tuple, msgId);
+                            LOG.trace("Emitted tuple [{}] for record [{}] with 
msgId [{}]", tuple, record, msgId);
                         } else {
-                            collector.emit(tuple);
+                            collector.emit(stream, tuple);
+                            LOG.trace("Emitted tuple [{}] for record [{}]", 
tuple, record);
                         }
                     } else {
                         emitted.add(msgId);
@@ -349,15 +367,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                         } else {            //New tuple, hence increment the 
uncommitted offset counter
                             numUncommittedOffsets++;
                         }
-
-                        if (tuple instanceof KafkaTuple) {
-                            collector.emit(((KafkaTuple) tuple).getStream(), 
tuple, msgId);
-                        } else {
-                            collector.emit(tuple, msgId);
-                        }
+                        collector.emit(stream, tuple, msgId);
                          tupleListener.onEmit(tuple, msgId);
+                        LOG.trace("Emitted tuple [{}] for record [{}] with 
msgId [{}]", tuple, record, msgId);
                     }
-                    LOG.trace("Emitted tuple [{}] for record [{}] with msgId 
[{}]", tuple, record, msgId);
                     return true;
                 }
             } else {
@@ -407,31 +420,37 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     // ======== Ack =======
-
     @Override
     public void ack(Object messageId) {
+        if (!isAtLeastOnce()) {
+            // Only need to keep track of acked tuples if commits are done 
based on acks
+            return;
+        }
+
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
         if (!emitted.contains(msgId)) {
             if (msgId.isEmitted()) {
-                LOG.debug("Received ack for message [{}], associated with 
tuple emitted for a ConsumerRecord that " +
-                        "came from a topic-partition that this consumer group 
instance is no longer tracking " +
-                        "due to rebalance/partition reassignment. No action 
taken.", msgId);
+                LOG.debug("Received ack for message [{}], associated with 
tuple emitted for a ConsumerRecord that "
+                    + "came from a topic-partition that this consumer group 
instance is no longer tracking "
+                    + "due to rebalance/partition reassignment. No action 
taken.", msgId);
             } else {
                 LOG.debug("Received direct ack for message [{}], associated 
with null tuple", msgId);
             }
         } else {
-            if (!consumerAutoCommitMode) {  // Only need to keep track of 
acked tuples if commits are not done automatically
-                
offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
-            }
+            offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
             emitted.remove(msgId);
         }
         tupleListener.onAck(msgId);
     }
 
     // ======== Fail =======
-
     @Override
     public void fail(Object messageId) {
+        if (!isAtLeastOnce()) {
+            // Only need to keep track of failed tuples if commits are done 
based on acks
+            return;
+        }
+
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
         if (!emitted.contains(msgId)) {
             LOG.debug("Received fail for tuple this spout is no longer 
tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
@@ -451,7 +470,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     // ======== Activate / Deactivate / Close / Declare Outputs =======
-
     @Override
     public void activate() {
         try {
@@ -487,7 +505,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private void shutdown() {
         try {
-            if (!consumerAutoCommitMode) {
+            if (isAtLeastOnce()) {
                 commitOffsetsForAckedTuples();
             }
         } finally {
@@ -506,10 +524,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     @Override
     public String toString() {
-        return "KafkaSpout{" +
-                "offsetManagers =" + offsetManagers +
-                ", emitted=" + emitted +
-                "}";
+        return "KafkaSpout{"
+            + "offsetManagers =" + offsetManagers
+            + ", emitted=" + emitted
+            + "}";
     }
 
     @Override
@@ -532,6 +550,3 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         return kafkaSpoutConfig.getSubscription().getTopicsString();
     }
 }
-
-
-

http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 5cad0f4..b7de812 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -33,14 +33,18 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.storm.Config;
 import 
org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
 import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * KafkaSpoutConfig defines the required configuration to connect a consumer 
to a consumer group, as well as the subscribing topics
  */
 public class KafkaSpoutConfig<K, V> implements Serializable {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpoutConfig.class);
     private static final long serialVersionUID = 141902646130682494L;
     // 200ms
     public static final long DEFAULT_POLL_TIMEOUT_MS = 200;
@@ -56,6 +60,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =
         new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.milliSeconds(2),
             DEFAULT_MAX_RETRIES, TimeInterval.seconds(10));
+    public static final ProcessingGuarantee DEFAULT_PROCESSING_GUARANTEE = 
ProcessingGuarantee.AT_LEAST_ONCE;
 
     public static final KafkaTupleListener DEFAULT_TUPLE_LISTENER = new 
EmptyKafkaTupleListener();
 
@@ -77,6 +82,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     private final Class<? extends Deserializer<K>> keyDesClazz;
     private final SerializableDeserializer<V> valueDes;
     private final Class<? extends Deserializer<V>> valueDesClazz;
+    private final ProcessingGuarantee processingGuarantee;
+    private final boolean forceEnableTupleTracking;
 
     /**
      * Creates a new KafkaSpoutConfig using a Builder.
@@ -84,7 +91,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
      * @param builder The Builder to construct the KafkaSpoutConfig from
      */
     public KafkaSpoutConfig(Builder<K, V> builder) {
-        this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
+        setAutoCommitMode(builder);
+        this.kafkaProps = builder.kafkaProps;
         this.subscription = builder.subscription;
         this.translator = builder.translator;
         this.pollTimeoutMs = builder.pollTimeoutMs;
@@ -99,6 +107,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         this.keyDesClazz = builder.keyDesClazz;
         this.valueDes = builder.valueDes;
         this.valueDesClazz = builder.valueDesClazz;
+        this.processingGuarantee = builder.processingGuarantee;
+        this.forceEnableTupleTracking = builder.forceEnableTupleTracking;
     }
 
     /**
@@ -124,6 +134,25 @@ public class KafkaSpoutConfig<K, V> implements 
Serializable {
         UNCOMMITTED_LATEST
     }
 
+    /**
+     * The processing guarantee supported by the spout. This parameter affects 
when the spout commits offsets to Kafka, marking them as
+     * processed.
+     *
+     * <ul>
+     * <li>AT_LEAST_ONCE means that the Kafka spout considers an offset ready 
for commit once a tuple corresponding to that offset has been
+     * acked on the spout. This corresponds to an at-least-once guarantee.</li>
+     * <li>ANY_TIMES means that the Kafka spout may commit polled offsets at 
any time. This means the message may be processed any number of
+     * times (including 0), and causes the spout to enable auto offset 
committing on the underlying consumer.</li>
+     * <li>AT_MOST_ONCE means that the spout will commit polled offsets before 
emitting them to the topology. This guarantees at-most-once
+     * processing.</li>
+     * </ul>
+     */
+    public static enum ProcessingGuarantee {
+        AT_LEAST_ONCE,
+        ANY_TIMES,
+        AT_MOST_ONCE
+    }
+
     public static class Builder<K, V> {
 
         private final Map<String, Object> kafkaProps;
@@ -141,6 +170,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
         private KafkaTupleListener tupleListener = DEFAULT_TUPLE_LISTENER;
         private long partitionRefreshPeriodMs = 
DEFAULT_PARTITION_REFRESH_PERIOD_MS;
         private boolean emitNullTuples = false;
+        private ProcessingGuarantee processingGuarantee = 
DEFAULT_PROCESSING_GUARANTEE;
+        private boolean forceEnableTupleTracking = false;
 
         public Builder(String bootstrapServers, String... topics) {
             this(bootstrapServers, (SerializableDeserializer) null, 
(SerializableDeserializer) null, new ManualPartitionSubscription(new 
RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
@@ -482,6 +513,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
         /**
          * Specifies the period, in milliseconds, the offset commit task is 
periodically called. Default is 15s.
          *
+         * <p>This setting only has an effect if the configured {@link 
ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}.
+         *
          * @param offsetCommitPeriodMs time in ms
          */
         public Builder<K, V> setOffsetCommitPeriodMs(long 
offsetCommitPeriodMs) {
@@ -495,6 +528,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
          * below the threshold. The default is {@link 
#DEFAULT_MAX_UNCOMMITTED_OFFSETS}. Note that this limit can in some cases be 
exceeded,
          * but no partition will exceed this limit by more than maxPollRecords 
- 1.
          *
+         * <p>This setting only has an effect if the configured {@link 
ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}.
+         *
          * @param maxUncommittedOffsets max number of records that can be be 
pending commit
          */
         public Builder<K, V> setMaxUncommittedOffsets(int 
maxUncommittedOffsets) {
@@ -516,6 +551,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
         /**
          * Sets the retry service for the spout to use.
          *
+         * <p>This setting only has an effect if the configured {@link 
ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}.
+         *
          * @param retryService the new retry service
          * @return the builder (this).
          */
@@ -593,6 +630,32 @@ public class KafkaSpoutConfig<K, V> implements 
Serializable {
             return this;
         }
 
+        /**
+         * Specifies which processing guarantee the spout should offer. Refer 
to the documentation for {@link ProcessingGuarantee}.
+         *
+         * @param processingGuarantee The processing guarantee the spout 
should offer.
+         */
+        public Builder<K, V> setProcessingGuarantee(ProcessingGuarantee 
processingGuarantee) {
+            this.processingGuarantee = processingGuarantee;
+            return this;
+        }
+
+        /**
+         * Specifies whether the spout should require Storm to track emitted 
tuples when using a {@link ProcessingGuarantee} other than
+         * {@link ProcessingGuarantee#AT_LEAST_ONCE}. The spout will always 
track emitted tuples when offering at-least-once guarantees
+         * regardless of this setting. This setting is false by default.
+         *
+         * <p>Enabling tracking can be useful even in cases where reliability 
is not a concern, because it allows
+         * {@link Config#TOPOLOGY_MAX_SPOUT_PENDING} to have an effect, and 
enables some spout metrics (e.g. complete-latency) that would
+         * otherwise be disabled.
+         *
+         * @param forceEnableTupleTracking true if Storm should track emitted 
tuples, false otherwise
+         */
+        public Builder<K, V> setForceEnableTupleTracking(boolean 
forceEnableTupleTracking) {
+            this.forceEnableTupleTracking = forceEnableTupleTracking;
+            return this;
+        }
+
         public KafkaSpoutConfig<K, V> build() {
             return new KafkaSpoutConfig<>(this);
         }
@@ -637,12 +700,24 @@ public class KafkaSpoutConfig<K, V> implements 
Serializable {
         return builder;
     }
 
-    private static Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, 
Object> kafkaProps) {
-        // set defaults for properties not specified
-        if (!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) 
{
-            kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+    private static void setAutoCommitMode(Builder<?, ?> builder) {
+        if 
(builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+            LOG.warn("Do not set " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 
+ " manually."
+                + " Instead use 
KafkaSpoutConfig.Builder.setProcessingGuarantee."
+                + " This will be treated as an error in the next major 
release."
+                + " For now the spout will be configured to behave like it 
would have in pre-1.2.0 releases.");
+            boolean enableAutoCommit = 
(boolean)builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+            if(enableAutoCommit) {
+                builder.processingGuarantee = ProcessingGuarantee.ANY_TIMES;
+            } else {
+                builder.processingGuarantee = 
ProcessingGuarantee.AT_LEAST_ONCE;
+            }
+        }
+        if (builder.processingGuarantee == ProcessingGuarantee.ANY_TIMES) {
+            builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true");
+        } else {
+            builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
         }
-        return kafkaProps;
     }
 
     /**
@@ -700,10 +775,22 @@ public class KafkaSpoutConfig<K, V> implements 
Serializable {
         return offsetCommitPeriodMs;
     }
 
+    /**
+     * @deprecated Use {@link #getProcessingGuarantee()} instead.
+     */
+    @Deprecated
     public boolean isConsumerAutoCommitMode() {
         return kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == 
null // default is false
             || Boolean.valueOf((String) 
kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
     }
+    
+    public ProcessingGuarantee getProcessingGuarantee() {
+        return processingGuarantee;
+    }
+
+    public boolean getForceEnableTupleTracking() {
+        return forceEnableTupleTracking;
+    }
 
     public String getConsumerGroupId() {
         return (String) kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG);

http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
index 17ba378..afc9b82 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
@@ -15,7 +15,6 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.*;
@@ -41,6 +40,8 @@ import org.mockito.Captor;
 import org.mockito.InOrder;
 import org.mockito.MockitoAnnotations;
 
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+
 public class KafkaSpoutCommitTest {
 
     private final long offsetCommitPeriodMs = 2_000;
@@ -57,7 +58,7 @@ public class KafkaSpoutCommitTest {
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
-        spoutConfig = getKafkaSpoutConfigBuilder(-1)
+        spoutConfig = createKafkaSpoutConfigBuilder(-1)
             .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
             .build();
         consumerMock = mock(KafkaConsumer.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
index e8e93b0..3b1ce2d 100755
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
@@ -15,8 +15,6 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.mock;
@@ -40,18 +38,18 @@ import org.apache.storm.task.TopologyContext;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
-import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.never;
 
-import java.util.HashSet;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Time.SimulatedTime;
 import org.junit.Before;
 import org.mockito.InOrder;
 
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+
 public class KafkaSpoutEmitTest {
 
     private final long offsetCommitPeriodMs = 2_000;
@@ -64,7 +62,7 @@ public class KafkaSpoutEmitTest {
 
     @Before
     public void setUp() {
-        spoutConfig = getKafkaSpoutConfigBuilder(-1)
+        spoutConfig = createKafkaSpoutConfigBuilder(-1)
             .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
             .build();
         consumerMock = mock(KafkaConsumer.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
new file mode 100644
index 0000000..1f23cc5
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+
+public class KafkaSpoutMessagingGuaranteeTest {
+
+    private final TopologyContext contextMock = mock(TopologyContext.class);
+    private final SpoutOutputCollector collectorMock = 
mock(SpoutOutputCollector.class);
+    private final Map<String, Object> conf = new HashMap<>();
+    private final TopicPartition partition = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+    private KafkaConsumer<String, String> consumerMock;
+
+    @Before
+    public void setUp() {
+        consumerMock = mock(KafkaConsumer.class);
+    }
+
+    @Test
+    public void testAtMostOnceModeCommitsBeforeEmit() throws Exception {
+        //At-most-once mode must commit tuples before they are emitted to the 
topology to ensure that a spout crash won't cause replays.
+        KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(-1)
+            
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .build();
+        KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
+
+        when(consumerMock.poll(anyLong())).thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
+            SpoutWithMockedConsumerSetupHelper.<String, 
String>createRecords(partition, 0, 1))));
+
+        spout.nextTuple();
+
+        //The spout should have emitted the tuple, and must have committed it 
before emit
+        InOrder inOrder = inOrder(consumerMock, collectorMock);
+        inOrder.verify(consumerMock).poll(anyLong());
+        inOrder.verify(consumerMock).commitSync();
+        
inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM),
 anyList());
+    }
+
+    private void 
doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String> 
spoutConfig) {
+        KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
+
+        when(consumerMock.poll(anyLong()))
+            .thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
+                SpoutWithMockedConsumerSetupHelper.<String, 
String>createRecords(partition, 0, spoutConfig.getMaxUncommittedOffsets()))))
+            .thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
+                SpoutWithMockedConsumerSetupHelper.<String, 
String>createRecords(partition, spoutConfig.getMaxUncommittedOffsets() - 1, 
spoutConfig.getMaxUncommittedOffsets()))));
+
+        for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() * 2; i++) {
+            spout.nextTuple();
+        }
+
+        verify(consumerMock, times(2)).poll(anyLong());
+        verify(collectorMock, times(spoutConfig.getMaxUncommittedOffsets() * 
2)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
+    }
+
+    @Test
+    public void testAtMostOnceModeDisregardsMaxUncommittedOffsets() throws 
Exception {
+        //The maxUncommittedOffsets limit should not be enforced, since it is 
only meaningful in at-least-once mode
+        KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(-1)
+            
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .build();
+        doTestModeDisregardsMaxUncommittedOffsets(spoutConfig);
+    }
+
+    @Test
+    public void testAnyTimesModeDisregardsMaxUncommittedOffsets() throws 
Exception {
+        //The maxUncommittedOffsets limit should not be enforced, since it is 
only meaningful in at-least-once mode
+        KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(-1)
+            
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.ANY_TIMES)
+            .build();
+        doTestModeDisregardsMaxUncommittedOffsets(spoutConfig);
+    }
+
+    private void doTestModeCannotReplayTuples(KafkaSpoutConfig<String, String> 
spoutConfig) {
+        KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
+
+        when(consumerMock.poll(anyLong())).thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
+            SpoutWithMockedConsumerSetupHelper.<String, 
String>createRecords(partition, 0, 1))));
+
+        spout.nextTuple();
+
+        ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        
verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), 
anyList(), msgIdCaptor.capture());
+        assertThat("Should have captured a message id", 
msgIdCaptor.getValue(), not(nullValue()));
+
+        spout.fail(msgIdCaptor.getValue());
+
+        reset(consumerMock);
+
+        when(consumerMock.poll(anyLong())).thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
+            SpoutWithMockedConsumerSetupHelper.<String, 
String>createRecords(partition, 1, 1))));
+
+        spout.nextTuple();
+
+        //The consumer should not be seeking to retry the failed tuple, it 
should just be continuing from the current position
+        verify(consumerMock, never()).seek(eq(partition), anyLong());
+    }
+
+    @Test
+    public void testAtMostOnceModeCannotReplayTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not replay tuples in 
at-most-once mode
+        KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(-1)
+            
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .setForceEnableTupleTracking(true)
+            .build();
+        doTestModeCannotReplayTuples(spoutConfig);
+    }
+
+    @Test
+    public void testAnyTimesModeCannotReplayTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not replay tuples in 
any-times mode
+        KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(-1)
+            
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.ANY_TIMES)
+            .setForceEnableTupleTracking(true)
+            .build();
+        doTestModeCannotReplayTuples(spoutConfig);
+    }
+
+    private void doTestModeDoesNotCommitAckedTuples(KafkaSpoutConfig<String, 
String> spoutConfig) {
+        try (SimulatedTime time = new SimulatedTime()) {
+            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
+
+            when(consumerMock.poll(anyLong())).thenReturn(new 
ConsumerRecords<>(Collections.singletonMap(partition,
+                SpoutWithMockedConsumerSetupHelper.<String, 
String>createRecords(partition, 0, 1))));
+
+            spout.nextTuple();
+
+            ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            
verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), 
anyList(), msgIdCaptor.capture());
+            assertThat("Should have captured a message id", 
msgIdCaptor.getValue(), not(nullValue()));
+
+            spout.ack(msgIdCaptor.getValue());
+            
+            Time.advanceTime(spoutConfig.getOffsetsCommitPeriodMs());
+            
+            spout.nextTuple();
+            
+            verify(consumerMock, never()).commitSync(any(Map.class));
+        }
+    }
+
+    @Test
+    public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not commit acked 
tuples in at-most-once mode because they were committed before being emitted
+        KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(-1)
+            
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .setForceEnableTupleTracking(true)
+            .build();
+        doTestModeDoesNotCommitAckedTuples(spoutConfig);
+    }
+    
+    @Test
+    public void testAnyTimesModeDoesNotCommitAckedTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not commit acked 
tuples in any-times mode because committing is managed by the consumer
+        KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(-1)
+            
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.ANY_TIMES)
+            .setForceEnableTupleTracking(true)
+            .build();
+        doTestModeDoesNotCommitAckedTuples(spoutConfig);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 8996190..ab57052 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -15,7 +15,6 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.Matchers.hasKey;
 import static org.junit.Assert.assertThat;
@@ -54,6 +53,9 @@ import org.mockito.Captor;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+
 public class KafkaSpoutRebalanceTest {
 
     @Captor
@@ -131,7 +133,7 @@ public class KafkaSpoutRebalanceTest {
             doNothing()
                 .when(subscriptionMock)
                 .subscribe(any(KafkaConsumer.class), 
rebalanceListenerCapture.capture(), any(TopologyContext.class));
-            KafkaSpout<String, String> spout = new 
KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1)
+            KafkaSpout<String, String> spout = new 
KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
                 .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
                 .build(), consumerFactoryMock);
             String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
@@ -168,7 +170,7 @@ public class KafkaSpoutRebalanceTest {
                 .when(subscriptionMock)
                 .subscribe(any(KafkaConsumer.class), 
rebalanceListenerCapture.capture(), any(TopologyContext.class));
         KafkaSpoutRetryService retryServiceMock = 
mock(KafkaSpoutRetryService.class);
-        KafkaSpout<String, String> spout = new 
KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1)
+        KafkaSpout<String, String> spout = new 
KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
             .setOffsetCommitPeriodMs(10)
             .setRetry(retryServiceMock)
             .build(), consumerFactoryMock);

http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
index 79f7398..39fa42c 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
@@ -15,7 +15,6 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.*;
@@ -45,6 +44,8 @@ import org.mockito.Captor;
 import org.mockito.InOrder;
 import org.mockito.MockitoAnnotations;
 
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+
 public class KafkaSpoutRetryLimitTest {
     
     private final long offsetCommitPeriodMs = 2_000;
@@ -65,7 +66,7 @@ public class KafkaSpoutRetryLimitTest {
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
-        spoutConfig = getKafkaSpoutConfigBuilder(-1)
+        spoutConfig = createKafkaSpoutConfigBuilder(-1)
             .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
             .setRetry(ZERO_RETRIES_RETRY_SERVICE)
             .build();

http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
index ccb2a6c..80a8e1d 100755
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
@@ -15,14 +15,12 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.hamcrest.CoreMatchers.everyItem;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.isIn;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
@@ -30,15 +28,12 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.storm.kafka.KafkaUnitRule;
 import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.spout.SpoutOutputCollector;
@@ -50,6 +45,8 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.MockitoAnnotations;
 
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+
 public class MaxUncommittedOffsetTest {
 
     @Rule
@@ -63,7 +60,7 @@ public class MaxUncommittedOffsetTest {
     private final int maxUncommittedOffsets = 10;
     private final int maxPollRecords = 5;
     private final int initialRetryDelaySecs = 60;
-    private final KafkaSpoutConfig<String, String> spoutConfig = 
getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
+    private final KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
         .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
         .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
         .setMaxUncommittedOffsets(maxUncommittedOffsets)
@@ -84,30 +81,15 @@ public class MaxUncommittedOffsetTest {
         this.spout = new KafkaSpout<>(spoutConfig);
     }
 
-    private void populateTopicData(String topicName, int msgCount) throws 
Exception {
-        kafkaUnitRule.getKafkaUnit().createTopic(topicName);
-
-        for (int i = 0; i < msgCount; i++) {
-            ProducerRecord<String, String> producerRecord = new 
ProducerRecord<>(
-                topicName, Integer.toString(i),
-                Integer.toString(i));
-
-            kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord);
-        }
-    }
-
-    private void initializeSpout(int msgCount) throws Exception {
-        populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
-        when(topologyContext.getThisTaskIndex()).thenReturn(0);
-        
when(topologyContext.getComponentTasks(anyString())).thenReturn(Collections.singletonList(0));
-        spout.open(conf, topologyContext, collector);
-        spout.activate();
+    private void prepareSpout(int msgCount) throws Exception {
+        
SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), 
SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
+        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collector);
     }
 
     private ArgumentCaptor<KafkaSpoutMessageId> 
emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(int messageCount) 
throws Exception {
         assertThat("The message count is less than maxUncommittedOffsets. This 
test is not meaningful with this configuration.", messageCount, 
greaterThanOrEqualTo(maxUncommittedOffsets));
         //The spout must respect maxUncommittedOffsets when 
requesting/emitting tuples
-        initializeSpout(messageCount);
+        prepareSpout(messageCount);
 
         //Try to emit all messages. Ensure only maxUncommittedOffsets are 
emitted
         ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index 7759b3c..cbbb391 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -17,10 +17,8 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.storm.kafka.KafkaUnitRule;
 import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.spout.SpoutOutputCollector;
@@ -31,12 +29,9 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Matchers.anyObject;
@@ -48,9 +43,7 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -65,6 +58,8 @@ import org.junit.Before;
 import org.mockito.Captor;
 import org.mockito.MockitoAnnotations;
 
+import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+
 public class SingleTopicKafkaSpoutTest {
 
     @Rule
@@ -77,15 +72,15 @@ public class SingleTopicKafkaSpoutTest {
     private final Map<String, Object> conf = new HashMap<>();
     private final SpoutOutputCollector collector = 
mock(SpoutOutputCollector.class);
     private final long commitOffsetPeriodMs = 2_000;
+    private final int maxRetries = 3;
     private KafkaConsumer<String, String> consumerSpy;
     private KafkaConsumerFactory<String, String> consumerFactory;
     private KafkaSpout<String, String> spout;
-    private int maxRetries = 3;
 
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
-        KafkaSpoutConfig<String, String> spoutConfig = 
getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
+        KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
             .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
             .setRetry(new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
                 maxRetries, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
@@ -101,25 +96,11 @@ public class SingleTopicKafkaSpoutTest {
         this.spout = new KafkaSpout<>(spoutConfig, consumerFactory);
     }
 
-    void populateTopicData(String topicName, int msgCount) throws 
InterruptedException, ExecutionException, TimeoutException {
-        kafkaUnitRule.getKafkaUnit().createTopic(topicName);
-
-        for (int i = 0; i < msgCount; i++) {
-            ProducerRecord<String, String> producerRecord = new 
ProducerRecord<>(
-                topicName, Integer.toString(i),
-                Integer.toString(i));
-            kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord);
-        }
+    private void prepareSpout(int messageCount) throws Exception {
+        
SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), 
SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
+        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collector);
     }
-
-    private void initializeSpout(int msgCount) throws InterruptedException, 
ExecutionException, TimeoutException {
-        populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
-        when(topologyContext.getThisTaskIndex()).thenReturn(0);
-        
when(topologyContext.getComponentTasks(anyString())).thenReturn(Collections.singletonList(0));
-        spout.open(conf, topologyContext, collector);
-        spout.activate();
-    }
-
+    
     /*
      * Asserts that commitSync has been called once, 
      * that there are only commits on one topic,
@@ -137,7 +118,7 @@ public class SingleTopicKafkaSpoutTest {
     public void shouldContinueWithSlowDoubleAcks() throws Exception {
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
             int messageCount = 20;
-            initializeSpout(messageCount);
+            prepareSpout(messageCount);
 
             //play 1st tuple
             ArgumentCaptor<Object> messageIdToDoubleAck = 
ArgumentCaptor.forClass(Object.class);
@@ -178,7 +159,7 @@ public class SingleTopicKafkaSpoutTest {
     public void shouldEmitAllMessages() throws Exception {
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
             int messageCount = 10;
-            initializeSpout(messageCount);
+            prepareSpout(messageCount);
 
             //Emit all messages and check that they are emitted. Ack the 
messages too
             for(int i = 0; i < messageCount; i++) {
@@ -206,7 +187,7 @@ public class SingleTopicKafkaSpoutTest {
     public void shouldReplayInOrderFailedMessages() throws Exception {
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
             int messageCount = 10;
-            initializeSpout(messageCount);
+            prepareSpout(messageCount);
 
             //play and ack 1 tuple
             ArgumentCaptor<Object> messageIdAcked = 
ArgumentCaptor.forClass(Object.class);
@@ -249,7 +230,7 @@ public class SingleTopicKafkaSpoutTest {
     public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception {
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
             int messageCount = 10;
-            initializeSpout(messageCount);
+            prepareSpout(messageCount);
 
             //play 1st tuple
             ArgumentCaptor<Object> messageIdToFail = 
ArgumentCaptor.forClass(Object.class);
@@ -296,7 +277,7 @@ public class SingleTopicKafkaSpoutTest {
         //The spout must reemit retriable tuples, even if they fail out of 
order.
         //The spout should be able to skip tuples it has already emitted when 
retrying messages, even if those tuples are also retries.
         int messageCount = 10;
-        initializeSpout(messageCount);
+        prepareSpout(messageCount);
 
         //play all tuples
         for (int i = 0; i < messageCount; i++) {
@@ -329,7 +310,7 @@ public class SingleTopicKafkaSpoutTest {
     public void shouldDropMessagesAfterMaxRetriesAreReached() throws Exception 
{
         //Check that if one message fails repeatedly, the retry cap limits how 
many times the message can be reemitted
         int messageCount = 1;
-        initializeSpout(messageCount);
+        prepareSpout(messageCount);
 
         //Emit and fail the same tuple until we've reached retry limit
         for (int i = 0; i <= maxRetries; i++) {

http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
new file mode 100644
index 0000000..81fe362
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.kafka.KafkaUnit;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+
+public class SingleTopicKafkaUnitSetupHelper {
+
+    /**
+     * Using the given KafkaUnit instance, put some messages in the specified 
topic.
+     *
+     * @param kafkaUnit The KafkaUnit instance to use
+     * @param topicName The topic to produce messages for
+     * @param msgCount The number of messages to produce
+     */
+    public static void populateTopicData(KafkaUnit kafkaUnit, String 
topicName, int msgCount) throws Exception {
+        kafkaUnit.createTopic(topicName);
+
+        for (int i = 0; i < msgCount; i++) {
+            ProducerRecord<String, String> producerRecord = new 
ProducerRecord<>(
+                topicName, Integer.toString(i),
+                Integer.toString(i));
+            kafkaUnit.sendMessage(producerRecord);
+        }
+    }
+
+    /**
+     * Open and activate a KafkaSpout that acts as a single-task/executor 
spout.
+     *
+     * @param <K> Kafka key type
+     * @param <V> Kafka value type
+     * @param spout The spout to prepare
+     * @param topoConf The topoConf
+     * @param topoContextMock The TopologyContext mock
+     * @param collectorMock The output collector mock
+     */
+    public static <K, V> void initializeSpout(KafkaSpout<K, V> spout, 
Map<String, Object> topoConf, TopologyContext topoContextMock,
+        SpoutOutputCollector collectorMock) throws Exception {
+        when(topoContextMock.getThisTaskIndex()).thenReturn(0);
+        
when(topoContextMock.getComponentTasks(anyString())).thenReturn(Collections.singletonList(0));
+        spout.open(topoConf, topoContextMock, collectorMock);
+        spout.activate();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
index 1ab4966..f4cb833 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@ -41,13 +41,6 @@ public class SingleTopicKafkaSpoutConfiguration {
     public static final String STREAM = "test_stream";
     public static final String TOPIC = "test";
 
-    /**
-     * Retry in a tight loop (keep unit tests fasts).
-     */
-    public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE =
-        new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
-            DEFAULT_MAX_RETRIES, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
-
     public static Config getConfig() {
         Config config = new Config();
         config.setDebug(true);
@@ -56,7 +49,7 @@ public class SingleTopicKafkaSpoutConfiguration {
 
     public static StormTopology getTopologyKafkaSpout(int port) {
         final TopologyBuilder tp = new TopologyBuilder();
-        tp.setSpout("kafka_spout", new 
KafkaSpout<>(getKafkaSpoutConfigBuilder(port).build()), 1);
+        tp.setSpout("kafka_spout", new 
KafkaSpout<>(SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(port).build()),
 1);
         tp.setBolt("kafka_bolt", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);
         return tp.createTopology();
     }
@@ -68,11 +61,11 @@ public class SingleTopicKafkaSpoutConfiguration {
         }
     };
 
-    public static KafkaSpoutConfig.Builder<String, String> 
getKafkaSpoutConfigBuilder(int port) {
+    public static KafkaSpoutConfig.Builder<String, String> 
createKafkaSpoutConfigBuilder(int port) {
         return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + 
port, TOPIC));
     }
 
-    public static KafkaSpoutConfig.Builder<String, String> 
getKafkaSpoutConfigBuilder(Subscription subscription, int port) {
+    public static KafkaSpoutConfig.Builder<String, String> 
createKafkaSpoutConfigBuilder(Subscription subscription, int port) {
         return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<String, 
String>("127.0.0.1:" + port, subscription));
     }
 
@@ -82,14 +75,18 @@ public class SingleTopicKafkaSpoutConfiguration {
                 new Fields("topic", "key", "value"), STREAM)
             .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
             .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5)
-            .setRetry(getRetryService())
+            .setRetry(getNoDelayRetryService())
             .setOffsetCommitPeriodMs(10_000)
             .setFirstPollOffsetStrategy(EARLIEST)
             .setMaxUncommittedOffsets(250)
             .setPollTimeoutMs(1000);
     }
 
-    protected static KafkaSpoutRetryService getRetryService() {
-        return UNIT_TEST_RETRY_SERVICE;
+    protected static KafkaSpoutRetryService getNoDelayRetryService() {
+        /**
+         * Retry in a tight loop (keep unit tests fasts).
+         */
+        return new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+            DEFAULT_MAX_RETRIES, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
     }
 }

Reply via email to