This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new f7fa02f  [FLINK-17076][docs] Revamp Kafka Connector Documentation
f7fa02f is described below

commit f7fa02fdb450af9d11c92bd90751fa9161add47a
Author: Seth Wiesman <[email protected]>
AuthorDate: Tue May 19 15:08:13 2020 -0500

    [FLINK-17076][docs] Revamp Kafka Connector Documentation
    
    This closes #12257
---
 docs/dev/connectors/kafka.md | 488 +++++++++++++++----------------------------
 1 file changed, 164 insertions(+), 324 deletions(-)

diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index d084808..6f21ff7 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -23,125 +23,59 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+Flink provides an [Apache Kafka](https://kafka.apache.org) connector for 
reading data from and writing data to Kafka topics with exactly-once guaruntees.
+
 * This will be replaced by the TOC
 {:toc}
 
-This connector provides access to event streams served by [Apache 
Kafka](https://kafka.apache.org/).
-
-Flink provides special Kafka Connectors for reading and writing data from/to 
Kafka topics.
-The Flink Kafka Consumer integrates with Flink's checkpointing mechanism to 
provide
-exactly-once processing semantics. To achieve that, Flink does not purely rely 
on Kafka's consumer group
-offset tracking, but tracks and checkpoints these offsets internally as well.
-
-Please pick a package (maven artifact id) and class name for your use-case and 
environment.
-For most users, the `FlinkKafkaConsumer010` (part of `flink-connector-kafka`) 
is appropriate.
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left">Maven Dependency</th>
-      <th class="text-left">Supported since</th>
-      <th class="text-left">Consumer and <br>
-      Producer Class name</th>
-      <th class="text-left">Kafka version</th>
-      <th class="text-left">Notes</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td>flink-connector-kafka-0.10{{ site.scala_version_suffix }}</td>
-        <td>1.2.0</td>
-        <td>FlinkKafkaConsumer010<br>
-        FlinkKafkaProducer010</td>
-        <td>0.10.x</td>
-        <td>This connector supports <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message";>Kafka
 messages with timestamps</a> both for producing and consuming.</td>
-    </tr>
-    <tr>
-        <td>flink-connector-kafka-0.11{{ site.scala_version_suffix }}</td>
-        <td>1.4.0</td>
-        <td>FlinkKafkaConsumer011<br>
-        FlinkKafkaProducer011</td>
-        <td>0.11.x</td>
-        <td>Since 0.11.x Kafka does not support scala 2.10. This connector 
supports <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging";>Kafka
 transactional messaging</a> to provide exactly once semantic for the 
producer.</td>
-    </tr>
-    <tr>
-        <td>flink-connector-kafka{{ site.scala_version_suffix }}</td>
-        <td>1.7.0</td>
-        <td>FlinkKafkaConsumer<br>
-        FlinkKafkaProducer</td>
-        <td>>= 1.0.0</td>
-        <td>
-        This universal Kafka connector attempts to track the latest version of 
the Kafka client.
-        The version of the client it uses may change between Flink releases. 
Starting with Flink 1.9 release, it uses the Kafka 2.2.0 client.
-        Modern Kafka clients are backwards compatible with broker versions 
0.10.0 or later.
-        However for Kafka 0.11.x and 0.10.x versions, we recommend using 
dedicated
-        flink-connector-kafka-0.11{{ site.scala_version_suffix }} and 
flink-connector-kafka-0.10{{ site.scala_version_suffix }} respectively.
-        </td>
-    </tr>
-  </tbody>
-</table>
-
-Then, import the connector in your maven project:
+## Dependency
+
+Apache Flink ships with multiple Kafka connectors: universal, 0.10, and 0.11.
+This universal Kafka connector attempts to track the latest version of the 
Kafka client.
+The version of the client it uses may change between Flink releases.
+Modern Kafka clients are backwards compatible with broker versions 0.10.0 or 
later.
+For most users the universal Kafka connector is the most appropriate.
+However, for Kafka versions 0.11.x and 0.10.x, we recommend using the 
dedicated ``0.11`` and ``0.10`` connectors, respectively.
+For details on Kafka compatibility, please refer to the official [Kafka 
documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
 
+<div class="codetabs" markdown="1">
+<div data-lang="universal" markdown="1">
 {% highlight xml %}
 <dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-kafka{{ site.scala_version_suffix }}</artifactId>
-  <version>{{ site.version }}</version>
+       <groupId>org.apache.flink</groupId>
+       <artifactId>flink-connector-kafka{{ site.scala_version_suffix 
}}</artifactId>
+       <version>{{ site.version }}</version>
+</dependency>
+{% endhighlight %} 
+</div>
+<div data-lang="011" markdown="1">
+{% highlight xml %}
+<dependency>
+       <groupId>org.apache.flink</groupId>
+       <artifactId>flink-connector-kafka-011{{ site.scala_version_suffix 
}}</artifactId>
+       <version>{{ site.version }}</version>
 </dependency>
 {% endhighlight %}
-
-Note that the streaming connectors are currently not part of the binary 
distribution.
-See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/projectsetup/dependencies.html).
-
-## Installing Apache Kafka
-
-* Follow the instructions from [Kafka's 
quickstart](https://kafka.apache.org/documentation.html#quickstart) to download 
the code and launch a server (launching a Zookeeper and a Kafka server is 
required every time before starting the application).
-* If the Kafka and Zookeeper servers are running on a remote machine, then the 
`advertised.host.name` setting in the `config/server.properties` file must be 
set to the machine's IP address.
-
-## Kafka 1.0.0+ Connector
-
-Starting with Flink 1.7, there is a new universal Kafka connector that does 
not track a specific Kafka major version.
-Rather, it tracks the latest version of Kafka at the time of the Flink release.
-
-If your Kafka broker version is 1.0.0 or newer, you should use this Kafka 
connector.
-If you use an older version of Kafka (0.11 or 0.10), you should use the 
connector corresponding to the broker version.
-
-### Compatibility
-
-The universal Kafka connector is compatible with older and newer Kafka brokers 
through the compatibility guarantees of the Kafka client API and broker.
-It is compatible with broker versions 0.11.0 or newer, depending on the 
features used.
-For details on Kafka compatibility, please refer to the [Kafka 
documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
-
-### Migrating Kafka Connector from 0.11 to universal
-
-In order to perform the migration, see the [upgrading jobs and Flink versions 
guide]({{ site.baseurl }}/ops/upgrading.html)
-and:
-* Use Flink 1.9 or newer for the whole process.
-* Do not upgrade the Flink and operators at the same time.
-* Make sure that Kafka Consumer and/or Kafka Producer used in your job have 
assigned unique identifiers (`uid`):
-* Use stop with savepoint feature to take the savepoint (for example by using 
`stop --withSavepoint`)[CLI command]({{ site.baseurl }}/ops/cli.html).
-
-### Usage
-
-To use the universal Kafka connector add a dependency to it:
-
+</div>
+<div data-lang="010" markdown="1">
 {% highlight xml %}
 <dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-kafka{{ site.scala_version_suffix }}</artifactId>
-  <version>{{ site.version }}</version>
+       <groupId>org.apache.flink</groupId>
+       <artifactId>flink-connector-kafka-010{{ site.scala_version_suffix 
}}</artifactId>
+       <version>{{ site.version }}</version>
 </dependency>
 {% endhighlight %}
+<span class="label label-danger">Attention</span> The ``0.10`` sink does not 
support exactly-once writes to Kafka.
+</div>
+</div>
 
-Then instantiate the new source (`FlinkKafkaConsumer`) and sink 
(`FlinkKafkaProducer`).
-The API is backward compatible with the Kafka 0.11 connector,
-except of dropping specific Kafka version from the module and class names.
+Flink's streaming connectors are not currently part of the binary distribution.
+See how to link with them for cluster execution [here]({{ 
site.baseurl}}/dev/projectsetup/dependencies.html).
 
 ## Kafka Consumer
 
-Flink's Kafka consumer is called `FlinkKafkaConsumer010` (or 011 for Kafka 
0.11.0.x versions, etc.
-or just `FlinkKafkaConsumer` for Kafka >= 1.0.0 versions). It provides access 
to one or more Kafka topics.
+Flink's Kafka consumer - `FlinkKafkaConsumer` (or `FlinkKafkaConsumer011` for 
Kafka 0.11.x,
+or `FlinkKafkaConsumer010` for Kafka 0.10.x) - provides access to read from 
one or more Kafka topics.
 
 The constructor accepts the following arguments:
 
@@ -152,8 +86,6 @@ The constructor accepts the following arguments:
   - "bootstrap.servers" (comma separated list of Kafka brokers)
   - "group.id" the id of the consumer group
 
-Example:
-
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
@@ -161,7 +93,7 @@ Properties properties = new Properties();
 properties.setProperty("bootstrap.servers", "localhost:9092");
 properties.setProperty("group.id", "test");
 DataStream<String> stream = env
-       .addSource(new FlinkKafkaConsumer010<>("topic", new 
SimpleStringSchema(), properties));
+       .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), 
properties));
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
@@ -170,26 +102,17 @@ val properties = new Properties()
 properties.setProperty("bootstrap.servers", "localhost:9092")
 properties.setProperty("group.id", "test")
 stream = env
-    .addSource(new FlinkKafkaConsumer010[String]("topic", new 
SimpleStringSchema(), properties))
-    .print()
+    .addSource(new FlinkKafkaConsumer[String]("topic", new 
SimpleStringSchema(), properties))
 {% endhighlight %}
 </div>
 </div>
 
 ### The `DeserializationSchema`
 
-The Flink Kafka Consumer needs to know how to turn the binary data in Kafka 
into Java/Scala objects. The
-`DeserializationSchema` allows users to specify such a schema. The `T 
deserialize(byte[] message)`
-method gets called for each Kafka message, passing the value from Kafka.
-
-It is usually helpful to start from the `AbstractDeserializationSchema`, which 
takes care of describing the
-produced Java/Scala type to Flink's type system. Users that implement a 
vanilla `DeserializationSchema` need
-to implement the `getProducedType(...)` method themselves.
+The Flink Kafka Consumer needs to know how to turn the binary data in Kafka 
into Java/Scala objects.
+The `KafkaDeserializationSchema` allows users to specify such a schema. The `T 
deserialize(ConsumerRecord<byte[], byte[]> record)` method gets called for each 
Kafka message, passing the value from Kafka.
 
-For accessing the key, value and metadata of the Kafka message, the 
`KafkaDeserializationSchema` has
-the following deserialize method `T deserialize(ConsumerRecord<byte[], byte[]> 
record)`.
-
-For convenience, Flink provides the following schemas:
+For convenience, Flink provides the following schemas out of the box:
 
 1. `TypeInformationSerializationSchema` (and 
`TypeInformationKeyValueSerializationSchema`) which creates
     a schema based on a Flink's `TypeInformation`. This is useful if the data 
is both written and read by Flink.
@@ -216,46 +139,37 @@ For convenience, Flink provides the following schemas:
 <div data-lang="AvroDeserializationSchema" markdown="1">
 {% highlight xml %}
 <dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-avro</artifactId>
-  <version>{{site.version }}</version>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>flink-avro</artifactId>
+    <version>{{site.version }}</version>
 </dependency>
 {% endhighlight %}
 </div>
 <div data-lang="ConfluentRegistryAvroDeserializationSchema" markdown="1">
 {% highlight xml %}
 <dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-avro-confluent-registry</artifactId>
-  <version>{{site.version }}</version>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>flink-avro-confluent-registry</artifactId>
+    <version>{{site.version }}</version>
 </dependency>
 {% endhighlight %}
 </div>
 </div>
 
-When encountering a corrupted message that cannot be deserialized for any 
reason, there
-are two options - either throwing an exception from the `deserialize(...)` 
method
-which will cause the job to fail and be restarted, or returning `null` to allow
-the Flink Kafka consumer to silently skip the corrupted message. Note that
-due to the consumer's fault tolerance (see below sections for more details),
-failing the job on the corrupted message will let the consumer attempt
-to deserialize the message again. Therefore, if deserialization still fails, 
the
-consumer will fall into a non-stop restart and fail loop on that corrupted
-message.
+When encountering a corrupted message that cannot be deserialized for any 
reason the deserialization schema should return null which will result in the 
record being skipped.
+Due to the consumer's fault tolerance (see below sections for more details), 
failing the job on the corrupted message will let the consumer attempt to 
deserialize the message again.
+Therefore, if deserialization still fails, the consumer will fall into a 
non-stop restart and fail loop on that corrupted message.
 
 ### Kafka Consumers Start Position Configuration
 
-The Flink Kafka Consumer allows configuring how the start position for Kafka
-partitions are determined.
-
-Example:
+The Flink Kafka Consumer allows configuring how the start positions for Kafka 
partitions are determined.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<>(...);
+FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
 myConsumer.setStartFromEarliest();     // start from the earliest record 
possible
 myConsumer.setStartFromLatest();       // start from the latest record
 myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp 
(milliseconds)
@@ -269,7 +183,7 @@ DataStream<String> stream = env.addSource(myConsumer);
 {% highlight scala %}
 val env = StreamExecutionEnvironment.getExecutionEnvironment()
 
-val myConsumer = new FlinkKafkaConsumer010[String](...)
+val myConsumer = new FlinkKafkaConsumer[String](...)
 myConsumer.setStartFromEarliest()      // start from the earliest record 
possible
 myConsumer.setStartFromLatest()        // start from the latest record
 myConsumer.setStartFromTimestamp(...)  // start from specified epoch timestamp 
(milliseconds)
@@ -339,34 +253,14 @@ fault tolerance for the consumer).
 ### Kafka Consumers and Fault Tolerance
 
 With Flink's checkpointing enabled, the Flink Kafka Consumer will consume 
records from a topic and periodically checkpoint all
-its Kafka offsets, together with the state of other operations, in a 
consistent manner. In case of a job failure, Flink will restore
+its Kafka offsets, together with the state of other operations. In case of a 
job failure, Flink will restore
 the streaming program to the state of the latest checkpoint and re-consume the 
records from Kafka, starting from the offsets that were
 stored in the checkpoint.
 
 The interval of drawing checkpoints therefore defines how much the program may 
have to go back at most, in case of a failure.
+To use fault tolerant Kafka Consumers, checkpointing of the topology needs to 
be enabled in the [job]({{ site.baseurl 
}}/ops/config.html#execution-checkpointing-interval).
 
-To use fault tolerant Kafka Consumers, checkpointing of the topology needs to 
be enabled at the execution environment:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-env.enableCheckpointing(5000); // checkpoint every 5000 msecs
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-env.enableCheckpointing(5000) // checkpoint every 5000 msecs
-{% endhighlight %}
-</div>
-</div>
-
-Also note that Flink can only restart the topology if enough processing slots 
are available to restart the topology.
-So if the topology fails due to loss of a TaskManager, there must still be 
enough slots available afterwards.
-Flink on YARN supports automatic restart of lost YARN containers.
-
-If checkpointing is not enabled, the Kafka consumer will periodically commit 
the offsets to Zookeeper.
+If checkpointing is disabled, the Kafka consumer will periodically commit the 
offsets to Zookeeper.
 
 ### Kafka Consumers Topic and Partition Discovery
 
@@ -380,15 +274,9 @@ By default, partition discovery is disabled. To enable it, 
set a non-negative va
 for `flink.partition-discovery.interval-millis` in the provided properties 
config,
 representing the discovery interval in milliseconds. 
 
-<span class="label label-danger">Limitation</span> When the consumer is 
restored from a savepoint from Flink versions
-prior to Flink 1.3.x, partition discovery cannot be enabled on the restore 
run. If enabled, the restore would fail
-with an exception. In this case, in order to use partition discovery, please 
first take a savepoint in Flink 1.3.x and
-then restore again from that.
-
 #### Topic discovery
 
-At a higher-level, the Flink Kafka Consumer is also capable of discovering 
topics, based on pattern matching on the
-topic names using regular expressions. See the below for an example:
+The Kafka Consumer is also capable of discovering topics by matching topic 
names using regular expressions.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -399,7 +287,7 @@ Properties properties = new Properties();
 properties.setProperty("bootstrap.servers", "localhost:9092");
 properties.setProperty("group.id", "test");
 
-FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
+FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(
     java.util.regex.Pattern.compile("test-topic-[0-9]"),
     new SimpleStringSchema(),
     properties);
@@ -416,7 +304,7 @@ val properties = new Properties()
 properties.setProperty("bootstrap.servers", "localhost:9092")
 properties.setProperty("group.id", "test")
 
-val myConsumer = new FlinkKafkaConsumer010[String](
+val myConsumer = new FlinkKafkaConsumer[String](
   java.util.regex.Pattern.compile("test-topic-[0-9]"),
   new SimpleStringSchema,
   properties)
@@ -445,7 +333,7 @@ tolerance guarantees. The committed offsets are only a 
means to expose
 the consumer's progress for monitoring purposes.
 
 The way to configure offset commit behaviour is different, depending on
-whether or not checkpointing is enabled for the job.
+whether checkpointing is enabled for the job.
 
  - *Checkpointing disabled:* if checkpointing is disabled, the Flink Kafka
  Consumer relies on the automatic periodic offset committing capability
@@ -465,15 +353,14 @@ whether or not checkpointing is enabled for the job.
 
 ### Kafka Consumers and Timestamp Extraction/Watermark Emission
 
-In many scenarios, the timestamp of a record is embedded (explicitly or 
implicitly) in the record itself.
-In addition, the user may want to emit watermarks either periodically, or in 
an irregular fashion, e.g. based on
+In many scenarios, the timestamp of a record is embedded in the record itself, 
or the metadata of the `ConsumerRecord`.
+In addition, users may want to emit watermarks either periodically, or 
irregularly, e.g. based on
 special records in the Kafka stream that contain the current event-time 
watermark. For these cases, the Flink Kafka
-Consumer allows the specification of an `AssignerWithPeriodicWatermarks` or an 
`AssignerWithPunctuatedWatermarks`.
+Consumer allows the specification of a [watermark strategy]({% link 
dev/event_time.md %}).
 
-You can specify your custom timestamp extractor/watermark emitter as described
-[here]({{ site.baseurl }}/dev/event_timestamps_watermarks.html), or use one 
from the
-[predefined ones]({{ site.baseurl }}/dev/event_timestamp_extractors.html). 
After doing so, you
-can pass it to your consumer in the following way:
+You can specify your custom strategy as described
+[here]({% link dev/event_timestamps_watermarks.md %}), or use one from the
+[predefined ones]({% link dev/event_timestamp_extractors.md %}). 
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -482,13 +369,14 @@ Properties properties = new Properties();
 properties.setProperty("bootstrap.servers", "localhost:9092");
 properties.setProperty("group.id", "test");
 
-FlinkKafkaConsumer010<String> myConsumer =
-    new FlinkKafkaConsumer010<>("topic", new SimpleStringSchema(), properties);
-myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
+FlinkKafkaConsumer<String> myConsumer =
+    new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
+myConsumer.assignTimestampsAndWatermarks(
+    WatermarkStrategies.
+        .<String>forBoundedOutOfOrderness(Duration.ofSeconds(20))
+        .build());
 
-DataStream<String> stream = env
-       .addSource(myConsumer)
-       .print();
+DataStream<String> stream = env.addSource(myConsumer);
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
@@ -497,51 +385,51 @@ val properties = new Properties()
 properties.setProperty("bootstrap.servers", "localhost:9092")
 properties.setProperty("group.id", "test")
 
-val myConsumer = new FlinkKafkaConsumer010[String]("topic", new 
SimpleStringSchema(), properties)
-myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
-stream = env
-    .addSource(myConsumer)
-    .print()
+val myConsumer =
+    new FlinkKafkaConsumer("topic", new SimpleStringSchema(), properties);
+myConsumer.assignTimestampsAndWatermarks(
+    WatermarkStrategies.
+        .forBoundedOutOfOrderness[String](Duration.ofSeconds(20))
+        .build())
+
+val stream = env.addSource(myConsumer)
 {% endhighlight %}
 </div>
 </div>
 
-Internally, an instance of the assigner is executed per Kafka partition.
-When such an assigner is specified, for each record read from Kafka, the
-`extractTimestamp(T element, long previousElementTimestamp)` is called to 
assign a timestamp to the record and
-the `Watermark getCurrentWatermark()` (for periodic) or the
-`Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)` 
(for punctuated) is called to determine
-if a new watermark should be emitted and with which timestamp.
 
 **Note**: If a watermark assigner depends on records read from Kafka to 
advance its watermarks
 (which is commonly the case), all topics and partitions need to have a 
continuous stream of records.
 Otherwise, the watermarks of the whole application cannot advance and all 
time-based operations,
 such as time windows or functions with timers, cannot make progress. A single 
idle Kafka partition causes this behavior.
-A Flink improvement is planned to prevent this from happening 
-(see [FLINK-5479: Per-partition watermarks in FlinkKafkaConsumer should 
consider idle partitions](
-https://issues.apache.org/jira/browse/FLINK-5479)).
-In the meanwhile, a possible workaround is to send *heartbeat messages* to all 
consumed partitions that advance the watermarks of idle partitions.
-
+Consider setting appropriate [idelness timeouts]({{ site.baseurl 
}}/dev/event_timestamps_watermarks.html#dealing-with-idle-sources) to mitigate 
this issue.
+ 
 ## Kafka Producer
 
-Flink’s Kafka Producer is called `FlinkKafkaProducer011` (or `010` for Kafka 
0.10.0.x versions, etc. or just `FlinkKafkaProducer` for Kafka >= 1.0.0 
versions).
-It allows writing a stream of records to one or more Kafka topics.
+Flink’s Kafka Producer - `FlinkKafkaProducer` (or `FlinkKafkaProducer010` for 
Kafka 0.10.x versions or `FlinkKafkaProducer011` for Kafka 0.11.x versions) -
+ allows writing a stream of records to one or more Kafka topics.
+
+The constructor accepts the following arguments:
 
-Example:
+1. A default output topic where events should be written
+2. A SerializationSchema / KafkaSerializationSchema for serializing data into 
Kafka
+3. Properties for the Kafka client. The following properties are required:
+    * "bootstrap.servers" (comma separated list of Kafka brokers)
+4. A fault-tolerance semantic
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-DataStream<String> stream = ...;
+DataStream<String> stream = ...
 
-FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
-        "localhost:9092",            // broker list
-        "my-topic",                  // target topic
-        new SimpleStringSchema());   // serialization schema
+Properties properties = new Properties();
+properties.setProperty("bootstrap.servers", "localhost:9092");
 
-// versions 0.10+ allow attaching the records' event timestamp when writing 
them to Kafka;
-// this method is not available for earlier Kafka versions
-myProducer.setWriteTimestampToKafka(true);
+FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
+        "my-topic",                  // target topic
+        new SimpleStringSchema(),    // serialization schema
+        properties,                  // producer config
+        FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
 
 stream.addSink(myProducer);
 {% endhighlight %}
@@ -550,91 +438,36 @@ stream.addSink(myProducer);
 {% highlight scala %}
 val stream: DataStream[String] = ...
 
-val myProducer = new FlinkKafkaProducer011[String](
-        "localhost:9092",         // broker list
-        "my-topic",               // target topic
-        new SimpleStringSchema)   // serialization schema
+Properties properties = new Properties
+properties.setProperty("bootstrap.servers", "localhost:9092")
 
-// versions 0.10+ allow attaching the records' event timestamp when writing 
them to Kafka;
-// this method is not available for earlier Kafka versions
-myProducer.setWriteTimestampToKafka(true)
+val myProducer = new FlinkKafkaProducer[String](
+        "my-topic",                  // target topic
+        new SimpleStringSchema(),    // serialization schema
+        properties,                  // producer config
+        FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // fault-tolerance
 
 stream.addSink(myProducer)
 {% endhighlight %}
 </div>
 </div>
 
-The above examples demonstrate the basic usage of creating a Flink Kafka 
Producer
-to write streams to a single Kafka target topic. For more advanced usages, 
there
-are other constructor variants that allow providing the following:
-
- * *Providing custom properties*:
- The producer allows providing a custom properties configuration for the 
internal `KafkaProducer`.
- Please refer to the [Apache Kafka 
documentation](https://kafka.apache.org/documentation.html) for
- details on how to configure Kafka Producers.
- * *Custom partitioner*: To assign records to specific
- partitions, you can provide an implementation of a `FlinkKafkaPartitioner` to 
the
- constructor. This partitioner will be called for each record in the stream
- to determine which exact partition of the target topic the record should be 
sent to.
- Please see [Kafka Producer Partitioning 
Scheme](#kafka-producer-partitioning-scheme) for more details.
- * *Advanced serialization schema*: Similar to the consumer,
- the producer also allows using an advanced serialization schema called 
`KeyedSerializationSchema`,
- which allows serializing the key and value separately. It also allows to 
override the target topic,
- so that one producer instance can send data to multiple topics.
- 
-### Kafka Producer Partitioning Scheme
- 
-By default, if a custom partitioner is not specified for the Flink Kafka 
Producer, the producer will use
-a `FlinkFixedPartitioner` that maps each Flink Kafka Producer parallel subtask 
to a single Kafka partition
-(i.e., all records received by a sink subtask will end up in the same Kafka 
partition).
-
-A custom partitioner can be implemented by extending the 
`FlinkKafkaPartitioner` class. All
-Kafka versions' constructors allow providing a custom partitioner when 
instantiating the producer.
-Note that the partitioner implementation must be serializable, as they will be 
transferred across Flink nodes.
-Also, keep in mind that any state in the partitioner will be lost on job 
failures since the partitioner
-is not part of the producer's checkpointed state.
-
-It is also possible to completely avoid using and kind of partitioner, and 
simply let Kafka partition
-the written records by their attached key (as determined for each record using 
the provided serialization schema).
-To do this, provide a `null` custom partitioner when instantiating the 
producer. It is important
-to provide `null` as the custom partitioner; as explained above, if a custom 
partitioner is not specified
-the `FlinkFixedPartitioner` is used instead.
+## The `SerializationSchema`
 
-### Kafka Producers and Fault Tolerance
+The Flink Kafka Producer needs to know how to turn Java/Scala objects into 
binary data.
+The `KafkaSerializationSchema` allows users to specify such a schema.
+The `ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long 
timestamp)` method gets called for each record, generating a `ProducerRecord` 
that is written to Kafka.
 
-#### Kafka 0.10
+The gives users fine-grained control over how data is written out to Kafka. 
+Through the producer record you can:
+* Set header values
+* Define keys for each record
+* Specify custom partitioning of data
 
-With Flink's checkpointing enabled, the `FlinkKafkaProducer010`
-can provide at-least-once delivery guarantees.
-
-Besides enabling Flink's checkpointing, you should also configure the setter
-methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)` 
appropriately.
-
- * `setLogFailuresOnly(boolean)`: by default, this is set to `false`.
- Enabling this will let the producer only log failures
- instead of catching and rethrowing them. This essentially accounts the record
- to have succeeded, even if it was never written to the target Kafka topic. 
This
- must be disabled for at-least-once.
- * `setFlushOnCheckpoint(boolean)`: by default, this is set to `true`.
- With this enabled, Flink's checkpoints will wait for any
- on-the-fly records at the time of the checkpoint to be acknowledged by Kafka 
before
- succeeding the checkpoint. This ensures that all records before the 
checkpoint have
- been written to Kafka. This must be enabled for at-least-once.
- 
-In conclusion, the Kafka producer by default has at-least-once guarantees for 
versions
-0.10, with `setLogFailureOnly` set to `false` and `setFlushOnCheckpoint` set
-to `true`.
-
-**Note**: By default, the number of retries is set to "0". This means that 
when `setLogFailuresOnly` is set to `false`,
-the producer fails immediately on errors, including leader changes. The value 
is set to "0" by default to avoid
-duplicate messages in the target topic that are caused by retries. For most 
production environments with frequent broker changes,
-we recommend setting the number of retries to a higher value.
-
-**Note**: There is currently no transactional producer for Kafka, so Flink can 
not guarantee exactly-once delivery
-into a Kafka topic.
-
-#### Kafka 0.11 and newer
+### Kafka Producers and Fault Tolerance
 
+<div class="codetabs" markdown="1">
+<div data-lang="Universal and 011" markdown="1">
 With Flink's checkpointing enabled, the `FlinkKafkaProducer011` 
(`FlinkKafkaProducer` for Kafka >= 1.0.0 versions) can provide
 exactly-once delivery guarantees.
 
@@ -643,9 +476,8 @@ chosen by passing appropriate `semantic` parameter to the 
`FlinkKafkaProducer011
 
  * `Semantic.NONE`: Flink will not guarantee anything. Produced records can be 
lost or they can
  be duplicated.
- * `Semantic.AT_LEAST_ONCE` (default setting): similar to 
`setFlushOnCheckpoint(true)` in
- `FlinkKafkaProducer010`. This guarantees that no records will be lost 
(although they can be duplicated).
- * `Semantic.EXACTLY_ONCE`: uses Kafka transactions to provide exactly-once 
semantic. Whenever you write
+ * `Semantic.AT_LEAST_ONCE` (default setting): This guarantees that no records 
will be lost (although they can be duplicated).
+ * `Semantic.EXACTLY_ONCE`: Kafka transactions will be used to provide 
exactly-once semantic. Whenever you write
  to Kafka using transactions, do not forget about setting desired 
`isolation.level` (`read_committed`
  or `read_uncommitted` - the latter one is the default value) for any 
application consuming records
  from Kafka.
@@ -695,54 +527,53 @@ event of failure of Flink application before first 
checkpoint, after restarting
 is no information in the system about previous pool sizes. Thus it is unsafe 
to scale down Flink
 application before first checkpoint completes, by factor larger than 
`FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR`.
 
-## Using Kafka timestamps and Flink event time in Kafka 0.10
-
-Since Apache Kafka 0.10+, Kafka's messages can carry
-[timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message),
 indicating
-the time the event has occurred (see ["event time" in Apache 
Flink](../event_time.html)) or the time when the message
-has been written to the Kafka broker.
-
-The `FlinkKafkaConsumer010` will emit records with the timestamp attached, if 
the time characteristic in Flink is 
-set to `TimeCharacteristic.EventTime` 
(`StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)`).
-
-The Kafka consumer does not emit watermarks. To emit watermarks, the same 
mechanisms as described above in 
-"Kafka Consumers and Timestamp Extraction/Watermark Emission"  using the 
`assignTimestampsAndWatermarks` method are applicable.
-
-There is no need to define a timestamp extractor when using the timestamps 
from Kafka. The `previousElementTimestamp` argument of 
-the `extractTimestamp()` method contains the timestamp carried by the Kafka 
message.
-
-A timestamp extractor for a Kafka consumer would look like this:
-{% highlight java %}
-public long extractTimestamp(Long element, long previousElementTimestamp) {
-    return previousElementTimestamp;
-}
-{% endhighlight %}
-
+</div>
+<div data-lang="010" markdown="1">
+With Flink's checkpointing enabled, the `FlinkKafkaProducer010`
+can provide at-least-once delivery guarantees.
 
+Besides enabling Flink's checkpointing, you should also configure the setter
+methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)` 
appropriately.
 
-The `FlinkKafkaProducer010` only emits the record timestamp, if 
`setWriteTimestampToKafka(true)` is set.
+ * `setLogFailuresOnly(boolean)`: by default, this is set to `false`.
+ Enabling this will let the producer only log failures
+ instead of catching and rethrowing them. This essentially accounts the record
+ to have succeeded, even if it was never written to the target Kafka topic. 
This
+ must be disabled for at-least-once.
+ * `setFlushOnCheckpoint(boolean)`: by default, this is set to `true`.
+ With this enabled, Flink's checkpoints will wait for any
+ on-the-fly records at the time of the checkpoint to be acknowledged by Kafka 
before
+ succeeding the checkpoint. This ensures that all records before the 
checkpoint have
+ been written to Kafka. This must be enabled for at-least-once.
+ 
+In conclusion, the Kafka producer by default has at-least-once guarantees for 
versions
+0.10, with `setLogFailureOnly` set to `false` and `setFlushOnCheckpoint` set
+to `true`.
 
-{% highlight java %}
-FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = 
FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, 
new SimpleStringSchema(), standardProps);
-config.setWriteTimestampToKafka(true);
-{% endhighlight %}
+**Note**: By default, the number of retries is set to "0". This means that 
when `setLogFailuresOnly` is set to `false`,
+the producer fails immediately on errors, including leader changes. The value 
is set to "0" by default to avoid
+duplicate messages in the target topic that are caused by retries. For most 
production environments with frequent broker changes,
+we recommend setting the number of retries to a higher value.
 
+**Note**: There is currently no transactional producer for Kafka, so Flink can 
not guarantee exactly-once delivery
+into a Kafka topic.
 
+</div>
+</div>
 
-## Kafka Connector metrics
+## Kafka Connector Metrics
 
 Flink's Kafka connectors provide some metrics through Flink's [metrics 
system]({{ site.baseurl }}/monitoring/metrics.html) to analyze
 the behavior of the connector.
-The producers export Kafka's internal metrics through Flink's metric system 
for all supported versions. The consumers export 
-all metrics starting from Kafka version 0.10. The Kafka documentation lists 
all exported metrics 
-in its 
[documentation](http://kafka.apache.org/documentation/#selector_monitoring).
+The producers export Kafka's internal metrics through Flink's metric system 
for all supported versions.
+The Kafka documentation lists all exported metrics in its 
[documentation](http://kafka.apache.org/documentation/#selector_monitoring).
 
 In addition to these metrics, all consumers expose the `current-offsets` and 
`committed-offsets` for each topic partition.
 The `current-offsets` refers to the current offset in the partition. This 
refers to the offset of the last element that
 we retrieved and emitted successfully. The `committed-offsets` is the last 
committed offset.
 
-The Kafka Consumers in Flink commit the offsets back to the Kafka brokers 
(Kafka 0.10+). If checkpointing
-is disabled, offsets are committed periodically.
+The Kafka Consumers in Flink commit the offsets back to the Kafka brokers.
+If checkpointing is disabled, offsets are committed periodically.
 With checkpointing, the commit happens once all operators in the streaming 
topology have confirmed that they've created a checkpoint of their state. 
 This provides users with at-least-once semantics for the offsets committed to 
Zookeeper or the broker. For offsets checkpointed to Flink, the system 
 provides exactly once guarantees.
@@ -776,6 +607,15 @@ A mismatch in service name between client and server 
configuration will cause th
 For more information on Flink configuration for Kerberos security, please see 
[here]({{ site.baseurl}}/ops/config.html).
 You can also find [here]({{ site.baseurl}}/ops/security-kerberos.html) further 
details on how Flink internally setups Kerberos-based security.
 
+## Migrating Kafka Connector from 0.11 to universal
+
+In order to perform the migration, see the [upgrading jobs and Flink versions 
guide]({{ site.baseurl }}/ops/upgrading.html)
+and:
+* Use Flink 1.9 or newer for the whole process.
+* Do not upgrade Flink and user operators at the same time.
+* Make sure that Kafka Consumer and/or Kafka Producer used in your job have 
assigned unique identifiers (`uid`):
+* Use stop with savepoint feature to take the savepoint (for example by using 
`stop --withSavepoint`)[CLI command]({{ site.baseurl }}/ops/cli.html).
+
 ## Troubleshooting
 
 <div class="alert alert-warning">

Reply via email to