Repository: storm
Updated Branches:
  refs/heads/master 5077228df -> 29f845b4a


STORM-2781: Refactor storm-kafka-client KafkaSpout Processing Guarantees

 - Define processing guarantees as AT_LEAST_ONCE, AT_MOST_ONCE, NONE
 - Refactor method name from setForceEnableTupleTracking to 
setTupleTrackingEnforced
 - Throw IllegalStateException instead of IllegalArgumentException if spout 
attempts to emit an already committed message
 - Update documentation to reflect these changes


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/87576118
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/87576118
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/87576118

Branch: refs/heads/master
Commit: 875761189ca387f0575aeaea33a769d70d36ae47
Parents: 7b940ae
Author: Hugo Louro <[email protected]>
Authored: Sun Oct 22 17:44:54 2017 -0700
Committer: Hugo Louro <[email protected]>
Committed: Fri Oct 27 15:42:18 2017 -0700

----------------------------------------------------------------------
 docs/storm-kafka-client.md                      | 37 +++++++++---
 .../apache/storm/kafka/spout/KafkaSpout.java    | 63 +++++++++++---------
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 50 +++++++++-------
 .../spout/KafkaSpoutMessagingGuaranteeTest.java | 14 ++---
 4 files changed, 100 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/87576118/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
index a24c632..0354a0d 100644
--- a/docs/storm-kafka-client.md
+++ b/docs/storm-kafka-client.md
@@ -266,7 +266,7 @@ use Kafka-clients 0.10.0.0, you would use the following 
dependency in your `pom.
 You can also override the kafka clients version while building from maven, 
with parameter `storm.kafka.client.version`
 e.g. `mvn clean install -Dstorm.kafka.client.version=0.10.0.0`
 
-When selecting a kafka client version, you should ensure - 
+When selecting a kafka client version, you should ensure -
  1. kafka api is compatible. storm-kafka-client module only supports **0.10 or 
newer** kafka client API. For older versions,
  you can use storm-kafka module 
(https://github.com/apache/storm/tree/master/external/storm-kafka).  
  2. The kafka client selected by you should be wire compatible with the 
broker. e.g. 0.9.x client will not work with 
@@ -298,25 +298,46 @@ Currently the Kafka spout has has the following default 
values, which have been
 * max.uncommitted.offsets = 10000000
 <br/>
 
-# Messaging reliability modes
+# Processing Guarantees
 
-In some cases you may not need or want the spout to guarantee at-least-once 
processing of messages. The spout also supports at-most-once and any-times 
modes. At-most-once guarantees that any tuple emitted to the topology will 
never be reemitted. Any-times makes no guarantees, but may reduce the overhead 
of committing offsets to Kafka in cases where you truly don't care how many 
times a message is processed.
+The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when an 
offset is committed to Kafka. This is
+conceptually equivalent to marking the tuple with the `ConsumerRecord` for 
that offset as being successfully processed
+because the tuple won't get re-emitted in case of failure or time out.
+
+For the AT_LEAST_ONCE and AT_MOST_ONCE processing guarantees the spout 
controls when the commit happens.
+When the guarantee is NONE Kafka controls when the commit happens.
+
+* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding 
tuple has been processed (at-least-once)
+     and acked. If a tuple fails or times out it will be re-emitted. A tuple 
can be processed more than once if for instance
+     the ack gets lost.
+
+* AT_MOST_ONCE - Offsets will be committed to Kafka right after being polled 
but before being emitted to the downstream
+     components of the topology. Offsets are processed at most once because 
tuples that fail or timeout won't be retried.
+
+* NONE - the polled offsets are committed to Kafka periodically as controlled 
by the Kafka properties
+     "enable.auto.commit" and "auto.commit.interval.ms". Because the spout 
does not control when the commit happens
+     it cannot give any message processing guarantees, i.e. a message may be 
processed 0, 1 or more times.
+     This option requires "enable.auto.commit=true". If 
"enable.auto.commit=false" an exception will be thrown.
+
+To set the processing guarantee use the 
`KafkaSpoutConfig.Builder.setProcessingGuarantee` method as follows:
 
-To set the processing guarantee, use the 
KafkaSpoutConfig.Builder.setProcessingGuarantee method, e.g.
 ```java
 KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
   .builder(String bootstrapServers, String ... topics)
   .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
 ```
 
-The spout will disable tuple tracking for emitted tuples by default when you 
use at-most-once or any-times. In some cases you may want to enable tracking 
anyway, because tuple tracking is necessary for some features of Storm, e.g. 
showing complete latency in Storm UI, or enabling backpressure through the 
`Config.TOPOLOGY_MAX_SPOUT_PENDING` parameter.
+# Tuple Tracking
+
+By default the spout only tracks emitted tuples when the processing guarantee 
is AT_LEAST_ONCE. It may be necessary to track
+emitted tuples with other processing guarantees to benefit from Storm features 
such as showing complete latency in the UI,
+or enabling backpressure with Config.TOPOLOGY_MAX_SPOUT_PENDING.
 
-If you need to enable tracking, use the 
KafkaSpoutConfig.Builder.setForceEnableTupleTracking method, e.g.
 ```java
 KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
   .builder(String bootstrapServers, String ... topics)
   .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
-  .setForceEnableTupleTracking(true)
+  .setTupleTrackingEnforced(true)
 ```
 
-Note that this setting has no effect in at-least-once mode, where tuple 
tracking is always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/87576118/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 9253a2d..170c025 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -78,17 +78,17 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private transient KafkaSpoutRetryService retryService;
     // Handles tuple events (emit, ack etc.)
     private transient KafkaTupleListener tupleListener;
-    // timer == null for modes other than at-least-once
+    // timer == null if processing guarantee is none or at-most-once
     private transient Timer commitTimer;
     // Flag indicating that the spout is still undergoing initialization 
process.
     private transient boolean initialized;
     // Initialization is only complete after the first call to  
KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
 
     // Tuples that were successfully acked/emitted. These tuples will be 
committed periodically when the commit timer expires,
-    //or after a consumer rebalance, or during close/deactivate. Always empty 
if not using at-least-once mode.
+    // or after a consumer rebalance, or during close/deactivate. Always empty 
if processing guarantee is none or at-most-once.
     private transient Map<TopicPartition, OffsetManager> offsetManagers;
     // Tuples that have been emitted but that are "on the wire", i.e. pending 
being acked or failed.
-    // Always empty if not using at-least-once mode.
+    // Always empty if processing guarantee is none or at-most-once
     private transient Set<KafkaSpoutMessageId> emitted;
     // Records that have been polled and are queued to be emitted in the 
nextTuple() call. One record is emitted per nextTuple()
     private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;
@@ -125,8 +125,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         tupleListener = kafkaSpoutConfig.getTupleListener();
 
-        if (isAtLeastOnce()) {
-            // Only used if the spout commits offsets for acked tuples
+        if (isAtLeastOnceProcessing()) {
+            // Only used if the spout should commit an offset to Kafka only 
after the corresponding tuple has been acked.
             commitTimer = new Timer(TIMER_DELAY_MS, 
kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
         }
         refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, 
kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
@@ -140,7 +140,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         LOG.info("Kafka Spout opened with the following configuration: {}", 
kafkaSpoutConfig);
     }
 
-    private boolean isAtLeastOnce() {
+    private boolean isAtLeastOnceProcessing() {
         return kafkaSpoutConfig.getProcessingGuarantee() == 
KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE;
     }
 
@@ -154,7 +154,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             LOG.info("Partitions revoked. [consumer-group={}, consumer={}, 
topic-partitions={}]",
                     kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
             previousAssignment = partitions;
-            if (isAtLeastOnce() && initialized) {
+            if (isAtLeastOnceProcessing() && initialized) {
                 initialized = false;
                 commitOffsetsForAckedTuples();
             }
@@ -170,7 +170,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
 
         private void initialize(Collection<TopicPartition> partitions) {
-            if (isAtLeastOnce()) {
+            if (isAtLeastOnceProcessing()) {
                 // remove from acked all partitions that are no longer 
assigned to this spout
                 offsetManagers.keySet().retainAll(partitions);
                 retryService.retainAll(partitions);
@@ -188,7 +188,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
                 final long fetchOffset = doSeek(tp, committedOffset);
                 // If this partition was previously assigned to this spout, 
leave the acked offsets as they were to resume where it left off
-                if (isAtLeastOnce() && !offsetManagers.containsKey(tp)) {
+                if (isAtLeastOnceProcessing() && 
!offsetManagers.containsKey(tp)) {
                     offsetManagers.put(tp, new OffsetManager(tp, fetchOffset));
                 }
             }
@@ -255,18 +255,17 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     private boolean commit() {
-        return isAtLeastOnce() && commitTimer.isExpiredResetOnTrue();    // 
timer != null for non auto commit mode
+        return isAtLeastOnceProcessing() && 
commitTimer.isExpiredResetOnTrue();    // timer != null for non auto commit mode
     }
 
     private boolean poll() {
         final int maxUncommittedOffsets = 
kafkaSpoutConfig.getMaxUncommittedOffsets();
         final int readyMessageCount = retryService.readyMessageCount();
         final boolean poll = !waitingToEmit()
-            //Check that the number of uncommitted, nonretriable tuples is 
less than the maxUncommittedOffsets limit
-            //Accounting for retriable tuples this way still guarantees that 
the limit is followed on a per partition basis,
-            //and prevents locking up the spout when there are too many 
retriable tuples
-            && (numUncommittedOffsets - readyMessageCount < 
maxUncommittedOffsets
-            || !isAtLeastOnce());
+            // Check that the number of uncommitted, non-retriable tuples is 
less than the maxUncommittedOffsets limit.
+            // Accounting for retriable tuples in this way still guarantees 
that the limit is followed on a per partition basis,
+            // and prevents locking up the spout when there are too many 
retriable tuples
+            && (numUncommittedOffsets - readyMessageCount < 
maxUncommittedOffsets || !isAtLeastOnceProcessing());
 
         if (!poll) {
             if (waitingToEmit()) {
@@ -274,7 +273,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                     + " [{}] uncommitted offsets across all topic partitions", 
numUncommittedOffsets);
             }
 
-            if (numUncommittedOffsets >= maxUncommittedOffsets && 
isAtLeastOnce()) {
+            if (numUncommittedOffsets >= maxUncommittedOffsets && 
isAtLeastOnceProcessing()) {
                 LOG.debug("Not polling. [{}] uncommitted offsets across all 
topic partitions has reached the threshold of [{}]",
                     numUncommittedOffsets, maxUncommittedOffsets);
             }
@@ -336,22 +335,25 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
         final TopicPartition tp = new TopicPartition(record.topic(), 
record.partition());
         final KafkaSpoutMessageId msgId = retryService.getMessageId(tp, 
record.offset());
+
         if (offsetManagers.containsKey(tp) && 
offsetManagers.get(tp).contains(msgId)) {   // has been acked
             LOG.trace("Tuple for record [{}] has already been acked. 
Skipping", record);
-        } else if (emitted.contains(msgId)) {   // has been emitted and it's 
pending ack or fail
+        } else if (emitted.contains(msgId)) {   // has been emitted and it is 
pending ack or fail
             LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
         } else {
-            Validate.isTrue(kafkaConsumer.committed(tp) == null || 
kafkaConsumer.committed(tp).offset() < kafkaConsumer.position(tp),
-                "The spout is about to emit a message that has already been 
committed."
-                + " This should never occur, and indicates a bug in the 
spout");
+            if (kafkaConsumer.committed(tp) != null && 
(kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) {
+                throw new IllegalStateException("Attempting to emit a message 
that has already been committed.");
+            }
+
             final List<Object> tuple = 
kafkaSpoutConfig.getTranslator().apply(record);
             if (isEmitTuple(tuple)) {
                 final boolean isScheduled = retryService.isScheduled(msgId);
                 // not scheduled <=> never failed (i.e. never emitted), or 
scheduled and ready to be retried
                 if (!isScheduled || retryService.isReady(msgId)) {
-                    String stream = tuple instanceof KafkaTuple ? 
((KafkaTuple) tuple).getStream() : Utils.DEFAULT_STREAM_ID;
-                    if (!isAtLeastOnce()) {
-                        if (kafkaSpoutConfig.getForceEnableTupleTracking()) {
+                    final String stream = tuple instanceof KafkaTuple ? 
((KafkaTuple) tuple).getStream() : Utils.DEFAULT_STREAM_ID;
+
+                    if (!isAtLeastOnceProcessing()) {
+                        if (kafkaSpoutConfig.isTupleTrackingEnforced()) {
                             collector.emit(stream, tuple, msgId);
                             LOG.trace("Emitted tuple [{}] for record [{}] with 
msgId [{}]", tuple, record, msgId);
                         } else {
@@ -438,11 +440,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     // ======== Ack =======
     @Override
     public void ack(Object messageId) {
-        if (!isAtLeastOnce()) {
-            // Only need to keep track of acked tuples if commits are done 
based on acks
+        if (!isAtLeastOnceProcessing()) {
             return;
         }
 
+        // Only need to keep track of acked tuples if commits to Kafka are 
controlled by
+        // tuple acks, which happens only for at-least-once processing 
semantics
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
         if (!emitted.contains(msgId)) {
             if (msgId.isEmitted()) {
@@ -464,11 +467,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     // ======== Fail =======
     @Override
     public void fail(Object messageId) {
-        if (!isAtLeastOnce()) {
-            // Only need to keep track of failed tuples if commits are done 
based on acks
+        if (!isAtLeastOnceProcessing()) {
             return;
         }
-
+        // Only need to keep track of failed tuples if commits to Kafka are 
controlled by
+        // tuple acks, which happens only for at-least-once processing 
semantics
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
         if (!emitted.contains(msgId)) {
             LOG.debug("Received fail for tuple this spout is no longer 
tracking."
@@ -477,7 +480,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
         Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + 
msgId + " is queued for retry while being failed."
             + " This should never occur barring errors in the RetryService 
implementation or the spout code.");
+
         msgId.incrementNumFails();
+
         if (!retryService.schedule(msgId)) {
             LOG.debug("Reached maximum number of retries. Message [{}] being 
marked as acked.", msgId);
             // this tuple should be removed from emitted only inside the ack() 
method. This is to ensure
@@ -526,7 +531,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private void shutdown() {
         try {
-            if (isAtLeastOnce()) {
+            if (isAtLeastOnceProcessing()) {
                 commitOffsetsForAckedTuples();
             }
         } finally {

http://git-wip-us.apache.org/repos/asf/storm/blob/87576118/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index f211697..6a693fe 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -55,10 +55,13 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
     public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;
     // 2s
     public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000;
+
     public static final FirstPollOffsetStrategy 
DEFAULT_FIRST_POLL_OFFSET_STRATEGY = 
FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+
     public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =
         new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.milliSeconds(2),
             DEFAULT_MAX_RETRIES, TimeInterval.seconds(10));
+
     public static final ProcessingGuarantee DEFAULT_PROCESSING_GUARANTEE = 
ProcessingGuarantee.AT_LEAST_ONCE;
 
     public static final KafkaTupleListener DEFAULT_TUPLE_LISTENER = new 
EmptyKafkaTupleListener();
@@ -78,7 +81,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     private final long partitionRefreshPeriodMs;
     private final boolean emitNullTuples;
     private final ProcessingGuarantee processingGuarantee;
-    private final boolean forceEnableTupleTracking;
+    private final boolean tupleTrackingEnforced;
 
     /**
      * Creates a new KafkaSpoutConfig using a Builder.
@@ -99,7 +102,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
         this.emitNullTuples = builder.emitNullTuples;
         this.processingGuarantee = builder.processingGuarantee;
-        this.forceEnableTupleTracking = builder.forceEnableTupleTracking;
+        this.tupleTrackingEnforced = builder.tupleTrackingEnforced;
     }
 
     /**
@@ -126,22 +129,29 @@ public class KafkaSpoutConfig<K, V> implements 
Serializable {
     }
 
     /**
-     * The processing guarantee supported by the spout. This parameter affects 
when the spout commits offsets to Kafka, marking them as
-     * processed.
+     * This enum controls when the tuple with the {@link ConsumerRecord} for 
an offset is marked as processed,
+     * i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and 
AT_MOST_ONCE the spout controls when
+     * the commit happens. When the guarantee is NONE Kafka controls when the 
commit happens.
      *
      * <ul>
-     * <li>AT_LEAST_ONCE means that the Kafka spout considers an offset ready 
for commit once a tuple corresponding to that offset has been
-     * acked on the spout. This corresponds to an at-least-once guarantee.</li>
-     * <li>ANY_TIMES means that the Kafka spout may commit polled offsets at 
any time. This means the message may be processed any number of
-     * times (including 0), and causes the spout to enable auto offset 
committing on the underlying consumer.</li>
-     * <li>AT_MOST_ONCE means that the spout will commit polled offsets before 
emitting them to the topology. This guarantees at-most-once
-     * processing.</li>
+     * <li>AT_LEAST_ONCE - an offset is ready to commit only after the 
corresponding tuple has been processed (at-least-once)
+     * and acked. If a tuple fails or times-out it will be re-emitted. A tuple 
can be processed more than once if for instance
+     * the ack gets lost.</li>
+     * <br/>
+     * <li>AT_MOST_ONCE - every offset will be committed to Kafka right after 
being polled but before being emitted
+     * to the downstream components of the topology. It guarantees that the 
offset is processed at-most-once because it
+     * won't retry tuples that fail or timeout after the commit to Kafka has 
been done.</li>
+     * <br/>
+     * <li>NONE - the polled offsets are committed to Kafka periodically as 
controlled by the Kafka properties
+     * "enable.auto.commit" and "auto.commit.interval.ms". Because the spout 
does not control when the commit happens
+     * it cannot give any message processing guarantees, i.e. a message may be 
processed 0, 1 or more times.
+     * This option requires "enable.auto.commit=true". If 
"enable.auto.commit=false" an exception will be thrown.</li>
      * </ul>
      */
-    public static enum ProcessingGuarantee {
+    public enum ProcessingGuarantee {
         AT_LEAST_ONCE,
-        ANY_TIMES,
-        AT_MOST_ONCE
+        AT_MOST_ONCE,
+        NONE,
     }
 
     public static class Builder<K, V> {
@@ -158,7 +168,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
         private long partitionRefreshPeriodMs = 
DEFAULT_PARTITION_REFRESH_PERIOD_MS;
         private boolean emitNullTuples = false;
         private ProcessingGuarantee processingGuarantee = 
DEFAULT_PROCESSING_GUARANTEE;
-        private boolean forceEnableTupleTracking = false;
+        private boolean tupleTrackingEnforced = false;
 
         public Builder(String bootstrapServers, String... topics) {
             this(bootstrapServers, new ManualPartitionSubscription(new 
RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
@@ -369,10 +379,10 @@ public class KafkaSpoutConfig<K, V> implements 
Serializable {
          * {@link Config#TOPOLOGY_MAX_SPOUT_PENDING} to have an effect, and 
enables some spout metrics (e.g. complete-latency) that would
          * otherwise be disabled.
          *
-         * @param forceEnableTupleTracking true if Storm should track emitted 
tuples, false otherwise
+         * @param tupleTrackingEnforced true if Storm should track emitted 
tuples, false otherwise
          */
-        public Builder<K, V> setForceEnableTupleTracking(boolean 
forceEnableTupleTracking) {
-            this.forceEnableTupleTracking = forceEnableTupleTracking;
+        public Builder<K, V> setTupleTrackingEnforced(boolean 
tupleTrackingEnforced) {
+            this.tupleTrackingEnforced = tupleTrackingEnforced;
             return this;
         }
 
@@ -425,7 +435,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
             throw new IllegalArgumentException("Do not set " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
                 + " Instead use 
KafkaSpoutConfig.Builder.setProcessingGuarantee");
         }
-        if (builder.processingGuarantee == ProcessingGuarantee.ANY_TIMES) {
+        if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
             builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true");
         } else {
             builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
@@ -461,8 +471,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
         return processingGuarantee;
     }
 
-    public boolean getForceEnableTupleTracking() {
-        return forceEnableTupleTracking;
+    public boolean isTupleTrackingEnforced() {
+        return tupleTrackingEnforced;
     }
 
     public String getConsumerGroupId() {

http://git-wip-us.apache.org/repos/asf/storm/blob/87576118/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
index cc24261..07ee2dc 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -111,7 +111,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
     public void testAnyTimesModeDisregardsMaxUncommittedOffsets() throws 
Exception {
         //The maxUncommittedOffsets limit should not be enforced, since it is 
only meaningful in at-least-once mode
         KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(-1)
-            
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.ANY_TIMES)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE)
             .build();
         doTestModeDisregardsMaxUncommittedOffsets(spoutConfig);
     }
@@ -146,7 +146,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
         //When tuple tracking is enabled, the spout must not replay tuples in 
at-most-once mode
         KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(-1)
             
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
-            .setForceEnableTupleTracking(true)
+            .setTupleTrackingEnforced(true)
             .build();
         doTestModeCannotReplayTuples(spoutConfig);
     }
@@ -155,8 +155,8 @@ public class KafkaSpoutMessagingGuaranteeTest {
     public void testAnyTimesModeCannotReplayTuples() throws Exception {
         //When tuple tracking is enabled, the spout must not replay tuples in 
any-times mode
         KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(-1)
-            
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.ANY_TIMES)
-            .setForceEnableTupleTracking(true)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE)
+            .setTupleTrackingEnforced(true)
             .build();
         doTestModeCannotReplayTuples(spoutConfig);
     }
@@ -189,7 +189,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
         //When tuple tracking is enabled, the spout must not commit acked 
tuples in at-most-once mode because they were committed before being emitted
         KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(-1)
             
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
-            .setForceEnableTupleTracking(true)
+            .setTupleTrackingEnforced(true)
             .build();
         doTestModeDoesNotCommitAckedTuples(spoutConfig);
     }
@@ -198,8 +198,8 @@ public class KafkaSpoutMessagingGuaranteeTest {
     public void testAnyTimesModeDoesNotCommitAckedTuples() throws Exception {
         //When tuple tracking is enabled, the spout must not commit acked 
tuples in any-times mode because committing is managed by the consumer
         KafkaSpoutConfig<String, String> spoutConfig = 
createKafkaSpoutConfigBuilder(-1)
-            
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.ANY_TIMES)
-            .setForceEnableTupleTracking(true)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE)
+            .setTupleTrackingEnforced(true)
             .build();
         doTestModeDoesNotCommitAckedTuples(spoutConfig);
     }

Reply via email to