Repository: storm
Updated Branches:
  refs/heads/master a1ab78b2b -> 16a4cb8c4


STORM-2787: storm-kafka-client KafkaSpout method onPartitionsRevoked(...) 
should set initialized flag independently of processing guarantees


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0b547d37
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0b547d37
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0b547d37

Branch: refs/heads/master
Commit: 0b547d373abeaac721c409034e3ae4b7ff485dd6
Parents: 8757611
Author: Hugo Louro <[email protected]>
Authored: Tue Oct 24 23:52:54 2017 -0700
Committer: Hugo Louro <[email protected]>
Committed: Fri Oct 27 15:58:44 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/storm/kafka/spout/KafkaSpout.java   | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0b547d37/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 170c025..4e3090c 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -151,11 +151,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
{
+            initialized = false;
+            previousAssignment = partitions;
+
             LOG.info("Partitions revoked. [consumer-group={}, consumer={}, 
topic-partitions={}]",
                     kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
-            previousAssignment = partitions;
-            if (isAtLeastOnceProcessing() && initialized) {
-                initialized = false;
+
+            if (isAtLeastOnceProcessing()) {
                 commitOffsetsForAckedTuples();
             }
         }

Reply via email to