Repository: storm Updated Branches: refs/heads/1.x-branch 805ea40a6 -> 2a99f61f7
STORM-2361 bugfix for Kafka spout: after leader change, it stops committing offsets to ZK Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/25fde124 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/25fde124 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/25fde124 Branch: refs/heads/1.x-branch Commit: 25fde124d5cac336d116a809632f0b8dc0207229 Parents: 9a19b69 Author: Ernestas Vaiciukevicius <[email protected]> Authored: Thu Mar 9 16:02:40 2017 +0200 Committer: Ernestas Vaiciukevicius <[email protected]> Committed: Thu Mar 9 16:02:40 2017 +0200 ---------------------------------------------------------------------- .../jvm/org/apache/storm/kafka/KafkaSpout.java | 21 ++++++++++++++++++++ 1 file changed, 21 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/25fde124/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java index d2bd313..8ae27d3 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java @@ -157,12 +157,27 @@ public class KafkaSpout extends BaseRichSpout { } } + private PartitionManager getManagerForPartition(int partition) { + for (PartitionManager partitionManager: _coordinator.getMyManagedPartitions()) { + if (partitionManager.getPartition().partition == partition) { + return partitionManager; + } + } + return null; + } + @Override public void ack(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.ack(id.offset); + } else { + // managers for partitions changed - try to find new manager responsible for that partition + PartitionManager newManager = getManagerForPartition(id.partition.partition); + if (newManager != null) { + newManager.ack(id.offset); + } } } @@ -172,6 +187,12 @@ public class KafkaSpout extends BaseRichSpout { PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.fail(id.offset); + } else { + // managers for partitions changed - try to find new manager responsible for that partition + PartitionManager newManager = getManagerForPartition(id.partition.partition); + if (newManager != null) { + newManager.fail(id.offset); + } } }
