http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java new file mode 100644 index 0000000..6bad180 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; +import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; +import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.util.SerializedValue; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer API. + * + * @param <T> The type of elements produced by the fetcher. + */ +public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(Kafka09Fetcher.class); + + // ------------------------------------------------------------------------ + + /** The schema to convert between Kafka's byte messages, and Flink's objects */ + private final KeyedDeserializationSchema<T> deserializer; + + /** The subtask's runtime context */ + private final RuntimeContext runtimeContext; + + /** The configuration for the Kafka consumer */ + private final Properties kafkaProperties; + + /** The maximum number of milliseconds to wait for a fetch batch */ + private final long pollTimeout; + + /** Flag whether to register Kafka metrics as Flink accumulators */ + private final boolean forwardKafkaMetrics; + + /** Mutex to guard against concurrent access to the non-threadsafe Kafka consumer */ + private final Object consumerLock = new Object(); + + /** Reference to the Kafka consumer, once it is created */ + private volatile KafkaConsumer<byte[], byte[]> consumer; + + /** Reference to the proxy, forwarding exceptions from the fetch thread to the main thread */ + private volatile ExceptionProxy errorHandler; + + /** Flag to mark the main work loop as alive */ + private volatile boolean running = true; + + // ------------------------------------------------------------------------ + + public Kafka09Fetcher( + SourceContext<T> sourceContext, + List<KafkaTopicPartition> assignedPartitions, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + StreamingRuntimeContext runtimeContext, + KeyedDeserializationSchema<T> deserializer, + Properties kafkaProperties, + long pollTimeout, + boolean forwardKafkaMetrics) throws Exception + { + super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext); + + this.deserializer = deserializer; + this.runtimeContext = runtimeContext; + this.kafkaProperties = kafkaProperties; + this.pollTimeout = pollTimeout; + this.forwardKafkaMetrics = forwardKafkaMetrics; + + // if checkpointing is enabled, we are not automatically committing to Kafka. + kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + Boolean.toString(!runtimeContext.isCheckpointingEnabled())); + } + + // ------------------------------------------------------------------------ + // Fetcher work methods + // ------------------------------------------------------------------------ + + @Override + public void runFetchLoop() throws Exception { + this.errorHandler = new ExceptionProxy(Thread.currentThread()); + + // rather than running the main fetch loop directly here, we spawn a dedicated thread + // this makes sure that no interrupt() call upon canceling reaches the Kafka consumer code + Thread runner = new Thread(this, "Kafka 0.9 Fetcher for " + runtimeContext.getTaskNameWithSubtasks()); + runner.setDaemon(true); + runner.start(); + + try { + runner.join(); + } catch (InterruptedException e) { + // may be the result of a wake-up after an exception. we ignore this here and only + // restore the interruption state + Thread.currentThread().interrupt(); + } + + // make sure we propagate any exception that occurred in the concurrent fetch thread, + // before leaving this method + this.errorHandler.checkAndThrowException(); + } + + @Override + public void cancel() { + // flag the main thread to exit + running = false; + + // NOTE: + // - We cannot interrupt the runner thread, because the Kafka consumer may + // deadlock when the thread is interrupted while in certain methods + // - We cannot call close() on the consumer, because it will actually throw + // an exception if a concurrent call is in progress + + // make sure the consumer finds out faster that we are shutting down + if (consumer != null) { + consumer.wakeup(); + } + } + + @Override + public void run() { + // This method initializes the KafkaConsumer and guarantees it is torn down properly. + // This is important, because the consumer has multi-threading issues, + // including concurrent 'close()' calls. + + final KafkaConsumer<byte[], byte[]> consumer; + try { + consumer = new KafkaConsumer<>(kafkaProperties); + } + catch (Throwable t) { + running = false; + errorHandler.reportError(t); + return; + } + + // from here on, the consumer will be closed properly + try { + consumer.assign(convertKafkaPartitions(subscribedPartitions())); + + // register Kafka metrics to Flink accumulators + if (forwardKafkaMetrics) { + Map<MetricName, ? extends Metric> metrics = consumer.metrics(); + if (metrics == null) { + // MapR's Kafka implementation returns null here. + LOG.info("Consumer implementation does not support metrics"); + } else { + // we have metrics, register them where possible + for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) { + String name = "KafkaConsumer-" + metric.getKey().name(); + DefaultKafkaMetricAccumulator kafkaAccumulator = + DefaultKafkaMetricAccumulator.createFor(metric.getValue()); + + // best effort: we only add the accumulator if available. + if (kafkaAccumulator != null) { + runtimeContext.addAccumulator(name, kafkaAccumulator); + } + } + } + } + + // seek the consumer to the initial offsets + for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { + if (partition.isOffsetDefined()) { + consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); + } + } + + // from now on, external operations may call the consumer + this.consumer = consumer; + + // main fetch loop + while (running) { + // get the next batch of records + final ConsumerRecords<byte[], byte[]> records; + synchronized (consumerLock) { + try { + records = consumer.poll(pollTimeout); + } + catch (WakeupException we) { + if (running) { + throw we; + } else { + continue; + } + } + } + + // get the records for each topic partition + for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { + + List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition.getKafkaPartitionHandle()); + + for (ConsumerRecord<byte[], byte[]> record : partitionRecords) { + T value = deserializer.deserialize( + record.key(), record.value(), + record.topic(), record.partition(), record.offset()); + + if (deserializer.isEndOfStream(value)) { + // end of stream signaled + running = false; + break; + } + + // emit the actual record. this also update offset state atomically + // and deals with timestamps and watermark generation + emitRecord(value, partition, record.offset()); + } + } + } + // end main fetch loop + } + catch (Throwable t) { + if (running) { + running = false; + errorHandler.reportError(t); + } else { + LOG.debug("Stopped ConsumerThread threw exception", t); + } + } + finally { + try { + synchronized (consumerLock) { + consumer.close(); + } + } catch (Throwable t) { + LOG.warn("Error while closing Kafka 0.9 consumer", t); + } + } + } + + // ------------------------------------------------------------------------ + // Kafka 0.9 specific fetcher behavior + // ------------------------------------------------------------------------ + + @Override + public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition partition) { + return new TopicPartition(partition.getTopic(), partition.getPartition()); + } + + @Override + public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { + KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitions(); + Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length); + + for (KafkaTopicPartitionState<TopicPartition> partition : partitions) { + Long offset = offsets.get(partition.getKafkaTopicPartition()); + if (offset != null) { + offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offset, "")); + } + } + + if (this.consumer != null) { + synchronized (consumerLock) { + this.consumer.commitSync(offsetsToCommit); + } + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + public static List<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) { + ArrayList<TopicPartition> result = new ArrayList<>(partitions.length); + for (KafkaTopicPartitionState<TopicPartition> p : partitions) { + result.add(p.getKafkaPartitionHandle()); + } + return result; + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java index 82e1dce..afb0056 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java @@ -27,11 +27,6 @@ public class Kafka09ITCase extends KafkaConsumerTestBase { // ------------------------------------------------------------------------ @Test(timeout = 60000) - public void testCheckpointing() throws Exception { - runCheckpointingTest(); - } - - @Test(timeout = 60000) public void testFailOnNoBroker() throws Exception { runFailOnNoBrokerTest(); } @@ -41,15 +36,15 @@ public class Kafka09ITCase extends KafkaConsumerTestBase { runSimpleConcurrentProducerConsumerTopology(); } - @Test(timeout = 60000) - public void testPunctuatedExplicitWMConsumer() throws Exception { - runExplicitPunctuatedWMgeneratingConsumerTest(false); - } +// @Test(timeout = 60000) +// public void testPunctuatedExplicitWMConsumer() throws Exception { +// runExplicitPunctuatedWMgeneratingConsumerTest(false); +// } - @Test(timeout = 60000) - public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception { - runExplicitPunctuatedWMgeneratingConsumerTest(true); - } +// @Test(timeout = 60000) +// public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception { +// runExplicitPunctuatedWMgeneratingConsumerTest(true); +// } @Test(timeout = 60000) public void testKeyValueSupport() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java index a2c4f73..b80a231 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java @@ -22,19 +22,23 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.TestLogger; + import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; + import org.junit.Test; import org.junit.runner.RunWith; + import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; + import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import java.util.Arrays; +import java.util.Collections; import java.util.Properties; import java.util.concurrent.Future; @@ -60,7 +64,7 @@ public class KafkaProducerTest extends TestLogger { // partition setup when(kafkaProducerMock.partitionsFor(anyString())).thenReturn( - Arrays.asList(new PartitionInfo("mock_topic", 42, null, null, null))); + Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null))); // failure when trying to send an element when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class))) http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java index 74b35af..c1b21b7 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.junit.Test; +@SuppressWarnings("serial") public class KafkaShortRetention09ITCase extends KafkaShortRetentionTestBase { @Test(timeout=60000) http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties index 6bdfb48..fbeb110 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties @@ -25,5 +25,6 @@ log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n # suppress the irrelevant (wrong) warnings from the netty channel handler log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger - - +log4j.logger.org.apache.zookeeper=OFF, testlogger +log4j.logger.state.change.logger=OFF, testlogger +log4j.logger.kafka=OFF, testlogger http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index d9e813f..0ca8fd5 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -18,427 +18,291 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.commons.collections.map.LinkedMap; -import org.apache.flink.api.common.ExecutionConfig; + import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.functions.TimestampAssigner; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionState; +import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.util.SerializedValue; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; - -import static java.util.Objects.requireNonNull; -import static org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.checkArgument; - -public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> - implements CheckpointListener, CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>, ResultTypeQueryable<T>, Triggerable { - - // ------------------------------------------------------------------------ - private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class); +import static org.apache.flink.util.Preconditions.checkNotNull; +/** + * Base class of all Flink Kafka Consumer data sources. + * This implements the common behavior across all Kafka versions. + * + * <p>The Kafka version specific behavior is defined mainly in the specific subclasses of the + * {@link AbstractFetcher}. + * + * @param <T> The type of records produced by this data source + */ +public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements + CheckpointListener, + CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>, + ResultTypeQueryable<T> +{ private static final long serialVersionUID = -6272159445203409112L; - /** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid), - * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */ - public static final long OFFSET_NOT_SET = -915623761776L; - + protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class); + /** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks */ public static final int MAX_NUM_PENDING_CHECKPOINTS = 100; - - /** The schema to convert between Kafka#s byte messages, and Flink's objects */ + // ------------------------------------------------------------------------ + // configuration state, set on the client relevant for all subtasks + // ------------------------------------------------------------------------ + + /** The schema to convert between Kafka's byte messages, and Flink's objects */ protected final KeyedDeserializationSchema<T> deserializer; - // ------ Runtime State ------- + /** The set of topic partitions that the source will read */ + protected List<KafkaTopicPartition> allSubscribedPartitions; + + /** Optional timestamp extractor / watermark generator that will be run per Kafka partition, + * to exploit per-partition timestamp characteristics. + * The assigner is kept in serialized form, to deserialize it into multiple copies */ + private SerializedValue<AssignerWithPeriodicWatermarks<T>> periodicWatermarkAssigner; + + /** Optional timestamp extractor / watermark generator that will be run per Kafka partition, + * to exploit per-partition timestamp characteristics. + * The assigner is kept in serialized form, to deserialize it into multiple copies */ + private SerializedValue<AssignerWithPunctuatedWatermarks<T>> punctuatedWatermarkAssigner; + // ------------------------------------------------------------------------ + // runtime state (used individually by each parallel subtask) + // ------------------------------------------------------------------------ + /** Data for pending but uncommitted checkpoints */ - protected final LinkedMap pendingCheckpoints = new LinkedMap(); - - /** - * Information about the partitions being read by the local consumer. This contains: - * offsets of the last returned elements, and if a timestamp assigner is used, it - * also contains the maximum seen timestamp in the partition and if the partition - * still receives elements or it is inactive. - */ - protected transient HashMap<KafkaTopicPartition, KafkaPartitionState> partitionState; + private final LinkedMap pendingCheckpoints = new LinkedMap(); + /** The fetcher implements the connections to the Kafka brokers */ + private transient volatile AbstractFetcher<T, ?> kafkaFetcher; + /** The offsets to restore to, if the consumer restores state from a checkpoint */ - protected transient HashMap<KafkaTopicPartition, Long> restoreToOffset; - + private transient volatile HashMap<KafkaTopicPartition, Long> restoreToOffset; + /** Flag indicating whether the consumer is still running **/ - protected volatile boolean running = true; + private volatile boolean running = true; // ------------------------------------------------------------------------ - // WATERMARK EMISSION - // ------------------------------------------------------------------------ /** - * The user-specified methods to extract the timestamps from the records in Kafka, and - * to decide when to emit watermarks. - */ - private AssignerWithPunctuatedWatermarks<T> punctuatedWatermarkAssigner; - - /** - * The user-specified methods to extract the timestamps from the records in Kafka, and - * to decide when to emit watermarks. - */ - private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner; - - private StreamingRuntimeContext runtime = null; - - private SourceContext<T> srcContext = null; - - /** - * The interval between consecutive periodic watermark emissions, - * as configured via the {@link ExecutionConfig#getAutoWatermarkInterval()}. - */ - private long watermarkInterval = -1; - - /** The last emitted watermark. */ - private long lastEmittedWatermark = Long.MIN_VALUE; - - // ------------------------------------------------------------------------ - - /** - * Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler. - * - * <p>To determine which kink of fetcher and offset handler to use, please refer to the docs - * at the beginning of this class.</p> + * Base constructor. * * @param deserializer * The deserializer to turn raw byte messages into Java/Scala objects. - * @param props - * The properties that are used to configure both the fetcher and the offset handler. */ - public FlinkKafkaConsumerBase(KeyedDeserializationSchema<T> deserializer, Properties props) { - this.deserializer = requireNonNull(deserializer, "valueDeserializer"); + public FlinkKafkaConsumerBase(KeyedDeserializationSchema<T> deserializer) { + this.deserializer = checkNotNull(deserializer, "valueDeserializer"); } /** - * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner. Bare in mind - * that the source can either have an {@link AssignerWithPunctuatedWatermarks} or an - * {@link AssignerWithPeriodicWatermarks}, not both. + * This method must be called from the subclasses, to set the list of all subscribed partitions + * that this consumer will fetch from (across all subtasks). + * + * @param allSubscribedPartitions The list of all partitions that all subtasks together should fetch from. */ - public FlinkKafkaConsumerBase<T> setPunctuatedWatermarkEmitter(AssignerWithPunctuatedWatermarks<T> assigner) { - checkEmitterDuringInit(); - this.punctuatedWatermarkAssigner = assigner; - return this; + protected void setSubscribedPartitions(List<KafkaTopicPartition> allSubscribedPartitions) { + checkNotNull(allSubscribedPartitions); + this.allSubscribedPartitions = Collections.unmodifiableList(allSubscribedPartitions); } + // ------------------------------------------------------------------------ + // Configuration + // ------------------------------------------------------------------------ + /** - * Specifies an {@link AssignerWithPeriodicWatermarks} to emit watermarks periodically. Bare in mind that the - * source can either have an {@link AssignerWithPunctuatedWatermarks} or an - * {@link AssignerWithPeriodicWatermarks}, not both. + * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner. + * The watermark extractor will run per Kafka partition, watermarks will be merged across partitions + * in the same way as in the Flink runtime, when streams are merged. + * + * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions, + * the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition + * characteristics are usually lost that way. For example, if the timestamps are strictly ascending + * per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the + * parallel source subtask reads more that one partition. + * + * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka + * partition, allows users to let them exploit the per-partition characteristics. + * + * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an + * {@link AssignerWithPeriodicWatermarks}, not both at the same time. + * + * @param assigner The timestamp assigner / watermark generator to use. + * @return The consumer object, to allow function chaining. */ - public FlinkKafkaConsumerBase<T> setPeriodicWatermarkEmitter(AssignerWithPeriodicWatermarks<T> assigner) { - checkEmitterDuringInit(); - this.periodicWatermarkAssigner = assigner; - return this; - } - - /** - * Processes the element after having been read from Kafka and deserialized, and updates the - * last read offset for the specifies partition. These two actions should be performed in - * an atomic way in order to guarantee exactly once semantics. - * @param sourceContext - * The context the task operates in. - * @param partDescriptor - * A descriptor containing the topic and the id of the partition. - * @param value - * The element to process. - * @param offset - * The offset of the element in the partition. - * */ - public void processElement(SourceContext<T> sourceContext, KafkaTopicPartition partDescriptor, T value, long offset) { - if (punctuatedWatermarkAssigner == null && periodicWatermarkAssigner == null) { - // the case where no watermark emitter is specified. - sourceContext.collect(value); - } else { - - if (srcContext == null) { - srcContext = sourceContext; - } - - long extractedTimestamp = extractTimestampAndEmitElement(partDescriptor, value); - - // depending on the specified watermark emitter, either send a punctuated watermark, - // or set the timer for the first periodic watermark. In the periodic case, we set the timer - // only for the first watermark, as it is the trigger() that will set the subsequent ones. - - if (punctuatedWatermarkAssigner != null) { - final Watermark nextWatermark = punctuatedWatermarkAssigner - .checkAndGetNextWatermark(value, extractedTimestamp); - if (nextWatermark != null) { - emitWatermarkIfMarkingProgress(sourceContext); - } - } else if(periodicWatermarkAssigner != null && runtime == null) { - runtime = (StreamingRuntimeContext) getRuntimeContext(); - watermarkInterval = runtime.getExecutionConfig().getAutoWatermarkInterval(); - if (watermarkInterval > 0) { - runtime.registerTimer(System.currentTimeMillis() + watermarkInterval, this); - } - } + public FlinkKafkaConsumerBase<T> setPunctuatedWatermarkEmitter(AssignerWithPunctuatedWatermarks<T> assigner) { + checkNotNull(assigner); + + if (this.periodicWatermarkAssigner != null) { + throw new IllegalStateException("A periodic watermark emitter has already been set."); + } + try { + this.punctuatedWatermarkAssigner = new SerializedValue<>(assigner); + return this; + } catch (Exception e) { + throw new IllegalArgumentException("The given assigner is not serializable", e); } - updateOffsetForPartition(partDescriptor, offset); } /** - * Extract the timestamp from the element based on the user-specified extractor, - * emit the element with the new timestamp, and update the partition monitoring info (if necessary). - * In more detail, upon reception of an element with a timestamp greater than the greatest timestamp - * seen so far in that partition, this method updates the maximum timestamp seen for that partition, - * and marks the partition as {@code active}, i.e. it still receives fresh data. - * @param partDescriptor the partition the new element belongs to. - * @param value the element to be forwarded. - * @return the timestamp of the new element. + * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner. + * The watermark extractor will run per Kafka partition, watermarks will be merged across partitions + * in the same way as in the Flink runtime, when streams are merged. + * + * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions, + * the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition + * characteristics are usually lost that way. For example, if the timestamps are strictly ascending + * per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the + * parallel source subtask reads more that one partition. + * + * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka + * partition, allows users to let them exploit the per-partition characteristics. + * + * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an + * {@link AssignerWithPeriodicWatermarks}, not both at the same time. + * + * @param assigner The timestamp assigner / watermark generator to use. + * @return The consumer object, to allow function chaining. */ - private long extractTimestampAndEmitElement(KafkaTopicPartition partDescriptor, T value) { - long extractedTimestamp = getTimestampAssigner().extractTimestamp(value, Long.MIN_VALUE); - srcContext.collectWithTimestamp(value, extractedTimestamp); - updateMaximumTimestampForPartition(partDescriptor, extractedTimestamp); - return extractedTimestamp; - } - - /** - * Upon reception of an element with a timestamp greater than the greatest timestamp seen so far in the partition, - * this method updates the maximum timestamp seen for that partition to {@code timestamp}, and marks the partition - * as {@code active}, i.e. it still receives fresh data. If the partition is not known to the system, then a new - * {@link KafkaPartitionState} is created and is associated to the new partition for future monitoring. - * @param partDescriptor - * A descriptor containing the topic and the id of the partition. - * @param timestamp - * The timestamp to set the minimum to, if smaller than the already existing one. - * @return {@code true} if the minimum was updated successfully to {@code timestamp}, {@code false} - * if the previous value is smaller than the provided timestamp - * */ - private boolean updateMaximumTimestampForPartition(KafkaTopicPartition partDescriptor, long timestamp) { - KafkaPartitionState info = getOrInitializeInfo(partDescriptor); - - if(timestamp > info.getMaxTimestamp()) { - - // the flag is set to false as soon as the current partition's max timestamp is sent as a watermark. - // if then, and for that partition, only late elements arrive, then the max timestamp will stay the - // same, and it will keep the overall system from progressing. - // To avoid this, we only mark a partition as active on non-late elements. - - info.setActive(true); - info.setMaxTimestamp(timestamp); - return true; + public FlinkKafkaConsumerBase<T> setPeriodicWatermarkEmitter(AssignerWithPeriodicWatermarks<T> assigner) { + checkNotNull(assigner); + + if (this.punctuatedWatermarkAssigner != null) { + throw new IllegalStateException("A punctuated watermark emitter has already been set."); + } + try { + this.periodicWatermarkAssigner = new SerializedValue<>(assigner); + return this; + } catch (Exception e) { + throw new IllegalArgumentException("The given assigner is not serializable", e); } - return false; } - /** - * Updates the last read offset for the partition specified by the {@code partDescriptor} to {@code offset}. - * If it is the first time we see the partition, then a new {@link KafkaPartitionState} is created to monitor - * this specific partition. - * @param partDescriptor the partition whose info to update. - * @param offset the last read offset of the partition. - */ - public void updateOffsetForPartition(KafkaTopicPartition partDescriptor, long offset) { - KafkaPartitionState info = getOrInitializeInfo(partDescriptor); - info.setOffset(offset); - } + // ------------------------------------------------------------------------ + // Work methods + // ------------------------------------------------------------------------ @Override - public void trigger(long timestamp) throws Exception { - if(this.srcContext == null) { - // if the trigger is called before any elements, then we - // just set the next timer to fire when it should and we - // ignore the triggering as this would produce no results. - setNextWatermarkTimer(); - return; + public void run(SourceContext<T> sourceContext) throws Exception { + if (allSubscribedPartitions == null) { + throw new Exception("The partitions were not set for the consumer"); } - - // this is valid because this method is only called when watermarks - // are set to be emitted periodically. - final Watermark nextWatermark = periodicWatermarkAssigner.getCurrentWatermark(); - if(nextWatermark != null) { - emitWatermarkIfMarkingProgress(srcContext); - } - setNextWatermarkTimer(); - } - - /** - * Emits a new watermark, with timestamp equal to the minimum across all the maximum timestamps - * seen per local partition (across all topics). The new watermark is emitted if and only if - * it signals progress in event-time, i.e. if its timestamp is greater than the timestamp of - * the last emitted watermark. In addition, this method marks as inactive the partition whose - * timestamp was emitted as watermark, i.e. the one with the minimum across the maximum timestamps - * of the local partitions. This is done to avoid not making progress because - * a partition stopped receiving data. The partition is going to be marked as {@code active} - * as soon as the <i>next non-late</i> element arrives. - * - * @return {@code true} if the Watermark was successfully emitted, {@code false} otherwise. - */ - private boolean emitWatermarkIfMarkingProgress(SourceFunction.SourceContext<T> sourceContext) { - Tuple2<KafkaTopicPartition, Long> globalMinTs = getMinTimestampAcrossAllTopics(); - if(globalMinTs.f0 != null ) { - synchronized (sourceContext.getCheckpointLock()) { - long minTs = globalMinTs.f1; - if(minTs > lastEmittedWatermark) { - lastEmittedWatermark = minTs; - Watermark toEmit = new Watermark(minTs); - sourceContext.emitWatermark(toEmit); - return true; - } + + // figure out which partitions this subtask should process + final List<KafkaTopicPartition> thisSubtaskPartitions = assignPartitions(allSubscribedPartitions, + getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask()); + + // we need only do work, if we actually have partitions assigned + if (!thisSubtaskPartitions.isEmpty()) { + + // (1) create the fetcher that will communicate with the Kafka brokers + final AbstractFetcher<T, ?> fetcher = createFetcher( + sourceContext, thisSubtaskPartitions, + periodicWatermarkAssigner, punctuatedWatermarkAssigner, + (StreamingRuntimeContext) getRuntimeContext()); + + // (2) set the fetcher to the restored checkpoint offsets + if (restoreToOffset != null) { + fetcher.restoreOffsets(restoreToOffset); } - } - return false; - } - /** - * Kafka sources with timestamp extractors are expected to keep the maximum timestamp seen per - * partition they are reading from. This is to mark the per-partition event-time progress. - * - * This method iterates this list, and returns the minimum timestamp across these per-partition - * max timestamps, and across all topics. In addition to this information, it also returns the topic and - * the partition within the topic the timestamp belongs to. - */ - private Tuple2<KafkaTopicPartition, Long> getMinTimestampAcrossAllTopics() { - Tuple2<KafkaTopicPartition, Long> minTimestamp = new Tuple2<>(null, Long.MAX_VALUE); - for(Map.Entry<KafkaTopicPartition, KafkaPartitionState> entries: partitionState.entrySet()) { - KafkaTopicPartition part = entries.getKey(); - KafkaPartitionState info = entries.getValue(); - - if(partitionIsActive(part) && info.getMaxTimestamp() < minTimestamp.f1) { - minTimestamp.f0 = part; - minTimestamp.f1 = info.getMaxTimestamp(); + // publish the reference, for snapshot-, commit-, and cancel calls + // IMPORTANT: We can only do that now, because only now will calls to + // the fetchers 'snapshotCurrentState()' method return at least + // the restored offsets + this.kafkaFetcher = fetcher; + if (!running) { + return; } + + // (3) run the fetcher' main work method + fetcher.runFetchLoop(); } - - if(minTimestamp.f0 != null) { - // it means that we have a winner and we have to set its flag to - // inactive, until its next non-late element. - KafkaTopicPartition partitionDescriptor = minTimestamp.f0; - setActiveFlagForPartition(partitionDescriptor, false); - } - - return minTimestamp; - } - - /** - * Sets the {@code active} flag for a given partition of a topic to {@code isActive}. - * This flag signals if the partition is still receiving data and it is used to avoid the case - * where a partition stops receiving data, so its max seen timestamp does not advance, and it - * holds back the progress of the watermark for all partitions. Note that if the partition is - * not known to the system, then a new {@link KafkaPartitionState} is created and is associated - * to the new partition for future monitoring. - * - * @param partDescriptor - * A descriptor containing the topic and the id of the partition. - * @param isActive - * The value {@code true} or {@code false} to set the flag to. - */ - private void setActiveFlagForPartition(KafkaTopicPartition partDescriptor, boolean isActive) { - KafkaPartitionState info = getOrInitializeInfo(partDescriptor); - info.setActive(isActive); - } - - /** - * Gets the statistics for a given partition specified by the {@code partition} argument. - * If it is the first time we see this partition, a new {@link KafkaPartitionState} data structure - * is initialized to monitor it from now on. This method never throws a {@link NullPointerException}. - * @param partition the partition to be fetched. - * @return the gathered statistics for that partition. - * */ - private KafkaPartitionState getOrInitializeInfo(KafkaTopicPartition partition) { - KafkaPartitionState info = partitionState.get(partition); - if(info == null) { - info = new KafkaPartitionState(partition.getPartition(), FlinkKafkaConsumerBase.OFFSET_NOT_SET); - partitionState.put(partition, info); + else { + // this source never completes, so emit a Long.MAX_VALUE watermark + // to not block watermark forwarding + sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); + + // wait until this is canceled + final Object waitLock = new Object(); + while (running) { + try { + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (waitLock) { + waitLock.wait(); + } + } + catch (InterruptedException e) { + if (!running) { + // restore the interrupted state, and fall through the loop + Thread.currentThread().interrupt(); + } + } + } } - return info; } - /** - * Checks if a partition of a topic is still active, i.e. if it still receives data. - * @param partDescriptor - * A descriptor containing the topic and the id of the partition. - * */ - private boolean partitionIsActive(KafkaTopicPartition partDescriptor) { - KafkaPartitionState info = partitionState.get(partDescriptor); - if(info == null) { - throw new RuntimeException("Unknown Partition: Topic=" + partDescriptor.getTopic() + - " Partition=" + partDescriptor.getPartition()); + @Override + public void cancel() { + // set ourselves as not running + running = false; + + // abort the fetcher, if there is one + if (kafkaFetcher != null) { + kafkaFetcher.cancel(); } - return info.isActive(); - } - private TimestampAssigner<T> getTimestampAssigner() { - checkEmitterStateAfterInit(); - return periodicWatermarkAssigner != null ? periodicWatermarkAssigner : punctuatedWatermarkAssigner; - } - - private void setNextWatermarkTimer() { - long timeToNextWatermark = System.currentTimeMillis() + watermarkInterval; - runtime.registerTimer(timeToNextWatermark, this); - } - - private void checkEmitterDuringInit() { - if(periodicWatermarkAssigner != null) { - throw new RuntimeException("A periodic watermark emitter has already been provided."); - } else if(punctuatedWatermarkAssigner != null) { - throw new RuntimeException("A punctuated watermark emitter has already been provided."); - } + // there will be an interrupt() call to the main thread anyways } - private void checkEmitterStateAfterInit() { - if(periodicWatermarkAssigner == null && punctuatedWatermarkAssigner == null) { - throw new RuntimeException("The timestamp assigner has not been initialized."); - } else if(periodicWatermarkAssigner != null && punctuatedWatermarkAssigner != null) { - throw new RuntimeException("The source can either have an assigner with punctuated " + - "watermarks or one with periodic watermarks, not both."); + @Override + public void close() throws Exception { + // pretty much the same logic as cancelling + try { + cancel(); + } finally { + super.close(); } } - + // ------------------------------------------------------------------------ // Checkpoint and restore // ------------------------------------------------------------------------ - - HashMap<KafkaTopicPartition, KafkaPartitionState> restoreInfoFromCheckpoint() { - HashMap<KafkaTopicPartition, KafkaPartitionState> partInfo = new HashMap<>(restoreToOffset.size()); - for(Map.Entry<KafkaTopicPartition, Long> offsets: restoreToOffset.entrySet()) { - KafkaTopicPartition key = offsets.getKey(); - partInfo.put(key, new KafkaPartitionState(key.getPartition(), offsets.getValue())); - } - return partInfo; - } - + @Override public HashMap<KafkaTopicPartition, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - if (partitionState == null) { - LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - } if (!running) { LOG.debug("snapshotState() called on closed source"); return null; } - - HashMap<KafkaTopicPartition, Long> currentOffsets = new HashMap<>(); - for (Map.Entry<KafkaTopicPartition, KafkaPartitionState> entry: partitionState.entrySet()) { - currentOffsets.put(entry.getKey(), entry.getValue().getOffset()); + + final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher; + if (fetcher == null) { + // the fetcher has not yet been initialized, which means we need to return the + // originally restored offsets + return restoreToOffset; } + HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState(); + if (LOG.isDebugEnabled()) { LOG.debug("Snapshotting state. Offsets: {}, checkpoint id: {}, timestamp: {}", KafkaTopicPartition.toString(currentOffsets), checkpointId, checkpointTimestamp); @@ -447,7 +311,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti // the map cannot be asynchronously updated, because only one checkpoint call can happen // on this function at a time: either snapshotState() or notifyCheckpointComplete() pendingCheckpoints.put(checkpointId, currentOffsets); - + + // truncate the map, to prevent infinite growth while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) { pendingCheckpoints.remove(0); } @@ -457,51 +322,49 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti @Override public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) { - LOG.info("Setting restore state in Kafka"); + LOG.info("Setting restore state in the FlinkKafkaConsumer"); restoreToOffset = restoredOffsets; } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { - if (partitionState == null) { - LOG.debug("notifyCheckpointComplete() called on uninitialized source"); - return; - } if (!running) { LOG.debug("notifyCheckpointComplete() called on closed source"); return; } + + final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher; + if (fetcher == null) { + LOG.debug("notifyCheckpointComplete() called on uninitialized source"); + return; + } // only one commit operation must be in progress if (LOG.isDebugEnabled()) { - LOG.debug("Committing offsets externally for checkpoint {}", checkpointId); + LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId); } try { - HashMap<KafkaTopicPartition, Long> checkpointOffsets; - - // the map may be asynchronously updates when snapshotting state, so we synchronize - synchronized (pendingCheckpoints) { - final int posInMap = pendingCheckpoints.indexOf(checkpointId); - if (posInMap == -1) { - LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId); - return; - } + final int posInMap = pendingCheckpoints.indexOf(checkpointId); + if (posInMap == -1) { + LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId); + return; + } - //noinspection unchecked - checkpointOffsets = (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap); + @SuppressWarnings("unchecked") + HashMap<KafkaTopicPartition, Long> checkpointOffsets = + (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap); - - // remove older checkpoints in map - for (int i = 0; i < posInMap; i++) { - pendingCheckpoints.remove(0); - } + // remove older checkpoints in map + for (int i = 0; i < posInMap; i++) { + pendingCheckpoints.remove(0); } + if (checkpointOffsets == null || checkpointOffsets.size() == 0) { LOG.debug("Checkpoint state was empty."); return; } - commitOffsets(checkpointOffsets); + fetcher.commitSpecificOffsetsToKafka(checkpointOffsets); } catch (Exception e) { if (running) { @@ -511,33 +374,77 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti } } - protected abstract void commitOffsets(HashMap<KafkaTopicPartition, Long> checkpointOffsets) throws Exception; - - + // ------------------------------------------------------------------------ + // Kafka Consumer specific methods + // ------------------------------------------------------------------------ + + /** + * Creates the fetcher that connect to the Kafka brokers, pulls data, deserialized the + * data, and emits it into the data streams. + * + * @param sourceContext The source context to emit data to. + * @param thisSubtaskPartitions The set of partitions that this subtask should handle. + * @param watermarksPeriodic Optional, a serialized timestamp extractor / periodic watermark generator. + * @param watermarksPunctuated Optional, a serialized timestamp extractor / punctuated watermark generator. + * @param runtimeContext The task's runtime context. + * + * @return The instantiated fetcher + * + * @throws Exception The method should forward exceptions + */ + protected abstract AbstractFetcher<T, ?> createFetcher( + SourceContext<T> sourceContext, + List<KafkaTopicPartition> thisSubtaskPartitions, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + StreamingRuntimeContext runtimeContext) throws Exception; + + // ------------------------------------------------------------------------ + // ResultTypeQueryable methods + // ------------------------------------------------------------------------ + @Override public TypeInformation<T> getProducedType() { return deserializer.getProducedType(); } - protected static <T> List<T> assignPartitions(List<T> partitions, int numConsumers, int consumerIndex) { - checkArgument(numConsumers > 0); - checkArgument(consumerIndex < numConsumers); - - List<T> partitionsToSub = new ArrayList<>(); + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ - for (int i = 0; i < partitions.size(); i++) { + /** + * Selects which of the given partitions should be handled by a specific consumer, + * given a certain number of consumers. + * + * @param allPartitions The partitions to select from + * @param numConsumers The number of consumers + * @param consumerIndex The index of the specific consumer + * + * @return The sublist of partitions to be handled by that consumer. + */ + protected static List<KafkaTopicPartition> assignPartitions( + List<KafkaTopicPartition> allPartitions, + int numConsumers, int consumerIndex) + { + final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>( + allPartitions.size() / numConsumers + 1); + + for (int i = 0; i < allPartitions.size(); i++) { if (i % numConsumers == consumerIndex) { - partitionsToSub.add(partitions.get(i)); + thisSubtaskPartitions.add(allPartitions.get(i)); } } - return partitionsToSub; + + return thisSubtaskPartitions; } - + /** - * Method to log partition information. + * Logs the partition information in INFO level. + * + * @param logger The logger to log to. * @param partitionInfos List of subscribed partitions */ - public static void logPartitionInfo(List<KafkaTopicPartition> partitionInfos) { + protected static void logPartitionInfo(Logger logger, List<KafkaTopicPartition> partitionInfos) { Map<String, Integer> countPerTopic = new HashMap<>(); for (KafkaTopicPartition partition : partitionInfos) { Integer count = countPerTopic.get(partition.getTopic()); @@ -548,12 +455,13 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti } countPerTopic.put(partition.getTopic(), count); } - StringBuilder sb = new StringBuilder(); + StringBuilder sb = new StringBuilder( + "Consumer is going to read the following topics (with number of partitions): "); + for (Map.Entry<String, Integer> e : countPerTopic.entrySet()) { sb.append(e.getKey()).append(" (").append(e.getValue()).append("), "); } - LOG.info("Consumer is going to read the following topics (with number of partitions): {}", sb.toString()); + + logger.info(sb.toString()); } - - } http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java new file mode 100644 index 0000000..594aa66 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.util.SerializedValue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for all fetchers, which implement the connections to Kafka brokers and + * pull records from Kafka partitions. + * + * <p>This fetcher base class implements the logic around emitting records and tracking offsets, + * as well as around the optional timestamp assignment and watermark generation. + * + * @param <T> The type of elements deserialized from Kafka's byte records, and emitted into + * the Flink data streams. + * @param <KPH> The type of topic/partition identifier used by Kafka in the specific version. + */ +public abstract class AbstractFetcher<T, KPH> { + + private static final int NO_TIMESTAMPS_WATERMARKS = 0; + private static final int PERIODIC_WATERMARKS = 1; + private static final int PUNCTUATED_WATERMARKS = 2; + + // ------------------------------------------------------------------------ + + /** The source context to emit records and watermarks to */ + private final SourceContext<T> sourceContext; + + /** The lock that guarantees that record emission and state updates are atomic, + * from the view of taking a checkpoint */ + private final Object checkpointLock; + + /** All partitions (and their state) that this fetcher is subscribed to */ + private final KafkaTopicPartitionState<KPH>[] allPartitions; + + /** The mode describing whether the fetcher also generates timestamps and watermarks */ + private final int timestampWatermarkMode; + + /** Only relevant for punctuated watermarks: The current cross partition watermark */ + private volatile long maxWatermarkSoFar = Long.MIN_VALUE; + + // ------------------------------------------------------------------------ + + protected AbstractFetcher( + SourceContext<T> sourceContext, + List<KafkaTopicPartition> assignedPartitions, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + StreamingRuntimeContext runtimeContext) throws Exception + { + this.sourceContext = checkNotNull(sourceContext); + this.checkpointLock = sourceContext.getCheckpointLock(); + + // figure out what we watermark mode we will be using + + if (watermarksPeriodic == null) { + if (watermarksPunctuated == null) { + // simple case, no watermarks involved + timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS; + } else { + timestampWatermarkMode = PUNCTUATED_WATERMARKS; + } + } else { + if (watermarksPunctuated == null) { + timestampWatermarkMode = PERIODIC_WATERMARKS; + } else { + throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks"); + } + } + + // create our partition state according to the timestamp/watermark mode + this.allPartitions = initializePartitions( + assignedPartitions, + timestampWatermarkMode, + watermarksPeriodic, watermarksPunctuated, + runtimeContext.getUserCodeClassLoader()); + + // if we have periodic watermarks, kick off the interval scheduler + if (timestampWatermarkMode == PERIODIC_WATERMARKS) { + KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts = + (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions; + + PeriodicWatermarkEmitter periodicEmitter = + new PeriodicWatermarkEmitter(parts, sourceContext, runtimeContext); + periodicEmitter.start(); + } + } + + // ------------------------------------------------------------------------ + // Properties + // ------------------------------------------------------------------------ + + /** + * Gets all partitions (with partition state) that this fetcher is subscribed to. + * + * @return All subscribed partitions. + */ + protected final KafkaTopicPartitionState<KPH>[] subscribedPartitions() { + return allPartitions; + } + + // ------------------------------------------------------------------------ + // Core fetcher work methods + // ------------------------------------------------------------------------ + + public abstract void runFetchLoop() throws Exception; + + public abstract void cancel(); + + // ------------------------------------------------------------------------ + // Kafka version specifics + // ------------------------------------------------------------------------ + + /** + * Creates the Kafka version specific representation of the given + * topic partition. + * + * @param partition The Flink representation of the Kafka topic partition. + * @return The specific Kafka representation of the Kafka topic partition. + */ + public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition partition); + + /** + * Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for + * older Kafka versions). + * + * @param offsets The offsets to commit to Kafka. + * @throws Exception This method forwards exceptions. + */ + public abstract void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception; + + // ------------------------------------------------------------------------ + // snapshot and restore the state + // ------------------------------------------------------------------------ + + /** + * Takes a snapshot of the partition offsets. + * + * <p>Important: This method mus be called under the checkpoint lock. + * + * @return A map from partition to current offset. + */ + public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() { + // this method assumes that the checkpoint lock is held + assert Thread.holdsLock(checkpointLock); + + HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length); + for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) { + if (partition.isOffsetDefined()) { + state.put(partition.getKafkaTopicPartition(), partition.getOffset()); + } + } + return state; + } + + /** + * Restores the partition offsets. + * + * @param snapshotState The offsets for the partitions + */ + public void restoreOffsets(HashMap<KafkaTopicPartition, Long> snapshotState) { + for (KafkaTopicPartitionState<?> partition : allPartitions) { + Long offset = snapshotState.get(partition.getKafkaTopicPartition()); + if (offset != null) { + partition.setOffset(offset); + } + } + } + + // ------------------------------------------------------------------------ + // emitting records + // ------------------------------------------------------------------------ + + /** + * + * <p>Implementation Note: This method is kept brief to be JIT inlining friendly. + * That makes the fast path efficient, the extended paths are called as separate methods. + * + * @param record The record to emit + * @param partitionState The state of the Kafka partition from which the record was fetched + * @param offset The offset from which the record was fetched + */ + protected final void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) { + if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) { + // fast path logic, in case there are no watermarks + + // emit the record, using the checkpoint lock to guarantee + // atomicity of record emission and offset state update + synchronized (checkpointLock) { + sourceContext.collect(record); + partitionState.setOffset(offset); + } + } + else if (timestampWatermarkMode == PERIODIC_WATERMARKS) { + emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset); + } + else { + emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset); + } + } + + /** + * Record emission, if a timestamp will be attached from an assigner that is + * also a periodic watermark generator. + */ + private void emitRecordWithTimestampAndPeriodicWatermark( + T record, KafkaTopicPartitionState<KPH> partitionState, long offset) + { + @SuppressWarnings("unchecked") + final KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> withWatermarksState = + (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState; + + // extract timestamp - this accesses/modifies the per-partition state inside the + // watermark generator instance, so we need to lock the access on the + // partition state. concurrent access can happen from the periodic emitter + final long timestamp; + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (withWatermarksState) { + timestamp = withWatermarksState.getTimestampForRecord(record); + } + + // emit the record with timestamp, using the usual checkpoint lock to guarantee + // atomicity of record emission and offset state update + synchronized (checkpointLock) { + sourceContext.collectWithTimestamp(record, timestamp); + partitionState.setOffset(offset); + } + } + + /** + * Record emission, if a timestamp will be attached from an assigner that is + * also a punctuated watermark generator. + */ + private void emitRecordWithTimestampAndPunctuatedWatermark( + T record, KafkaTopicPartitionState<KPH> partitionState, long offset) + { + @SuppressWarnings("unchecked") + final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState = + (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState; + + // only one thread ever works on accessing timestamps and watermarks + // from the punctuated extractor + final long timestamp = withWatermarksState.getTimestampForRecord(record); + final Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp); + + // emit the record with timestamp, using the usual checkpoint lock to guarantee + // atomicity of record emission and offset state update + synchronized (checkpointLock) { + sourceContext.collectWithTimestamp(record, timestamp); + partitionState.setOffset(offset); + } + + // if we also have a new per-partition watermark, check if that is also a + // new cross-partition watermark + if (newWatermark != null) { + updateMinPunctuatedWatermark(newWatermark); + } + } + /** + *Checks whether a new per-partition watermark is also a new cross-partition watermark. + */ + private void updateMinPunctuatedWatermark(Watermark nextWatermark) { + if (nextWatermark.getTimestamp() > maxWatermarkSoFar) { + long newMin = Long.MAX_VALUE; + + for (KafkaTopicPartitionState<?> state : allPartitions) { + @SuppressWarnings("unchecked") + final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState = + (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state; + + newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark()); + } + + // double-check locking pattern + if (newMin > maxWatermarkSoFar) { + synchronized (checkpointLock) { + if (newMin > maxWatermarkSoFar) { + maxWatermarkSoFar = newMin; + sourceContext.emitWatermark(new Watermark(newMin)); + } + } + } + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Utility method that takes the topic partitions and creates the topic partition state + * holders. If a watermark generator per partition exists, this will also initialize those. + */ + private KafkaTopicPartitionState<KPH>[] initializePartitions( + List<KafkaTopicPartition> assignedPartitions, + int timestampWatermarkMode, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + ClassLoader userCodeClassLoader) + throws IOException, ClassNotFoundException + { + @SuppressWarnings("unchecked") + KafkaTopicPartitionState<KPH>[] partitions = + (KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitions.size()]; + + int pos = 0; + for (KafkaTopicPartition partition : assignedPartitions) { + // create the kafka version specific partition handle + KPH kafkaHandle = createKafkaPartitionHandle(partition); + + // create the partition state + KafkaTopicPartitionState<KPH> partitionState; + switch (timestampWatermarkMode) { + case NO_TIMESTAMPS_WATERMARKS: + partitionState = new KafkaTopicPartitionState<>(partition, kafkaHandle); + break; + case PERIODIC_WATERMARKS: { + AssignerWithPeriodicWatermarks<T> assignerInstance = + watermarksPeriodic.deserializeValue(userCodeClassLoader); + partitionState = new KafkaTopicPartitionStateWithPeriodicWatermarks<>( + partition, kafkaHandle, assignerInstance); + break; + } + + case PUNCTUATED_WATERMARKS: { + AssignerWithPunctuatedWatermarks<T> assignerInstance = + watermarksPunctuated.deserializeValue(userCodeClassLoader); + partitionState = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>( + partition, kafkaHandle, assignerInstance); + break; + } + default: + // cannot happen, add this as a guard for the future + throw new RuntimeException(); + } + + partitions[pos++] = partitionState; + } + + return partitions; + } + + // ------------------------------------------------------------------------ + + /** + * The periodic watermark emitter. In its given interval, it checks all partitions for + * the current event time watermark, and possibly emits the next watermark. + */ + private static class PeriodicWatermarkEmitter implements Triggerable { + + private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions; + + private final SourceContext<?> emitter; + + private final StreamingRuntimeContext triggerContext; + + private final long interval; + + private long lastWatermarkTimestamp; + + //------------------------------------------------- + + PeriodicWatermarkEmitter( + KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions, + SourceContext<?> emitter, + StreamingRuntimeContext runtimeContext) + { + this.allPartitions = checkNotNull(allPartitions); + this.emitter = checkNotNull(emitter); + this.triggerContext = checkNotNull(runtimeContext); + this.interval = runtimeContext.getExecutionConfig().getAutoWatermarkInterval(); + this.lastWatermarkTimestamp = Long.MIN_VALUE; + } + + //------------------------------------------------- + + public void start() { + triggerContext.registerTimer(System.currentTimeMillis() + interval, this); + } + + @Override + public void trigger(long timestamp) throws Exception { + // sanity check + assert Thread.holdsLock(emitter.getCheckpointLock()); + + long minAcrossAll = Long.MAX_VALUE; + for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) { + + // we access the current watermark for the periodic assigners under the state + // lock, to prevent concurrent modification to any internal variables + final long curr; + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (state) { + curr = state.getCurrentWatermarkTimestamp(); + } + + minAcrossAll = Math.min(minAcrossAll, curr); + } + + // emit next watermark, if there is one + if (minAcrossAll > lastWatermarkTimestamp) { + lastWatermarkTimestamp = minAcrossAll; + emitter.emitWatermark(new Watermark(minAcrossAll)); + } + + // schedule the next watermark + triggerContext.registerTimer(System.currentTimeMillis() + interval, this); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java new file mode 100644 index 0000000..9a0e4e3 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import javax.annotation.Nullable; +import java.util.concurrent.atomic.AtomicReference; + +/** + * + */ +public class ExceptionProxy { + + /** The thread that should be interrupted when an exception occurs */ + private final Thread toInterrupt; + + /** The exception to throw */ + private final AtomicReference<Throwable> exception; + + /** + * + * @param toInterrupt The thread to interrupt upon an exception. May be null. + */ + public ExceptionProxy(@Nullable Thread toInterrupt) { + this.toInterrupt = toInterrupt; + this.exception = new AtomicReference<>(); + } + + // ------------------------------------------------------------------------ + + /** + * Sets the exception occurred and interrupts the target thread, + * if no other exception has occurred so far. + * + * @param t The exception that occurred + */ + public void reportError(Throwable t) { + // set the exception, if it is the first + if (exception.compareAndSet(null, t) && toInterrupt != null) { + toInterrupt.interrupt(); + } + } + + public void checkAndThrowException() throws Exception { + Throwable t = exception.get(); + if (t != null) { + if (t instanceof Exception) { + throw (Exception) t; + } + else if (t instanceof Error) { + throw (Error) t; + } + else { + throw new Exception(t); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java deleted file mode 100644 index 11a392a..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import java.io.Serializable; - -public class KafkaPartitionState implements Serializable { - - private static final long serialVersionUID = 722083576322742328L; - - private final int partitionID; - private long offset; - - private long maxTimestamp = Long.MIN_VALUE; - private boolean isActive = false; - - public KafkaPartitionState(int id, long offset) { - this.partitionID = id; - this.offset = offset; - } - - public void setOffset(long offset) { - this.offset = offset; - } - - public void setActive(boolean isActive) { - this.isActive = isActive; - } - - public void setMaxTimestamp(long timestamp) { - maxTimestamp = timestamp; - } - - public int getPartition() { - return partitionID; - } - - public boolean isActive() { - return isActive; - } - - public long getMaxTimestamp() { - return maxTimestamp; - } - - public long getOffset() { - return offset; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java index aea14cf..c68fe28 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java @@ -24,14 +24,20 @@ import java.util.Map; import static java.util.Objects.requireNonNull; - /** - * A serializable representation of a kafka topic and a partition. - * Used as an operator state for the Kafka consumer + * Flink's description of a partition in a Kafka topic. + * Serializable, and common across all Kafka consumer subclasses (0.8, 0.9, ...) + * + * <p>Note: This class must not change in its structure, because it would change the + * serialization format and make previous savepoints unreadable. */ -public class KafkaTopicPartition implements Serializable { +public final class KafkaTopicPartition implements Serializable { + /** THIS SERIAL VERSION UID MUST NOT CHANGE, BECAUSE IT WOULD BREAK + * READING OLD SERIALIZED INSTANCES FROM SAVEPOINTS */ private static final long serialVersionUID = 722083576322742325L; + + // ------------------------------------------------------------------------ private final String topic; private final int partition; @@ -43,6 +49,8 @@ public class KafkaTopicPartition implements Serializable { this.cachedHash = 31 * topic.hashCode() + partition; } + // ------------------------------------------------------------------------ + public String getTopic() { return topic; } @@ -51,6 +59,8 @@ public class KafkaTopicPartition implements Serializable { return partition; } + // ------------------------------------------------------------------------ + @Override public String toString() { return "KafkaTopicPartition{" + @@ -64,25 +74,23 @@ public class KafkaTopicPartition implements Serializable { if (this == o) { return true; } - if (!(o instanceof KafkaTopicPartition)) { - return false; + else if (o instanceof KafkaTopicPartition) { + KafkaTopicPartition that = (KafkaTopicPartition) o; + return this.partition == that.partition && this.topic.equals(that.topic); } - - KafkaTopicPartition that = (KafkaTopicPartition) o; - - if (partition != that.partition) { + else { return false; } - return topic.equals(that.topic); } @Override public int hashCode() { return cachedHash; } - - - // ------------------- Utilities ------------------------------------- + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ public static String toString(Map<KafkaTopicPartition, Long> map) { StringBuilder sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java new file mode 100644 index 0000000..36612a4 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +/** + * The state that the Flink Kafka Consumer holds for each Kafka partition. + * Includes the Kafka descriptor for partitions. + * + * <p>This class describes the most basic state (only the offset), subclasses + * define more elaborate state, containing current watermarks and timestamp + * extractors. + * + * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions. + */ +public class KafkaTopicPartitionState<KPH> { + + /** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid), + * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */ + public static final long OFFSET_NOT_SET = -915623761776L; + + // ------------------------------------------------------------------------ + + /** The Flink description of a Kafka partition */ + private final KafkaTopicPartition partition; + + /** The Kafka description of a Kafka partition (varies across different Kafka versions) */ + private final KPH kafkaPartitionHandle; + + /** The offset within the Kafka partition that we already processed */ + private volatile long offset; + + // ------------------------------------------------------------------------ + + public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH kafkaPartitionHandle) { + this.partition = partition; + this.kafkaPartitionHandle = kafkaPartitionHandle; + this.offset = OFFSET_NOT_SET; + } + + // ------------------------------------------------------------------------ + + /** + * Gets Flink's descriptor for the Kafka Partition. + * @return The Flink partition descriptor. + */ + public final KafkaTopicPartition getKafkaTopicPartition() { + return partition; + } + + /** + * Gets Kafka's descriptor for the Kafka Partition. + * @return The Kafka partition descriptor. + */ + public final KPH getKafkaPartitionHandle() { + return kafkaPartitionHandle; + } + + public final String getTopic() { + return partition.getTopic(); + } + + public final int getPartition() { + return partition.getPartition(); + } + + /** + * The current offset in the partition. This refers to the offset last element that + * we retrieved and emitted successfully. It is the offset that should be stored in + * a checkpoint. + */ + public final long getOffset() { + return offset; + } + + public final void setOffset(long offset) { + this.offset = offset; + } + + public final boolean isOffsetDefined() { + return offset != OFFSET_NOT_SET; + } + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "Partition: " + partition + ", KafkaPartitionHandle=" + kafkaPartitionHandle + + ", offset=" + (isOffsetDefined() ? String.valueOf(offset) : "(not set)"); + } +}
