Repository: storm
Updated Branches:
  refs/heads/1.x-branch 0fa8209e4 -> 31133fdce


STORM-2784: storm-kafka-client KafkaTupleListener method 
onPartitionsReassigned() should be called after initialization is complete


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/472a0ad7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/472a0ad7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/472a0ad7

Branch: refs/heads/1.x-branch
Commit: 472a0ad79701158bce8a27bf794d5687a537048d
Parents: 9dc5414
Author: Hugo Louro <[email protected]>
Authored: Sun Oct 22 21:35:29 2017 -0700
Committer: Hugo Louro <[email protected]>
Committed: Wed Nov 29 19:59:48 2017 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/472a0ad7/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 4cda53c..b126770 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -164,8 +164,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, 
consumer={}, topic-partitions={}]",
                 context.getThisTaskId(), 
kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
 
-            tupleListener.onPartitionsReassigned(partitions);
             initialize(partitions);
+            tupleListener.onPartitionsReassigned(partitions);
         }
 
         private void initialize(Collection<TopicPartition> partitions) {

Reply via email to