Repository: storm
Updated Branches:
  refs/heads/master 3fd66b5e8 -> db4695d2d


STORM-2340 fix AutoCommitMode issue in KafkaSpout

* Closes #1919
* fix: KafkaSpout is blocked in AutoCommitMode
* add comments for impacts of AutoCommitMode
* add doc about how to use KafkaSpout with at-most-once.
* remove at-most-once for better describe the changes; emit null msgId when 
AutoCommitMode;
* update sample code in storm-kafka-client to use inline setProp()


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/914a4768
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/914a4768
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/914a4768

Branch: refs/heads/master
Commit: 914a4768c9a8aa0320b49ff4bfb1ec338bf1d042
Parents: 3fd66b5
Author: mingmxu <[email protected]>
Authored: Fri Feb 3 12:03:37 2017 -0800
Committer: Jungtaek Lim <[email protected]>
Committed: Tue Feb 14 11:25:49 2017 +0900

----------------------------------------------------------------------
 docs/storm-kafka-client.md                      | 22 ++++++++++
 .../apache/storm/kafka/spout/KafkaSpout.java    | 43 +++++++++++++-------
 2 files changed, 50 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/914a4768/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
index ec5056f..79c4115 100644
--- a/docs/storm-kafka-client.md
+++ b/docs/storm-kafka-client.md
@@ -345,3 +345,25 @@ Currently the Kafka spout has has the following default 
values, which have shown
 * offset.commit.period.ms = 30000   (30s)
 * max.uncommitted.offsets = 10000000
 <br/>
+
+# Kafka AutoCommitMode 
+
+If reliability isn't important to you -- that is, you don't care about losing 
tuples in failure situations --, and want to remove the overhead of tuple 
tracking, then you can run a KafkaSpout with AutoCommitMode.
+
+To enable it, you need to:
+* set Config.TOPOLOGY_ACKERS to 0;
+* enable *AutoCommitMode* in Kafka consumer configuration; 
+
+Here's one example to set AutoCommitMode in KafkaSpout:
+```java
+KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
+               .builder(String bootstrapServers, String ... topics)
+               .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+               .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
+               .build();
+```
+
+*Note that it's not exactly At-Most-Once in Storm, as offset is committed 
periodically by Kafka consumer, some tuples could be replayed when KafkaSpout 
is crashed.*
+
+
+

http://git-wip-us.apache.org/repos/asf/storm/blob/914a4768/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 864235c..b96f3f9 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -78,10 +78,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private transient boolean initialized;                              // 
Flag indicating that the spout is still undergoing initialization process.
     // Initialization is only complete after the first call to  
KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
 
-    transient Map<TopicPartition, OffsetEntry> acked;           // Tuples that 
were successfully acked. These tuples will be committed periodically when the 
commit timer expires, after consumer rebalance, or on close/deactivate
-    private transient Set<KafkaSpoutMessageId> emitted;                 // 
Tuples that have been emitted but that are "on the wire", i.e. pending being 
acked or failed
+    transient Map<TopicPartition, OffsetEntry> acked;           // Tuples that 
were successfully acked. These tuples will be committed periodically when the 
commit timer expires, after consumer rebalance, or on close/deactivate. Not 
used if it's AutoCommitMode
+    private transient Set<KafkaSpoutMessageId> emitted;                 // 
Tuples that have been emitted but that are "on the wire", i.e. pending being 
acked or failed. Not used if it's AutoCommitMode
     private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;         // 
Records that have been polled and are queued to be emitted in the nextTuple() 
call. One record is emitted per nextTuple()
-    private transient long numUncommittedOffsets;                       // 
Number of offsets that have been polled and emitted but not yet been committed
+    private transient long numUncommittedOffsets;                       // 
Number of offsets that have been polled and emitted but not yet been committed. 
Not used if it's AutoCommitMode
     private transient TopologyContext context;
     private transient Timer refreshSubscriptionTimer;                   // 
Used to say when a subscription should be refreshed
 
@@ -107,7 +107,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         // Offset management
         firstPollOffsetStrategy = 
kafkaSpoutConfig.getFirstPollOffsetStrategy();
-        consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();
+        // with AutoCommitMode, offset will be periodically committed in the 
background by Kafka consumer
+        consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();  
 
         // Retries management
         retryService = kafkaSpoutConfig.getRetryService();
@@ -242,14 +243,15 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private boolean poll() {
         final int maxUncommittedOffsets = 
kafkaSpoutConfig.getMaxUncommittedOffsets();
-        final boolean poll = !waitingToEmit() && numUncommittedOffsets < 
maxUncommittedOffsets;
+        final boolean poll = !waitingToEmit() 
+                       && ( numUncommittedOffsets < maxUncommittedOffsets || 
consumerAutoCommitMode);
 
         if (!poll) {
             if (waitingToEmit()) {
                 LOG.debug("Not polling. Tuples waiting to be emitted. [{}] 
uncommitted offsets across all topic partitions", numUncommittedOffsets);
             }
 
-            if (numUncommittedOffsets >= maxUncommittedOffsets) {
+            if (numUncommittedOffsets >= maxUncommittedOffsets && 
!consumerAutoCommitMode) {
                 LOG.debug("Not polling. [{}] uncommitted offsets across all 
topic partitions has reached the threshold of [{}]", numUncommittedOffsets, 
maxUncommittedOffsets);
             }
         }
@@ -314,15 +316,26 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             boolean isScheduled = retryService.isScheduled(msgId);
             if (!isScheduled || retryService.isReady(msgId)) {   // not 
scheduled <=> never failed (i.e. never emitted) or ready to be retried
                 final List<Object> tuple = 
kafkaSpoutConfig.getTranslator().apply(record);
-                if (tuple instanceof KafkaTuple) {
-                    collector.emit(((KafkaTuple)tuple).getStream(), tuple, 
msgId);
-                } else {
-                    collector.emit(tuple, msgId);
-                }
-                emitted.add(msgId);
-                numUncommittedOffsets++;
-                if (isScheduled) { // Was scheduled for retry, now being 
re-emitted. Remove from schedule.
-                    retryService.remove(msgId);
+                
+                if(consumerAutoCommitMode){
+                    if (tuple instanceof KafkaTuple) {
+                        collector.emit(((KafkaTuple)tuple).getStream(), tuple);
+                    } else {
+                        collector.emit(tuple);
+                    }
+                }else{                
+                    if (tuple instanceof KafkaTuple) {
+                        collector.emit(((KafkaTuple)tuple).getStream(), tuple, 
msgId);
+                    } else {
+                        collector.emit(tuple, msgId);
+                    }
+                
+                    emitted.add(msgId);
+                    numUncommittedOffsets++;
+                
+                    if (isScheduled) { // Was scheduled for retry, now being 
re-emitted. Remove from schedule.
+                        retryService.remove(msgId);
+                    }
                 }
                 LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
                 return true;

Reply via email to