Repository: storm Updated Branches: refs/heads/1.x-branch 0fa8209e4 -> 31133fdce
STORM-2784: storm-kafka-client KafkaTupleListener method onPartitionsReassigned() should be called after initialization is complete Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/472a0ad7 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/472a0ad7 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/472a0ad7 Branch: refs/heads/1.x-branch Commit: 472a0ad79701158bce8a27bf794d5687a537048d Parents: 9dc5414 Author: Hugo Louro <[email protected]> Authored: Sun Oct 22 21:35:29 2017 -0700 Committer: Hugo Louro <[email protected]> Committed: Wed Nov 29 19:59:48 2017 -0800 ---------------------------------------------------------------------- .../src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/472a0ad7/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 4cda53c..b126770 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -164,8 +164,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout { LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]", context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); - tupleListener.onPartitionsReassigned(partitions); initialize(partitions); + tupleListener.onPartitionsReassigned(partitions); } private void initialize(Collection<TopicPartition> partitions) {
