STORM-2787: storm-kafka-client KafkaSpout method onPartitionsRevoked(...) 
should set initialized flag independently of processing guarantees


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7d4ac076
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7d4ac076
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7d4ac076

Branch: refs/heads/1.x-branch
Commit: 7d4ac07684a7405d6539a3bd0cb7da985736bac7
Parents: ac16fe1
Author: Hugo Louro <[email protected]>
Authored: Tue Oct 24 23:52:54 2017 -0700
Committer: Hugo Louro <[email protected]>
Committed: Sun Oct 29 16:15:52 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/storm/kafka/spout/KafkaSpout.java   | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7d4ac076/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 0a4d788..7decd0f 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -151,11 +151,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
{
+            initialized = false;
+            previousAssignment = partitions;
+
             LOG.info("Partitions revoked. [consumer-group={}, consumer={}, 
topic-partitions={}]",
                     kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
-            previousAssignment = partitions;
-            if (isAtLeastOnceProcessing() && initialized) {
-                initialized = false;
+
+            if (isAtLeastOnceProcessing()) {
                 commitOffsetsForAckedTuples();
             }
         }

Reply via email to