Repository: kafka
Updated Branches:
  refs/heads/trunk c6c4f5070 -> 38a1b6055


HOTFIX: fix off-by-one stream offset commit

guozhangwang

Author: Yasuhiro Matsuda <yasuh...@confluent.io>

Reviewers: Guozhang Wang

Closes #372 from ymatsuda/commit_offset


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/38a1b605
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/38a1b605
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/38a1b605

Branch: refs/heads/trunk
Commit: 38a1b605533632f61f4c23b69933eb496098311b
Parents: c6c4f50
Author: Yasuhiro Matsuda <yasuh...@confluent.io>
Authored: Tue Oct 27 13:49:19 2015 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Tue Oct 27 13:49:19 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/processor/internals/StreamTask.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/38a1b605/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index d83d721..f01e00b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -271,7 +271,7 @@ public class StreamTask implements Punctuator {
         if (commitOffsetNeeded) {
             Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata 
= new HashMap<>(consumedOffsets.size());
             for (Map.Entry<TopicPartition, Long> entry : 
consumedOffsets.entrySet()) {
-                consumedOffsetsAndMetadata.put(entry.getKey(), new 
OffsetAndMetadata(entry.getValue()));
+                consumedOffsetsAndMetadata.put(entry.getKey(), new 
OffsetAndMetadata(entry.getValue() + 1L));
             }
             consumer.commitSync(consumedOffsetsAndMetadata);
             commitOffsetNeeded = false;

Reply via email to