Repository: storm
Updated Branches:
  refs/heads/1.x-branch 502ed7e7a -> 3414a8cad


STORM-2784: storm-kafka-client KafkaTupleListener method 
onPartitionsReassigned() should be called after initialization is complete


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/667ffee1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/667ffee1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/667ffee1

Branch: refs/heads/1.x-branch
Commit: 667ffee1580d77469c5eaadd7b5651972a316fc5
Parents: 502ed7e
Author: Hugo Louro <[email protected]>
Authored: Sun Oct 22 21:35:29 2017 -0700
Committer: Jungtaek Lim <[email protected]>
Committed: Fri Oct 27 15:12:09 2017 +0900

----------------------------------------------------------------------
 .../src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/667ffee1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 68bce11..5022862 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -165,8 +165,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, 
consumer={}, topic-partitions={}]",
                 context.getThisTaskId(), 
kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
 
-            tupleListener.onPartitionsReassigned(partitions);
             initialize(partitions);
+            tupleListener.onPartitionsReassigned(partitions);
         }
 
         private void initialize(Collection<TopicPartition> partitions) {

Reply via email to