Repository: flink Updated Branches: refs/heads/release-0.10 e977d8274 -> 9defe0cb2
[FLINK-3043] [docs] Fix description of Kafka Consumer and Producer. This also adds to the deprecated classes pointers forward to the designated classes. This closes #1380 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9defe0cb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9defe0cb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9defe0cb Branch: refs/heads/release-0.10 Commit: 9defe0cb289bfd8c3cfc37e8895e337147e82d29 Parents: 7441799 Author: Stephan Ewen <se...@apache.org> Authored: Wed Nov 18 20:30:05 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Nov 19 14:49:00 2015 +0100 ---------------------------------------------------------------------- docs/apis/kafka.md | 63 ---------- docs/apis/streaming_guide.md | 121 ++++++------------- .../connectors/kafka/FlinkKafkaProducer.java | 5 +- .../connectors/kafka/api/KafkaSink.java | 2 + .../api/persistent/PersistentKafkaSource.java | 5 + 5 files changed, 47 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9defe0cb/docs/apis/kafka.md ---------------------------------------------------------------------- diff --git a/docs/apis/kafka.md b/docs/apis/kafka.md deleted file mode 100644 index 0c0790a..0000000 --- a/docs/apis/kafka.md +++ /dev/null @@ -1,63 +0,0 @@ ---- -title: "Reading from Kafka" -is_beta: true ---- - -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -<a href="#top"></a> - -Interact with [Apache Kafka](https://kafka.apache.org/) streams from Flink's APIs. - -* This will be replaced by the TOC -{:toc} - - -Kafka Connector ------------ - -### Background - -Flink provides special Kafka Connectors for reading and writing data to Kafka topics. -The Flink Kafka Consumer integrates with Flink's checkpointing mechanisms to provide different -processing guarantees (most importantly exactly-once guarantees). - -For exactly-once processing Flink can not rely on the auto-commit capabilities of the Kafka consumers. -The Kafka consumer might commit offsets to Kafka which have not been processed successfully. - -Flink provides different connector implementations for different use-cases and environments. - - - - -### How to read data from Kafka - -#### Choose appropriate package and class - -Please pick a package (maven artifact id) and class name for your use-case and environment. For most users, the `flink-connector-kafka-083` package and the `FlinkKafkaConsumer082` class are appropriate. - -| Package | Supported Since | Class | Kafka Version | Allows exactly once processing | Notes | -| ------------- |-------------| -----| ------ | ------ | -| flink-connector-kafka | 0.9, 0.10 | `KafkaSource` | 0.8.1, 0.8.2 | **No**, does not participate in checkpointing at all. | Uses the old, high level KafkaConsumer API, autocommits to ZK by Kafka | -| flink-connector-kafka | 0.9, 0.10 | `PersistentKafkaSource` | 0.8.1, 0.8.2 | **No**, does not guarantee exactly-once processing, element order or strict partition assignment | Uses the old, high level KafkaConsumer API, offsets are committed into ZK manually | -| flink-connector-kafka-083 | 0.9.1 0.10 | `FlinkKafkaConsumer081` | 0.8.1 | **yes** | Uses the [SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example) API of Kafka internally. Offsets are committed to ZK manually | -| flink-connector-kafka-083 | 0.9.1 0.10 | `FlinkKafkaConsumer082` | 0.8.2 | **yes** | Uses the [SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example) API of Kafka internally. Offsets are committed to ZK manually | - - http://git-wip-us.apache.org/repos/asf/flink/blob/9defe0cb/docs/apis/streaming_guide.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md index a5b8b5b..3b3fd4f 100644 --- a/docs/apis/streaming_guide.md +++ b/docs/apis/streaming_guide.md @@ -3367,74 +3367,43 @@ with connectors. This connector provides access to event streams served by [Apache Kafka](https://kafka.apache.org/). -Flink provides special Kafka Connectors for reading and writing data to Kafka topics. -The Flink Kafka Consumer integrates with Flink's checkpointing mechanisms to provide different -processing guarantees (most importantly exactly-once guarantees). - -For exactly-once processing Flink can not rely on the auto-commit capabilities of the Kafka consumers. -The Kafka consumer might commit offsets to Kafka which have not been processed successfully. +Flink provides special Kafka Connectors for reading and writing data from/to Kafka topics. +The Flink Kafka Consumer integrates with Flink's checkpointing mechanism to provide +exactly-once processing semantics. To achieve that, Flink does not purely rely on Kafka's consumer group +offset tracking, but tracks and checkpoints these offsets internally as well. Please pick a package (maven artifact id) and class name for your use-case and environment. -For most users, the `flink-connector-kafka-083` package and the `FlinkKafkaConsumer082` class are appropriate. +For most users, the `FlinkKafkaConsumer082` (part of `flink-connector-kafka`) is appropriate. <table class="table table-bordered"> <thead> <tr> - <th class="text-left">Package</th> + <th class="text-left">Maven Dependency</th> <th class="text-left">Supported since</th> <th class="text-left">Class name</th> - <th class="text-left">Kafka version</th> - <th class="text-left">Checkpointing behavior</th> - <th class="text-left">Notes</th> + <th class="text-left">Kafka version</th> + <th class="text-left">Notes</th> </tr> </thead> <tbody> <tr> <td>flink-connector-kafka</td> - <td>0.9, 0.10</td> - <td>KafkaSource</td> - <td>0.8.1, 0.8.2</td> - <td>Does not participate in checkpointing (no consistency guarantees)</td> - <td>Uses the old, high level KafkaConsumer API, autocommits to ZK via Kafka</td> - </tr> - <tr> - <td>flink-connector-kafka</td> - <td>0.9, 0.10</td> - <td>PersistentKafkaSource</td> - <td>0.8.1, 0.8.2</td> - <td>Does not guarantee exactly-once processing, element order, or strict partition assignment</td> - <td>Uses the old, high level KafkaConsumer API, offsets are committed into ZK manually</td> - </tr> - <tr> - <td>flink-connector-kafka-083</td> <td>0.9.1, 0.10</td> <td>FlinkKafkaConsumer081</td> - <td>0.8.1</td> - <td>Guarantees exactly-once processing</td> - <td>Uses the <a href = "https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK manually</td> + <td>0.8.1</td> + <td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td> </tr> <tr> - <td>flink-connector-kafka-083</td> + <td>flink-connector-kafka</td> <td>0.9.1, 0.10</td> <td>FlinkKafkaConsumer082</td> - <td>0.8.2</td> - <td>Guarantee exactly-once processing</td> - <td>Uses the <a href = "https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK manually</td> - </tr> + <td>0.8.2</td> + <td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td> + </tr> </tbody> </table> - -<!-- -| Package | Supported Since | Class | Kafka Version | Allows exactly once processing | Notes | -| ------------- |-------------| -----| ------ | ------ | -| flink-connector-kafka | 0.9, 0.10 | `KafkaSource` | 0.8.1, 0.8.2 | **No**, does not participate in checkpointing at all. | Uses the old, high level KafkaConsumer API, autocommits to ZK by Kafka | -| flink-connector-kafka | 0.9, 0.10 | `PersistentKafkaSource` | 0.8.1, 0.8.2 | **No**, does not guarantee exactly-once processing, element order or strict partition assignment | Uses the old, high level KafkaConsumer API, offsets are committed into ZK manually | -| flink-connector-kafka-083 | 0.9.1 0.10 | `FlinkKafkaConsumer081` | 0.8.1 | **yes** | Uses the [SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example) API of Kafka internally. Offsets are committed to ZK manually | -| flink-connector-kafka-083 | 0.9.1 0.10 | `FlinkKafkaConsumer082` | 0.8.2 | **yes** | Uses the [SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example) API of Kafka internally. Offsets are committed to ZK manually | ---> - Then, import the connector in your maven project: {% highlight xml %} @@ -3448,15 +3417,14 @@ Then, import the connector in your maven project: Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution [here](cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). #### Installing Apache Kafka + * Follow the instructions from [Kafka's quickstart](https://kafka.apache.org/documentation.html#quickstart) to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application). * On 32 bit computers [this](http://stackoverflow.com/questions/22325364/unrecognized-vm-option-usecompressedoops-when-running-kafka-from-my-ubuntu-in) problem may occur. * If the Kafka and Zookeeper servers are running on a remote machine, then the `advertised.host.name` setting in the `config/server.properties` file the must be set to the machine's IP address. #### Kafka Consumer -The standard `FlinkKafkaConsumer082` is a Kafka consumer providing access to one topic. - -The following parameters have to be provided for the `FlinkKafkaConsumer082(...)` constructor: +The standard `FlinkKafkaConsumer082` is a Kafka consumer providing access to one topic. It takes the following parameters to the constructor: 1. The topic name 2. A DeserializationSchema @@ -3495,12 +3463,12 @@ stream = env #### Kafka Consumers and Fault Tolerance -As Kafka persists all the data, a fault tolerant Kafka consumer can be provided. +With Flink's checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all +its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore +the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that where +stored in the checkpoint. -The FlinkKafkaConsumer082 can read a topic, and if the job fails for some reason, the source will -continue on reading from where it left off after a restart. -For example if there are 3 partitions in the topic with offsets 31, 122, 110 read at the time of job -failure, then at the time of restart it will continue on reading from those offsets, no matter whether these partitions have new messages. +The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure. To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment: @@ -3508,7 +3476,13 @@ To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be <div data-lang="java" markdown="1"> {% highlight java %} final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.enableCheckpointing(5000); +env.enableCheckpointing(5000); // checkpoint every 5000 msecs +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.enableCheckpointing(5000) // checkpoint every 5000 msecs {% endhighlight %} </div> </div> @@ -3518,52 +3492,29 @@ So if the topology fails due to loss of a TaskManager, there must still be enoug Flink on YARN supports automatic restart of lost YARN containers. -#### Kafka Sink - -A class providing an interface for sending data to Kafka. +#### Kafka Producer -The following arguments have to be provided for the `KafkaSink(â¦)` constructor in order: - -1. Broker address (in hostname:port format, can be a comma separated list) -2. The topic name -3. Serialization schema +The `FlinkKafkaProducer` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns +recors to partitions. Example: <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -stream.addSink(new KafkaSink<String>("localhost:9092", "test", new SimpleStringSchema())); -{% endhighlight %} -</div> -<div data-lang="scala" markdown="1"> -{% highlight scala %} -stream.addSink(new KafkaSink[String]("localhost:9092", "test", new SimpleStringSchema)) -{% endhighlight %} -</div> -</div> - -The user can also define custom Kafka producer configuration for the KafkaSink with the constructor: - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig, - SerializationSchema<IN, byte[]> serializationSchema) +stream.addSink(new FlinkKafkaProducer<String>("localhost:9092", "my-topic", new SimpleStringSchema())); {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig, - SerializationSchema serializationSchema) +stream.addSink(new FlinkKafkaProducer[String]("localhost:9092", "my-topic", new SimpleStringSchema())) {% endhighlight %} </div> </div> -If this constructor is used, the user needs to make sure to set the broker(s) with the "metadata.broker.list" property. -Also the serializer configuration should be left default, and the serialization should be set via SerializationSchema. - -The Apache Kafka official documentation can be found [here](https://kafka.apache.org/documentation.html). +You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to +the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for details on how to configure +Kafka Producers. [Back to top](#top) http://git-wip-us.apache.org/repos/asf/flink/blob/9defe0cb/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java index 715f5ee..5e08464 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java @@ -278,9 +278,12 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN> { public static Properties getPropertiesFromBrokerList(String brokerList) { String[] elements = brokerList.split(","); - for(String broker: elements) { + + // validate the broker addresses + for (String broker: elements) { NetUtils.getCorrectHostnamePort(broker); } + Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); return props; http://git-wip-us.apache.org/repos/asf/flink/blob/9defe0cb/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java index f856926..e832f20 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java @@ -25,6 +25,8 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema; * * The KafkaSink has been relocated to org.apache.flink.streaming.connectors.kafka.KafkaSink. * This class will be removed in future releases of Flink. + * + * @deprecated Please use the {@link org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} instead. */ @Deprecated public class KafkaSink<IN> extends FlinkKafkaProducer<IN> { http://git-wip-us.apache.org/repos/asf/flink/blob/9defe0cb/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java index 869c44f..2efeb20 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java @@ -30,6 +30,11 @@ import org.apache.flink.streaming.util.serialization.DeserializationSchema; * Please use FlinkKafkaConsumer081 and FlinkKafkaConsumer082. * * @param <T> The type of elements produced by this consumer. + * + * @deprecated Due to Kafka protocol and architecture (offset handling) changes, please use the + * Kafka version specific consumers, like + * {@link org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081}, + * {@link org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082}, etc. */ @Deprecated public class PersistentKafkaSource<T> extends FlinkKafkaConsumer<T> {