[FLINK-3375] [kafka connector] Rework/simplify Kafka Connector and have a WatermarkExtractor object per partition
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c93103d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c93103d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c93103d Branch: refs/heads/master Commit: 3c93103d1476cb05ec0c018bfa6876e4ecad38e8 Parents: 0ac1549 Author: Stephan Ewen <[email protected]> Authored: Wed Apr 6 23:21:17 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Apr 13 01:10:54 2016 +0200 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaConsumer08.java | 358 ++------ .../connectors/kafka/internals/Fetcher.java | 72 -- .../kafka/internals/Kafka08Fetcher.java | 446 +++++++++ .../kafka/internals/KillerWatchDog.java | 62 ++ .../kafka/internals/LegacyFetcher.java | 896 ------------------- .../kafka/internals/OffsetHandler.java | 55 -- .../kafka/internals/PartitionInfoFetcher.java | 66 ++ .../kafka/internals/PartitionerWrapper.java | 49 - .../internals/PeriodicOffsetCommitter.java | 85 ++ .../kafka/internals/SimpleConsumerThread.java | 504 +++++++++++ .../kafka/internals/ZookeeperOffsetHandler.java | 58 +- .../connectors/kafka/Kafka08ITCase.java | 176 ++-- .../connectors/kafka/KafkaConsumer08Test.java | 90 ++ .../connectors/kafka/KafkaConsumerTest.java | 156 ---- .../kafka/KafkaShortRetention08ITCase.java | 3 +- .../internals/ZookeeperOffsetHandlerTest.java | 56 -- .../src/test/resources/log4j-test.properties | 5 +- .../connectors/kafka/FlinkKafkaConsumer09.java | 398 ++------ .../kafka/internal/Kafka09Fetcher.java | 311 +++++++ .../connectors/kafka/Kafka09ITCase.java | 21 +- .../connectors/kafka/KafkaProducerTest.java | 8 +- .../kafka/KafkaShortRetention09ITCase.java | 1 + .../src/test/resources/log4j-test.properties | 5 +- .../kafka/FlinkKafkaConsumerBase.java | 668 ++++++-------- .../kafka/internals/AbstractFetcher.java | 439 +++++++++ .../kafka/internals/ExceptionProxy.java | 73 ++ .../kafka/internals/KafkaPartitionState.java | 65 -- .../kafka/internals/KafkaTopicPartition.java | 36 +- .../internals/KafkaTopicPartitionState.java | 105 +++ ...picPartitionStateWithPeriodicWatermarks.java | 71 ++ ...cPartitionStateWithPunctuatedWatermarks.java | 84 ++ .../kafka/partitioner/KafkaPartitioner.java | 2 +- .../connectors/kafka/util/KafkaUtils.java | 13 +- .../kafka/FlinkKafkaConsumerBaseTest.java | 222 +++++ .../KafkaConsumerPartitionAssignmentTest.java | 96 +- .../connectors/kafka/KafkaConsumerTestBase.java | 175 ++-- .../connectors/kafka/KafkaProducerTestBase.java | 8 +- .../kafka/KafkaShortRetentionTestBase.java | 45 +- .../internals/KafkaTopicPartitionTest.java | 57 ++ .../testutils/JobManagerCommunicationUtils.java | 49 +- .../kafka/testutils/MockRuntimeContext.java | 26 +- .../AssignerWithPeriodicWatermarks.java | 3 + .../AssignerWithPunctuatedWatermarks.java | 3 + 43 files changed, 3407 insertions(+), 2714 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java index 4748781..48cc461 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.kafka; +import kafka.api.OffsetRequest; import kafka.cluster.Broker; import kafka.common.ErrorMapping; import kafka.javaapi.PartitionMetadata; @@ -24,40 +25,32 @@ import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.consumer.SimpleConsumer; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.connectors.kafka.internals.Fetcher; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionState; +import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; +import org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher; -import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; -import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; import org.apache.flink.streaming.util.serialization.DeserializationSchema; - import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; import org.apache.flink.util.NetUtils; +import org.apache.flink.util.SerializedValue; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.Node; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.IOException; import java.net.URL; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.Random; -import static java.util.Objects.requireNonNull; import static org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.getIntFromConfig; import static org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.getLongFromConfig; - +import static org.apache.flink.util.Preconditions.checkNotNull; /** * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from @@ -102,12 +95,8 @@ import static org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.getLon * reach the Kafka brokers or ZooKeeper.</p> */ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { - - // ------------------------------------------------------------------------ - + private static final long serialVersionUID = -6272159445203409112L; - - private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer08.class); /** Configuration key for the number of retries for getting the partition info */ public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry"; @@ -115,30 +104,16 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { /** Default number of retries for getting the partition info. One retry means going through the full list of brokers */ public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3; - - // ------ Configuration of the Consumer ------- + // ------------------------------------------------------------------------ - /** Initial list of topics and partitions to consume */ - private final List<KafkaTopicPartition> partitionInfos; - /** The properties to parametrize the Kafka consumer and ZooKeeper client */ - private final Properties props; + private final Properties kafkaProperties; + /** The behavior when encountering an invalid offset (see {@link OffsetRequest}) */ + private final long invalidOffsetBehavior; - // ------ Runtime State ------- - - /** The fetcher used to pull data from the Kafka brokers */ - private transient Fetcher fetcher; - - /** The committer that persists the committed offsets */ - private transient OffsetHandler offsetHandler; - - /** The partitions actually handled by this consumer at runtime */ - private transient List<KafkaTopicPartition> subscribedPartitions; - - /** The latest offsets that have been committed to Kafka or ZooKeeper. These are never - * newer then the last offsets (Flink's internal view is fresher) */ - private transient HashMap<KafkaTopicPartition, Long> committedOffsets; + /** The interval in which to automatically commit (-1 if deactivated) */ + private final long autoCommitInterval; // ------------------------------------------------------------------------ @@ -202,287 +177,49 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { * The properties that are used to configure both the fetcher and the offset handler. */ public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) { - super(deserializer, props); + super(deserializer); - requireNonNull(topics, "topics"); - this.props = requireNonNull(props, "props"); + checkNotNull(topics, "topics"); + this.kafkaProperties = checkNotNull(props, "props"); // validate the zookeeper properties validateZooKeeperConfig(props); + this.invalidOffsetBehavior = getInvalidOffsetBehavior(props); + this.autoCommitInterval = getLongFromConfig(props, "auto.commit.interval.ms", 60000); + // Connect to a broker to get the partitions for all topics - this.partitionInfos = KafkaTopicPartition.dropLeaderData(getPartitionsForTopic(topics, props)); + List<KafkaTopicPartition> partitionInfos = + KafkaTopicPartition.dropLeaderData(getPartitionsForTopic(topics, props)); if (partitionInfos.size() == 0) { - throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics.toString() + "." + - "Please check previous log entries"); - } - - if (LOG.isInfoEnabled()) { - logPartitionInfo(partitionInfos); + throw new RuntimeException( + "Unable to retrieve any partitions for the requested topics " + topics + + ". Please check previous log entries"); } - } - - // ------------------------------------------------------------------------ - // Source life cycle - // ------------------------------------------------------------------------ - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - - final int numConsumers = getRuntimeContext().getNumberOfParallelSubtasks(); - final int thisConsumerIndex = getRuntimeContext().getIndexOfThisSubtask(); - - // pick which partitions we work on - subscribedPartitions = assignPartitions(this.partitionInfos, numConsumers, thisConsumerIndex); - if (LOG.isInfoEnabled()) { - LOG.info("Kafka consumer {} will read partitions {} out of partitions {}", - thisConsumerIndex, KafkaTopicPartition.toString(subscribedPartitions), this.partitionInfos.size()); - } - - // we leave the fetcher as null, if we have no partitions - if (subscribedPartitions.isEmpty()) { - LOG.info("Kafka consumer {} has no partitions (empty source)", thisConsumerIndex); - this.fetcher = null; // fetcher remains null - return; - } - - // offset handling - offsetHandler = new ZookeeperOffsetHandler(props); - - committedOffsets = new HashMap<>(); - - // initially load the map with "offset not set", last max read timestamp set to Long.MIN_VALUE - // and mark the partition as in-active, until we receive the first element - Map<KafkaTopicPartition, KafkaPartitionState> subscribedPartitionsWithOffsets = - new HashMap<>(subscribedPartitions.size()); - for(KafkaTopicPartition ktp: subscribedPartitions) { - subscribedPartitionsWithOffsets.put(ktp, - new KafkaPartitionState(ktp.getPartition(), FlinkKafkaConsumerBase.OFFSET_NOT_SET)); - } - - // seek to last known pos, from restore request - if (restoreToOffset != null) { - if (LOG.isInfoEnabled()) { - LOG.info("Consumer {} is restored from previous checkpoint: {}", - thisConsumerIndex, KafkaTopicPartition.toString(restoreToOffset)); - } - // initialize offsets with restored state - this.partitionState = restoreInfoFromCheckpoint(); - subscribedPartitionsWithOffsets.putAll(partitionState); - restoreToOffset = null; - } - else { - // start with empty partition state - partitionState = new HashMap<>(); - - // no restore request: overwrite offsets. - for(Map.Entry<KafkaTopicPartition, Long> offsetInfo: offsetHandler.getOffsets(subscribedPartitions).entrySet()) { - KafkaTopicPartition key = offsetInfo.getKey(); - subscribedPartitionsWithOffsets.put(key, - new KafkaPartitionState(key.getPartition(), offsetInfo.getValue())); - } - } - if(subscribedPartitionsWithOffsets.size() != subscribedPartitions.size()) { - throw new IllegalStateException("The subscribed partitions map has more entries than the subscribed partitions " + - "list: " + subscribedPartitionsWithOffsets.size() + "," + subscribedPartitions.size()); - } - - // create fetcher - fetcher = new LegacyFetcher<T>(this, subscribedPartitionsWithOffsets, props, - getRuntimeContext().getTaskName(), getRuntimeContext().getUserCodeClassLoader()); - } - - @Override - public void run(SourceContext<T> sourceContext) throws Exception { - if (fetcher != null) { - StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext(); - - // if we have a non-checkpointed source, start a thread which periodically commits - // the current offset into ZK. - - PeriodicOffsetCommitter<T> offsetCommitter = null; - if (!streamingRuntimeContext.isCheckpointingEnabled()) { - // we use Kafka's own configuration parameter key for this. - // Note that the default configuration value in Kafka is 60 * 1000, so we use the same here. - long commitInterval = getLongFromConfig(props, "auto.commit.interval.ms", 60000); - offsetCommitter = new PeriodicOffsetCommitter<>(commitInterval, this); - offsetCommitter.setDaemon(true); - offsetCommitter.start(); - LOG.info("Starting periodic offset committer, with commit interval of {}ms", commitInterval); - } - - try { - fetcher.run(sourceContext, deserializer, partitionState); - } finally { - if (offsetCommitter != null) { - offsetCommitter.close(); - try { - offsetCommitter.join(); - } catch(InterruptedException ie) { - // ignore interrupt - } - } - } - } - else { - // this source never completes, so emit a Long.MAX_VALUE watermark - // to not block watermark forwarding - sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); - - final Object waitLock = new Object(); - while (running) { - // wait until we are canceled - try { - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (waitLock) { - waitLock.wait(); - } - } - catch (InterruptedException e) { - // do nothing, check our "running" status - } - } - } - - // close the context after the work was done. this can actually only - // happen when the fetcher decides to stop fetching - sourceContext.close(); - } - - @Override - public void cancel() { - // set ourselves as not running - running = false; - - // close the fetcher to interrupt any work - Fetcher fetcher = this.fetcher; - this.fetcher = null; - if (fetcher != null) { - try { - fetcher.close(); - } - catch (IOException e) { - LOG.warn("Error while closing Kafka connector data fetcher", e); - } + logPartitionInfo(LOG, partitionInfos); } - - OffsetHandler offsetHandler = this.offsetHandler; - this.offsetHandler = null; - if (offsetHandler != null) { - try { - offsetHandler.close(); - } - catch (IOException e) { - LOG.warn("Error while closing Kafka connector offset handler", e); - } - } - } - @Override - public void close() throws Exception { - cancel(); - super.close(); + setSubscribedPartitions(partitionInfos); } - // ------------------------------------------------------------------------ - // Checkpoint and restore - // ------------------------------------------------------------------------ - - /** - * Utility method to commit offsets. - * - * @param toCommit the offsets to commit - * @throws Exception - */ @Override - protected void commitOffsets(HashMap<KafkaTopicPartition, Long> toCommit) throws Exception { - Map<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(); - for (KafkaTopicPartition tp : this.subscribedPartitions) { - Long offset = toCommit.get(tp); - if(offset == null) { - // There was no data ever consumed from this topic, that's why there is no entry - // for this topicPartition in the map. - continue; - } - Long lastCommitted = this.committedOffsets.get(tp); - if (lastCommitted == null) { - lastCommitted = OFFSET_NOT_SET; - } - if (offset != OFFSET_NOT_SET) { - if (offset > lastCommitted) { - offsetsToCommit.put(tp, offset); - this.committedOffsets.put(tp, offset); - LOG.debug("Committing offset {} for partition {}", offset, tp); - } else { - LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, tp); - } - } - } - - if (LOG.isDebugEnabled() && offsetsToCommit.size() > 0) { - LOG.debug("Committing offsets {} to Zookeeper", KafkaTopicPartition.toString(offsetsToCommit)); - } - - this.offsetHandler.commit(offsetsToCommit); + protected AbstractFetcher<T, ?> createFetcher( + SourceContext<T> sourceContext, + List<KafkaTopicPartition> thisSubtaskPartitions, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + StreamingRuntimeContext runtimeContext) throws Exception { + + return new Kafka08Fetcher<>(sourceContext, thisSubtaskPartitions, + watermarksPeriodic, watermarksPunctuated, + runtimeContext, deserializer, kafkaProperties, + invalidOffsetBehavior, autoCommitInterval); } // ------------------------------------------------------------------------ - // Miscellaneous utilities - // ------------------------------------------------------------------------ - - /** - * Thread to periodically commit the current read offset into Zookeeper. - */ - private static class PeriodicOffsetCommitter<T> extends Thread { - private final long commitInterval; - private final FlinkKafkaConsumer08<T> consumer; - private volatile boolean running = true; - - PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer08<T> consumer) { - this.commitInterval = commitInterval; - this.consumer = consumer; - } - - @Override - public void run() { - try { - - while (running) { - try { - Thread.sleep(commitInterval); - // ------------ commit current offsets ---------------- - - // create copy a deep copy of the current offsets - @SuppressWarnings("unchecked") - HashMap<KafkaTopicPartition, Long> currentOffsets = new HashMap<>(consumer.partitionState.size()); - for (Map.Entry<KafkaTopicPartition, KafkaPartitionState> entry: consumer.partitionState.entrySet()) { - currentOffsets.put(entry.getKey(), entry.getValue().getOffset()); - } - consumer.commitOffsets(currentOffsets); - } catch (InterruptedException e) { - if (running) { - // throw unexpected interruption - throw e; - } - } - } - } catch (Throwable t) { - LOG.warn("Periodic checkpoint committer is stopping the fetcher because of an error", t); - consumer.fetcher.stopWithError(t); - } - } - - public void close() { - this.running = false; - this.interrupt(); - } - } - - - // ------------------------------------------------------------------------ // Kafka / ZooKeeper communication utilities // ------------------------------------------------------------------------ @@ -492,11 +229,11 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { * @param topics The name of the topics. * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic. */ - public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(final List<String> topics, final Properties properties) { + public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties) { String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); final int numRetries = getIntFromConfig(properties, GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES); - requireNonNull(seedBrokersConfString, "Configuration property " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " not set"); + checkNotNull(seedBrokersConfString, "Configuration property %s not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); String[] seedBrokers = seedBrokersConfString.split(","); List<KafkaTopicPartitionLeader> partitions = new ArrayList<>(); @@ -605,4 +342,17 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { throw new IllegalArgumentException("Property 'zookeeper.connection.timeout.ms' is not a valid integer"); } } + + private static long getInvalidOffsetBehavior(Properties config) { + final String val = config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest"); + if (val.equals("none")) { + throw new IllegalArgumentException("Cannot use '" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + + "' value 'none'. Possible values: 'latest', 'largest', or 'earliest'."); + } + else if (val.equals("largest") || val.equals("latest")) { // largest is kafka 0.8, latest is kafka 0.9 + return OffsetRequest.LatestTime(); + } else { + return OffsetRequest.EarliestTime(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java deleted file mode 100644 index f86687e..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; - -import java.io.IOException; -import java.util.HashMap; - -/** - * A fetcher pulls data from Kafka, from a fix set of partitions. - * The fetcher supports "seeking" inside the partitions, i.e., moving to a different offset. - */ -public interface Fetcher { - - /** - * Closes the fetcher. This will stop any operation in the - * {@link #run(SourceFunction.SourceContext, KeyedDeserializationSchema, HashMap)} method and eventually - * close underlying connections and release all resources. - */ - void close() throws IOException; - - /** - * Starts fetch data from Kafka and emitting it into the stream. - * - * <p>To provide exactly once guarantees, the fetcher needs emit a record and update the last - * consumed offset in one atomic operation. This is done in the - * {@link org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#processElement(SourceFunction.SourceContext, KafkaTopicPartition, Object, long)} - * which is called from within the {@link SourceFunction.SourceContext#getCheckpointLock()}, as shown below:</p> - * <pre>{@code - * - * while (running) { - * T next = ... - * long offset = ... - * int partition = ... - * synchronized (sourceContext.getCheckpointLock()) { - * processElement(sourceContext, partition, next, offset) - * } - * } - * }</pre> - * - * @param <T> The type of elements produced by the fetcher and emitted to the source context. - * @param sourceContext The source context to emit elements to. - * @param valueDeserializer The deserializer to decode the raw values with. - * @param lastOffsets The map into which to store the offsets for which elements are emitted (operator state) - */ - <T> void run(SourceFunction.SourceContext<T> sourceContext, KeyedDeserializationSchema<T> valueDeserializer, - HashMap<KafkaTopicPartition, KafkaPartitionState> lastOffsets) throws Exception; - - /** - * Exit run loop with given error and release all resources. - * - * @param t Error cause - */ - void stopWithError(Throwable t); -} http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java new file mode 100644 index 0000000..91fdc71 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import kafka.common.TopicAndPartition; +import org.apache.kafka.common.Node; + +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.SerializedValue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A fetcher that fetches data from Kafka brokers via the Kafka 0.8 low-level consumer API. + * The fetcher also handles the explicit communication with ZooKeeper to fetch initial offsets + * and to write offsets to ZooKeeper. + * + * @param <T> The type of elements produced by the fetcher. + */ +public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> { + + static final KafkaTopicPartitionState<TopicAndPartition> MARKER = + new KafkaTopicPartitionState<>(new KafkaTopicPartition("n/a", -1), new TopicAndPartition("n/a", -1)); + + private static final Logger LOG = LoggerFactory.getLogger(Kafka08Fetcher.class); + + // ------------------------------------------------------------------------ + + /** The schema to convert between Kafka's byte messages, and Flink's objects */ + private final KeyedDeserializationSchema<T> deserializer; + + /** The properties that configure the Kafka connection */ + private final Properties kafkaConfig; + + /** The task name, to give more readable names to the spawned threads */ + private final String taskName; + + /** The class loader for dynamically loaded classes */ + private final ClassLoader userCodeClassLoader; + + /** The queue of partitions that are currently not assigned to a broker connection */ + private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitionsQueue; + + /** The behavior to use in case that an offset is not valid (any more) for a partition */ + private final long invalidOffsetBehavior; + + /** The interval in which to automatically commit (-1 if deactivated) */ + private final long autoCommitInterval; + + /** The handler that reads/writes offsets from/to ZooKeeper */ + private volatile ZookeeperOffsetHandler zookeeperOffsetHandler; + + /** Flag to track the main work loop as alive */ + private volatile boolean running = true; + + + public Kafka08Fetcher( + SourceContext<T> sourceContext, + List<KafkaTopicPartition> assignedPartitions, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + StreamingRuntimeContext runtimeContext, + KeyedDeserializationSchema<T> deserializer, + Properties kafkaProperties, + long invalidOffsetBehavior, + long autoCommitInterval) throws Exception + { + super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext); + + this.deserializer = checkNotNull(deserializer); + this.kafkaConfig = checkNotNull(kafkaProperties); + this.taskName = runtimeContext.getTaskNameWithSubtasks(); + this.userCodeClassLoader = runtimeContext.getUserCodeClassLoader(); + this.invalidOffsetBehavior = invalidOffsetBehavior; + this.autoCommitInterval = autoCommitInterval; + this.unassignedPartitionsQueue = new ClosableBlockingQueue<>(); + + // initially, all these partitions are not assigned to a specific broker connection + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { + unassignedPartitionsQueue.add(partition); + } + } + + // ------------------------------------------------------------------------ + // Main Work Loop + // ------------------------------------------------------------------------ + + @Override + public void runFetchLoop() throws Exception { + // the map from broker to the thread that is connected to that broker + final Map<Node, SimpleConsumerThread<T>> brokerToThread = new HashMap<>(); + + // this holds possible the exceptions from the concurrent broker connection threads + final ExceptionProxy errorHandler = new ExceptionProxy(Thread.currentThread()); + + // the offset handler handles the communication with ZooKeeper, to commit externally visible offsets + final ZookeeperOffsetHandler zookeeperOffsetHandler = new ZookeeperOffsetHandler(kafkaConfig); + this.zookeeperOffsetHandler = zookeeperOffsetHandler; + + PeriodicOffsetCommitter periodicCommitter = null; + try { + // read offsets from ZooKeeper for partitions that did not restore offsets + { + List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>(); + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { + if (!partition.isOffsetDefined()) { + partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); + } + } + + Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getOffsets(partitionsWithNoOffset); + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { + Long offset = zkOffsets.get(partition.getKafkaTopicPartition()); + if (offset != null) { + partition.setOffset(offset); + } + } + } + + // start the periodic offset committer thread, if necessary + if (autoCommitInterval > 0) { + LOG.info("Starting periodic offset committer, with commit interval of {}ms", autoCommitInterval); + + periodicCommitter = new PeriodicOffsetCommitter(zookeeperOffsetHandler, + subscribedPartitions(), errorHandler, autoCommitInterval); + periodicCommitter.setName("Periodic Kafka partition offset committer"); + periodicCommitter.setDaemon(true); + periodicCommitter.start(); + } + + // Main loop polling elements from the unassignedPartitions queue to the threads + while (running) { + // re-throw any exception from the concurrent fetcher threads + errorHandler.checkAndThrowException(); + + // wait for max 5 seconds trying to get partitions to assign + // if threads shut down, this poll returns earlier, because the threads inject the + // special marker into the queue + List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign = + unassignedPartitionsQueue.getBatchBlocking(5000); + partitionsToAssign.remove(MARKER); + + if (!partitionsToAssign.isEmpty()) { + LOG.info("Assigning {} partitions to broker threads", partitionsToAssign.size()); + Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeaders = + findLeaderForPartitions(partitionsToAssign, kafkaConfig); + + // assign the partitions to the leaders (maybe start the threads) + for (Map.Entry<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeader : + partitionsWithLeaders.entrySet()) + { + final Node leader = partitionsWithLeader.getKey(); + final List<KafkaTopicPartitionState<TopicAndPartition>> partitions = partitionsWithLeader.getValue(); + SimpleConsumerThread<T> brokerThread = brokerToThread.get(leader); + + if (!running) { + break; + } + + if (brokerThread == null || !brokerThread.getNewPartitionsQueue().isOpen()) { + // start new thread + brokerThread = createAndStartSimpleConsumerThread(partitions, leader, errorHandler); + brokerToThread.put(leader, brokerThread); + } + else { + // put elements into queue of thread + ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue = + brokerThread.getNewPartitionsQueue(); + + for (KafkaTopicPartitionState<TopicAndPartition> fp : partitions) { + if (!newPartitionsQueue.addIfOpen(fp)) { + // we were unable to add the partition to the broker's queue + // the broker has closed in the meantime (the thread will shut down) + // create a new thread for connecting to this broker + List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions = new ArrayList<>(); + seedPartitions.add(fp); + brokerThread = createAndStartSimpleConsumerThread(seedPartitions, leader, errorHandler); + brokerToThread.put(leader, brokerThread); + newPartitionsQueue = brokerThread.getNewPartitionsQueue(); // update queue for the subsequent partitions + } + } + } + } + } + else { + // there were no partitions to assign. Check if any broker threads shut down. + // we get into this section of the code, if either the poll timed out, or the + // blocking poll was woken up by the marker element + Iterator<SimpleConsumerThread<T>> bttIterator = brokerToThread.values().iterator(); + while (bttIterator.hasNext()) { + SimpleConsumerThread<T> thread = bttIterator.next(); + if (!thread.getNewPartitionsQueue().isOpen()) { + LOG.info("Removing stopped consumer thread {}", thread.getName()); + bttIterator.remove(); + } + } + } + + if (brokerToThread.size() == 0 && unassignedPartitionsQueue.isEmpty()) { + if (unassignedPartitionsQueue.close()) { + LOG.info("All consumer threads are finished, there are no more unassigned partitions. Stopping fetcher"); + break; + } + // we end up here if somebody added something to the queue in the meantime --> continue to poll queue again + } + } + } + catch (InterruptedException e) { + // this may be thrown because an exception on one of the concurrent fetcher threads + // woke this thread up. make sure we throw the root exception instead in that case + errorHandler.checkAndThrowException(); + + // no other root exception, throw the interrupted exception + throw e; + } + finally { + this.running = false; + this.zookeeperOffsetHandler = null; + + // if we run a periodic committer thread, shut that down + if (periodicCommitter != null) { + periodicCommitter.shutdown(); + } + + // make sure that in any case (completion, abort, error), all spawned threads are stopped + try { + int runningThreads; + do { + // check whether threads are alive and cancel them + runningThreads = 0; + Iterator<SimpleConsumerThread<T>> threads = brokerToThread.values().iterator(); + while (threads.hasNext()) { + SimpleConsumerThread<?> t = threads.next(); + if (t.isAlive()) { + t.cancel(); + runningThreads++; + } else { + threads.remove(); + } + } + + // wait for the threads to finish, before issuing a cancel call again + if (runningThreads > 0) { + for (SimpleConsumerThread<?> t : brokerToThread.values()) { + t.join(500 / runningThreads + 1); + } + } + } + while (runningThreads > 0); + } + catch (Throwable t) { + // we catch all here to preserve the original exception + LOG.error("Exception while shutting down consumer threads", t); + } + + try { + zookeeperOffsetHandler.close(); + } + catch (Throwable t) { + // we catch all here to preserve the original exception + LOG.error("Exception while shutting down ZookeeperOffsetHandler", t); + } + } + } + + @Override + public void cancel() { + // signal the main thread to exit + this.running = false; + + // make sure the main thread wakes up soon + this.unassignedPartitionsQueue.addIfOpen(MARKER); + } + + // ------------------------------------------------------------------------ + // Kafka 0.8 specific class instantiation + // ------------------------------------------------------------------------ + + @Override + public TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition partition) { + return new TopicAndPartition(partition.getTopic(), partition.getPartition()); + } + + // ------------------------------------------------------------------------ + // Offset handling + // ------------------------------------------------------------------------ + + @Override + public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { + ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler; + if (zkHandler != null) { + zkHandler.writeOffsets(offsets); + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private SimpleConsumerThread<T> createAndStartSimpleConsumerThread( + List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions, + Node leader, + ExceptionProxy errorHandler) throws IOException, ClassNotFoundException + { + // each thread needs its own copy of the deserializer, because the deserializer is + // not necessarily thread safe + final KeyedDeserializationSchema<T> clonedDeserializer = + InstantiationUtil.clone(deserializer, userCodeClassLoader); + + // seed thread with list of fetch partitions (otherwise it would shut down immediately again + SimpleConsumerThread<T> brokerThread = new SimpleConsumerThread<>( + this, errorHandler, kafkaConfig, leader, seedPartitions, unassignedPartitionsQueue, + clonedDeserializer, invalidOffsetBehavior); + + brokerThread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)", + taskName, leader.id(), leader.host(), leader.port())); + brokerThread.setDaemon(true); + brokerThread.start(); + + LOG.info("Starting thread {}", brokerThread.getName()); + return brokerThread; + } + + /** + * Returns a list of unique topics from for the given partitions + * + * @param partitions A the partitions + * @return A list of unique topics + */ + private static List<String> getTopics(List<KafkaTopicPartitionState<TopicAndPartition>> partitions) { + HashSet<String> uniqueTopics = new HashSet<>(); + for (KafkaTopicPartitionState<TopicAndPartition> fp: partitions) { + uniqueTopics.add(fp.getTopic()); + } + return new ArrayList<>(uniqueTopics); + } + + /** + * Find leaders for the partitions + * + * From a high level, the method does the following: + * - Get a list of FetchPartitions (usually only a few partitions) + * - Get the list of topics from the FetchPartitions list and request the partitions for the topics. (Kafka doesn't support getting leaders for a set of partitions) + * - Build a Map<Leader, List<FetchPartition>> where only the requested partitions are contained. + * + * @param partitionsToAssign fetch partitions list + * @return leader to partitions map + */ + private static Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> findLeaderForPartitions( + List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign, + Properties kafkaProperties) throws Exception + { + if (partitionsToAssign.isEmpty()) { + throw new IllegalArgumentException("Leader request for empty partitions list"); + } + + LOG.info("Refreshing leader information for partitions {}", partitionsToAssign); + + // this request is based on the topic names + PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(getTopics(partitionsToAssign), kafkaProperties); + infoFetcher.start(); + + // NOTE: The kafka client apparently locks itself up sometimes + // when it is interrupted, so we run it only in a separate thread. + // since it sometimes refuses to shut down, we resort to the admittedly harsh + // means of killing the thread after a timeout. + KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000); + watchDog.start(); + + // this list contains ALL partitions of the requested topics + List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = infoFetcher.getPartitions(); + + // copy list to track unassigned partitions + List<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions = new ArrayList<>(partitionsToAssign); + + // final mapping from leader -> list(fetchPartition) + Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> leaderToPartitions = new HashMap<>(); + + for(KafkaTopicPartitionLeader partitionLeader: topicPartitionWithLeaderList) { + if (unassignedPartitions.size() == 0) { + // we are done: all partitions are assigned + break; + } + + Iterator<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitionsIterator = unassignedPartitions.iterator(); + while (unassignedPartitionsIterator.hasNext()) { + KafkaTopicPartitionState<TopicAndPartition> unassignedPartition = unassignedPartitionsIterator.next(); + + if (unassignedPartition.getKafkaTopicPartition().equals(partitionLeader.getTopicPartition())) { + // we found the leader for one of the fetch partitions + Node leader = partitionLeader.getLeader(); + + List<KafkaTopicPartitionState<TopicAndPartition>> partitionsOfLeader = leaderToPartitions.get(leader); + if (partitionsOfLeader == null) { + partitionsOfLeader = new ArrayList<>(); + leaderToPartitions.put(leader, partitionsOfLeader); + } + partitionsOfLeader.add(unassignedPartition); + unassignedPartitionsIterator.remove(); // partition has been assigned + break; + } + } + } + + if (unassignedPartitions.size() > 0) { + throw new RuntimeException("Unable to find a leader for partitions: " + unassignedPartitions); + } + + LOG.debug("Partitions with assigned leaders {}", leaderToPartitions); + + return leaderToPartitions; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java new file mode 100644 index 0000000..4d61e53 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +/** + * A watch dog thread that forcibly kills another thread, if that thread does not + * finish in time. + * + * <p>This uses the discouraged {@link Thread#stop()} method. While this is not + * advisable, this watch dog is only for extreme cases of thread that simply + * to not terminate otherwise. + */ +class KillerWatchDog extends Thread { + + private final Thread toKill; + private final long timeout; + + KillerWatchDog(Thread toKill, long timeout) { + super("KillerWatchDog"); + setDaemon(true); + + this.toKill = toKill; + this.timeout = timeout; + } + + @SuppressWarnings("deprecation") + @Override + public void run() { + final long deadline = System.currentTimeMillis() + timeout; + long now; + + while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) { + try { + toKill.join(deadline - now); + } + catch (InterruptedException e) { + // ignore here, our job is important! + } + } + + // this is harsh, but this watchdog is a last resort + if (toKill.isAlive()) { + toKill.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java deleted file mode 100644 index c4dd55c..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java +++ /dev/null @@ -1,896 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import kafka.api.FetchRequestBuilder; -import kafka.api.OffsetRequest; -import kafka.api.PartitionOffsetRequestInfo; -import kafka.common.ErrorMapping; -import kafka.common.TopicAndPartition; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.OffsetResponse; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.MessageAndOffset; - -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.StringUtils; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.Node; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicReference; - -import static java.util.Objects.requireNonNull; -import static org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.getIntFromConfig; - -/** - * This fetcher uses Kafka's low-level API to pull data from a specific - * set of topics and partitions. - * - * <p>This code is in parts based on the tutorial code for the low-level Kafka consumer.</p> - */ -public class LegacyFetcher<T> implements Fetcher { - - private static final Logger LOG = LoggerFactory.getLogger(LegacyFetcher.class); - - private static final FetchPartition MARKER = new FetchPartition("n/a", -1, -1); - - /** The properties that configure the Kafka connection */ - private final Properties config; - - /** The task name, to give more readable names to the spawned threads */ - private final String taskName; - - /** The first error that occurred in a connection thread */ - private final AtomicReference<Throwable> error; - - /** The classloader for dynamically loaded classes */ - private final ClassLoader userCodeClassloader; - - /** Reference the the thread that executed the run() method. */ - private volatile Thread mainThread; - - /** Flag to shut the fetcher down */ - private volatile boolean running = true; - - /** - * Queue of partitions which need to find a (new) leader. Elements are added to the queue - * from the consuming threads. The LegacyFetcher thread is finding new leaders and assigns the partitions - * to the respective threads. - */ - private final ClosableBlockingQueue<FetchPartition> unassignedPartitions = new ClosableBlockingQueue<>(); - - /** The {@link FlinkKafkaConsumer08} to whom this Fetcher belongs. */ - private final FlinkKafkaConsumer08<T> flinkKafkaConsumer; - - /** - * Create a LegacyFetcher instance. - * - * @param initialPartitionsToRead Map of partitions to read. The offset passed is the last-fetched-offset (not the next-offset-to-fetch). - * @param props kafka properties - * @param taskName name of the parent task - * @param userCodeClassloader classloader for loading user code - */ - public LegacyFetcher( - FlinkKafkaConsumer08<T> owner, - Map<KafkaTopicPartition, KafkaPartitionState> initialPartitionsToRead, - Properties props, - String taskName, ClassLoader userCodeClassloader) { - - this.flinkKafkaConsumer = requireNonNull(owner); - this.config = requireNonNull(props, "The config properties cannot be null"); - this.userCodeClassloader = requireNonNull(userCodeClassloader); - if (initialPartitionsToRead.size() == 0) { - throw new IllegalArgumentException("List of initial partitions is empty"); - } - - for (Map.Entry<KafkaTopicPartition, KafkaPartitionState> partitionToRead: initialPartitionsToRead.entrySet()) { - KafkaTopicPartition ktp = partitionToRead.getKey(); - // we increment the offset by one so that we fetch the next message in the partition. - long offset = partitionToRead.getValue().getOffset(); - if (offset >= 0 && offset != FlinkKafkaConsumerBase.OFFSET_NOT_SET) { - offset += 1L; - } - unassignedPartitions.add(new FetchPartition(ktp.getTopic(), ktp.getPartition(), offset)); - } - this.taskName = taskName; - this.error = new AtomicReference<>(); - } - - // ------------------------------------------------------------------------ - // Fetcher methods - // ------------------------------------------------------------------------ - - - @Override - public void close() { - // flag needs to be check by the run() method that creates the spawned threads - this.running = false; - - // all other cleanup is made by the run method itself - } - - @Override - public <T> void run(SourceFunction.SourceContext<T> sourceContext, - KeyedDeserializationSchema<T> deserializer, - HashMap<KafkaTopicPartition, KafkaPartitionState> partitionState) throws Exception { - - // NOTE: This method needs to always release all resources it acquires - - this.mainThread = Thread.currentThread(); - - // keep presumably dead threads in the list until we are sure the thread is not alive anymore. - final Map<Node, SimpleConsumerThread<T>> brokerToThread = new HashMap<>(); - try { - // Main loop polling elements from the unassignedPartitions queue to the threads - while (running && error.get() == null) { - try { - // wait for 5 seconds trying to get partitions to assign - List<FetchPartition> partitionsToAssign = unassignedPartitions.getBatchBlocking(5000); - partitionsToAssign.remove(MARKER); - - if(!partitionsToAssign.isEmpty()) { - LOG.info("Assigning {} partitions to broker threads", partitionsToAssign.size()); - Map<Node, List<FetchPartition>> partitionsWithLeaders = findLeaderForPartitions(partitionsToAssign); - - // assign the partitions to the leaders (maybe start the threads) - for (Map.Entry<Node, List<FetchPartition>> partitionsWithLeader : partitionsWithLeaders.entrySet()) { - final Node leader = partitionsWithLeader.getKey(); - final List<FetchPartition> partitions = partitionsWithLeader.getValue(); - SimpleConsumerThread<T> brokerThread = brokerToThread.get(leader); - - if (!running) { - break; - } - - if (brokerThread == null || !brokerThread.getNewPartitionsQueue().isOpen()) { - // start new thread - brokerThread = createAndStartSimpleConsumerThread(sourceContext, deserializer, partitions, leader); - brokerToThread.put(leader, brokerThread); - } else { - // put elements into queue of thread - ClosableBlockingQueue<FetchPartition> newPartitionsQueue = brokerThread.getNewPartitionsQueue(); - for (FetchPartition fp : partitions) { - if (!newPartitionsQueue.addIfOpen(fp)) { - // we were unable to add the partition to the broker's queue - // the broker has closed in the meantime (the thread will shut down) - // create a new thread for connecting to this broker - List<FetchPartition> seedPartitions = new ArrayList<>(); - seedPartitions.add(fp); - brokerThread = createAndStartSimpleConsumerThread(sourceContext, deserializer, seedPartitions, leader); - brokerToThread.put(leader, brokerThread); - newPartitionsQueue = brokerThread.getNewPartitionsQueue(); // update queue for the subsequent partitions - } - } - } - } - } else { - // there were no partitions to assign. Check if any broker threads shut down. - Iterator<SimpleConsumerThread<T>> bttIterator = brokerToThread.values().iterator(); - while (bttIterator.hasNext()) { - SimpleConsumerThread<T> thread = bttIterator.next(); - if(!thread.getNewPartitionsQueue().isOpen()) { - LOG.info("Removing stopped consumer thread {}", thread.getName()); - bttIterator.remove(); - } - } - } - - if (brokerToThread.size() == 0 && unassignedPartitions.isEmpty()) { - if (unassignedPartitions.close()) { - LOG.info("All consumer threads are finished, there are no more unassigned partitions. Stopping fetcher"); - break; - } - // we end up here if somebody added something to the queue in the meantime --> continue to poll queue again - } - } catch (InterruptedException e) { - // ignore. we should notice what happened in the next loop check - } - } - - // make sure any asynchronous error is noticed - Throwable error = this.error.get(); - if (error != null) { - throw new Exception(error.getMessage(), error); - } - } finally { - // make sure that in any case (completion, abort, error), all spawned threads are stopped - int runningThreads; - do { - runningThreads = 0; - for (SimpleConsumerThread<?> t : brokerToThread.values()) { - if (t.isAlive()) { - t.cancel(); - runningThreads++; - } - } - if(runningThreads > 0) { - Thread.sleep(500); - } - } while(runningThreads > 0); - } - } - - private <T> SimpleConsumerThread<T> createAndStartSimpleConsumerThread(SourceFunction.SourceContext<T> sourceContext, - KeyedDeserializationSchema<T> deserializer, - List<FetchPartition> seedPartitions, Node leader) throws IOException, ClassNotFoundException { - final KeyedDeserializationSchema<T> clonedDeserializer = - InstantiationUtil.clone(deserializer, userCodeClassloader); - - // seed thread with list of fetch partitions (otherwise it would shut down immediately again - SimpleConsumerThread<T> brokerThread = new SimpleConsumerThread<>(this, config, - leader, seedPartitions, unassignedPartitions, sourceContext, clonedDeserializer); - - brokerThread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)", - taskName, leader.id(), leader.host(), leader.port())); - brokerThread.setDaemon(true); - brokerThread.start(); - LOG.info("Starting thread {}", brokerThread.getName()); - return brokerThread; - } - - /** - * Find leaders for the partitions - * - * From a high level, the method does the following: - * - Get a list of FetchPartitions (usually only a few partitions) - * - Get the list of topics from the FetchPartitions list and request the partitions for the topics. (Kafka doesn't support getting leaders for a set of partitions) - * - Build a Map<Leader, List<FetchPartition>> where only the requested partitions are contained. - * - * @param partitionsToAssign fetch partitions list - * @return leader to partitions map - */ - private Map<Node, List<FetchPartition>> findLeaderForPartitions(List<FetchPartition> partitionsToAssign) throws Exception { - if(partitionsToAssign.size() == 0) { - throw new IllegalArgumentException("Leader request for empty partitions list"); - } - - LOG.info("Refreshing leader information for partitions {}", partitionsToAssign); - // NOTE: The kafka client apparently locks itself in an infinite loop sometimes - // when it is interrupted, so we run it only in a separate thread. - // since it sometimes refuses to shut down, we resort to the admittedly harsh - // means of killing the thread after a timeout. - PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(getTopics(partitionsToAssign), config); // this request is based on the topic names - infoFetcher.start(); - - KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000); - watchDog.start(); - - // this list contains ALL partitions of the requested topics - List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = infoFetcher.getPartitions(); - // copy list to track unassigned partitions - List<FetchPartition> unassignedPartitions = new ArrayList<>(partitionsToAssign); - - // final mapping from leader -> list(fetchPartition) - Map<Node, List<FetchPartition>> leaderToPartitions = new HashMap<>(); - - for(KafkaTopicPartitionLeader partitionLeader: topicPartitionWithLeaderList) { - if (unassignedPartitions.size() == 0) { - // we are done: all partitions are assigned - break; - } - Iterator<FetchPartition> unassignedPartitionsIterator = unassignedPartitions.iterator(); - while (unassignedPartitionsIterator.hasNext()) { - FetchPartition unassignedPartition = unassignedPartitionsIterator.next(); - if (unassignedPartition.topic.equals(partitionLeader.getTopicPartition().getTopic()) - && unassignedPartition.partition == partitionLeader.getTopicPartition().getPartition()) { - - // we found the leader for one of the fetch partitions - Node leader = partitionLeader.getLeader(); - List<FetchPartition> partitionsOfLeader = leaderToPartitions.get(leader); - if (partitionsOfLeader == null) { - partitionsOfLeader = new ArrayList<>(); - leaderToPartitions.put(leader, partitionsOfLeader); - } - partitionsOfLeader.add(unassignedPartition); - unassignedPartitionsIterator.remove(); // partition has been assigned - break; - } - } - } - - if(unassignedPartitions.size() > 0) { - throw new RuntimeException("Unable to find a leader for partitions: " + unassignedPartitions); - } - - LOG.debug("Partitions with assigned leaders {}", leaderToPartitions); - - return leaderToPartitions; - } - - /** - * Reports an error from a fetch thread. This will cause the main thread to see this error, - * abort, and cancel all other fetch threads. - * - * @param error The error to report. - */ - @Override - public void stopWithError(Throwable error) { - if (this.error.compareAndSet(null, error)) { - // we are the first to report an error - if (mainThread != null) { - mainThread.interrupt(); - } - } - } - - // ------------------------------------------------------------------------ - - /** - * Representation of a partition to fetch. - */ - private static class FetchPartition { - - final String topic; - - /** ID of the partition within the topic (0 indexed, as given by Kafka) */ - final int partition; - - /** Offset pointing at the next element to read from that partition. */ - long nextOffsetToRead; - - FetchPartition(String topic, int partition, long nextOffsetToRead) { - this.topic = topic; - this.partition = partition; - this.nextOffsetToRead = nextOffsetToRead; - } - - @Override - public String toString() { - return "FetchPartition {topic=" + topic +", partition=" + partition + ", offset=" + nextOffsetToRead + '}'; - } - } - - // ------------------------------------------------------------------------ - // Per broker fetcher - // ------------------------------------------------------------------------ - - /** - * Each broker needs its separate connection. This thread implements the connection to - * one broker. The connection can fetch multiple partitions from the broker. - * - * @param <T> The data type fetched. - */ - private static class SimpleConsumerThread<T> extends Thread { - - private static final Logger LOG = LoggerFactory.getLogger(SimpleConsumerThread.class); - - private final SourceFunction.SourceContext<T> sourceContext; - private final KeyedDeserializationSchema<T> deserializer; - - private final List<FetchPartition> partitions; - - private final Node broker; - - private final Properties config; - - private final LegacyFetcher owner; - - private final ClosableBlockingQueue<FetchPartition> unassignedPartitions; - - private volatile boolean running = true; - - /** Queue containing new fetch partitions for the consumer thread */ - private final ClosableBlockingQueue<FetchPartition> newPartitionsQueue = new ClosableBlockingQueue<>(); - - // ----------------- Simple Consumer ---------------------- - private SimpleConsumer consumer; - - private final int soTimeout; - private final int minBytes; - private final int maxWait; - private final int fetchSize; - private final int bufferSize; - private final int reconnectLimit; - - - // exceptions are thrown locally - public SimpleConsumerThread(LegacyFetcher owner, - Properties config, - Node broker, - List<FetchPartition> seedPartitions, - ClosableBlockingQueue<FetchPartition> unassignedPartitions, - SourceFunction.SourceContext<T> sourceContext, - KeyedDeserializationSchema<T> deserializer) { - this.owner = owner; - this.config = config; - this.broker = broker; - this.partitions = seedPartitions; - this.sourceContext = requireNonNull(sourceContext); - this.deserializer = requireNonNull(deserializer); - this.unassignedPartitions = requireNonNull(unassignedPartitions); - - this.soTimeout = getIntFromConfig(config, "socket.timeout.ms", 30000); - this.minBytes = getIntFromConfig(config, "fetch.min.bytes", 1); - this.maxWait = getIntFromConfig(config, "fetch.wait.max.ms", 100); - this.fetchSize = getIntFromConfig(config, "fetch.message.max.bytes", 1048576); - this.bufferSize = getIntFromConfig(config, "socket.receive.buffer.bytes", 65536); - this.reconnectLimit = getIntFromConfig(config, "flink.simple-consumer-reconnectLimit", 3); - } - - @Override - public void run() { - LOG.info("Starting to fetch from {}", this.partitions); - - // set up the config values - final String clientId = "flink-kafka-consumer-legacy-" + broker.id(); - int reconnects = 0; - // these are the actual configuration values of Kafka + their original default values. - - try { - // create the Kafka consumer that we actually use for fetching - consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId); - - // make sure that all partitions have some offsets to start with - // those partitions that do not have an offset from a checkpoint need to get - // their start offset from ZooKeeper - getMissingOffsetsFromKafka(partitions); - - // Now, the actual work starts :-) - int offsetOutOfRangeCount = 0; - while (running) { - - // ----------------------------------- partitions list maintenance ---------------------------- - - // check queue for new partitions to read from: - List<FetchPartition> newPartitions = newPartitionsQueue.pollBatch(); - if(newPartitions != null) { - // only this thread is taking elements from the queue, so the next call will never block - - // check if the new partitions need an offset lookup - getMissingOffsetsFromKafka(newPartitions); - // add the new partitions (and check they are not already in there) - for(FetchPartition newPartition: newPartitions) { - if(partitions.contains(newPartition)) { - throw new IllegalStateException("Adding partition " + newPartition + " to subscribed partitions even though it is already subscribed"); - } - partitions.add(newPartition); - } - LOG.info("Adding {} new partitions to consumer thread {}", newPartitions.size(), getName()); - if(LOG.isDebugEnabled()) { - LOG.debug("Partitions list: {}", newPartitions); - } - } - - if(partitions.size() == 0) { - if(newPartitionsQueue.close()) { - // close succeeded. Closing thread - LOG.info("Consumer thread {} does not have any partitions assigned anymore. Stopping thread.", getName()); - running = false; - - // add the wake-up marker into the queue to make the main thread - // immediately wake up and termination faster - unassignedPartitions.add(MARKER); - - break; - } else { - // close failed: LegacyFetcher main thread added new partitions into the queue. - continue; // go to top of loop again and get the new partitions - } - } - - // ----------------------------------- request / response with kafka ---------------------------- - - FetchRequestBuilder frb = new FetchRequestBuilder(); - frb.clientId(clientId); - frb.maxWait(maxWait); - frb.minBytes(minBytes); - - for (FetchPartition fp : partitions) { - frb.addFetch(fp.topic, fp.partition, fp.nextOffsetToRead, fetchSize); - } - kafka.api.FetchRequest fetchRequest = frb.build(); - LOG.debug("Issuing fetch request {}", fetchRequest); - - FetchResponse fetchResponse; - try { - fetchResponse = consumer.fetch(fetchRequest); - } catch(Throwable cce) { - //noinspection ConstantConditions - if(cce instanceof ClosedChannelException) { - LOG.warn("Fetch failed because of ClosedChannelException."); - LOG.debug("Full exception", cce); - // we don't know if the broker is overloaded or unavailable. - // retry a few times, then return ALL partitions for new leader lookup - if(++reconnects >= reconnectLimit) { - LOG.warn("Unable to reach broker after {} retries. Returning all current partitions", reconnectLimit); - for (FetchPartition fp: this.partitions) { - unassignedPartitions.add(fp); - } - this.partitions.clear(); - continue; // jump to top of loop: will close thread or subscribe to new partitions - } - try { - consumer.close(); - } catch(Throwable t) { - LOG.warn("Error while closing consumer connection", t); - } - // delay & retry - Thread.sleep(500); - consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId); - continue; // retry - } else { - throw cce; - } - } - reconnects = 0; - - // ---------------------------------------- error handling ---------------------------- - - if(fetchResponse == null) { - throw new RuntimeException("Fetch failed"); - } - if (fetchResponse.hasError()) { - String exception = ""; - List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>(); - // iterate over partitions to get individual error codes - Iterator<FetchPartition> partitionsIterator = partitions.iterator(); - boolean partitionsRemoved = false; - while(partitionsIterator.hasNext()) { - final FetchPartition fp = partitionsIterator.next(); - short code = fetchResponse.errorCode(fp.topic, fp.partition); - - if (code == ErrorMapping.OffsetOutOfRangeCode()) { - // we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper) - // Kafka's high level consumer is resetting the offset according to 'auto.offset.reset' - partitionsToGetOffsetsFor.add(fp); - } else if(code == ErrorMapping.NotLeaderForPartitionCode() || - code == ErrorMapping.LeaderNotAvailableCode() || - code == ErrorMapping.BrokerNotAvailableCode() || - code == ErrorMapping.UnknownCode()) { - // the broker we are connected to is not the leader for the partition. - LOG.warn("{} is not the leader of {}. Reassigning leader for partition", broker, fp); - LOG.debug("Error code = {}", code); - - unassignedPartitions.add(fp); - - partitionsIterator.remove(); // unsubscribe the partition ourselves - partitionsRemoved = true; - } else if (code != ErrorMapping.NoError()) { - exception += "\nException for " + fp.topic +":"+ fp.partition + ": " + - StringUtils.stringifyException(ErrorMapping.exceptionFor(code)); - } - } - if (partitionsToGetOffsetsFor.size() > 0) { - // safeguard against an infinite loop. - if (offsetOutOfRangeCount++ > 3) { - throw new RuntimeException("Found invalid offsets more than three times in partitions "+partitionsToGetOffsetsFor.toString()+" " + - "Exceptions: "+exception); - } - // get valid offsets for these partitions and try again. - LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor); - getLastOffset(consumer, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config)); - LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor); - continue; // jump back to create a new fetch request. The offset has not been touched. - } else if(partitionsRemoved) { - continue; // create new fetch request - } else { - // partitions failed on an error - throw new IOException("Error while fetching from broker '" + broker +"':" + exception); - } - } else { - // successful fetch, reset offsetOutOfRangeCount. - offsetOutOfRangeCount = 0; - } - - // ----------------------------------- process fetch response ---------------------------- - - int messagesInFetch = 0; - int deletedMessages = 0; - Iterator<FetchPartition> partitionsIterator = partitions.iterator(); - partitionsLoop: while (partitionsIterator.hasNext()) { - final FetchPartition fp = partitionsIterator.next(); - final ByteBufferMessageSet messageSet = fetchResponse.messageSet(fp.topic, fp.partition); - final KafkaTopicPartition topicPartition = new KafkaTopicPartition(fp.topic, fp.partition); - - for (MessageAndOffset msg : messageSet) { - if (running) { - messagesInFetch++; - if (msg.offset() < fp.nextOffsetToRead) { - // we have seen this message already - LOG.info("Skipping message with offset " + msg.offset() - + " because we have seen messages until " + fp.nextOffsetToRead - + " from partition " + fp.partition + " already"); - continue; - } - - final long offset = msg.offset(); - - ByteBuffer payload = msg.message().payload(); - - // If the message value is null, this represents a delete command for the message key. - // Log this and pass it on to the client who might want to also receive delete messages. - byte[] valueBytes; - if (payload == null) { - deletedMessages++; - valueBytes = null; - } else { - valueBytes = new byte[payload.remaining()]; - payload.get(valueBytes); - } - - // put key into byte array - byte[] keyBytes = null; - int keySize = msg.message().keySize(); - - if (keySize >= 0) { // message().hasKey() is doing the same. We save one int deserialization - ByteBuffer keyPayload = msg.message().key(); - keyBytes = new byte[keySize]; - keyPayload.get(keyBytes); - } - - final T value = deserializer.deserialize(keyBytes, valueBytes, fp.topic, fp.partition, offset); - if(deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } - synchronized (sourceContext.getCheckpointLock()) { - owner.flinkKafkaConsumer.processElement(sourceContext, topicPartition, value, offset); - } - - // advance offset for the next request - fp.nextOffsetToRead = offset + 1; - } - else { - // no longer running - return; - } - } - } - LOG.debug("This fetch contained {} messages ({} deleted messages)", messagesInFetch, deletedMessages); - } // end of fetch loop - - if (!newPartitionsQueue.close()) { - throw new Exception("Bug: Cleanly leaving fetcher thread without having a closed queue."); - } - } - catch (Throwable t) { - // report to the main thread - owner.stopWithError(t); - } - finally { - // end of run loop. close connection to consumer - if (consumer != null) { - // closing the consumer should not fail the program - try { - consumer.close(); - } - catch (Throwable t) { - LOG.error("Error while closing the Kafka simple consumer", t); - } - } - } - } - - private void getMissingOffsetsFromKafka(List<FetchPartition> partitions) { - List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>(); - - for (FetchPartition fp : partitions) { - if (fp.nextOffsetToRead == FlinkKafkaConsumerBase.OFFSET_NOT_SET) { - // retrieve the offset from the consumer - partitionsToGetOffsetsFor.add(fp); - } - } - if (partitionsToGetOffsetsFor.size() > 0) { - getLastOffset(consumer, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config)); - LOG.info("No prior offsets found for some partitions. Fetched the following start offsets {}", partitionsToGetOffsetsFor); - - // setting the fetched offset also in the offset state. - // we subtract -1 from the offset - synchronized (sourceContext.getCheckpointLock()) { - for(FetchPartition fp: partitionsToGetOffsetsFor) { - owner.flinkKafkaConsumer.updateOffsetForPartition(new KafkaTopicPartition(fp.topic, fp.partition), fp.nextOffsetToRead - 1L); - } - } - } - } - - /** - * Cancels this fetch thread. The thread will release all resources and terminate. - */ - public void cancel() { - this.running = false; - - // interrupt whatever the consumer is doing - if (consumer != null) { - consumer.close(); - } - - this.interrupt(); - } - - public ClosableBlockingQueue<FetchPartition> getNewPartitionsQueue() { - return newPartitionsQueue; - } - - /** - * Request latest offsets for a set of partitions, via a Kafka consumer. - * - * This method retries three times if the response has an error - * - * @param consumer The consumer connected to lead broker - * @param partitions The list of partitions we need offsets for - * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest) - */ - private static void getLastOffset(SimpleConsumer consumer, List<FetchPartition> partitions, long whichTime) { - - Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>(); - for (FetchPartition fp: partitions) { - TopicAndPartition topicAndPartition = new TopicAndPartition(fp.topic, fp.partition); - requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); - } - - int retries = 0; - OffsetResponse response; - while(true) { - kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId()); - response = consumer.getOffsetsBefore(request); - - if (response.hasError()) { - String exception = ""; - for (FetchPartition fp : partitions) { - short code; - if ((code = response.errorCode(fp.topic, fp.partition)) != ErrorMapping.NoError()) { - exception += "\nException for topic=" + fp.topic + " partition=" + fp.partition + ": " + StringUtils.stringifyException(ErrorMapping.exceptionFor(code)); - } - } - if(++retries >= 3) { - throw new RuntimeException("Unable to get last offset for partitions " + partitions + ". " + exception); - } else { - LOG.warn("Unable to get last offset for partitions: Exception(s): {}", exception); - } - - } else { - break; // leave retry loop - } - } - - for (FetchPartition fp: partitions) { - // the resulting offset is the next offset we are going to read - // for not-yet-consumed partitions, it is 0. - fp.nextOffsetToRead = response.offsets(fp.topic, fp.partition)[0]; - } - } - - private static long getInvalidOffsetBehavior(Properties config) { - long timeType; - String val = config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest"); - if(val.equals("none")) { - throw new RuntimeException("Unable to find previous offset in consumer group. " + - "Set 'auto.offset.reset' to 'latest' or 'earliest' to automatically get the offset from Kafka"); - } - if (val.equals("largest") || val.equals("latest")) { // largest is kafka 0.8, latest is kafka 0.9 - timeType = OffsetRequest.LatestTime(); - } else { - timeType = OffsetRequest.EarliestTime(); - } - return timeType; - } - } - - - private static class PartitionInfoFetcher extends Thread { - - private final List<String> topics; - private final Properties properties; - - private volatile List<KafkaTopicPartitionLeader> result; - private volatile Throwable error; - - - PartitionInfoFetcher(List<String> topics, Properties properties) { - this.topics = topics; - this.properties = properties; - } - - @Override - public void run() { - try { - result = FlinkKafkaConsumer08.getPartitionsForTopic(topics, properties); - } - catch (Throwable t) { - this.error = t; - } - } - - public List<KafkaTopicPartitionLeader> getPartitions() throws Exception { - try { - this.join(); - } - catch (InterruptedException e) { - throw new Exception("Partition fetching was cancelled before completion"); - } - - if (error != null) { - throw new Exception("Failed to fetch partitions for topics " + topics.toString(), error); - } - if (result != null) { - return result; - } - throw new Exception("Partition fetching failed"); - } - } - - private static class KillerWatchDog extends Thread { - - private final Thread toKill; - private final long timeout; - - private KillerWatchDog(Thread toKill, long timeout) { - super("KillerWatchDog"); - setDaemon(true); - - this.toKill = toKill; - this.timeout = timeout; - } - - @SuppressWarnings("deprecation") - @Override - public void run() { - final long deadline = System.currentTimeMillis() + timeout; - long now; - - while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) { - try { - toKill.join(deadline - now); - } - catch (InterruptedException e) { - // ignore here, our job is important! - } - } - - // this is harsh, but this watchdog is a last resort - if (toKill.isAlive()) { - toKill.stop(); - } - } - } - - /** - * Returns a unique list of topics from the topic partition list - * - * @param partitionsList A lost of FetchPartitions's - * @return A unique list of topics from the input map - */ - public static List<String> getTopics(List<FetchPartition> partitionsList) { - HashSet<String> uniqueTopics = new HashSet<>(); - for (FetchPartition fp: partitionsList) { - uniqueTopics.add(fp.topic); - } - return new ArrayList<>(uniqueTopics); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java deleted file mode 100644 index ecc7609..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * The offset handler is responsible for locating the initial partition offsets - * where the source should start reading, as well as committing offsets from completed - * checkpoints. - */ -public interface OffsetHandler { - - /** - * Commits the given offset for the partitions. May commit the offsets to the Kafka broker, - * or to ZooKeeper, based on its configured behavior. - * - * @param offsetsToCommit The offset to commit, per partition. - */ - void commit(Map<KafkaTopicPartition, Long> offsetsToCommit) throws Exception; - - /** - * Positions the given fetcher to the initial read offsets where the stream consumption - * will start from. - * - * @param partitions The partitions for which to seeks the fetcher to the beginning. - */ - Map<KafkaTopicPartition, Long> getOffsets(List<KafkaTopicPartition> partitions) throws Exception; - - /** - * Closes the offset handler, releasing all resources. - * - * @throws IOException Thrown, if the closing fails. - */ - void close() throws IOException; -} http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java new file mode 100644 index 0000000..d8d927d --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; + +import java.util.List; +import java.util.Properties; + +class PartitionInfoFetcher extends Thread { + + private final List<String> topics; + private final Properties properties; + + private volatile List<KafkaTopicPartitionLeader> result; + private volatile Throwable error; + + + PartitionInfoFetcher(List<String> topics, Properties properties) { + this.topics = topics; + this.properties = properties; + } + + @Override + public void run() { + try { + result = FlinkKafkaConsumer08.getPartitionsForTopic(topics, properties); + } + catch (Throwable t) { + this.error = t; + } + } + + public List<KafkaTopicPartitionLeader> getPartitions() throws Exception { + try { + this.join(); + } + catch (InterruptedException e) { + throw new Exception("Partition fetching was cancelled before completion"); + } + + if (error != null) { + throw new Exception("Failed to fetch partitions for topics " + topics.toString(), error); + } + if (result != null) { + return result; + } + throw new Exception("Partition fetching failed"); + } +} \ No newline at end of file
