Repository: storm Updated Branches: refs/heads/1.x-branch 502ed7e7a -> 3414a8cad
STORM-2784: storm-kafka-client KafkaTupleListener method onPartitionsReassigned() should be called after initialization is complete Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/667ffee1 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/667ffee1 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/667ffee1 Branch: refs/heads/1.x-branch Commit: 667ffee1580d77469c5eaadd7b5651972a316fc5 Parents: 502ed7e Author: Hugo Louro <[email protected]> Authored: Sun Oct 22 21:35:29 2017 -0700 Committer: Jungtaek Lim <[email protected]> Committed: Fri Oct 27 15:12:09 2017 +0900 ---------------------------------------------------------------------- .../src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/667ffee1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 68bce11..5022862 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -165,8 +165,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout { LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]", context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); - tupleListener.onPartitionsReassigned(partitions); initialize(partitions); + tupleListener.onPartitionsReassigned(partitions); } private void initialize(Collection<TopicPartition> partitions) {
