STORM-2361 bugfix for Kafka spout: after leader change, it stops committing 
offsets to ZK


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/48cda9a9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/48cda9a9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/48cda9a9

Branch: refs/heads/1.0.x-branch
Commit: 48cda9a98d583cb15658c3a4c39c33ca6380f08f
Parents: e84f88f
Author: Ernestas Vaiciukevicius <[email protected]>
Authored: Thu Mar 9 16:02:40 2017 +0200
Committer: Jungtaek Lim <[email protected]>
Committed: Fri Mar 10 01:53:19 2017 +0900

----------------------------------------------------------------------
 .../jvm/org/apache/storm/kafka/KafkaSpout.java  | 21 ++++++++++++++++++++
 1 file changed, 21 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/48cda9a9/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
index 7bacd0b..679638b 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
@@ -156,12 +156,27 @@ public class KafkaSpout extends BaseRichSpout {
         }
     }
 
+    private PartitionManager getManagerForPartition(int partition) {
+        for (PartitionManager partitionManager: 
_coordinator.getMyManagedPartitions()) {
+            if (partitionManager.getPartition().partition == partition) {
+                return partitionManager;
+            }
+        }
+        return null;
+    }
+
     @Override
     public void ack(Object msgId) {
         KafkaMessageId id = (KafkaMessageId) msgId;
         PartitionManager m = _coordinator.getManager(id.partition);
         if (m != null) {
             m.ack(id.offset);
+        } else {
+            // managers for partitions changed - try to find new manager 
responsible for that partition
+            PartitionManager newManager = 
getManagerForPartition(id.partition.partition);
+            if (newManager != null) {
+                newManager.ack(id.offset);
+            }
         }
     }
 
@@ -171,6 +186,12 @@ public class KafkaSpout extends BaseRichSpout {
         PartitionManager m = _coordinator.getManager(id.partition);
         if (m != null) {
             m.fail(id.offset);
+        } else {
+            // managers for partitions changed - try to find new manager 
responsible for that partition
+            PartitionManager newManager = 
getManagerForPartition(id.partition.partition);
+            if (newManager != null) {
+                newManager.fail(id.offset);
+            }
         }
     }
 

Reply via email to