Repository: flink
Updated Branches:
  refs/heads/master 6b253d9f8 -> 864357bac


[FLINK-3043] [docs] Fix description of Kafka Consumer and Producer.

This also adds to the deprecated classes pointers forward to the designated 
classes.

This closes #1380


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/864357ba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/864357ba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/864357ba

Branch: refs/heads/master
Commit: 864357bacee3531d21a02c951c4b924fb0494eb6
Parents: 2061206
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 18 20:30:05 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 19 14:45:44 2015 +0100

----------------------------------------------------------------------
 docs/apis/kafka.md                              |  63 ----------
 docs/apis/streaming_guide.md                    | 121 ++++++-------------
 .../connectors/kafka/FlinkKafkaProducer.java    |   5 +-
 .../connectors/kafka/api/KafkaSink.java         |   2 +
 .../api/persistent/PersistentKafkaSource.java   |   5 +
 5 files changed, 47 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/864357ba/docs/apis/kafka.md
----------------------------------------------------------------------
diff --git a/docs/apis/kafka.md b/docs/apis/kafka.md
deleted file mode 100644
index 0c0790a..0000000
--- a/docs/apis/kafka.md
+++ /dev/null
@@ -1,63 +0,0 @@
----
-title: "Reading from Kafka"
-is_beta: true
----
-
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<a href="#top"></a>
-
-Interact with [Apache Kafka](https://kafka.apache.org/) streams from Flink's 
APIs.
-
-* This will be replaced by the TOC
-{:toc}
-
-
-Kafka Connector
------------
-
-### Background
-
-Flink provides special Kafka Connectors for reading and writing data to Kafka 
topics.
-The Flink Kafka Consumer integrates with Flink's checkpointing mechanisms to 
provide different 
-processing guarantees (most importantly exactly-once guarantees).
-
-For exactly-once processing Flink can not rely on the auto-commit capabilities 
of the Kafka consumers.
-The Kafka consumer might commit offsets to Kafka which have not been processed 
successfully.
-
-Flink provides different connector implementations for different use-cases and 
environments.
-
-
-
-
-### How to read data from Kafka
-
-#### Choose appropriate package and class
-
-Please pick a package (maven artifact id) and class name for your use-case and 
environment. For most users, the `flink-connector-kafka-083` package and the 
`FlinkKafkaConsumer082` class are appropriate.
-
-| Package                     | Supported Since | Class | Kafka Version | 
Allows exactly once processing | Notes |
-| -------------               |-------------| -----| ------ | ------ |
-| flink-connector-kafka       | 0.9, 0.10 | `KafkaSource` | 0.8.1, 0.8.2 | 
**No**, does not participate in checkpointing at all. | Uses the old, high 
level KafkaConsumer API, autocommits to ZK by Kafka |
-| flink-connector-kafka       | 0.9, 0.10 | `PersistentKafkaSource` | 0.8.1, 
0.8.2 | **No**, does not guarantee exactly-once processing, element order or 
strict partition assignment | Uses the old, high level KafkaConsumer API, 
offsets are committed into ZK manually |
-| flink-connector-kafka-083   | 0.9.1 0.10 | `FlinkKafkaConsumer081` | 0.8.1  
| **yes** | Uses the 
[SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example)
 API of Kafka internally. Offsets are committed to ZK manually |
-| flink-connector-kafka-083   | 0.9.1 0.10 | `FlinkKafkaConsumer082` | 0.8.2  
| **yes** | Uses the 
[SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example)
 API of Kafka internally. Offsets are committed to ZK manually |
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/864357ba/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 1c97dd9..fb6d86a 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -3367,74 +3367,43 @@ with connectors.
 
 This connector provides access to event streams served by [Apache 
Kafka](https://kafka.apache.org/).
 
-Flink provides special Kafka Connectors for reading and writing data to Kafka 
topics.
-The Flink Kafka Consumer integrates with Flink's checkpointing mechanisms to 
provide different
-processing guarantees (most importantly exactly-once guarantees).
-
-For exactly-once processing Flink can not rely on the auto-commit capabilities 
of the Kafka consumers.
-The Kafka consumer might commit offsets to Kafka which have not been processed 
successfully.
+Flink provides special Kafka Connectors for reading and writing data from/to 
Kafka topics.
+The Flink Kafka Consumer integrates with Flink's checkpointing mechanism to 
provide
+exactly-once processing semantics. To achieve that, Flink does not purely rely 
on Kafka's consumer group
+offset tracking, but tracks and checkpoints these offsets internally as well.
 
 Please pick a package (maven artifact id) and class name for your use-case and 
environment.
-For most users, the `flink-connector-kafka-083` package and the 
`FlinkKafkaConsumer082` class are appropriate.
+For most users, the `FlinkKafkaConsumer082` (part of `flink-connector-kafka`) 
is appropriate.
 
 
 <table class="table table-bordered">
   <thead>
     <tr>
-      <th class="text-left">Package</th>
+      <th class="text-left">Maven Dependency</th>
       <th class="text-left">Supported since</th>
       <th class="text-left">Class name</th>
-      <th class="text-left">Kafka version</th>      
-      <th class="text-left">Checkpointing behavior</th>
-      <th class="text-left">Notes</th>            
+      <th class="text-left">Kafka version</th>
+      <th class="text-left">Notes</th>
     </tr>
   </thead>
   <tbody>
     <tr>
         <td>flink-connector-kafka</td>
-        <td>0.9, 0.10</td>
-        <td>KafkaSource</td>
-       <td>0.8.1, 0.8.2</td>   
-       <td>Does not participate in checkpointing (no consistency 
guarantees)</td>
-       <td>Uses the old, high level KafkaConsumer API, autocommits to ZK via 
Kafka</td>        
-    </tr>
-    <tr>
-        <td>flink-connector-kafka</td>
-        <td>0.9, 0.10</td>
-        <td>PersistentKafkaSource</td>
-       <td>0.8.1, 0.8.2</td>   
-       <td>Does not guarantee exactly-once processing, element order, or 
strict partition assignment</td>
-       <td>Uses the old, high level KafkaConsumer API, offsets are committed 
into ZK manually</td>     
-    </tr>
-    <tr>
-        <td>flink-connector-kafka-083</td>
         <td>0.9.1, 0.10</td>
         <td>FlinkKafkaConsumer081</td>
-       <td>0.8.1</td>  
-       <td>Guarantees exactly-once processing</td>
-       <td>Uses the <a href = 
"https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example";>SimpleConsumer</a>
 API of Kafka internally. Offsets are committed to ZK manually</td>   
+        <td>0.8.1</td> 
+        <td>Uses the <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example";>SimpleConsumer</a>
 API of Kafka internally. Offsets are committed to ZK by Flink.</td>   
     </tr>
     <tr>
-        <td>flink-connector-kafka-083</td>
+        <td>flink-connector-kafka</td>
         <td>0.9.1, 0.10</td>
         <td>FlinkKafkaConsumer082</td>
-       <td>0.8.2</td>  
-       <td>Guarantee exactly-once processing</td>
-       <td>Uses the <a href = 
"https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example";>SimpleConsumer</a>
 API of Kafka internally. Offsets are committed to ZK manually</td>   
-    </tr>    
+        <td>0.8.2</td> 
+        <td>Uses the <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example";>SimpleConsumer</a>
 API of Kafka internally. Offsets are committed to ZK by Flink.</td>   
+    </tr>
   </tbody>
 </table>
 
-
-<!--
-| Package                     | Supported Since | Class | Kafka Version | 
Allows exactly once processing | Notes |
-| -------------               |-------------| -----| ------ | ------ |
-| flink-connector-kafka       | 0.9, 0.10 | `KafkaSource` | 0.8.1, 0.8.2 | 
**No**, does not participate in checkpointing at all. | Uses the old, high 
level KafkaConsumer API, autocommits to ZK by Kafka |
-| flink-connector-kafka       | 0.9, 0.10 | `PersistentKafkaSource` | 0.8.1, 
0.8.2 | **No**, does not guarantee exactly-once processing, element order or 
strict partition assignment | Uses the old, high level KafkaConsumer API, 
offsets are committed into ZK manually |
-| flink-connector-kafka-083   | 0.9.1 0.10 | `FlinkKafkaConsumer081` | 0.8.1  
| **yes** | Uses the 
[SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example)
 API of Kafka internally. Offsets are committed to ZK manually |
-| flink-connector-kafka-083   | 0.9.1 0.10 | `FlinkKafkaConsumer082` | 0.8.2  
| **yes** | Uses the 
[SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example)
 API of Kafka internally. Offsets are committed to ZK manually |
--->
-
 Then, import the connector in your maven project:
 
 {% highlight xml %}
@@ -3448,15 +3417,14 @@ Then, import the connector in your maven project:
 Note that the streaming connectors are currently not part of the binary 
distribution. See how to link with them for cluster execution 
[here](cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
 
 #### Installing Apache Kafka
+
 * Follow the instructions from [Kafka's 
quickstart](https://kafka.apache.org/documentation.html#quickstart) to download 
the code and launch a server (launching a Zookeeper and a Kafka server is 
required every time before starting the application).
 * On 32 bit computers 
[this](http://stackoverflow.com/questions/22325364/unrecognized-vm-option-usecompressedoops-when-running-kafka-from-my-ubuntu-in)
 problem may occur.
 * If the Kafka and Zookeeper servers are running on a remote machine, then the 
`advertised.host.name` setting in the `config/server.properties` file must be 
set to the machine's IP address.
 
 #### Kafka Consumer
 
-The standard `FlinkKafkaConsumer082` is a Kafka consumer providing access to 
one topic.
-
-The following parameters have to be provided for the 
`FlinkKafkaConsumer082(...)` constructor:
+The standard `FlinkKafkaConsumer082` is a Kafka consumer providing access to 
one topic. It takes the following parameters to the constructor:
 
 1. The topic name
 2. A DeserializationSchema
@@ -3495,12 +3463,12 @@ stream = env
 
 #### Kafka Consumers and Fault Tolerance
 
-As Kafka persists all the data, a fault tolerant Kafka consumer can be 
provided.
+With Flink's checkpointing enabled, the Flink Kafka Consumer will consume 
records from a topic and periodically checkpoint all
+its Kafka offsets, together with the state of other operations, in a 
consistent manner. In case of a job failure, Flink will restore
+the streaming program to the state of the latest checkpoint and re-consume the 
records from Kafka, starting from the offsets that where
+stored in the checkpoint.
 
-The FlinkKafkaConsumer082 can read a topic, and if the job fails for some 
reason, the source will
-continue on reading from where it left off after a restart.
-For example if there are 3 partitions in the topic with offsets 31, 122, 110 
read at the time of job
-failure, then at the time of restart it will continue on reading from those 
offsets, no matter whether these partitions have new messages.
+The interval of drawing checkpoints therefore defines how much the program may 
have to go back at most, in case of a failure.
 
 To use fault tolerant Kafka Consumers, checkpointing of the topology needs to 
be enabled at the execution environment:
 
@@ -3508,7 +3476,13 @@ To use fault tolerant Kafka Consumers, checkpointing of 
the topology needs to be
 <div data-lang="java" markdown="1">
 {% highlight java %}
 final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-env.enableCheckpointing(5000);
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
 {% endhighlight %}
 </div>
 </div>
@@ -3518,52 +3492,29 @@ So if the topology fails due to loss of a TaskManager, 
there must still be enoug
 Flink on YARN supports automatic restart of lost YARN containers.
 
 
-#### Kafka Sink
-
-A class providing an interface for sending data to Kafka.
+#### Kafka Producer
 
-The following arguments have to be provided for the `KafkaSink(…)` 
constructor in order:
-
-1. Broker address (in hostname:port format, can be a comma separated list)
-2. The topic name
-3. Serialization schema
+The `FlinkKafkaProducer` writes data to a Kafka topic. The producer can 
specify a custom partitioner that assigns
+recors to partitions.
 
 Example:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-stream.addSink(new KafkaSink<String>("localhost:9092", "test", new 
SimpleStringSchema()));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-stream.addSink(new KafkaSink[String]("localhost:9092", "test", new 
SimpleStringSchema))
-{% endhighlight %}
-</div>
-</div>
-
-The user can also define custom Kafka producer configuration for the KafkaSink 
with the constructor:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-public KafkaSink(String zookeeperAddress, String topicId, Properties 
producerConfig,
-      SerializationSchema<IN, byte[]> serializationSchema)
+stream.addSink(new FlinkKafkaProducer<String>("localhost:9092", "my-topic", 
new SimpleStringSchema()));
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-public KafkaSink(String zookeeperAddress, String topicId, Properties 
producerConfig,
-      SerializationSchema serializationSchema)
+stream.addSink(new FlinkKafkaProducer[String]("localhost:9092", "my-topic", 
new SimpleStringSchema()))
 {% endhighlight %}
 </div>
 </div>
 
-If this constructor is used, the user needs to make sure to set the broker(s) 
with the "metadata.broker.list" property.
-Also the serializer configuration should be left default, and the 
serialization should be set via SerializationSchema.
-
-The Apache Kafka official documentation can be found 
[here](https://kafka.apache.org/documentation.html).
+You can also define a custom Kafka producer configuration for the KafkaSink 
with the constructor. Please refer to
+the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) 
for details on how to configure
+Kafka Producers.
 
 [Back to top](#top)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/864357ba/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 715f5ee..5e08464 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -278,9 +278,12 @@ public class FlinkKafkaProducer<IN> extends 
RichSinkFunction<IN>  {
        
        public static Properties getPropertiesFromBrokerList(String brokerList) 
{
                String[] elements = brokerList.split(",");
-               for(String broker: elements) {
+               
+               // validate the broker addresses
+               for (String broker: elements) {
                        NetUtils.getCorrectHostnamePort(broker);
                }
+               
                Properties props = new Properties();
                props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerList);
                return props;

http://git-wip-us.apache.org/repos/asf/flink/blob/864357ba/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index f856926..e832f20 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -25,6 +25,8 @@ import 
org.apache.flink.streaming.util.serialization.SerializationSchema;
  *
  * The KafkaSink has been relocated to 
org.apache.flink.streaming.connectors.kafka.KafkaSink.
  * This class will be removed in future releases of Flink.
+ * 
+ * @deprecated Please use the {@link 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer} instead.
  */
 @Deprecated
 public class KafkaSink<IN> extends FlinkKafkaProducer<IN> {

http://git-wip-us.apache.org/repos/asf/flink/blob/864357ba/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
index 869c44f..2efeb20 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -30,6 +30,11 @@ import 
org.apache.flink.streaming.util.serialization.DeserializationSchema;
  * Please use FlinkKafkaConsumer081 and FlinkKafkaConsumer082.
  *
  * @param <T> The type of elements produced by this consumer.
+ * 
+ * @deprecated Due to Kafka protocol and architecture (offset handling) 
changes, please use the
+ *             Kafka version specific consumers, like
+ *             {@link 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081}, 
+ *             {@link 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082}, etc.
  */
 @Deprecated
 public class PersistentKafkaSource<T> extends FlinkKafkaConsumer<T> {

Reply via email to