Repository: flink Updated Branches: refs/heads/release-1.1 90d77594f -> 400c49ca0
[FLINK-4618] [kafka-connector] Incremented the commited offset by one to avoid duplicate read message. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bef0ebec Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bef0ebec Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bef0ebec Branch: refs/heads/release-1.1 Commit: bef0ebecc70f95c9e8b680b9a5e7212c23eef508 Parents: 90d7759 Author: Max Kuklinski <[email protected]> Authored: Fri Sep 30 23:03:30 2016 +0200 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Sat Oct 1 18:16:35 2016 +0800 ---------------------------------------------------------------------- .../streaming/connectors/kafka/internal/Kafka09Fetcher.java | 6 +++++- .../flink/streaming/connectors/kafka/Kafka09FetcherTest.java | 4 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bef0ebec/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java index 1da2259..7e4177e 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java @@ -289,7 +289,11 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length); for (KafkaTopicPartitionState<TopicPartition> partition : partitions) { - Long offset = offsets.get(partition.getKafkaTopicPartition()); + /* + * Increment offset by one, otherwise last record will be read again. This does not affect checkpoints/saved state. + * The offset is only read from Kafka/ZK on a fresh startup of a job, not restart or failure. See https://issues.apache.org/jira/browse/FLINK-4618 + */ + Long offset = offsets.get(partition.getKafkaTopicPartition()) + 1; if (offset != null) { offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offset)); partition.setCommittedOffset(offset); http://git-wip-us.apache.org/repos/asf/flink/blob/bef0ebec/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java index 4fd6c9f..5a638b2 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java @@ -266,7 +266,7 @@ public class Kafka09FetcherTest { TopicPartition partition = entry.getKey(); if (partition.topic().equals("test")) { assertEquals(42, partition.partition()); - assertEquals(11L, entry.getValue().offset()); + assertEquals(12L, entry.getValue().offset()); } else if (partition.topic().equals("another")) { assertEquals(99, partition.partition()); @@ -283,7 +283,7 @@ public class Kafka09FetcherTest { TopicPartition partition = entry.getKey(); if (partition.topic().equals("test")) { assertEquals(42, partition.partition()); - assertEquals(19L, entry.getValue().offset()); + assertEquals(20L, entry.getValue().offset()); } else if (partition.topic().equals("another")) { assertEquals(99, partition.partition());
