STORM-511 fix updating internal state of PartitionManager
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d34037d2 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d34037d2 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d34037d2 Branch: refs/heads/security Commit: d34037d2d2b1a1e635d708e9367c5a29d2505eff Parents: 95abbc5 Author: Viktor Taranenko <[email protected]> Authored: Mon Oct 20 22:20:34 2014 +0100 Committer: Viktor Taranenko <[email protected]> Committed: Mon Oct 20 22:20:34 2014 +0100 ---------------------------------------------------------------------- external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/d34037d2/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java index c228c19..fa5f7e5 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java @@ -161,7 +161,7 @@ public class PartitionManager { try { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (UpdateOffsetException e) { - offset = e.startOffset; + _emittedToOffset = e.startOffset; } long end = System.nanoTime(); long millis = (end - start) / 1000000;
