Repository: storm Updated Branches: refs/heads/master 414f2b4a8 -> 15845336f
[STORM-2607] Offset consumer + 1 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/14e98e73 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/14e98e73 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/14e98e73 Branch: refs/heads/master Commit: 14e98e73f42700b0c35519f89d8ef3bc41e9d9db Parents: 352cd46 Author: Rodolfo Ribeiro <[email protected]> Authored: Thu Jun 29 12:06:20 2017 -0300 Committer: Stig Rohde Døssing <[email protected]> Committed: Wed Oct 11 19:05:37 2017 +0200 ---------------------------------------------------------------------- .../kafka/spout/internal/OffsetManager.java | 19 +++++++++++-------- .../kafka/spout/SingleTopicKafkaSpoutTest.java | 2 +- 2 files changed, 12 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/14e98e73/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index 87b52ef..8d6fbce 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -76,15 +76,17 @@ public class OffsetManager { public OffsetAndMetadata findNextCommitOffset() { long currOffset; long nextCommitOffset = committedOffset; + long lastOffMessageOffset = committedOffset; KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap currOffset = currAckedMsg.offset(); - if (currOffset == nextCommitOffset + 1) { // found the next offset to commit + if (currOffset == lastOffMessageOffset + 1) { // found the next offset to commit nextCommitMsg = currAckedMsg; - nextCommitOffset = currOffset; - } else if (currOffset > nextCommitOffset + 1) { - if (emittedOffsets.contains(nextCommitOffset + 1)) { + lastOffMessageOffset = currOffset; + nextCommitOffset = lastOffMessageOffset + 1; + } else if (currOffset > lastOffMessageOffset + 1) { + if (emittedOffsets.contains(lastOffMessageOffset + 1)) { LOG.debug("topic-partition [{}] has non-continuous offset [{}]." + " It will be processed in a subsequent batch.", tp, currOffset); break; @@ -99,10 +101,11 @@ public class OffsetManager { LOG.debug("Processed non contiguous offset." + " (committedOffset+1) is no longer part of the topic." + " Committed: [{}], Processed: [{}]", committedOffset, currOffset); - final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset + 1); + final Long nextEmittedOffset = emittedOffsets.ceiling(lastOffMessageOffset); if (nextEmittedOffset != null && currOffset == nextEmittedOffset) { nextCommitMsg = currAckedMsg; - nextCommitOffset = currOffset; + lastOffMessageOffset = currOffset; + nextCommitOffset = lastOffMessageOffset + 1; } else { LOG.debug("topic-partition [{}] has non-continuous offset [{}]." + " Next Offset to commit should be [{}]", tp, currOffset, nextEmittedOffset); @@ -120,7 +123,7 @@ public class OffsetManager { if (nextCommitMsg != null) { nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread())); LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", - tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset()); + tp, earliestUncommittedOffset, nextCommitOffsetAndMetadata.offset() - 1); } else { LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp); } @@ -139,7 +142,7 @@ public class OffsetManager { */ public long commit(OffsetAndMetadata committedOffset) { final long preCommitCommittedOffsets = this.committedOffset; - long numCommittedOffsets = 0; + final long numCommittedOffsets = committedOffset.offset() - this.committedOffset - 1; this.committedOffset = committedOffset.offset(); for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); iterator.hasNext();) { if (iterator.next().offset() <= committedOffset.offset()) { http://git-wip-us.apache.org/repos/asf/storm/blob/14e98e73/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index 10ab581..b4137dc 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -104,7 +104,7 @@ public class SingleTopicKafkaSpoutTest { Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue(); assertThat("Expected commits for only one topic partition", commits.entrySet().size(), is(1)); OffsetAndMetadata offset = commits.entrySet().iterator().next().getValue(); - assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount - 1)); + assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount)); } @Test
