Repository: kafka Updated Branches: refs/heads/trunk c6c4f5070 -> 38a1b6055
HOTFIX: fix off-by-one stream offset commit guozhangwang Author: Yasuhiro Matsuda <yasuh...@confluent.io> Reviewers: Guozhang Wang Closes #372 from ymatsuda/commit_offset Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/38a1b605 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/38a1b605 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/38a1b605 Branch: refs/heads/trunk Commit: 38a1b605533632f61f4c23b69933eb496098311b Parents: c6c4f50 Author: Yasuhiro Matsuda <yasuh...@confluent.io> Authored: Tue Oct 27 13:49:19 2015 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Tue Oct 27 13:49:19 2015 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/streams/processor/internals/StreamTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/38a1b605/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index d83d721..f01e00b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -271,7 +271,7 @@ public class StreamTask implements Punctuator { if (commitOffsetNeeded) { Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size()); for (Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) { - consumedOffsetsAndMetadata.put(entry.getKey(), new OffsetAndMetadata(entry.getValue())); + consumedOffsetsAndMetadata.put(entry.getKey(), new OffsetAndMetadata(entry.getValue() + 1L)); } consumer.commitSync(consumedOffsetsAndMetadata); commitOffsetNeeded = false;