[FLINK-7732][kafka-consumer] Do not commit to kafka Flink's sentinel offsets
This closes #4928. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c61d1860 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c61d1860 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c61d1860 Branch: refs/heads/master Commit: c61d18605ace57adbe84a2b05a50308043452398 Parents: b7d3589 Author: Piotr Nowojski <[email protected]> Authored: Tue Oct 31 15:38:32 2017 +0100 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Thu Nov 2 12:33:55 2017 +0800 ---------------------------------------------------------------------- .../kafka/internals/Kafka08Fetcher.java | 2 +- .../kafka/internal/Kafka09Fetcher.java | 6 ++- .../kafka/internals/AbstractFetcher.java | 18 +++++++- .../KafkaTopicPartitionStateSentinel.java | 3 ++ .../kafka/internals/AbstractFetcherTest.java | 48 +++++++++++++++++++- 5 files changed, 72 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c61d1860/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java index 7359e91..8bcd663 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java @@ -348,7 +348,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> { // ------------------------------------------------------------------------ @Override - public void commitInternalOffsetsToKafka( + protected void doCommitInternalOffsetsToKafka( Map<KafkaTopicPartition, Long> offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/c61d1860/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java index cef70fe..51f69cd 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java @@ -44,6 +44,8 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import static org.apache.flink.util.Preconditions.checkState; + /** * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer API. * @@ -212,7 +214,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> { } @Override - public void commitInternalOffsetsToKafka( + protected void doCommitInternalOffsetsToKafka( Map<KafkaTopicPartition, Long> offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception { @@ -224,6 +226,8 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> { for (KafkaTopicPartitionState<TopicPartition> partition : partitions) { Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition()); if (lastProcessedOffset != null) { + checkState(lastProcessedOffset >= 0, "Illegal offset value to commit"); + // committed offsets through the KafkaConsumer need to be 1 more than the last processed offset. // This does not affect Flink's checkpoints/saved state. long offsetToCommit = lastProcessedOffset + 1; http://git-wip-us.apache.org/repos/asf/flink/blob/c61d1860/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index 11f97b2..a128174 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -36,6 +36,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -242,10 +243,25 @@ public abstract class AbstractFetcher<T, KPH> { * @param commitCallback The callback that the user should trigger when a commit request completes or fails. * @throws Exception This method forwards exceptions. */ - public abstract void commitInternalOffsetsToKafka( + public final void commitInternalOffsetsToKafka( + Map<KafkaTopicPartition, Long> offsets, + @Nonnull KafkaCommitCallback commitCallback) throws Exception { + // Ignore sentinels. They might appear here if snapshot has started before actual offsets values + // replaced sentinels + doCommitInternalOffsetsToKafka(filterOutSentinels(offsets), commitCallback); + } + + protected abstract void doCommitInternalOffsetsToKafka( Map<KafkaTopicPartition, Long> offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception; + private Map<KafkaTopicPartition, Long> filterOutSentinels(Map<KafkaTopicPartition, Long> offsets) { + return offsets.entrySet() + .stream() + .filter(entry -> !KafkaTopicPartitionStateSentinel.isSentinel(entry.getValue())) + .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())); + } + /** * Creates the Kafka version specific representation of the given * topic partition. http://git-wip-us.apache.org/repos/asf/flink/blob/c61d1860/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java index c218618..3857991 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java @@ -52,4 +52,7 @@ public class KafkaTopicPartitionStateSentinel { */ public static final long GROUP_OFFSET = -915623761773L; + public static boolean isSentinel(long offset) { + return offset < 0; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/c61d1860/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java index 1063102..46894a1 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java @@ -32,8 +32,10 @@ import org.junit.Test; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -46,6 +48,42 @@ import static org.mockito.Mockito.mock; @SuppressWarnings("serial") public class AbstractFetcherTest { + @Test + public void testIgnorePartitionStateSentinelInSnapshot() throws Exception { + final String testTopic = "test topic name"; + Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>(); + originalPartitions.put(new KafkaTopicPartition(testTopic, 1), KafkaTopicPartitionStateSentinel.LATEST_OFFSET); + originalPartitions.put(new KafkaTopicPartition(testTopic, 2), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + originalPartitions.put(new KafkaTopicPartition(testTopic, 3), KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); + + TestSourceContext<Long> sourceContext = new TestSourceContext<>(); + + TestFetcher<Long> fetcher = new TestFetcher<>( + sourceContext, + originalPartitions, + null, + null, + mock(TestProcessingTimeService.class), + 0); + + synchronized (sourceContext.getCheckpointLock()) { + HashMap<KafkaTopicPartition, Long> currentState = fetcher.snapshotCurrentState(); + fetcher.commitInternalOffsetsToKafka(currentState, new KafkaCommitCallback() { + @Override + public void onSuccess() { + } + + @Override + public void onException(Throwable cause) { + throw new RuntimeException("Callback failed", cause); + } + }); + + assertTrue(fetcher.getLastCommittedOffsets().isPresent()); + assertEquals(Collections.emptyMap(), fetcher.getLastCommittedOffsets().get()); + } + } + // ------------------------------------------------------------------------ // Record emitting tests // ------------------------------------------------------------------------ @@ -327,6 +365,7 @@ public class AbstractFetcherTest { // ------------------------------------------------------------------------ private static final class TestFetcher<T> extends AbstractFetcher<T, Object> { + protected Optional<Map<KafkaTopicPartition, Long>> lastCommittedOffsets = Optional.empty(); protected TestFetcher( SourceContext<T> sourceContext, @@ -362,10 +401,15 @@ public class AbstractFetcherTest { } @Override - public void commitInternalOffsetsToKafka( + protected void doCommitInternalOffsetsToKafka( Map<KafkaTopicPartition, Long> offsets, @Nonnull KafkaCommitCallback callback) throws Exception { - throw new UnsupportedOperationException(); + lastCommittedOffsets = Optional.of(offsets); + callback.onSuccess(); + } + + public Optional<Map<KafkaTopicPartition, Long>> getLastCommittedOffsets() { + return lastCommittedOffsets; } }
