Repository: flink Updated Branches: refs/heads/release-1.1 a31a22ec7 -> a83dbaeb0
[hotfix] [kafka] Committed offset value set in KafkaTopicPartitionState should also be incremented by 1. The broken behaviour was introduced in the last hotfix commit eece0dd0. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a83dbaeb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a83dbaeb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a83dbaeb Branch: refs/heads/release-1.1 Commit: a83dbaeb04ec2f691683971bae4029e907795c8c Parents: a31a22e Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Wed Oct 5 18:17:34 2016 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Wed Oct 5 18:50:14 2016 +0800 ---------------------------------------------------------------------- .../connectors/kafka/internal/Kafka09Fetcher.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a83dbaeb/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java index ad7efa2..aaec9dc 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java @@ -289,11 +289,13 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length); for (KafkaTopicPartitionState<TopicPartition> partition : partitions) { - // committed offsets through the KafkaConsumer need to be 1 more than the last processed offset. - // This does not affect Flink's checkpoints/saved state. - Long offsetToCommit = offsets.get(partition.getKafkaTopicPartition()); - if (offsetToCommit != null) { - offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit + 1)); + Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition()); + if (lastProcessedOffset != null) { + // committed offsets through the KafkaConsumer need to be 1 more than the last processed offset. + // This does not affect Flink's checkpoints/saved state. + long offsetToCommit = lastProcessedOffset + 1; + + offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit)); partition.setCommittedOffset(offsetToCommit); } }
