http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml b/flink-connectors/flink-connector-kafka-base/pom.xml new file mode 100644 index 0000000..58eb043 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/pom.xml @@ -0,0 +1,212 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.2-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-kafka-base_2.10</artifactId> + <name>flink-connector-kafka-base</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <kafka.version>0.8.2.2</kafka.version> + </properties> + + <dependencies> + + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <!-- Projects depending on this project, + won't depend on flink-table. --> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + <version>${kafka.version}</version> + <exclusions> + <exclusion> + <groupId>com.sun.jmx</groupId> + <artifactId>jmxri</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jdmk</groupId> + <artifactId>jmxtools</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + </exclusion> + <exclusion> + <groupId>net.sf.jopt-simple</groupId> + <artifactId>jopt-simple</artifactId> + </exclusion> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + </exclusion> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>com.yammer.metrics</groupId> + <artifactId>metrics-annotation</artifactId> + </exclusion> + <exclusion> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- test dependencies --> + + <!-- force using the latest zkclient --> + <dependency> + <groupId>com.101tec</groupId> + <artifactId>zkclient</artifactId> + <version>0.7</version> + <type>jar</type> + <scope>test</scope> + </dependency> + + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <version>${curator.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-jmx</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minikdc</artifactId> + <version>${minikdc.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>com.101tec</groupId> + <artifactId>zkclient</artifactId> + <version>0.7</version> + </dependency> + </dependencies> + </dependencyManagement> + + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <!-- + https://issues.apache.org/jira/browse/DIRSHARED-134 + Required to pull the Mini-KDC transitive dependency + --> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <version>3.0.1</version> + <inherited>true</inherited> + <extensions>true</extensions> + </plugin> + </plugins> + </build> + +</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java new file mode 100644 index 0000000..aef7116 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -0,0 +1,552 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.commons.collections.map.LinkedMap; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.DefaultOperatorStateBackend; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.util.SerializedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class of all Flink Kafka Consumer data sources. + * This implements the common behavior across all Kafka versions. + * + * <p>The Kafka version specific behavior is defined mainly in the specific subclasses of the + * {@link AbstractFetcher}. + * + * @param <T> The type of records produced by this data source + */ +public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements + CheckpointListener, + ResultTypeQueryable<T>, + CheckpointedFunction { + private static final long serialVersionUID = -6272159445203409112L; + + protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class); + + /** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks */ + public static final int MAX_NUM_PENDING_CHECKPOINTS = 100; + + /** Boolean configuration key to disable metrics tracking **/ + public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; + + // ------------------------------------------------------------------------ + // configuration state, set on the client relevant for all subtasks + // ------------------------------------------------------------------------ + + private final List<String> topics; + + /** The schema to convert between Kafka's byte messages, and Flink's objects */ + protected final KeyedDeserializationSchema<T> deserializer; + + /** The set of topic partitions that the source will read */ + protected List<KafkaTopicPartition> subscribedPartitions; + + /** Optional timestamp extractor / watermark generator that will be run per Kafka partition, + * to exploit per-partition timestamp characteristics. + * The assigner is kept in serialized form, to deserialize it into multiple copies */ + private SerializedValue<AssignerWithPeriodicWatermarks<T>> periodicWatermarkAssigner; + + /** Optional timestamp extractor / watermark generator that will be run per Kafka partition, + * to exploit per-partition timestamp characteristics. + * The assigner is kept in serialized form, to deserialize it into multiple copies */ + private SerializedValue<AssignerWithPunctuatedWatermarks<T>> punctuatedWatermarkAssigner; + + private transient ListState<Tuple2<KafkaTopicPartition, Long>> offsetsStateForCheckpoint; + + // ------------------------------------------------------------------------ + // runtime state (used individually by each parallel subtask) + // ------------------------------------------------------------------------ + + /** Data for pending but uncommitted offsets */ + private final LinkedMap pendingOffsetsToCommit = new LinkedMap(); + + /** The fetcher implements the connections to the Kafka brokers */ + private transient volatile AbstractFetcher<T, ?> kafkaFetcher; + + /** The offsets to restore to, if the consumer restores state from a checkpoint */ + private transient volatile HashMap<KafkaTopicPartition, Long> restoreToOffset; + + /** Flag indicating whether the consumer is still running **/ + private volatile boolean running = true; + + // ------------------------------------------------------------------------ + + /** + * Base constructor. + * + * @param deserializer + * The deserializer to turn raw byte messages into Java/Scala objects. + */ + public FlinkKafkaConsumerBase(List<String> topics, KeyedDeserializationSchema<T> deserializer) { + this.topics = checkNotNull(topics); + checkArgument(topics.size() > 0, "You have to define at least one topic."); + this.deserializer = checkNotNull(deserializer, "valueDeserializer"); + } + + /** + * This method must be called from the subclasses, to set the list of all subscribed partitions + * that this consumer will fetch from (across all subtasks). + * + * @param allSubscribedPartitions The list of all partitions that all subtasks together should fetch from. + */ + protected void setSubscribedPartitions(List<KafkaTopicPartition> allSubscribedPartitions) { + checkNotNull(allSubscribedPartitions); + this.subscribedPartitions = Collections.unmodifiableList(allSubscribedPartitions); + } + + // ------------------------------------------------------------------------ + // Configuration + // ------------------------------------------------------------------------ + + /** + * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner. + * The watermark extractor will run per Kafka partition, watermarks will be merged across partitions + * in the same way as in the Flink runtime, when streams are merged. + * + * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions, + * the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition + * characteristics are usually lost that way. For example, if the timestamps are strictly ascending + * per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the + * parallel source subtask reads more that one partition. + * + * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka + * partition, allows users to let them exploit the per-partition characteristics. + * + * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an + * {@link AssignerWithPeriodicWatermarks}, not both at the same time. + * + * @param assigner The timestamp assigner / watermark generator to use. + * @return The consumer object, to allow function chaining. + */ + public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) { + checkNotNull(assigner); + + if (this.periodicWatermarkAssigner != null) { + throw new IllegalStateException("A periodic watermark emitter has already been set."); + } + try { + ClosureCleaner.clean(assigner, true); + this.punctuatedWatermarkAssigner = new SerializedValue<>(assigner); + return this; + } catch (Exception e) { + throw new IllegalArgumentException("The given assigner is not serializable", e); + } + } + + /** + * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner. + * The watermark extractor will run per Kafka partition, watermarks will be merged across partitions + * in the same way as in the Flink runtime, when streams are merged. + * + * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions, + * the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition + * characteristics are usually lost that way. For example, if the timestamps are strictly ascending + * per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the + * parallel source subtask reads more that one partition. + * + * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka + * partition, allows users to let them exploit the per-partition characteristics. + * + * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an + * {@link AssignerWithPeriodicWatermarks}, not both at the same time. + * + * @param assigner The timestamp assigner / watermark generator to use. + * @return The consumer object, to allow function chaining. + */ + public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) { + checkNotNull(assigner); + + if (this.punctuatedWatermarkAssigner != null) { + throw new IllegalStateException("A punctuated watermark emitter has already been set."); + } + try { + ClosureCleaner.clean(assigner, true); + this.periodicWatermarkAssigner = new SerializedValue<>(assigner); + return this; + } catch (Exception e) { + throw new IllegalArgumentException("The given assigner is not serializable", e); + } + } + + // ------------------------------------------------------------------------ + // Work methods + // ------------------------------------------------------------------------ + + @Override + public void run(SourceContext<T> sourceContext) throws Exception { + if (subscribedPartitions == null) { + throw new Exception("The partitions were not set for the consumer"); + } + + // we need only do work, if we actually have partitions assigned + if (!subscribedPartitions.isEmpty()) { + + // (1) create the fetcher that will communicate with the Kafka brokers + final AbstractFetcher<T, ?> fetcher = createFetcher( + sourceContext, subscribedPartitions, + periodicWatermarkAssigner, punctuatedWatermarkAssigner, + (StreamingRuntimeContext) getRuntimeContext()); + + // (2) set the fetcher to the restored checkpoint offsets + if (restoreToOffset != null) { + fetcher.restoreOffsets(restoreToOffset); + } + + // publish the reference, for snapshot-, commit-, and cancel calls + // IMPORTANT: We can only do that now, because only now will calls to + // the fetchers 'snapshotCurrentState()' method return at least + // the restored offsets + this.kafkaFetcher = fetcher; + if (!running) { + return; + } + + // (3) run the fetcher' main work method + fetcher.runFetchLoop(); + } + else { + // this source never completes, so emit a Long.MAX_VALUE watermark + // to not block watermark forwarding + sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); + + // wait until this is canceled + final Object waitLock = new Object(); + while (running) { + try { + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (waitLock) { + waitLock.wait(); + } + } + catch (InterruptedException e) { + if (!running) { + // restore the interrupted state, and fall through the loop + Thread.currentThread().interrupt(); + } + } + } + } + } + + @Override + public void cancel() { + // set ourselves as not running + running = false; + + // abort the fetcher, if there is one + if (kafkaFetcher != null) { + kafkaFetcher.cancel(); + } + + // there will be an interrupt() call to the main thread anyways + } + + @Override + public void open(Configuration configuration) { + List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics); + + if (kafkaTopicPartitions != null) { + assignTopicPartitions(kafkaTopicPartitions); + } + } + + @Override + public void close() throws Exception { + // pretty much the same logic as cancelling + try { + cancel(); + } finally { + super.close(); + } + } + + // ------------------------------------------------------------------------ + // Checkpoint and restore + // ------------------------------------------------------------------------ + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + + OperatorStateStore stateStore = context.getOperatorStateStore(); + offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); + + if (context.isRestored()) { + restoreToOffset = new HashMap<>(); + for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) { + restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1); + } + + LOG.info("Setting restore state in the FlinkKafkaConsumer."); + if (LOG.isDebugEnabled()) { + LOG.debug("Using the following offsets: {}", restoreToOffset); + } + } else { + LOG.info("No restore state for FlinkKafkaConsumer."); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + if (!running) { + LOG.debug("snapshotState() called on closed source"); + } else { + + offsetsStateForCheckpoint.clear(); + + final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher; + if (fetcher == null) { + // the fetcher has not yet been initialized, which means we need to return the + // originally restored offsets or the assigned partitions + + if (restoreToOffset != null) { + + for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) { + offsetsStateForCheckpoint.add( + Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); + } + } else if (subscribedPartitions != null) { + for (KafkaTopicPartition subscribedPartition : subscribedPartitions) { + offsetsStateForCheckpoint.add( + Tuple2.of(subscribedPartition, KafkaTopicPartitionState.OFFSET_NOT_SET)); + } + } + + // the map cannot be asynchronously updated, because only one checkpoint call can happen + // on this function at a time: either snapshotState() or notifyCheckpointComplete() + pendingOffsetsToCommit.put(context.getCheckpointId(), restoreToOffset); + } else { + HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState(); + + // the map cannot be asynchronously updated, because only one checkpoint call can happen + // on this function at a time: either snapshotState() or notifyCheckpointComplete() + pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets); + + for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) { + offsetsStateForCheckpoint.add( + Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); + } + } + + // truncate the map of pending offsets to commit, to prevent infinite growth + while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) { + pendingOffsetsToCommit.remove(0); + } + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (!running) { + LOG.debug("notifyCheckpointComplete() called on closed source"); + return; + } + + final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher; + if (fetcher == null) { + LOG.debug("notifyCheckpointComplete() called on uninitialized source"); + return; + } + + // only one commit operation must be in progress + if (LOG.isDebugEnabled()) { + LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId); + } + + try { + final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId); + if (posInMap == -1) { + LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId); + return; + } + + @SuppressWarnings("unchecked") + HashMap<KafkaTopicPartition, Long> offsets = + (HashMap<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap); + + // remove older checkpoints in map + for (int i = 0; i < posInMap; i++) { + pendingOffsetsToCommit.remove(0); + } + + if (offsets == null || offsets.size() == 0) { + LOG.debug("Checkpoint state was empty."); + return; + } + fetcher.commitInternalOffsetsToKafka(offsets); + } + catch (Exception e) { + if (running) { + throw e; + } + // else ignore exception if we are no longer running + } + } + + // ------------------------------------------------------------------------ + // Kafka Consumer specific methods + // ------------------------------------------------------------------------ + + /** + * Creates the fetcher that connect to the Kafka brokers, pulls data, deserialized the + * data, and emits it into the data streams. + * + * @param sourceContext The source context to emit data to. + * @param thisSubtaskPartitions The set of partitions that this subtask should handle. + * @param watermarksPeriodic Optional, a serialized timestamp extractor / periodic watermark generator. + * @param watermarksPunctuated Optional, a serialized timestamp extractor / punctuated watermark generator. + * @param runtimeContext The task's runtime context. + * + * @return The instantiated fetcher + * + * @throws Exception The method should forward exceptions + */ + protected abstract AbstractFetcher<T, ?> createFetcher( + SourceContext<T> sourceContext, + List<KafkaTopicPartition> thisSubtaskPartitions, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + StreamingRuntimeContext runtimeContext) throws Exception; + + protected abstract List<KafkaTopicPartition> getKafkaPartitions(List<String> topics); + + // ------------------------------------------------------------------------ + // ResultTypeQueryable methods + // ------------------------------------------------------------------------ + + @Override + public TypeInformation<T> getProducedType() { + return deserializer.getProducedType(); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private void assignTopicPartitions(List<KafkaTopicPartition> kafkaTopicPartitions) { + subscribedPartitions = new ArrayList<>(); + + if (restoreToOffset != null) { + for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) { + if (restoreToOffset.containsKey(kafkaTopicPartition)) { + subscribedPartitions.add(kafkaTopicPartition); + } + } + } else { + Collections.sort(kafkaTopicPartitions, new Comparator<KafkaTopicPartition>() { + @Override + public int compare(KafkaTopicPartition o1, KafkaTopicPartition o2) { + int topicComparison = o1.getTopic().compareTo(o2.getTopic()); + + if (topicComparison == 0) { + return o1.getPartition() - o2.getPartition(); + } else { + return topicComparison; + } + } + }); + + for (int i = getRuntimeContext().getIndexOfThisSubtask(); i < kafkaTopicPartitions.size(); i += getRuntimeContext().getNumberOfParallelSubtasks()) { + subscribedPartitions.add(kafkaTopicPartitions.get(i)); + } + } + } + + /** + * Selects which of the given partitions should be handled by a specific consumer, + * given a certain number of consumers. + * + * @param allPartitions The partitions to select from + * @param numConsumers The number of consumers + * @param consumerIndex The index of the specific consumer + * + * @return The sublist of partitions to be handled by that consumer. + */ + protected static List<KafkaTopicPartition> assignPartitions( + List<KafkaTopicPartition> allPartitions, + int numConsumers, int consumerIndex) { + final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>( + allPartitions.size() / numConsumers + 1); + + for (int i = 0; i < allPartitions.size(); i++) { + if (i % numConsumers == consumerIndex) { + thisSubtaskPartitions.add(allPartitions.get(i)); + } + } + + return thisSubtaskPartitions; + } + + /** + * Logs the partition information in INFO level. + * + * @param logger The logger to log to. + * @param partitionInfos List of subscribed partitions + */ + protected static void logPartitionInfo(Logger logger, List<KafkaTopicPartition> partitionInfos) { + Map<String, Integer> countPerTopic = new HashMap<>(); + for (KafkaTopicPartition partition : partitionInfos) { + Integer count = countPerTopic.get(partition.getTopic()); + if (count == null) { + count = 1; + } else { + count++; + } + countPerTopic.put(partition.getTopic(), count); + } + StringBuilder sb = new StringBuilder( + "Consumer is going to read the following topics (with number of partitions): "); + + for (Map.Entry<String, Integer> e : countPerTopic.entrySet()) { + sb.append(e.getKey()).append(" (").append(e.getValue()).append("), "); + } + + logger.info(sb.toString()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java new file mode 100644 index 0000000..d413f1c --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.SerializableObject; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.util.NetUtils; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.Properties; +import java.util.Collections; +import java.util.Comparator; + +import static java.util.Objects.requireNonNull; + + +/** + * Flink Sink to produce data into a Kafka topic. + * + * Please note that this producer provides at-least-once reliability guarantees when + * checkpoints are enabled and setFlushOnCheckpoint(true) is set. + * Otherwise, the producer doesn't provide any reliability guarantees. + * + * @param <IN> Type of the messages to write into Kafka. + */ +public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> implements CheckpointedFunction { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class); + + private static final long serialVersionUID = 1L; + + /** + * Configuration key for disabling the metrics reporting + */ + public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; + + /** + * Array with the partition ids of the given defaultTopicId + * The size of this array is the number of partitions + */ + protected int[] partitions; + + /** + * User defined properties for the Producer + */ + protected final Properties producerConfig; + + /** + * The name of the default topic this producer is writing data to + */ + protected final String defaultTopicId; + + /** + * (Serializable) SerializationSchema for turning objects used with Flink into + * byte[] for Kafka. + */ + protected final KeyedSerializationSchema<IN> schema; + + /** + * User-provided partitioner for assigning an object to a Kafka partition. + */ + protected final KafkaPartitioner<IN> partitioner; + + /** + * Flag indicating whether to accept failures (and log them), or to fail on failures + */ + protected boolean logFailuresOnly; + + /** + * If true, the producer will wait until all outstanding records have been send to the broker. + */ + protected boolean flushOnCheckpoint; + + // -------------------------------- Runtime fields ------------------------------------------ + + /** KafkaProducer instance */ + protected transient KafkaProducer<byte[], byte[]> producer; + + /** The callback than handles error propagation or logging callbacks */ + protected transient Callback callback; + + /** Errors encountered in the async producer are stored here */ + protected transient volatile Exception asyncException; + + /** Lock for accessing the pending records */ + protected final SerializableObject pendingRecordsLock = new SerializableObject(); + + /** Number of unacknowledged records. */ + protected long pendingRecords; + + protected OperatorStateStore stateStore; + + + /** + * The main constructor for creating a FlinkKafkaProducer. + * + * @param defaultTopicId The default topic to write data to + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner + */ + public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { + requireNonNull(defaultTopicId, "TopicID not set"); + requireNonNull(serializationSchema, "serializationSchema not set"); + requireNonNull(producerConfig, "producerConfig not set"); + ClosureCleaner.clean(customPartitioner, true); + ClosureCleaner.ensureSerializable(serializationSchema); + + this.defaultTopicId = defaultTopicId; + this.schema = serializationSchema; + this.producerConfig = producerConfig; + + // set the producer configuration properties for kafka record key value serializers. + if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { + this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + } else { + LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); + } + + if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { + this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + } else { + LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); + } + + // eagerly ensure that bootstrap servers are set. + if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) { + throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties."); + } + + this.partitioner = customPartitioner; + } + + // ---------------------------------- Properties -------------------------- + + /** + * Defines whether the producer should fail on errors, or only log them. + * If this is set to true, then exceptions will be only logged, if set to false, + * exceptions will be eventually thrown and cause the streaming program to + * fail (and enter recovery). + * + * @param logFailuresOnly The flag to indicate logging-only on exceptions. + */ + public void setLogFailuresOnly(boolean logFailuresOnly) { + this.logFailuresOnly = logFailuresOnly; + } + + /** + * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers + * to be acknowledged by the Kafka producer on a checkpoint. + * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint. + * + * @param flush Flag indicating the flushing mode (true = flush on checkpoint) + */ + public void setFlushOnCheckpoint(boolean flush) { + this.flushOnCheckpoint = flush; + } + + /** + * Used for testing only + */ + protected <K,V> KafkaProducer<K,V> getKafkaProducer(Properties props) { + return new KafkaProducer<>(props); + } + + // ----------------------------------- Utilities -------------------------- + + /** + * Initializes the connection to Kafka. + */ + @Override + public void open(Configuration configuration) { + producer = getKafkaProducer(this.producerConfig); + + // the fetched list is immutable, so we're creating a mutable copy in order to sort it + List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(defaultTopicId)); + + // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks + Collections.sort(partitionsList, new Comparator<PartitionInfo>() { + @Override + public int compare(PartitionInfo o1, PartitionInfo o2) { + return Integer.compare(o1.partition(), o2.partition()); + } + }); + + partitions = new int[partitionsList.size()]; + for (int i = 0; i < partitions.length; i++) { + partitions[i] = partitionsList.get(i).partition(); + } + + RuntimeContext ctx = getRuntimeContext(); + if (partitioner != null) { + partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions); + } + + LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", + ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicId); + + // register Kafka metrics to Flink accumulators + if (!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) { + Map<MetricName, ? extends Metric> metrics = this.producer.metrics(); + + if (metrics == null) { + // MapR's Kafka implementation returns null here. + LOG.info("Producer implementation does not support metrics"); + } else { + final MetricGroup kafkaMetricGroup = getRuntimeContext().getMetricGroup().addGroup("KafkaProducer"); + for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) { + kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue())); + } + } + } + + if (flushOnCheckpoint && !((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled()) { + LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing."); + flushOnCheckpoint = false; + } + + if (logFailuresOnly) { + callback = new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception e) { + if (e != null) { + LOG.error("Error while sending record to Kafka: " + e.getMessage(), e); + } + acknowledgeMessage(); + } + }; + } + else { + callback = new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null && asyncException == null) { + asyncException = exception; + } + acknowledgeMessage(); + } + }; + } + } + + /** + * Called when new data arrives to the sink, and forwards it to Kafka. + * + * @param next + * The incoming data + */ + @Override + public void invoke(IN next) throws Exception { + // propagate asynchronous errors + checkErroneous(); + + byte[] serializedKey = schema.serializeKey(next); + byte[] serializedValue = schema.serializeValue(next); + String targetTopic = schema.getTargetTopic(next); + if (targetTopic == null) { + targetTopic = defaultTopicId; + } + + ProducerRecord<byte[], byte[]> record; + if (partitioner == null) { + record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue); + } else { + record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue); + } + if (flushOnCheckpoint) { + synchronized (pendingRecordsLock) { + pendingRecords++; + } + } + producer.send(record, callback); + } + + + @Override + public void close() throws Exception { + if (producer != null) { + producer.close(); + } + + // make sure we propagate pending errors + checkErroneous(); + } + + // ------------------- Logic for handling checkpoint flushing -------------------------- // + + private void acknowledgeMessage() { + if (flushOnCheckpoint) { + synchronized (pendingRecordsLock) { + pendingRecords--; + if (pendingRecords == 0) { + pendingRecordsLock.notifyAll(); + } + } + } + } + + /** + * Flush pending records. + */ + protected abstract void flush(); + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + this.stateStore = context.getOperatorStateStore(); + } + + @Override + public void snapshotState(FunctionSnapshotContext ctx) throws Exception { + if (flushOnCheckpoint) { + // flushing is activated: We need to wait until pendingRecords is 0 + flush(); + synchronized (pendingRecordsLock) { + if (pendingRecords != 0) { + throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecords); + } + // pending records count is 0. We can now confirm the checkpoint + } + } + } + + // ----------------------------------- Utilities -------------------------- + + protected void checkErroneous() throws Exception { + Exception e = asyncException; + if (e != null) { + // prevent double throwing + asyncException = null; + throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e); + } + } + + public static Properties getPropertiesFromBrokerList(String brokerList) { + String[] elements = brokerList.split(","); + + // validate the broker addresses + for (String broker: elements) { + NetUtils.getCorrectHostnamePort(broker); + } + + Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); + return props; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java new file mode 100644 index 0000000..ee98783 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import java.util.Properties; + +/** + * Base class for {@link KafkaTableSink} that serializes data in JSON format + */ +public abstract class KafkaJsonTableSink extends KafkaTableSink { + + /** + * Creates KafkaJsonTableSink + * + * @param topic topic in Kafka to which table is written + * @param properties properties to connect to Kafka + * @param partitioner Kafka partitioner + */ + public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) { + super(topic, properties, partitioner); + } + + @Override + protected SerializationSchema<Row> createSerializationSchema(String[] fieldNames) { + return new JsonRowSerializationSchema(fieldNames); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java new file mode 100644 index 0000000..f145509 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.sources.StreamTableSource; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; + +import java.util.Properties; + +/** + * A version-agnostic Kafka JSON {@link StreamTableSource}. + * + * <p>The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + * + * <p>The field names are used to parse the JSON file and so are the types. + */ +public abstract class KafkaJsonTableSource extends KafkaTableSource { + + /** + * Creates a generic Kafka JSON {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + KafkaJsonTableSource( + String topic, + Properties properties, + String[] fieldNames, + Class<?>[] fieldTypes) { + + super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes); + } + + /** + * Creates a generic Kafka JSON {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + KafkaJsonTableSource( + String topic, + Properties properties, + String[] fieldNames, + TypeInformation<?>[] fieldTypes) { + + super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes); + } + + /** + * Configures the failure behaviour if a JSON field is missing. + * + * <p>By default, a missing field is ignored and the field is set to null. + * + * @param failOnMissingField Flag indicating whether to fail or not on a missing field. + */ + public void setFailOnMissingField(boolean failOnMissingField) { + JsonRowDeserializationSchema deserializationSchema = (JsonRowDeserializationSchema) getDeserializationSchema(); + deserializationSchema.setFailOnMissingField(failOnMissingField); + } + + private static JsonRowDeserializationSchema createDeserializationSchema( + String[] fieldNames, + TypeInformation<?>[] fieldTypes) { + + return new JsonRowDeserializationSchema(fieldNames, fieldTypes); + } + + private static JsonRowDeserializationSchema createDeserializationSchema( + String[] fieldNames, + Class<?>[] fieldTypes) { + + return new JsonRowDeserializationSchema(fieldNames, fieldTypes); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java new file mode 100644 index 0000000..714d9cd --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.sinks.StreamTableSink; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.util.Preconditions; + +import java.util.Properties; + +/** + * A version-agnostic Kafka {@link StreamTableSink}. + * + * <p>The version-specific Kafka consumers need to extend this class and + * override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}. + */ +public abstract class KafkaTableSink implements StreamTableSink<Row> { + + protected final String topic; + protected final Properties properties; + protected SerializationSchema<Row> serializationSchema; + protected final KafkaPartitioner<Row> partitioner; + protected String[] fieldNames; + protected TypeInformation[] fieldTypes; + + /** + * Creates KafkaTableSink + * + * @param topic Kafka topic to write to. + * @param properties Properties for the Kafka consumer. + * @param partitioner Partitioner to select Kafka partition for each item + */ + public KafkaTableSink( + String topic, + Properties properties, + KafkaPartitioner<Row> partitioner) { + + this.topic = Preconditions.checkNotNull(topic, "topic"); + this.properties = Preconditions.checkNotNull(properties, "properties"); + this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner"); + } + + /** + * Returns the version-specifid Kafka producer. + * + * @param topic Kafka topic to produce to. + * @param properties Properties for the Kafka producer. + * @param serializationSchema Serialization schema to use to create Kafka records. + * @param partitioner Partitioner to select Kafka partition. + * @return The version-specific Kafka producer + */ + protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer( + String topic, Properties properties, + SerializationSchema<Row> serializationSchema, + KafkaPartitioner<Row> partitioner); + + /** + * Create serialization schema for converting table rows into bytes. + * + * @param fieldNames Field names in table rows. + * @return Instance of serialization schema + */ + protected abstract SerializationSchema<Row> createSerializationSchema(String[] fieldNames); + + /** + * Create a deep copy of this sink. + * + * @return Deep copy of this sink + */ + protected abstract KafkaTableSink createCopy(); + + @Override + public void emitDataStream(DataStream<Row> dataStream) { + FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner); + dataStream.addSink(kafkaProducer); + } + + @Override + public TypeInformation<Row> getOutputType() { + return new RowTypeInfo(getFieldTypes()); + } + + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return fieldTypes; + } + + @Override + public KafkaTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + KafkaTableSink copy = createCopy(); + copy.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames"); + copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes"); + Preconditions.checkArgument(fieldNames.length == fieldTypes.length, + "Number of provided field names and types does not match."); + copy.serializationSchema = createSerializationSchema(fieldNames); + + return copy; + } + + + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java new file mode 100644 index 0000000..fd423d7 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.sources.StreamTableSource; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.util.Preconditions; + +import java.util.Properties; + +import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.toTypeInfo; + +/** + * A version-agnostic Kafka {@link StreamTableSource}. + * + * <p>The version-specific Kafka consumers need to extend this class and + * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}. + */ +public abstract class KafkaTableSource implements StreamTableSource<Row> { + + /** The Kafka topic to consume. */ + private final String topic; + + /** Properties for the Kafka consumer. */ + private final Properties properties; + + /** Deserialization schema to use for Kafka records. */ + private final DeserializationSchema<Row> deserializationSchema; + + /** Row field names. */ + private final String[] fieldNames; + + /** Row field types. */ + private final TypeInformation<?>[] fieldTypes; + + /** + * Creates a generic Kafka {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema to use for Kafka records. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + KafkaTableSource( + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + String[] fieldNames, + Class<?>[] fieldTypes) { + + this(topic, properties, deserializationSchema, fieldNames, toTypeInfo(fieldTypes)); + } + + /** + * Creates a generic Kafka {@link StreamTableSource}. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema to use for Kafka records. + * @param fieldNames Row field names. + * @param fieldTypes Row field types. + */ + KafkaTableSource( + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema, + String[] fieldNames, + TypeInformation<?>[] fieldTypes) { + + this.topic = Preconditions.checkNotNull(topic, "Topic"); + this.properties = Preconditions.checkNotNull(properties, "Properties"); + this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema"); + this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names"); + this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types"); + + Preconditions.checkArgument(fieldNames.length == fieldTypes.length, + "Number of provided field names and types does not match."); + } + + /** + * NOTE: This method is for internal use only for defining a TableSource. + * Do not use it in Table API programs. + */ + @Override + public DataStream<Row> getDataStream(StreamExecutionEnvironment env) { + // Version-specific Kafka consumer + FlinkKafkaConsumerBase<Row> kafkaConsumer = getKafkaConsumer(topic, properties, deserializationSchema); + DataStream<Row> kafkaSource = env.addSource(kafkaConsumer); + return kafkaSource; + } + + @Override + public int getNumberOfFields() { + return fieldNames.length; + } + + @Override + public String[] getFieldsNames() { + return fieldNames; + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return fieldTypes; + } + + @Override + public TypeInformation<Row> getReturnType() { + return new RowTypeInfo(fieldTypes); + } + + /** + * Returns the version-specific Kafka consumer. + * + * @param topic Kafka topic to consume. + * @param properties Properties for the Kafka consumer. + * @param deserializationSchema Deserialization schema to use for Kafka records. + * @return The version-specific Kafka consumer + */ + abstract FlinkKafkaConsumerBase<Row> getKafkaConsumer( + String topic, + Properties properties, + DeserializationSchema<Row> deserializationSchema); + + /** + * Returns the deserialization schema. + * + * @return The deserialization schema + */ + protected DeserializationSchema<Row> getDeserializationSchema() { + return deserializationSchema; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java new file mode 100644 index 0000000..cf39606 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -0,0 +1,552 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.SerializedValue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for all fetchers, which implement the connections to Kafka brokers and + * pull records from Kafka partitions. + * + * <p>This fetcher base class implements the logic around emitting records and tracking offsets, + * as well as around the optional timestamp assignment and watermark generation. + * + * @param <T> The type of elements deserialized from Kafka's byte records, and emitted into + * the Flink data streams. + * @param <KPH> The type of topic/partition identifier used by Kafka in the specific version. + */ +public abstract class AbstractFetcher<T, KPH> { + + protected static final int NO_TIMESTAMPS_WATERMARKS = 0; + protected static final int PERIODIC_WATERMARKS = 1; + protected static final int PUNCTUATED_WATERMARKS = 2; + + // ------------------------------------------------------------------------ + + /** The source context to emit records and watermarks to */ + protected final SourceContext<T> sourceContext; + + /** The lock that guarantees that record emission and state updates are atomic, + * from the view of taking a checkpoint */ + protected final Object checkpointLock; + + /** All partitions (and their state) that this fetcher is subscribed to */ + private final KafkaTopicPartitionState<KPH>[] allPartitions; + + /** The mode describing whether the fetcher also generates timestamps and watermarks */ + protected final int timestampWatermarkMode; + + /** Flag whether to register metrics for the fetcher */ + protected final boolean useMetrics; + + /** Only relevant for punctuated watermarks: The current cross partition watermark */ + private volatile long maxWatermarkSoFar = Long.MIN_VALUE; + + // ------------------------------------------------------------------------ + + protected AbstractFetcher( + SourceContext<T> sourceContext, + List<KafkaTopicPartition> assignedPartitions, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + ProcessingTimeService processingTimeProvider, + long autoWatermarkInterval, + ClassLoader userCodeClassLoader, + boolean useMetrics) throws Exception + { + this.sourceContext = checkNotNull(sourceContext); + this.checkpointLock = sourceContext.getCheckpointLock(); + this.useMetrics = useMetrics; + + // figure out what we watermark mode we will be using + + if (watermarksPeriodic == null) { + if (watermarksPunctuated == null) { + // simple case, no watermarks involved + timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS; + } else { + timestampWatermarkMode = PUNCTUATED_WATERMARKS; + } + } else { + if (watermarksPunctuated == null) { + timestampWatermarkMode = PERIODIC_WATERMARKS; + } else { + throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks"); + } + } + + // create our partition state according to the timestamp/watermark mode + this.allPartitions = initializePartitions( + assignedPartitions, + timestampWatermarkMode, + watermarksPeriodic, watermarksPunctuated, + userCodeClassLoader); + + // if we have periodic watermarks, kick off the interval scheduler + if (timestampWatermarkMode == PERIODIC_WATERMARKS) { + KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts = + (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions; + + PeriodicWatermarkEmitter periodicEmitter = + new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval); + periodicEmitter.start(); + } + } + + // ------------------------------------------------------------------------ + // Properties + // ------------------------------------------------------------------------ + + /** + * Gets all partitions (with partition state) that this fetcher is subscribed to. + * + * @return All subscribed partitions. + */ + protected final KafkaTopicPartitionState<KPH>[] subscribedPartitions() { + return allPartitions; + } + + // ------------------------------------------------------------------------ + // Core fetcher work methods + // ------------------------------------------------------------------------ + + public abstract void runFetchLoop() throws Exception; + + public abstract void cancel(); + + // ------------------------------------------------------------------------ + // Kafka version specifics + // ------------------------------------------------------------------------ + + /** + * Creates the Kafka version specific representation of the given + * topic partition. + * + * @param partition The Flink representation of the Kafka topic partition. + * @return The specific Kafka representation of the Kafka topic partition. + */ + public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition partition); + + /** + * Commits the given partition offsets to the Kafka brokers (or to ZooKeeper for + * older Kafka versions). The given offsets are the internal checkpointed offsets, representing + * the last processed record of each partition. Version-specific implementations of this method + * need to hold the contract that the given offsets must be incremented by 1 before + * committing them, so that committed offsets to Kafka represent "the next record to process". + * + * @param offsets The offsets to commit to Kafka (implementations must increment offsets by 1 before committing). + * @throws Exception This method forwards exceptions. + */ + public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception; + + // ------------------------------------------------------------------------ + // snapshot and restore the state + // ------------------------------------------------------------------------ + + /** + * Takes a snapshot of the partition offsets. + * + * <p>Important: This method mus be called under the checkpoint lock. + * + * @return A map from partition to current offset. + */ + public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() { + // this method assumes that the checkpoint lock is held + assert Thread.holdsLock(checkpointLock); + + HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length); + for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) { + state.put(partition.getKafkaTopicPartition(), partition.getOffset()); + } + return state; + } + + /** + * Restores the partition offsets. + * + * @param snapshotState The offsets for the partitions + */ + public void restoreOffsets(HashMap<KafkaTopicPartition, Long> snapshotState) { + for (KafkaTopicPartitionState<?> partition : allPartitions) { + Long offset = snapshotState.get(partition.getKafkaTopicPartition()); + if (offset != null) { + partition.setOffset(offset); + } + } + } + + // ------------------------------------------------------------------------ + // emitting records + // ------------------------------------------------------------------------ + + /** + * Emits a record without attaching an existing timestamp to it. + * + * <p>Implementation Note: This method is kept brief to be JIT inlining friendly. + * That makes the fast path efficient, the extended paths are called as separate methods. + * + * @param record The record to emit + * @param partitionState The state of the Kafka partition from which the record was fetched + * @param offset The offset of the record + */ + protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) throws Exception { + if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) { + // fast path logic, in case there are no watermarks + + // emit the record, using the checkpoint lock to guarantee + // atomicity of record emission and offset state update + synchronized (checkpointLock) { + sourceContext.collect(record); + partitionState.setOffset(offset); + } + } + else if (timestampWatermarkMode == PERIODIC_WATERMARKS) { + emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, Long.MIN_VALUE); + } + else { + emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, Long.MIN_VALUE); + } + } + + /** + * Emits a record attaching a timestamp to it. + * + * <p>Implementation Note: This method is kept brief to be JIT inlining friendly. + * That makes the fast path efficient, the extended paths are called as separate methods. + * + * @param record The record to emit + * @param partitionState The state of the Kafka partition from which the record was fetched + * @param offset The offset of the record + */ + protected void emitRecordWithTimestamp( + T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception { + + if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) { + // fast path logic, in case there are no watermarks generated in the fetcher + + // emit the record, using the checkpoint lock to guarantee + // atomicity of record emission and offset state update + synchronized (checkpointLock) { + sourceContext.collectWithTimestamp(record, timestamp); + partitionState.setOffset(offset); + } + } + else if (timestampWatermarkMode == PERIODIC_WATERMARKS) { + emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp); + } + else { + emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp); + } + } + + /** + * Record emission, if a timestamp will be attached from an assigner that is + * also a periodic watermark generator. + */ + protected void emitRecordWithTimestampAndPeriodicWatermark( + T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp) + { + @SuppressWarnings("unchecked") + final KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> withWatermarksState = + (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState; + + // extract timestamp - this accesses/modifies the per-partition state inside the + // watermark generator instance, so we need to lock the access on the + // partition state. concurrent access can happen from the periodic emitter + final long timestamp; + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (withWatermarksState) { + timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp); + } + + // emit the record with timestamp, using the usual checkpoint lock to guarantee + // atomicity of record emission and offset state update + synchronized (checkpointLock) { + sourceContext.collectWithTimestamp(record, timestamp); + partitionState.setOffset(offset); + } + } + + /** + * Record emission, if a timestamp will be attached from an assigner that is + * also a punctuated watermark generator. + */ + protected void emitRecordWithTimestampAndPunctuatedWatermark( + T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp) + { + @SuppressWarnings("unchecked") + final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState = + (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState; + + // only one thread ever works on accessing timestamps and watermarks + // from the punctuated extractor + final long timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp); + final Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp); + + // emit the record with timestamp, using the usual checkpoint lock to guarantee + // atomicity of record emission and offset state update + synchronized (checkpointLock) { + sourceContext.collectWithTimestamp(record, timestamp); + partitionState.setOffset(offset); + } + + // if we also have a new per-partition watermark, check if that is also a + // new cross-partition watermark + if (newWatermark != null) { + updateMinPunctuatedWatermark(newWatermark); + } + } + + /** + *Checks whether a new per-partition watermark is also a new cross-partition watermark. + */ + private void updateMinPunctuatedWatermark(Watermark nextWatermark) { + if (nextWatermark.getTimestamp() > maxWatermarkSoFar) { + long newMin = Long.MAX_VALUE; + + for (KafkaTopicPartitionState<?> state : allPartitions) { + @SuppressWarnings("unchecked") + final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState = + (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state; + + newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark()); + } + + // double-check locking pattern + if (newMin > maxWatermarkSoFar) { + synchronized (checkpointLock) { + if (newMin > maxWatermarkSoFar) { + maxWatermarkSoFar = newMin; + sourceContext.emitWatermark(new Watermark(newMin)); + } + } + } + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Utility method that takes the topic partitions and creates the topic partition state + * holders. If a watermark generator per partition exists, this will also initialize those. + */ + private KafkaTopicPartitionState<KPH>[] initializePartitions( + List<KafkaTopicPartition> assignedPartitions, + int timestampWatermarkMode, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + ClassLoader userCodeClassLoader) + throws IOException, ClassNotFoundException + { + switch (timestampWatermarkMode) { + + case NO_TIMESTAMPS_WATERMARKS: { + @SuppressWarnings("unchecked") + KafkaTopicPartitionState<KPH>[] partitions = + (KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitions.size()]; + + int pos = 0; + for (KafkaTopicPartition partition : assignedPartitions) { + // create the kafka version specific partition handle + KPH kafkaHandle = createKafkaPartitionHandle(partition); + partitions[pos++] = new KafkaTopicPartitionState<>(partition, kafkaHandle); + } + + return partitions; + } + + case PERIODIC_WATERMARKS: { + @SuppressWarnings("unchecked") + KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions = + (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[]) + new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitions.size()]; + + int pos = 0; + for (KafkaTopicPartition partition : assignedPartitions) { + KPH kafkaHandle = createKafkaPartitionHandle(partition); + + AssignerWithPeriodicWatermarks<T> assignerInstance = + watermarksPeriodic.deserializeValue(userCodeClassLoader); + + partitions[pos++] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>( + partition, kafkaHandle, assignerInstance); + } + + return partitions; + } + + case PUNCTUATED_WATERMARKS: { + @SuppressWarnings("unchecked") + KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[] partitions = + (KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[]) + new KafkaTopicPartitionStateWithPunctuatedWatermarks<?, ?>[assignedPartitions.size()]; + + int pos = 0; + for (KafkaTopicPartition partition : assignedPartitions) { + KPH kafkaHandle = createKafkaPartitionHandle(partition); + + AssignerWithPunctuatedWatermarks<T> assignerInstance = + watermarksPunctuated.deserializeValue(userCodeClassLoader); + + partitions[pos++] = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>( + partition, kafkaHandle, assignerInstance); + } + + return partitions; + } + default: + // cannot happen, add this as a guard for the future + throw new RuntimeException(); + } + } + + // ------------------------- Metrics ---------------------------------- + + /** + * Add current and committed offsets to metric group + * + * @param metricGroup The metric group to use + */ + protected void addOffsetStateGauge(MetricGroup metricGroup) { + // add current offsets to gage + MetricGroup currentOffsets = metricGroup.addGroup("current-offsets"); + MetricGroup committedOffsets = metricGroup.addGroup("committed-offsets"); + for (KafkaTopicPartitionState<?> ktp: subscribedPartitions()) { + currentOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET)); + committedOffsets.gauge(ktp.getTopic() + "-" + ktp.getPartition(), new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET)); + } + } + + /** + * Gauge types + */ + private enum OffsetGaugeType { + CURRENT_OFFSET, + COMMITTED_OFFSET + } + + /** + * Gauge for getting the offset of a KafkaTopicPartitionState. + */ + private static class OffsetGauge implements Gauge<Long> { + + private final KafkaTopicPartitionState<?> ktp; + private final OffsetGaugeType gaugeType; + + public OffsetGauge(KafkaTopicPartitionState<?> ktp, OffsetGaugeType gaugeType) { + this.ktp = ktp; + this.gaugeType = gaugeType; + } + + @Override + public Long getValue() { + switch(gaugeType) { + case COMMITTED_OFFSET: + return ktp.getCommittedOffset(); + case CURRENT_OFFSET: + return ktp.getOffset(); + default: + throw new RuntimeException("Unknown gauge type: " + gaugeType); + } + } + } + // ------------------------------------------------------------------------ + + /** + * The periodic watermark emitter. In its given interval, it checks all partitions for + * the current event time watermark, and possibly emits the next watermark. + */ + private static class PeriodicWatermarkEmitter implements ProcessingTimeCallback { + + private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions; + + private final SourceContext<?> emitter; + + private final ProcessingTimeService timerService; + + private final long interval; + + private long lastWatermarkTimestamp; + + //------------------------------------------------- + + PeriodicWatermarkEmitter( + KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions, + SourceContext<?> emitter, + ProcessingTimeService timerService, + long autoWatermarkInterval) + { + this.allPartitions = checkNotNull(allPartitions); + this.emitter = checkNotNull(emitter); + this.timerService = checkNotNull(timerService); + this.interval = autoWatermarkInterval; + this.lastWatermarkTimestamp = Long.MIN_VALUE; + } + + //------------------------------------------------- + + public void start() { + timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this); + } + + @Override + public void onProcessingTime(long timestamp) throws Exception { + + long minAcrossAll = Long.MAX_VALUE; + for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) { + + // we access the current watermark for the periodic assigners under the state + // lock, to prevent concurrent modification to any internal variables + final long curr; + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (state) { + curr = state.getCurrentWatermarkTimestamp(); + } + + minAcrossAll = Math.min(minAcrossAll, curr); + } + + // emit next watermark, if there is one + if (minAcrossAll > lastWatermarkTimestamp) { + lastWatermarkTimestamp = minAcrossAll; + emitter.emitWatermark(new Watermark(minAcrossAll)); + } + + // schedule the next watermark + timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java new file mode 100644 index 0000000..c736493 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import javax.annotation.Nullable; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A proxy that communicates exceptions between threads. Typically used if an exception + * from a spawned thread needs to be recognized by the "parent" (spawner) thread. + * + * <p>The spawned thread would set the exception via {@link #reportError(Throwable)}. + * The parent would check (at certain points) for exceptions via {@link #checkAndThrowException()}. + * Optionally, the parent can pass itself in the constructor to be interrupted as soon as + * an exception occurs. + * + * <pre> + * {@code + * + * final ExceptionProxy errorProxy = new ExceptionProxy(Thread.currentThread()); + * + * Thread subThread = new Thread() { + * + * public void run() { + * try { + * doSomething(); + * } catch (Throwable t) { + * errorProxy.reportError( + * } finally { + * doSomeCleanup(); + * } + * } + * }; + * subThread.start(); + * + * doSomethingElse(); + * errorProxy.checkAndThrowException(); + * + * doSomethingMore(); + * errorProxy.checkAndThrowException(); + * + * try { + * subThread.join(); + * } catch (InterruptedException e) { + * errorProxy.checkAndThrowException(); + * // restore interrupted status, if not caused by an exception + * Thread.currentThread().interrupt(); + * } + * } + * </pre> + */ +public class ExceptionProxy { + + /** The thread that should be interrupted when an exception occurs */ + private final Thread toInterrupt; + + /** The exception to throw */ + private final AtomicReference<Throwable> exception; + + /** + * Creates an exception proxy that interrupts the given thread upon + * report of an exception. The thread to interrupt may be null. + * + * @param toInterrupt The thread to interrupt upon an exception. May be null. + */ + public ExceptionProxy(@Nullable Thread toInterrupt) { + this.toInterrupt = toInterrupt; + this.exception = new AtomicReference<>(); + } + + // ------------------------------------------------------------------------ + + /** + * Sets the exception and interrupts the target thread, + * if no other exception has occurred so far. + * + * <p>The exception is only set (and the interruption is only triggered), + * if no other exception was set before. + * + * @param t The exception that occurred + */ + public void reportError(Throwable t) { + // set the exception, if it is the first (and the exception is non null) + if (t != null && exception.compareAndSet(null, t) && toInterrupt != null) { + toInterrupt.interrupt(); + } + } + + /** + * Checks whether an exception has been set via {@link #reportError(Throwable)}. + * If yes, that exception if re-thrown by this method. + * + * @throws Exception This method re-throws the exception, if set. + */ + public void checkAndThrowException() throws Exception { + Throwable t = exception.get(); + if (t != null) { + if (t instanceof Exception) { + throw (Exception) t; + } + else if (t instanceof Error) { + throw (Error) t; + } + else { + throw new Exception(t); + } + } + } +}
