STORM-2361 bugfix for Kafka spout: after leader change, it stops committing offsets to ZK
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/48cda9a9 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/48cda9a9 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/48cda9a9 Branch: refs/heads/1.0.x-branch Commit: 48cda9a98d583cb15658c3a4c39c33ca6380f08f Parents: e84f88f Author: Ernestas Vaiciukevicius <[email protected]> Authored: Thu Mar 9 16:02:40 2017 +0200 Committer: Jungtaek Lim <[email protected]> Committed: Fri Mar 10 01:53:19 2017 +0900 ---------------------------------------------------------------------- .../jvm/org/apache/storm/kafka/KafkaSpout.java | 21 ++++++++++++++++++++ 1 file changed, 21 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/48cda9a9/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java index 7bacd0b..679638b 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java @@ -156,12 +156,27 @@ public class KafkaSpout extends BaseRichSpout { } } + private PartitionManager getManagerForPartition(int partition) { + for (PartitionManager partitionManager: _coordinator.getMyManagedPartitions()) { + if (partitionManager.getPartition().partition == partition) { + return partitionManager; + } + } + return null; + } + @Override public void ack(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.ack(id.offset); + } else { + // managers for partitions changed - try to find new manager responsible for that partition + PartitionManager newManager = getManagerForPartition(id.partition.partition); + if (newManager != null) { + newManager.ack(id.offset); + } } } @@ -171,6 +186,12 @@ public class KafkaSpout extends BaseRichSpout { PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.fail(id.offset); + } else { + // managers for partitions changed - try to find new manager responsible for that partition + PartitionManager newManager = getManagerForPartition(id.partition.partition); + if (newManager != null) { + newManager.fail(id.offset); + } } }
