Repository: flink
Updated Branches:
  refs/heads/master 5a573c6bc -> efb40cfc5


[hotfix] [kafka] Committed offset value set in KafkaTopicPartitionState should 
also be incremented by 1.

The broken behaviour was introduced in the last hotfix commit eece0dd0.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efb40cfc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efb40cfc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efb40cfc

Branch: refs/heads/master
Commit: efb40cfc5f0c738cfd281b182b696495b5985a96
Parents: 5a573c6
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Wed Oct 5 18:17:34 2016 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Wed Oct 5 18:17:34 2016 +0800

----------------------------------------------------------------------
 .../connectors/kafka/internal/Kafka09Fetcher.java       | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/efb40cfc/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index ad7efa2..aaec9dc 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -289,11 +289,13 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new 
HashMap<>(partitions.length);
 
                for (KafkaTopicPartitionState<TopicPartition> partition : 
partitions) {
-                       // committed offsets through the KafkaConsumer need to 
be 1 more than the last processed offset.
-                       // This does not affect Flink's checkpoints/saved state.
-                       Long offsetToCommit = 
offsets.get(partition.getKafkaTopicPartition());
-                       if (offsetToCommit != null) {
-                               
offsetsToCommit.put(partition.getKafkaPartitionHandle(), new 
OffsetAndMetadata(offsetToCommit + 1));
+                       Long lastProcessedOffset = 
offsets.get(partition.getKafkaTopicPartition());
+                       if (lastProcessedOffset != null) {
+                               // committed offsets through the KafkaConsumer 
need to be 1 more than the last processed offset.
+                               // This does not affect Flink's 
checkpoints/saved state.
+                               long offsetToCommit = lastProcessedOffset + 1;
+
+                               
offsetsToCommit.put(partition.getKafkaPartitionHandle(), new 
OffsetAndMetadata(offsetToCommit));
                                partition.setCommittedOffset(offsetToCommit);
                        }
                }

Reply via email to