STORM-2787: storm-kafka-client KafkaSpout method onPartitionsRevoked(...) should set initialized flag independently of processing guarantees
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7d4ac076 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7d4ac076 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7d4ac076 Branch: refs/heads/1.x-branch Commit: 7d4ac07684a7405d6539a3bd0cb7da985736bac7 Parents: ac16fe1 Author: Hugo Louro <[email protected]> Authored: Tue Oct 24 23:52:54 2017 -0700 Committer: Hugo Louro <[email protected]> Committed: Sun Oct 29 16:15:52 2017 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/storm/kafka/spout/KafkaSpout.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/7d4ac076/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 0a4d788..7decd0f 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -151,11 +151,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + initialized = false; + previousAssignment = partitions; + LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); - previousAssignment = partitions; - if (isAtLeastOnceProcessing() && initialized) { - initialized = false; + + if (isAtLeastOnceProcessing()) { commitOffsetsForAckedTuples(); } }
