Repository: storm
Updated Branches:
  refs/heads/1.x-branch 805ea40a6 -> 2a99f61f7


STORM-2361 bugfix for Kafka spout: after leader change, it stops committing 
offsets to ZK


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/25fde124
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/25fde124
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/25fde124

Branch: refs/heads/1.x-branch
Commit: 25fde124d5cac336d116a809632f0b8dc0207229
Parents: 9a19b69
Author: Ernestas Vaiciukevicius <[email protected]>
Authored: Thu Mar 9 16:02:40 2017 +0200
Committer: Ernestas Vaiciukevicius <[email protected]>
Committed: Thu Mar 9 16:02:40 2017 +0200

----------------------------------------------------------------------
 .../jvm/org/apache/storm/kafka/KafkaSpout.java  | 21 ++++++++++++++++++++
 1 file changed, 21 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/25fde124/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
index d2bd313..8ae27d3 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
@@ -157,12 +157,27 @@ public class KafkaSpout extends BaseRichSpout {
         }
     }
 
+    private PartitionManager getManagerForPartition(int partition) {
+        for (PartitionManager partitionManager: 
_coordinator.getMyManagedPartitions()) {
+            if (partitionManager.getPartition().partition == partition) {
+                return partitionManager;
+            }
+        }
+        return null;
+    }
+
     @Override
     public void ack(Object msgId) {
         KafkaMessageId id = (KafkaMessageId) msgId;
         PartitionManager m = _coordinator.getManager(id.partition);
         if (m != null) {
             m.ack(id.offset);
+        } else {
+            // managers for partitions changed - try to find new manager 
responsible for that partition
+            PartitionManager newManager = 
getManagerForPartition(id.partition.partition);
+            if (newManager != null) {
+                newManager.ack(id.offset);
+            }
         }
     }
 
@@ -172,6 +187,12 @@ public class KafkaSpout extends BaseRichSpout {
         PartitionManager m = _coordinator.getManager(id.partition);
         if (m != null) {
             m.fail(id.offset);
+        } else {
+            // managers for partitions changed - try to find new manager 
responsible for that partition
+            PartitionManager newManager = 
getManagerForPartition(id.partition.partition);
+            if (newManager != null) {
+                newManager.fail(id.offset);
+            }
         }
     }
 

Reply via email to