Repository: flink
Updated Branches:
  refs/heads/release-1.1 90d77594f -> 400c49ca0


[FLINK-4618] [kafka-connector] Incremented the commited offset by one to avoid 
duplicate read message.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bef0ebec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bef0ebec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bef0ebec

Branch: refs/heads/release-1.1
Commit: bef0ebecc70f95c9e8b680b9a5e7212c23eef508
Parents: 90d7759
Author: Max Kuklinski <[email protected]>
Authored: Fri Sep 30 23:03:30 2016 +0200
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Sat Oct 1 18:16:35 2016 +0800

----------------------------------------------------------------------
 .../streaming/connectors/kafka/internal/Kafka09Fetcher.java    | 6 +++++-
 .../flink/streaming/connectors/kafka/Kafka09FetcherTest.java   | 4 ++--
 2 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bef0ebec/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 1da2259..7e4177e 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -289,7 +289,11 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new 
HashMap<>(partitions.length);
 
                for (KafkaTopicPartitionState<TopicPartition> partition : 
partitions) {
-                       Long offset = 
offsets.get(partition.getKafkaTopicPartition());
+                       /*
+                        * Increment offset by one, otherwise last record will 
be read again. This does not affect checkpoints/saved state.
+                        * The offset is only read from Kafka/ZK on a fresh 
startup of a job, not restart or failure. See 
https://issues.apache.org/jira/browse/FLINK-4618
+                        */
+                       Long offset = 
offsets.get(partition.getKafkaTopicPartition()) + 1;
                        if (offset != null) {
                                
offsetsToCommit.put(partition.getKafkaPartitionHandle(), new 
OffsetAndMetadata(offset));
                                partition.setCommittedOffset(offset);

http://git-wip-us.apache.org/repos/asf/flink/blob/bef0ebec/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 4fd6c9f..5a638b2 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -266,7 +266,7 @@ public class Kafka09FetcherTest {
                        TopicPartition partition = entry.getKey();
                        if (partition.topic().equals("test")) {
                                assertEquals(42, partition.partition());
-                               assertEquals(11L, entry.getValue().offset());
+                               assertEquals(12L, entry.getValue().offset());
                        }
                        else if (partition.topic().equals("another")) {
                                assertEquals(99, partition.partition());
@@ -283,7 +283,7 @@ public class Kafka09FetcherTest {
                        TopicPartition partition = entry.getKey();
                        if (partition.topic().equals("test")) {
                                assertEquals(42, partition.partition());
-                               assertEquals(19L, entry.getValue().offset());
+                               assertEquals(20L, entry.getValue().offset());
                        }
                        else if (partition.topic().equals("another")) {
                                assertEquals(99, partition.partition());

Reply via email to