Repository: storm Updated Branches: refs/heads/master d6b69a7ca -> 9daa6a1e3
STORM-2361 bugfix for Kafka spout: after leader change, it stops committing offsets to ZK Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/83a342f6 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/83a342f6 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/83a342f6 Branch: refs/heads/master Commit: 83a342f6f812d41052fa56c82b349a53316792bf Parents: d6b69a7 Author: Ernestas Vaiciukevicius <[email protected]> Authored: Thu Mar 9 16:02:40 2017 +0200 Committer: Jungtaek Lim <[email protected]> Committed: Fri Mar 10 01:26:20 2017 +0900 ---------------------------------------------------------------------- .../jvm/org/apache/storm/kafka/KafkaSpout.java | 21 ++++++++++++++++++++ 1 file changed, 21 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/83a342f6/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java index 4608963..01cc9b7 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java @@ -157,12 +157,27 @@ public class KafkaSpout extends BaseRichSpout { } } + private PartitionManager getManagerForPartition(int partition) { + for (PartitionManager partitionManager: _coordinator.getMyManagedPartitions()) { + if (partitionManager.getPartition().partition == partition) { + return partitionManager; + } + } + return null; + } + @Override public void ack(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.ack(id.offset); + } else { + // managers for partitions changed - try to find new manager responsible for that partition + PartitionManager newManager = getManagerForPartition(id.partition.partition); + if (newManager != null) { + newManager.ack(id.offset); + } } } @@ -172,6 +187,12 @@ public class KafkaSpout extends BaseRichSpout { PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.fail(id.offset); + } else { + // managers for partitions changed - try to find new manager responsible for that partition + PartitionManager newManager = getManagerForPartition(id.partition.partition); + if (newManager != null) { + newManager.fail(id.offset); + } } }
