Repository: storm Updated Branches: refs/heads/master a1ab78b2b -> 16a4cb8c4
STORM-2787: storm-kafka-client KafkaSpout method onPartitionsRevoked(...) should set initialized flag independently of processing guarantees Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0b547d37 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0b547d37 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0b547d37 Branch: refs/heads/master Commit: 0b547d373abeaac721c409034e3ae4b7ff485dd6 Parents: 8757611 Author: Hugo Louro <[email protected]> Authored: Tue Oct 24 23:52:54 2017 -0700 Committer: Hugo Louro <[email protected]> Committed: Fri Oct 27 15:58:44 2017 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/storm/kafka/spout/KafkaSpout.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/0b547d37/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 170c025..4e3090c 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -151,11 +151,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + initialized = false; + previousAssignment = partitions; + LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); - previousAssignment = partitions; - if (isAtLeastOnceProcessing() && initialized) { - initialized = false; + + if (isAtLeastOnceProcessing()) { commitOffsetsForAckedTuples(); } }
