[FLINK-7732][kafka-consumer] Do not commit to kafka Flink's sentinel offsets

This closes #4928.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c61d1860
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c61d1860
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c61d1860

Branch: refs/heads/master
Commit: c61d18605ace57adbe84a2b05a50308043452398
Parents: b7d3589
Author: Piotr Nowojski <[email protected]>
Authored: Tue Oct 31 15:38:32 2017 +0100
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Thu Nov 2 12:33:55 2017 +0800

----------------------------------------------------------------------
 .../kafka/internals/Kafka08Fetcher.java         |  2 +-
 .../kafka/internal/Kafka09Fetcher.java          |  6 ++-
 .../kafka/internals/AbstractFetcher.java        | 18 +++++++-
 .../KafkaTopicPartitionStateSentinel.java       |  3 ++
 .../kafka/internals/AbstractFetcherTest.java    | 48 +++++++++++++++++++-
 5 files changed, 72 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c61d1860/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index 7359e91..8bcd663 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -348,7 +348,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, 
TopicAndPartition> {
        // 
------------------------------------------------------------------------
 
        @Override
-       public void commitInternalOffsetsToKafka(
+       protected void doCommitInternalOffsetsToKafka(
                        Map<KafkaTopicPartition, Long> offsets,
                        @Nonnull KafkaCommitCallback commitCallback) throws 
Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c61d1860/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index cef70fe..51f69cd 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -44,6 +44,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer 
API.
  *
@@ -212,7 +214,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> {
        }
 
        @Override
-       public void commitInternalOffsetsToKafka(
+       protected void doCommitInternalOffsetsToKafka(
                        Map<KafkaTopicPartition, Long> offsets,
                        @Nonnull KafkaCommitCallback commitCallback) throws 
Exception {
 
@@ -224,6 +226,8 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> {
                for (KafkaTopicPartitionState<TopicPartition> partition : 
partitions) {
                        Long lastProcessedOffset = 
offsets.get(partition.getKafkaTopicPartition());
                        if (lastProcessedOffset != null) {
+                               checkState(lastProcessedOffset >= 0, "Illegal 
offset value to commit");
+
                                // committed offsets through the KafkaConsumer 
need to be 1 more than the last processed offset.
                                // This does not affect Flink's 
checkpoints/saved state.
                                long offsetToCommit = lastProcessedOffset + 1;

http://git-wip-us.apache.org/repos/asf/flink/blob/c61d1860/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 11f97b2..a128174 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -36,6 +36,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -242,10 +243,25 @@ public abstract class AbstractFetcher<T, KPH> {
         * @param commitCallback The callback that the user should trigger when 
a commit request completes or fails.
         * @throws Exception This method forwards exceptions.
         */
-       public abstract void commitInternalOffsetsToKafka(
+       public final void commitInternalOffsetsToKafka(
+                       Map<KafkaTopicPartition, Long> offsets,
+                       @Nonnull KafkaCommitCallback commitCallback) throws 
Exception {
+               // Ignore sentinels. They might appear here if snapshot has 
started before actual offsets values
+               // replaced sentinels
+               doCommitInternalOffsetsToKafka(filterOutSentinels(offsets), 
commitCallback);
+       }
+
+       protected abstract void doCommitInternalOffsetsToKafka(
                        Map<KafkaTopicPartition, Long> offsets,
                        @Nonnull KafkaCommitCallback commitCallback) throws 
Exception;
 
+       private Map<KafkaTopicPartition, Long> 
filterOutSentinels(Map<KafkaTopicPartition, Long> offsets) {
+               return offsets.entrySet()
+                       .stream()
+                       .filter(entry -> 
!KafkaTopicPartitionStateSentinel.isSentinel(entry.getValue()))
+                       .collect(Collectors.toMap(entry -> entry.getKey(), 
entry -> entry.getValue()));
+       }
+
        /**
         * Creates the Kafka version specific representation of the given
         * topic partition.

http://git-wip-us.apache.org/repos/asf/flink/blob/c61d1860/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
index c218618..3857991 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
@@ -52,4 +52,7 @@ public class KafkaTopicPartitionStateSentinel {
         */
        public static final long GROUP_OFFSET = -915623761773L;
 
+       public static boolean isSentinel(long offset) {
+               return offset < 0;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c61d1860/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
index 1063102..46894a1 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
@@ -32,8 +32,10 @@ import org.junit.Test;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -46,6 +48,42 @@ import static org.mockito.Mockito.mock;
 @SuppressWarnings("serial")
 public class AbstractFetcherTest {
 
+       @Test
+       public void testIgnorePartitionStateSentinelInSnapshot() throws 
Exception {
+               final String testTopic = "test topic name";
+               Map<KafkaTopicPartition, Long> originalPartitions = new 
HashMap<>();
+               originalPartitions.put(new KafkaTopicPartition(testTopic, 1), 
KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+               originalPartitions.put(new KafkaTopicPartition(testTopic, 2), 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+               originalPartitions.put(new KafkaTopicPartition(testTopic, 3), 
KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
+
+               TestSourceContext<Long> sourceContext = new 
TestSourceContext<>();
+
+               TestFetcher<Long> fetcher = new TestFetcher<>(
+                       sourceContext,
+                       originalPartitions,
+                       null,
+                       null,
+                       mock(TestProcessingTimeService.class),
+                       0);
+
+               synchronized (sourceContext.getCheckpointLock()) {
+                       HashMap<KafkaTopicPartition, Long> currentState = 
fetcher.snapshotCurrentState();
+                       fetcher.commitInternalOffsetsToKafka(currentState, new 
KafkaCommitCallback() {
+                               @Override
+                               public void onSuccess() {
+                               }
+
+                               @Override
+                               public void onException(Throwable cause) {
+                                       throw new RuntimeException("Callback 
failed", cause);
+                               }
+                       });
+
+                       
assertTrue(fetcher.getLastCommittedOffsets().isPresent());
+                       assertEquals(Collections.emptyMap(), 
fetcher.getLastCommittedOffsets().get());
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //   Record emitting tests
        // 
------------------------------------------------------------------------
@@ -327,6 +365,7 @@ public class AbstractFetcherTest {
        // 
------------------------------------------------------------------------
 
        private static final class TestFetcher<T> extends AbstractFetcher<T, 
Object> {
+               protected Optional<Map<KafkaTopicPartition, Long>> 
lastCommittedOffsets = Optional.empty();
 
                protected TestFetcher(
                                SourceContext<T> sourceContext,
@@ -362,10 +401,15 @@ public class AbstractFetcherTest {
                }
 
                @Override
-               public void commitInternalOffsetsToKafka(
+               protected void doCommitInternalOffsetsToKafka(
                                Map<KafkaTopicPartition, Long> offsets,
                                @Nonnull KafkaCommitCallback callback) throws 
Exception {
-                       throw new UnsupportedOperationException();
+                       lastCommittedOffsets = Optional.of(offsets);
+                       callback.onSuccess();
+               }
+
+               public Optional<Map<KafkaTopicPartition, Long>> 
getLastCommittedOffsets() {
+                       return lastCommittedOffsets;
                }
        }
 

Reply via email to