[FLINK-5849] [kafka] Move FlinkKafkaConsumer start offset determination to open()
This closes #3378. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed68fedb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed68fedb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed68fedb Branch: refs/heads/master Commit: ed68fedbe90db03823d75a020510ad3c344fa73e Parents: 72f56d1 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Tue Feb 21 23:05:32 2017 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue Feb 28 00:54:48 2017 +0800 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaConsumer010.java | 9 +- .../kafka/internal/Kafka010Fetcher.java | 12 +- .../internal/KafkaConsumerCallBridge010.java | 9 +- .../connectors/kafka/Kafka010FetcherTest.java | 23 +- .../connectors/kafka/FlinkKafkaConsumer08.java | 22 +- .../kafka/internals/Kafka08Fetcher.java | 77 +++---- .../kafka/internals/ZookeeperOffsetHandler.java | 18 +- .../connectors/kafka/FlinkKafkaConsumer09.java | 9 +- .../kafka/internal/Kafka09Fetcher.java | 17 +- .../kafka/internal/KafkaConsumerCallBridge.java | 12 +- .../kafka/internal/KafkaConsumerThread.java | 79 ++----- .../connectors/kafka/Kafka09FetcherTest.java | 23 +- .../kafka/FlinkKafkaConsumerBase.java | 187 ++++++++-------- .../connectors/kafka/config/StartupMode.java | 20 +- .../kafka/internals/AbstractFetcher.java | 89 ++++---- .../internals/KafkaTopicPartitionState.java | 10 +- .../KafkaTopicPartitionStateSentinel.java | 55 +++++ .../FlinkKafkaConsumerBaseMigrationTest.java | 33 ++- .../kafka/FlinkKafkaConsumerBaseTest.java | 20 +- .../KafkaConsumerPartitionAssignmentTest.java | 222 ++++++++++++------- .../AbstractFetcherTimestampsTest.java | 37 ++-- 21 files changed, 510 insertions(+), 473 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java index 3a58216..716fa19 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java @@ -29,7 +29,7 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaW import org.apache.flink.util.SerializedValue; import java.util.Collections; -import java.util.HashMap; +import java.util.Map; import java.util.List; import java.util.Properties; @@ -128,8 +128,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> { @Override protected AbstractFetcher<T, ?> createFetcher( SourceContext<T> sourceContext, - List<KafkaTopicPartition> thisSubtaskPartitions, - HashMap<KafkaTopicPartition, Long> restoredSnapshotState, + Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception { @@ -138,8 +137,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> { return new Kafka010Fetcher<>( sourceContext, - thisSubtaskPartitions, - restoredSnapshotState, + assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, runtimeContext.getProcessingTimeService(), @@ -151,7 +149,6 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> { deserializer, properties, pollTimeout, - startupMode, useMetrics); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java index efb6f88..da6ecd0 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java @@ -22,7 +22,6 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; -import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -32,8 +31,7 @@ import org.apache.flink.util.SerializedValue; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; -import java.util.HashMap; -import java.util.List; +import java.util.Map; import java.util.Properties; /** @@ -48,8 +46,7 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> { public Kafka010Fetcher( SourceContext<T> sourceContext, - List<KafkaTopicPartition> assignedPartitions, - HashMap<KafkaTopicPartition, Long> restoredSnapshotState, + Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ProcessingTimeService processingTimeProvider, @@ -61,13 +58,11 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> { KeyedDeserializationSchema<T> deserializer, Properties kafkaProperties, long pollTimeout, - StartupMode startupMode, boolean useMetrics) throws Exception { super( sourceContext, - assignedPartitions, - restoredSnapshotState, + assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, @@ -79,7 +74,6 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> { deserializer, kafkaProperties, pollTimeout, - startupMode, useMetrics); } http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java index 1e0bc5b..0fda9a6 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka.internal; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; +import java.util.Collections; import java.util.List; /** @@ -39,12 +40,12 @@ public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge { } @Override - public void seekPartitionsToBeginning(KafkaConsumer<?, ?> consumer, List<TopicPartition> partitions) { - consumer.seekToBeginning(partitions); + public void seekPartitionToBeginning(KafkaConsumer<?, ?> consumer, TopicPartition partition) { + consumer.seekToBeginning(Collections.singletonList(partition)); } @Override - public void seekPartitionsToEnd(KafkaConsumer<?, ?> consumer, List<TopicPartition> partitions) { - consumer.seekToEnd(partitions); + public void seekPartitionToEnd(KafkaConsumer<?, ?> consumer, TopicPartition partition) { + consumer.seekToEnd(Collections.singletonList(partition)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java index 98aa28a..17ba712 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java @@ -24,10 +24,10 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.internal.Handover; -import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher; import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; @@ -118,13 +118,13 @@ public class Kafka010FetcherTest { @SuppressWarnings("unchecked") SourceContext<String> sourceContext = mock(SourceContext.class); - List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( sourceContext, - topics, - null, /* no restored state */ + partitionsWithInitialOffsets, null, /* periodic assigner */ null, /* punctuated assigner */ new TestProcessingTimeService(), @@ -136,7 +136,6 @@ public class Kafka010FetcherTest { schema, new Properties(), 0L, - StartupMode.GROUP_OFFSETS, false); // ----- run the fetcher ----- @@ -256,13 +255,13 @@ public class Kafka010FetcherTest { @SuppressWarnings("unchecked") SourceContext<String> sourceContext = mock(SourceContext.class); - List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( sourceContext, - topics, - null, /* no restored state */ + partitionsWithInitialOffsets, null, /* periodic assigner */ null, /* punctuated assigner */ new TestProcessingTimeService(), @@ -274,7 +273,6 @@ public class Kafka010FetcherTest { schema, new Properties(), 0L, - StartupMode.GROUP_OFFSETS, false); // ----- run the fetcher ----- @@ -372,13 +370,13 @@ public class Kafka010FetcherTest { // ----- build a fetcher ----- BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>(); - List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition)); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>( sourceContext, - topics, - null, /* no restored state */ + partitionsWithInitialOffsets, null, /* periodic watermark extractor */ null, /* punctuated watermark extractor */ new TestProcessingTimeService(), @@ -390,7 +388,6 @@ public class Kafka010FetcherTest { schema, new Properties(), 0L, - StartupMode.GROUP_OFFSETS, false); // ----- run the fetcher ----- http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java index c0e4dd7..bf7ed02 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java @@ -45,10 +45,10 @@ import java.net.InetAddress; import java.net.URL; import java.net.UnknownHostException; import java.nio.channels.ClosedChannelException; -import java.util.ArrayList; import java.util.Collections; +import java.util.ArrayList; import java.util.List; -import java.util.HashMap; +import java.util.Map; import java.util.Properties; import java.util.Random; @@ -194,19 +194,23 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { @Override protected AbstractFetcher<T, ?> createFetcher( SourceContext<T> sourceContext, - List<KafkaTopicPartition> thisSubtaskPartitions, - HashMap<KafkaTopicPartition, Long> restoredSnapshotState, + Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception { boolean useMetrics = !Boolean.valueOf(kafkaProperties.getProperty(KEY_DISABLE_METRICS, "false")); - return new Kafka08Fetcher<>(sourceContext, - thisSubtaskPartitions, restoredSnapshotState, - watermarksPeriodic, watermarksPunctuated, - runtimeContext, deserializer, kafkaProperties, - autoCommitInterval, startupMode, useMetrics); + return new Kafka08Fetcher<>( + sourceContext, + assignedPartitionsWithInitialOffsets, + watermarksPeriodic, + watermarksPunctuated, + runtimeContext, + deserializer, + kafkaProperties, + autoCommitInterval, + useMetrics); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java index ad520d8..de201e5 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java @@ -91,27 +91,23 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> { public Kafka08Fetcher( SourceContext<T> sourceContext, - List<KafkaTopicPartition> assignedPartitions, - HashMap<KafkaTopicPartition, Long> restoredSnapshotState, + Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, KeyedDeserializationSchema<T> deserializer, Properties kafkaProperties, long autoCommitInterval, - StartupMode startupMode, boolean useMetrics) throws Exception { super( sourceContext, - assignedPartitions, - restoredSnapshotState, + assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, runtimeContext.getProcessingTimeService(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), runtimeContext.getUserCodeClassLoader(), - startupMode, useMetrics); this.deserializer = checkNotNull(deserializer); @@ -122,7 +118,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> { this.unassignedPartitionsQueue = new ClosableBlockingQueue<>(); // initially, all these partitions are not assigned to a specific broker connection - for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) { unassignedPartitionsQueue.add(partition); } } @@ -146,43 +142,32 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> { PeriodicOffsetCommitter periodicCommitter = null; try { - // if we're not restored from a checkpoint, all partitions will not have their offset set; - // depending on the configured startup mode, accordingly set the starting offsets - if (!isRestored) { - switch (startupMode) { - case EARLIEST: - for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { - partition.setOffset(OffsetRequest.EarliestTime()); - } - break; - case LATEST: - for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { - partition.setOffset(OffsetRequest.LatestTime()); - } - break; - default: - case GROUP_OFFSETS: - List<KafkaTopicPartition> partitions = new ArrayList<>(subscribedPartitions().length); - for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { - partitions.add(partition.getKafkaTopicPartition()); - } - - Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitions); - for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { - Long offset = zkOffsets.get(partition.getKafkaTopicPartition()); - if (offset != null) { - // the committed offset in ZK represents the next record to process, - // so we subtract it by 1 to correctly represent internal state - partition.setOffset(offset - 1); - } else { - // if we can't find an offset for a partition in ZK when using GROUP_OFFSETS, - // we default to "auto.offset.reset" like the Kafka high-level consumer - LOG.warn("No group offset can be found for partition {} in Zookeeper;" + - " resetting starting offset to 'auto.offset.reset'", partition); - - partition.setOffset(invalidOffsetBehavior); - } - } + // offsets in the state may still be placeholder sentinel values if we are starting fresh, or the + // checkpoint / savepoint state we were restored with had not completely been replaced with actual offset + // values yet; replace those with actual offsets, according to what the sentinel value represent. + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitionStates()) { + if (partition.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) { + // this will be replaced by an actual offset in SimpleConsumerThread + partition.setOffset(OffsetRequest.EarliestTime()); + } else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) { + // this will be replaced by an actual offset in SimpleConsumerThread + partition.setOffset(OffsetRequest.LatestTime()); + } else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) { + Long committedOffset = zookeeperOffsetHandler.getCommittedOffset(partition.getKafkaTopicPartition()); + if (committedOffset != null) { + // the committed offset in ZK represents the next record to process, + // so we subtract it by 1 to correctly represent internal state + partition.setOffset(committedOffset - 1); + } else { + // if we can't find an offset for a partition in ZK when using GROUP_OFFSETS, + // we default to "auto.offset.reset" like the Kafka high-level consumer + LOG.warn("No group offset can be found for partition {} in Zookeeper;" + + " resetting starting offset to 'auto.offset.reset'", partition); + + partition.setOffset(invalidOffsetBehavior); + } + } else { + // the partition already has a specific start offset and is ready to be consumed } } @@ -191,7 +176,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> { LOG.info("Starting periodic offset committer, with commit interval of {}ms", autoCommitInterval); periodicCommitter = new PeriodicOffsetCommitter(zookeeperOffsetHandler, - subscribedPartitions(), errorHandler, autoCommitInterval); + subscribedPartitionStates(), errorHandler, autoCommitInterval); periodicCommitter.setName("Periodic Kafka partition offset committer"); periodicCommitter.setDaemon(true); periodicCommitter.start(); @@ -388,7 +373,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> { } // Set committed offsets in topic partition state - KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitions(); + KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitionStates(); for (KafkaTopicPartitionState<TopicAndPartition> partition : partitions) { Long offset = offsets.get(partition.getKafkaTopicPartition()); if (offset != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java index 8f2ef09..cec980f 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java @@ -30,8 +30,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Properties; @@ -96,22 +94,12 @@ public class ZookeeperOffsetHandler { } /** - * @param partitions The partitions to read offsets for. + * @param partition The partition to read offset for. * @return The mapping from partition to offset. * @throws Exception This method forwards exceptions. */ - public Map<KafkaTopicPartition, Long> getCommittedOffsets(List<KafkaTopicPartition> partitions) throws Exception { - Map<KafkaTopicPartition, Long> ret = new HashMap<>(partitions.size()); - for (KafkaTopicPartition tp : partitions) { - Long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition()); - - if (offset != null) { - LOG.info("Offset for TopicPartition {}:{} was set to {} in ZooKeeper. Seeking fetcher to that position.", - tp.getTopic(), tp.getPartition(), offset); - ret.put(tp, offset); - } - } - return ret; + public Long getCommittedOffset(KafkaTopicPartition partition) throws Exception { + return getOffsetFromZooKeeper(curatorClient, groupId, partition.getTopic(), partition.getPartition()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java index 9a61b91..c7236a2 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java @@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.HashMap; +import java.util.Map; import java.util.Properties; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -171,8 +171,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> { @Override protected AbstractFetcher<T, ?> createFetcher( SourceContext<T> sourceContext, - List<KafkaTopicPartition> thisSubtaskPartitions, - HashMap<KafkaTopicPartition, Long> restoredSnapshotState, + Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception { @@ -181,8 +180,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> { return new Kafka09Fetcher<>( sourceContext, - thisSubtaskPartitions, - restoredSnapshotState, + assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, runtimeContext.getProcessingTimeService(), @@ -194,7 +192,6 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> { deserializer, properties, pollTimeout, - startupMode, useMetrics); } http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java index b7c9bc2..c389486 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java @@ -22,7 +22,6 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; -import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; @@ -71,8 +70,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> { public Kafka09Fetcher( SourceContext<T> sourceContext, - List<KafkaTopicPartition> assignedPartitions, - HashMap<KafkaTopicPartition, Long> restoredSnapshotState, + Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ProcessingTimeService processingTimeProvider, @@ -84,19 +82,16 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> { KeyedDeserializationSchema<T> deserializer, Properties kafkaProperties, long pollTimeout, - StartupMode startupMode, boolean useMetrics) throws Exception { super( sourceContext, - assignedPartitions, - restoredSnapshotState, + assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, - startupMode, useMetrics); this.deserializer = deserializer; @@ -114,13 +109,11 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> { LOG, handover, kafkaProperties, - subscribedPartitions(), + subscribedPartitionStates(), kafkaMetricGroup, createCallBridge(), getFetcherName() + " for " + taskNameWithSubtasks, pollTimeout, - startupMode, - isRestored, useMetrics); } @@ -142,7 +135,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> { final ConsumerRecords<byte[], byte[]> records = handover.pollNext(); // get the records for each topic partition - for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { + for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) { List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition.getKafkaPartitionHandle()); @@ -226,7 +219,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> { @Override public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { - KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitions(); + KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitionStates(); Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length); for (KafkaTopicPartitionState<TopicPartition> partition : partitions) { http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java index a97b3cf..37ba34c 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java @@ -39,16 +39,12 @@ public class KafkaConsumerCallBridge { consumer.assign(topicPartitions); } - public void seekPartitionsToBeginning(KafkaConsumer<?, ?> consumer, List<TopicPartition> partitions) { - for (TopicPartition partition : partitions) { - consumer.seekToBeginning(partition); - } + public void seekPartitionToBeginning(KafkaConsumer<?, ?> consumer, TopicPartition partition) { + consumer.seekToBeginning(partition); } - public void seekPartitionsToEnd(KafkaConsumer<?, ?> consumer, List<TopicPartition> partitions) { - for (TopicPartition partition : partitions) { - consumer.seekToEnd(partition); - } + public void seekPartitionToEnd(KafkaConsumer<?, ?> consumer, TopicPartition partition) { + consumer.seekToEnd(partition); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java index 03fe2c6..cbe1551 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java @@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.kafka.internal; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -69,7 +69,7 @@ public class KafkaConsumerThread extends Thread { private final Properties kafkaProperties; /** The partitions that this consumer reads from */ - private final KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions; + private final KafkaTopicPartitionState<TopicPartition>[] subscribedPartitionStates; /** We get this from the outside to publish metrics. **/ private final MetricGroup kafkaMetricGroup; @@ -80,12 +80,6 @@ public class KafkaConsumerThread extends Thread { /** The maximum number of milliseconds to wait for a fetch batch */ private final long pollTimeout; - /** The configured startup mode (relevant only if we're restored from checkpoint / savepoint) */ - private final StartupMode startupMode; - - /** Flag whether or not we're restored from checkpoint / savepoint */ - private final boolean isRestored; - /** Flag whether to add Kafka's metrics to the Flink metrics */ private final boolean useMetrics; @@ -103,13 +97,11 @@ public class KafkaConsumerThread extends Thread { Logger log, Handover handover, Properties kafkaProperties, - KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions, + KafkaTopicPartitionState<TopicPartition>[] subscribedPartitionStates, MetricGroup kafkaMetricGroup, KafkaConsumerCallBridge consumerCallBridge, String threadName, long pollTimeout, - StartupMode startupMode, - boolean isRestored, boolean useMetrics) { super(threadName); @@ -120,21 +112,8 @@ public class KafkaConsumerThread extends Thread { this.kafkaProperties = checkNotNull(kafkaProperties); this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup); this.consumerCallBridge = checkNotNull(consumerCallBridge); - this.startupMode = checkNotNull(startupMode); - - this.subscribedPartitions = checkNotNull(subscribedPartitions); - this.isRestored = isRestored; - - // if we are restoring from a checkpoint / savepoint, all - // subscribed partitions' state should have defined offsets - if (isRestored) { - for (KafkaTopicPartitionState<TopicPartition> subscribedPartition : subscribedPartitions) { - if (!subscribedPartition.isOffsetDefined()) { - throw new IllegalArgumentException("Restoring from a checkpoint / savepoint, but found a " + - "partition state " + subscribedPartition + " that does not have a defined offset."); - } - } - } + + this.subscribedPartitionStates = checkNotNull(subscribedPartitionStates); this.pollTimeout = pollTimeout; this.useMetrics = useMetrics; @@ -173,7 +152,7 @@ public class KafkaConsumerThread extends Thread { final OffsetCommitCallback offsetCommitCallback = new CommitCallback(); // tell the consumer which partitions to work with - consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitions)); + consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitionStates)); // register Kafka's very own metrics in Flink's metric reporters if (useMetrics) { @@ -195,39 +174,23 @@ public class KafkaConsumerThread extends Thread { return; } - if (isRestored) { - for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions) { - log.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " + - "to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1); - - consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); - } - } else { - List<TopicPartition> partitionList = convertKafkaPartitions(subscribedPartitions); - - // fetch offsets from Kafka, depending on the configured startup mode - switch (startupMode) { - case EARLIEST: - log.info("Setting starting point as earliest offset for partitions {}", partitionList); - - consumerCallBridge.seekPartitionsToBeginning(consumer, partitionList); - break; - case LATEST: - log.info("Setting starting point as latest offset for partitions {}", partitionList); - - consumerCallBridge.seekPartitionsToEnd(consumer, partitionList); - break; - default: - case GROUP_OFFSETS: - log.info("Using group offsets in Kafka of group.id {} as starting point for partitions {}", - kafkaProperties.getProperty("group.id"), partitionList); - } + // offsets in the state may still be placeholder sentinel values if we are starting fresh, or the + // checkpoint / savepoint state we were restored with had not completely been replaced with actual offset + // values yet; replace those with actual offsets, according to what the sentinel value represent. + for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates) { + if (partition.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) { + consumerCallBridge.seekPartitionToBeginning(consumer, partition.getKafkaPartitionHandle()); + partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1); + } else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) { + consumerCallBridge.seekPartitionToEnd(consumer, partition.getKafkaPartitionHandle()); + partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1); + } else if (partition.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) { + // the KafkaConsumer by default will automatically seek the consumer position + // to the committed group offset, so we do not need to do it. - // on startup, all partition states will not have defined offsets; - // set the initial states with the offsets fetched from Kafka - for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions) { - // the fetched offset represents the next record to process, so we need to subtract it by 1 partition.setOffset(consumer.position(partition.getKafkaPartitionHandle()) - 1); + } else { + consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java index abd75cc..49144e6 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java @@ -24,10 +24,10 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.internal.Handover; -import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; @@ -118,13 +118,13 @@ public class Kafka09FetcherTest { @SuppressWarnings("unchecked") SourceContext<String> sourceContext = mock(SourceContext.class); - List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>( sourceContext, - topics, - null, /* no restored state */ + partitionsWithInitialOffsets, null, /* periodic watermark extractor */ null, /* punctuated watermark extractor */ new TestProcessingTimeService(), @@ -136,7 +136,6 @@ public class Kafka09FetcherTest { schema, new Properties(), 0L, - StartupMode.GROUP_OFFSETS, false); // ----- run the fetcher ----- @@ -256,13 +255,13 @@ public class Kafka09FetcherTest { @SuppressWarnings("unchecked") SourceContext<String> sourceContext = mock(SourceContext.class); - List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>( sourceContext, - topics, - null, /* no restored state */ + partitionsWithInitialOffsets, null, /* periodic watermark extractor */ null, /* punctuated watermark extractor */ new TestProcessingTimeService(), @@ -274,7 +273,6 @@ public class Kafka09FetcherTest { schema, new Properties(), 0L, - StartupMode.GROUP_OFFSETS, false); // ----- run the fetcher ----- @@ -372,13 +370,13 @@ public class Kafka09FetcherTest { // ----- build a fetcher ----- BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>(); - List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition)); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>( sourceContext, - topics, - null, /* no restored state */ + partitionsWithInitialOffsets, null, /* periodic watermark extractor */ null, /* punctuated watermark extractor */ new TestProcessingTimeService(), @@ -390,7 +388,6 @@ public class Kafka09FetcherTest { schema, new Properties(), 0L, - StartupMode.GROUP_OFFSETS, false); http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 1121d1b..144ede8 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -40,15 +40,11 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -90,8 +86,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti /** The schema to convert between Kafka's byte messages, and Flink's objects */ protected final KeyedDeserializationSchema<T> deserializer; - /** The set of topic partitions that the source will read */ - private List<KafkaTopicPartition> subscribedPartitions; + /** The set of topic partitions that the source will read, with their initial offsets to start reading from */ + private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets; /** Optional timestamp extractor / watermark generator that will be run per Kafka partition, * to exploit per-partition timestamp characteristics. @@ -138,17 +134,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti this.deserializer = checkNotNull(deserializer, "valueDeserializer"); } - /** - * This method must be called from the subclasses, to set the list of all subscribed partitions - * that this consumer will fetch from (across all subtasks). - * - * @param allSubscribedPartitions The list of all partitions that all subtasks together should fetch from. - */ - protected void setSubscribedPartitions(List<KafkaTopicPartition> allSubscribedPartitions) { - checkNotNull(allSubscribedPartitions); - this.subscribedPartitions = Collections.unmodifiableList(allSubscribedPartitions); - } - // ------------------------------------------------------------------------ // Configuration // ------------------------------------------------------------------------ @@ -263,17 +248,67 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti // ------------------------------------------------------------------------ @Override + public void open(Configuration configuration) { + List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics); + + subscribedPartitionsToStartOffsets = new HashMap<>(kafkaTopicPartitions.size()); + + if (kafkaTopicPartitions != null) { + if (restoredState != null) { + for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) { + if (restoredState.containsKey(kafkaTopicPartition)) { + subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, restoredState.get(kafkaTopicPartition)); + } + } + + LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}", + getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets); + } else { + initializeSubscribedPartitionsToStartOffsets( + subscribedPartitionsToStartOffsets, + kafkaTopicPartitions, + getRuntimeContext().getIndexOfThisSubtask(), + getRuntimeContext().getNumberOfParallelSubtasks(), + startupMode); + + if (subscribedPartitionsToStartOffsets.size() != 0) { + switch (startupMode) { + case EARLIEST: + LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}", + getRuntimeContext().getIndexOfThisSubtask(), + subscribedPartitionsToStartOffsets.size(), + subscribedPartitionsToStartOffsets.keySet()); + break; + case LATEST: + LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}", + getRuntimeContext().getIndexOfThisSubtask(), + subscribedPartitionsToStartOffsets.size(), + subscribedPartitionsToStartOffsets.keySet()); + break; + default: + case GROUP_OFFSETS: + LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}", + getRuntimeContext().getIndexOfThisSubtask(), + subscribedPartitionsToStartOffsets.size(), + subscribedPartitionsToStartOffsets.keySet()); + } + } + } + } + } + + @Override public void run(SourceContext<T> sourceContext) throws Exception { - if (subscribedPartitions == null) { + if (subscribedPartitionsToStartOffsets == null) { throw new Exception("The partitions were not set for the consumer"); } // we need only do work, if we actually have partitions assigned - if (!subscribedPartitions.isEmpty()) { + if (!subscribedPartitionsToStartOffsets.isEmpty()) { // create the fetcher that will communicate with the Kafka brokers final AbstractFetcher<T, ?> fetcher = createFetcher( - sourceContext, subscribedPartitions, restoredState, + sourceContext, subscribedPartitionsToStartOffsets, periodicWatermarkAssigner, punctuatedWatermarkAssigner, (StreamingRuntimeContext) getRuntimeContext()); @@ -327,15 +362,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti } @Override - public void open(Configuration configuration) { - List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics); - - if (kafkaTopicPartitions != null) { - assignTopicPartitions(kafkaTopicPartitions); - } - } - - @Override public void close() throws Exception { // pretty much the same logic as cancelling try { @@ -386,18 +412,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti if (fetcher == null) { // the fetcher has not yet been initialized, which means we need to return the // originally restored offsets or the assigned partitions - - if (restoredState != null) { - - for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : restoredState.entrySet()) { - offsetsStateForCheckpoint.add( - Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); - } - } else if (subscribedPartitions != null) { - for (KafkaTopicPartition subscribedPartition : subscribedPartitions) { - offsetsStateForCheckpoint.add( - Tuple2.of(subscribedPartition, KafkaTopicPartitionState.OFFSET_NOT_SET)); - } + for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) { + offsetsStateForCheckpoint.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue())); } // the map cannot be asynchronously updated, because only one checkpoint call can happen @@ -493,7 +509,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti * data, and emits it into the data streams. * * @param sourceContext The source context to emit data to. - * @param thisSubtaskPartitions The set of partitions that this subtask should handle. + * @param subscribedPartitionsToStartOffsets The set of partitions that this subtask should handle, with their start offsets. * @param watermarksPeriodic Optional, a serialized timestamp extractor / periodic watermark generator. * @param watermarksPunctuated Optional, a serialized timestamp extractor / punctuated watermark generator. * @param runtimeContext The task's runtime context. @@ -504,8 +520,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti */ protected abstract AbstractFetcher<T, ?> createFetcher( SourceContext<T> sourceContext, - List<KafkaTopicPartition> thisSubtaskPartitions, - HashMap<KafkaTopicPartition, Long> restoredSnapshotState, + Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception; @@ -525,60 +540,33 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti // Utilities // ------------------------------------------------------------------------ - private void assignTopicPartitions(List<KafkaTopicPartition> kafkaTopicPartitions) { - subscribedPartitions = new ArrayList<>(); - - if (restoredState != null) { - for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) { - if (restoredState.containsKey(kafkaTopicPartition)) { - subscribedPartitions.add(kafkaTopicPartition); - } - } - } else { - Collections.sort(kafkaTopicPartitions, new Comparator<KafkaTopicPartition>() { - @Override - public int compare(KafkaTopicPartition o1, KafkaTopicPartition o2) { - int topicComparison = o1.getTopic().compareTo(o2.getTopic()); - - if (topicComparison == 0) { - return o1.getPartition() - o2.getPartition(); - } else { - return topicComparison; - } - } - }); - - for (int i = getRuntimeContext().getIndexOfThisSubtask(); i < kafkaTopicPartitions.size(); i += getRuntimeContext().getNumberOfParallelSubtasks()) { - subscribedPartitions.add(kafkaTopicPartitions.get(i)); - } - } - } - /** - * Selects which of the given partitions should be handled by a specific consumer, - * given a certain number of consumers. - * - * @param allPartitions The partitions to select from - * @param numConsumers The number of consumers - * @param consumerIndex The index of the specific consumer - * - * @return The sublist of partitions to be handled by that consumer. + * Initializes {@link FlinkKafkaConsumerBase#subscribedPartitionsToStartOffsets} with appropriate + * values. The method decides which partitions this consumer instance should subscribe to, and also + * sets the initial offset each subscribed partition should be started from based on the configured startup mode. + * + * @param subscribedPartitionsToStartOffsets to subscribedPartitionsToStartOffsets to initialize + * @param kafkaTopicPartitions the complete list of all Kafka partitions + * @param indexOfThisSubtask the index of this consumer instance + * @param numParallelSubtasks total number of parallel consumer instances + * @param startupMode the configured startup mode for the consumer + * + * Note: This method is also exposed for testing. */ - protected static List<KafkaTopicPartition> assignPartitions( - List<KafkaTopicPartition> allPartitions, - int numConsumers, int consumerIndex) { - final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>( - allPartitions.size() / numConsumers + 1); - - for (int i = 0; i < allPartitions.size(); i++) { - if (i % numConsumers == consumerIndex) { - thisSubtaskPartitions.add(allPartitions.get(i)); + protected static void initializeSubscribedPartitionsToStartOffsets( + Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets, + List<KafkaTopicPartition> kafkaTopicPartitions, + int indexOfThisSubtask, + int numParallelSubtasks, + StartupMode startupMode) { + + for (int i = 0; i < kafkaTopicPartitions.size(); i++) { + if (i % numParallelSubtasks == indexOfThisSubtask) { + subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel()); } } - - return thisSubtaskPartitions; } - + /** * Logs the partition information in INFO level. * @@ -607,8 +595,17 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti } @VisibleForTesting - List<KafkaTopicPartition> getSubscribedPartitions() { - return subscribedPartitions; + void setSubscribedPartitions(List<KafkaTopicPartition> allSubscribedPartitions) { + checkNotNull(allSubscribedPartitions); + this.subscribedPartitionsToStartOffsets = new HashMap<>(); + for (KafkaTopicPartition partition : allSubscribedPartitions) { + this.subscribedPartitionsToStartOffsets.put(partition, null); + } + } + + @VisibleForTesting + Map<KafkaTopicPartition, Long> getSubscribedPartitionsToStartOffsets() { + return subscribedPartitionsToStartOffsets; } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java index 331c1a6..f796e62 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java @@ -16,18 +16,30 @@ */ package org.apache.flink.streaming.connectors.kafka.config; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; + /** * Startup modes for the Kafka Consumer. */ public enum StartupMode { /** Start from committed offsets in ZK / Kafka brokers of a specific consumer group (default) */ - GROUP_OFFSETS, + GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET), /** Start from the earliest offset possible */ - EARLIEST, + EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET), /** Start from the latest offset */ - LATEST - + LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET); + + /** The sentinel offset value corresponding to this startup mode */ + private long stateSentinel; + + StartupMode(long stateSentinel) { + this.stateSentinel = stateSentinel; + } + + public long getStateSentinel() { + return stateSentinel; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index b27e996..e021881 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -26,12 +26,10 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; -import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.util.SerializedValue; import java.io.IOException; import java.util.HashMap; -import java.util.List; import java.util.Map; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -63,20 +61,14 @@ public abstract class AbstractFetcher<T, KPH> { protected final Object checkpointLock; /** All partitions (and their state) that this fetcher is subscribed to */ - private final KafkaTopicPartitionState<KPH>[] allPartitions; + private final KafkaTopicPartitionState<KPH>[] subscribedPartitionStates; /** The mode describing whether the fetcher also generates timestamps and watermarks */ protected final int timestampWatermarkMode; - /** The startup mode for the consumer (only relevant if the consumer wasn't restored) */ - protected final StartupMode startupMode; - /** Flag whether to register metrics for the fetcher */ protected final boolean useMetrics; - /** Flag whether or not the consumer state was restored from a checkpoint / savepoint */ - protected final boolean isRestored; - /** Only relevant for punctuated watermarks: The current cross partition watermark */ private volatile long maxWatermarkSoFar = Long.MIN_VALUE; @@ -84,19 +76,16 @@ public abstract class AbstractFetcher<T, KPH> { protected AbstractFetcher( SourceContext<T> sourceContext, - List<KafkaTopicPartition> assignedPartitions, - HashMap<KafkaTopicPartition, Long> restoredSnapshotState, + Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, - StartupMode startupMode, boolean useMetrics) throws Exception { this.sourceContext = checkNotNull(sourceContext); this.checkpointLock = sourceContext.getCheckpointLock(); - this.startupMode = checkNotNull(startupMode); this.useMetrics = useMetrics; // figure out what we watermark mode we will be using @@ -115,30 +104,25 @@ public abstract class AbstractFetcher<T, KPH> { throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks"); } } - + // create our partition state according to the timestamp/watermark mode - this.allPartitions = initializePartitions( - assignedPartitions, + this.subscribedPartitionStates = initializeSubscribedPartitionStates( + assignedPartitionsWithInitialOffsets, timestampWatermarkMode, watermarksPeriodic, watermarksPunctuated, userCodeClassLoader); - if (restoredSnapshotState != null) { - for (KafkaTopicPartitionState<?> partition : allPartitions) { - Long offset = restoredSnapshotState.get(partition.getKafkaTopicPartition()); - if (offset != null) { - partition.setOffset(offset); - } + // check that all partition states have a defined offset + for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) { + if (!partitionState.isOffsetDefined()) { + throw new IllegalArgumentException("The fetcher was assigned partitions with undefined initial offsets."); } - this.isRestored = true; - } else { - this.isRestored = false; } // if we have periodic watermarks, kick off the interval scheduler if (timestampWatermarkMode == PERIODIC_WATERMARKS) { KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts = - (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions; + (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) subscribedPartitionStates; PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval); @@ -155,8 +139,8 @@ public abstract class AbstractFetcher<T, KPH> { * * @return All subscribed partitions. */ - protected final KafkaTopicPartitionState<KPH>[] subscribedPartitions() { - return allPartitions; + protected final KafkaTopicPartitionState<KPH>[] subscribedPartitionStates() { + return subscribedPartitionStates; } // ------------------------------------------------------------------------ @@ -207,8 +191,8 @@ public abstract class AbstractFetcher<T, KPH> { // this method assumes that the checkpoint lock is held assert Thread.holdsLock(checkpointLock); - HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length); - for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) { + HashMap<KafkaTopicPartition, Long> state = new HashMap<>(subscribedPartitionStates.length); + for (KafkaTopicPartitionState<?> partition : subscribedPartitionStates()) { state.put(partition.getKafkaTopicPartition(), partition.getOffset()); } return state; @@ -343,7 +327,7 @@ public abstract class AbstractFetcher<T, KPH> { if (nextWatermark.getTimestamp() > maxWatermarkSoFar) { long newMin = Long.MAX_VALUE; - for (KafkaTopicPartitionState<?> state : allPartitions) { + for (KafkaTopicPartitionState<?> state : subscribedPartitionStates) { @SuppressWarnings("unchecked") final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState = (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state; @@ -371,8 +355,8 @@ public abstract class AbstractFetcher<T, KPH> { * Utility method that takes the topic partitions and creates the topic partition state * holders. If a watermark generator per partition exists, this will also initialize those. */ - private KafkaTopicPartitionState<KPH>[] initializePartitions( - List<KafkaTopicPartition> assignedPartitions, + private KafkaTopicPartitionState<KPH>[] initializeSubscribedPartitionStates( + Map<KafkaTopicPartition, Long> assignedPartitionsToInitialOffsets, int timestampWatermarkMode, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, @@ -384,13 +368,16 @@ public abstract class AbstractFetcher<T, KPH> { case NO_TIMESTAMPS_WATERMARKS: { @SuppressWarnings("unchecked") KafkaTopicPartitionState<KPH>[] partitions = - (KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitions.size()]; + (KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitionsToInitialOffsets.size()]; int pos = 0; - for (KafkaTopicPartition partition : assignedPartitions) { + for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) { // create the kafka version specific partition handle - KPH kafkaHandle = createKafkaPartitionHandle(partition); - partitions[pos++] = new KafkaTopicPartitionState<>(partition, kafkaHandle); + KPH kafkaHandle = createKafkaPartitionHandle(partition.getKey()); + partitions[pos] = new KafkaTopicPartitionState<>(partition.getKey(), kafkaHandle); + partitions[pos].setOffset(partition.getValue()); + + pos++; } return partitions; @@ -400,17 +387,20 @@ public abstract class AbstractFetcher<T, KPH> { @SuppressWarnings("unchecked") KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions = (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[]) - new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitions.size()]; + new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitionsToInitialOffsets.size()]; int pos = 0; - for (KafkaTopicPartition partition : assignedPartitions) { - KPH kafkaHandle = createKafkaPartitionHandle(partition); + for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) { + KPH kafkaHandle = createKafkaPartitionHandle(partition.getKey()); AssignerWithPeriodicWatermarks<T> assignerInstance = watermarksPeriodic.deserializeValue(userCodeClassLoader); - partitions[pos++] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>( - partition, kafkaHandle, assignerInstance); + partitions[pos] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>( + partition.getKey(), kafkaHandle, assignerInstance); + partitions[pos].setOffset(partition.getValue()); + + pos++; } return partitions; @@ -420,17 +410,20 @@ public abstract class AbstractFetcher<T, KPH> { @SuppressWarnings("unchecked") KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[] partitions = (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[]) - new KafkaTopicPartitionStateWithPunctuatedWatermarks<?, ?>[assignedPartitions.size()]; + new KafkaTopicPartitionStateWithPunctuatedWatermarks<?, ?>[assignedPartitionsToInitialOffsets.size()]; int pos = 0; - for (KafkaTopicPartition partition : assignedPartitions) { - KPH kafkaHandle = createKafkaPartitionHandle(partition); + for (Map.Entry<KafkaTopicPartition, Long> partition : assignedPartitionsToInitialOffsets.entrySet()) { + KPH kafkaHandle = createKafkaPartitionHandle(partition.getKey()); AssignerWithPunctuatedWatermarks<T> assignerInstance = watermarksPunctuated.deserializeValue(userCodeClassLoader); - partitions[pos++] = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>( - partition, kafkaHandle, assignerInstance); + partitions[pos] = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>( + partition.getKey(), kafkaHandle, assignerInstance); + partitions[pos].setOffset(partition.getValue()); + + pos++; } return partitions; @@ -452,7 +445,7 @@ public abstract class AbstractFetcher<T, KPH> { // add current offsets to gage MetricGroup currentOffsets = metricGroup.addGroup("current-offsets"); MetricGroup committedOffsets = metricGroup.addGroup("committed-offsets"); - for (KafkaTopicPartitionState<?> ktp: subscribedPartitions()) { + for (KafkaTopicPartitionState<?> ktp: subscribedPartitionStates()) { currentOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET)); committedOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET)); } http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java index 7cb5f46..adfbf79 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java @@ -28,10 +28,6 @@ package org.apache.flink.streaming.connectors.kafka.internals; * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions. */ public class KafkaTopicPartitionState<KPH> { - - /** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid), - * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */ - public static final long OFFSET_NOT_SET = -915623761776L; // ------------------------------------------------------------------------ @@ -52,8 +48,8 @@ public class KafkaTopicPartitionState<KPH> { public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH kafkaPartitionHandle) { this.partition = partition; this.kafkaPartitionHandle = kafkaPartitionHandle; - this.offset = OFFSET_NOT_SET; - this.committedOffset = OFFSET_NOT_SET; + this.offset = KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET; + this.committedOffset = KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET; } // ------------------------------------------------------------------------ @@ -96,7 +92,7 @@ public class KafkaTopicPartitionState<KPH> { } public final boolean isOffsetDefined() { - return offset != OFFSET_NOT_SET; + return offset != KafkaTopicPartitionStateSentinel.OFFSET_NOT_SET; } public final void setCommittedOffset(long offset) { http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java new file mode 100644 index 0000000..153a326 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +/** + * Magic values used to represent special offset states before partitions are actually read. + * + * The values are all negative. Negative offsets are not used by Kafka (invalid), so we + * pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. + */ +public class KafkaTopicPartitionStateSentinel { + + /** Magic number that defines an unset offset. */ + public static final long OFFSET_NOT_SET = -915623761776L; + + /** + * Magic number that defines the partition should start from the earliest offset. + * + * This is used as a placeholder so that the actual earliest offset can be evaluated lazily + * when the partition will actually start to be read by the consumer. + */ + public static final long EARLIEST_OFFSET = -915623761775L; + + /** + * Magic number that defines the partition should start from the latest offset. + * + * This is used as a placeholder so that the actual latest offset can be evaluated lazily + * when the partition will actually start to be read by the consumer. + */ + public static final long LATEST_OFFSET = -915623761774L; + + /** + * Magic number that defines the partition should start from its committed group offset in Kafka. + * + * This is used as a placeholder so that the actual committed group offset can be evaluated lazily + * when the partition will actually start to be read by the consumer. + */ + public static final long GROUP_OFFSET = -915623761773L; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java index 38a3ce8..20411e1 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.util.SerializedValue; @@ -34,6 +35,7 @@ import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Map; import java.util.HashMap; import java.util.List; @@ -68,8 +70,8 @@ public class FlinkKafkaConsumerBaseMigrationTest { testHarness.open(); // assert that no partitions were found and is empty - Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null); - Assert.assertTrue(consumerFunction.getSubscribedPartitions().isEmpty()); + Assert.assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null); + Assert.assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()); // assert that no state was restored Assert.assertTrue(consumerFunction.getRestoredState() == null); @@ -101,10 +103,16 @@ public class FlinkKafkaConsumerBaseMigrationTest { getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state")); testHarness.open(); + // the expected state in "kafka-consumer-migration-test-flink1.1-snapshot-empty-state"; + // since the state is empty, the consumer should reflect on the startup mode to determine start offsets. + final HashMap<KafkaTopicPartition, Long> expectedSubscribedPartitionsWithStartOffsets = new HashMap<>(); + expectedSubscribedPartitionsWithStartOffsets.put(new KafkaTopicPartition("abc", 13), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + expectedSubscribedPartitionsWithStartOffsets.put(new KafkaTopicPartition("def", 7), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + // assert that there are partitions and is identical to expected list - Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null); - Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty()); - Assert.assertTrue(consumerFunction.getSubscribedPartitions().equals(partitions)); + Assert.assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null); + Assert.assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()); + Assert.assertEquals(expectedSubscribedPartitionsWithStartOffsets, consumerFunction.getSubscribedPartitionsToStartOffsets()); // assert that no state was restored Assert.assertTrue(consumerFunction.getRestoredState() == null); @@ -136,16 +144,18 @@ public class FlinkKafkaConsumerBaseMigrationTest { getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot")); testHarness.open(); - // assert that there are partitions and is identical to expected list - Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null); - Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty()); - Assert.assertEquals(partitions, consumerFunction.getSubscribedPartitions()); - // the expected state in "kafka-consumer-migration-test-flink1.1-snapshot" final HashMap<KafkaTopicPartition, Long> expectedState = new HashMap<>(); expectedState.put(new KafkaTopicPartition("abc", 13), 16768L); expectedState.put(new KafkaTopicPartition("def", 7), 987654321L); + // assert that there are partitions and is identical to expected list + Assert.assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null); + Assert.assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()); + + // on restore, subscribedPartitionsToStartOffsets should be identical to the restored state + Assert.assertEquals(expectedState, consumerFunction.getSubscribedPartitionsToStartOffsets()); + // assert that state is correctly restored from legacy checkpoint Assert.assertTrue(consumerFunction.getRestoredState() != null); Assert.assertEquals(expectedState, consumerFunction.getRestoredState()); @@ -179,8 +189,7 @@ public class FlinkKafkaConsumerBaseMigrationTest { @Override protected AbstractFetcher<T, ?> createFetcher( SourceContext<T> sourceContext, - List<KafkaTopicPartition> thisSubtaskPartitions, - HashMap<KafkaTopicPartition, Long> restoredSnapshotState, + Map<KafkaTopicPartition, Long> thisSubtaskPartitionsWithStartOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/ed68fedb/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index 980a025..e6ea63f 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; @@ -42,13 +43,7 @@ import org.mockito.stubbing.Answer; import java.io.Serializable; import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -137,6 +132,8 @@ public class FlinkKafkaConsumerBaseTest { consumer.initializeState(initializationContext); + consumer.open(new Configuration()); + consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17)); // ensure that the list was cleared and refilled. while this is an implementation detail, we use it here @@ -177,6 +174,8 @@ public class FlinkKafkaConsumerBaseTest { consumer.initializeState(initializationContext); + consumer.open(new Configuration()); + consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17)); assertFalse(listState.get().iterator().hasNext()); @@ -364,15 +363,10 @@ public class FlinkKafkaConsumerBaseTest { @SuppressWarnings("unchecked") protected AbstractFetcher<T, ?> createFetcher( SourceContext<T> sourceContext, - List<KafkaTopicPartition> thisSubtaskPartitions, - HashMap<KafkaTopicPartition, Long> restoredSnapshotState, + Map<KafkaTopicPartition, Long> thisSubtaskPartitionsWithStartOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception { - if (restoredSnapshotState != null) { - Assert.fail("Trying to restore offsets even though there was no restore state."); - return null; - } return mock(AbstractFetcher.class); }
