This is an automated email from the ASF dual-hosted git repository.
sjwiesman pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new f7fa02f [FLINK-17076][docs] Revamp Kafka Connector Documentation
f7fa02f is described below
commit f7fa02fdb450af9d11c92bd90751fa9161add47a
Author: Seth Wiesman <[email protected]>
AuthorDate: Tue May 19 15:08:13 2020 -0500
[FLINK-17076][docs] Revamp Kafka Connector Documentation
This closes #12257
---
docs/dev/connectors/kafka.md | 488 +++++++++++++++----------------------------
1 file changed, 164 insertions(+), 324 deletions(-)
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index d084808..6f21ff7 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -23,125 +23,59 @@ specific language governing permissions and limitations
under the License.
-->
+Flink provides an [Apache Kafka](https://kafka.apache.org) connector for
reading data from and writing data to Kafka topics with exactly-once guaruntees.
+
* This will be replaced by the TOC
{:toc}
-This connector provides access to event streams served by [Apache
Kafka](https://kafka.apache.org/).
-
-Flink provides special Kafka Connectors for reading and writing data from/to
Kafka topics.
-The Flink Kafka Consumer integrates with Flink's checkpointing mechanism to
provide
-exactly-once processing semantics. To achieve that, Flink does not purely rely
on Kafka's consumer group
-offset tracking, but tracks and checkpoints these offsets internally as well.
-
-Please pick a package (maven artifact id) and class name for your use-case and
environment.
-For most users, the `FlinkKafkaConsumer010` (part of `flink-connector-kafka`)
is appropriate.
-
-<table class="table table-bordered">
- <thead>
- <tr>
- <th class="text-left">Maven Dependency</th>
- <th class="text-left">Supported since</th>
- <th class="text-left">Consumer and <br>
- Producer Class name</th>
- <th class="text-left">Kafka version</th>
- <th class="text-left">Notes</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <td>flink-connector-kafka-0.10{{ site.scala_version_suffix }}</td>
- <td>1.2.0</td>
- <td>FlinkKafkaConsumer010<br>
- FlinkKafkaProducer010</td>
- <td>0.10.x</td>
- <td>This connector supports <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message">Kafka
messages with timestamps</a> both for producing and consuming.</td>
- </tr>
- <tr>
- <td>flink-connector-kafka-0.11{{ site.scala_version_suffix }}</td>
- <td>1.4.0</td>
- <td>FlinkKafkaConsumer011<br>
- FlinkKafkaProducer011</td>
- <td>0.11.x</td>
- <td>Since 0.11.x Kafka does not support scala 2.10. This connector
supports <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging">Kafka
transactional messaging</a> to provide exactly once semantic for the
producer.</td>
- </tr>
- <tr>
- <td>flink-connector-kafka{{ site.scala_version_suffix }}</td>
- <td>1.7.0</td>
- <td>FlinkKafkaConsumer<br>
- FlinkKafkaProducer</td>
- <td>>= 1.0.0</td>
- <td>
- This universal Kafka connector attempts to track the latest version of
the Kafka client.
- The version of the client it uses may change between Flink releases.
Starting with Flink 1.9 release, it uses the Kafka 2.2.0 client.
- Modern Kafka clients are backwards compatible with broker versions
0.10.0 or later.
- However for Kafka 0.11.x and 0.10.x versions, we recommend using
dedicated
- flink-connector-kafka-0.11{{ site.scala_version_suffix }} and
flink-connector-kafka-0.10{{ site.scala_version_suffix }} respectively.
- </td>
- </tr>
- </tbody>
-</table>
-
-Then, import the connector in your maven project:
+## Dependency
+
+Apache Flink ships with multiple Kafka connectors: universal, 0.10, and 0.11.
+This universal Kafka connector attempts to track the latest version of the
Kafka client.
+The version of the client it uses may change between Flink releases.
+Modern Kafka clients are backwards compatible with broker versions 0.10.0 or
later.
+For most users the universal Kafka connector is the most appropriate.
+However, for Kafka versions 0.11.x and 0.10.x, we recommend using the
dedicated ``0.11`` and ``0.10`` connectors, respectively.
+For details on Kafka compatibility, please refer to the official [Kafka
documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
+<div class="codetabs" markdown="1">
+<div data-lang="universal" markdown="1">
{% highlight xml %}
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka{{ site.scala_version_suffix }}</artifactId>
- <version>{{ site.version }}</version>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka{{ site.scala_version_suffix
}}</artifactId>
+ <version>{{ site.version }}</version>
+</dependency>
+{% endhighlight %}
+</div>
+<div data-lang="011" markdown="1">
+{% highlight xml %}
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka-011{{ site.scala_version_suffix
}}</artifactId>
+ <version>{{ site.version }}</version>
</dependency>
{% endhighlight %}
-
-Note that the streaming connectors are currently not part of the binary
distribution.
-See how to link with them for cluster execution [here]({{
site.baseurl}}/dev/projectsetup/dependencies.html).
-
-## Installing Apache Kafka
-
-* Follow the instructions from [Kafka's
quickstart](https://kafka.apache.org/documentation.html#quickstart) to download
the code and launch a server (launching a Zookeeper and a Kafka server is
required every time before starting the application).
-* If the Kafka and Zookeeper servers are running on a remote machine, then the
`advertised.host.name` setting in the `config/server.properties` file must be
set to the machine's IP address.
-
-## Kafka 1.0.0+ Connector
-
-Starting with Flink 1.7, there is a new universal Kafka connector that does
not track a specific Kafka major version.
-Rather, it tracks the latest version of Kafka at the time of the Flink release.
-
-If your Kafka broker version is 1.0.0 or newer, you should use this Kafka
connector.
-If you use an older version of Kafka (0.11 or 0.10), you should use the
connector corresponding to the broker version.
-
-### Compatibility
-
-The universal Kafka connector is compatible with older and newer Kafka brokers
through the compatibility guarantees of the Kafka client API and broker.
-It is compatible with broker versions 0.11.0 or newer, depending on the
features used.
-For details on Kafka compatibility, please refer to the [Kafka
documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
-
-### Migrating Kafka Connector from 0.11 to universal
-
-In order to perform the migration, see the [upgrading jobs and Flink versions
guide]({{ site.baseurl }}/ops/upgrading.html)
-and:
-* Use Flink 1.9 or newer for the whole process.
-* Do not upgrade the Flink and operators at the same time.
-* Make sure that Kafka Consumer and/or Kafka Producer used in your job have
assigned unique identifiers (`uid`):
-* Use stop with savepoint feature to take the savepoint (for example by using
`stop --withSavepoint`)[CLI command]({{ site.baseurl }}/ops/cli.html).
-
-### Usage
-
-To use the universal Kafka connector add a dependency to it:
-
+</div>
+<div data-lang="010" markdown="1">
{% highlight xml %}
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka{{ site.scala_version_suffix }}</artifactId>
- <version>{{ site.version }}</version>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka-010{{ site.scala_version_suffix
}}</artifactId>
+ <version>{{ site.version }}</version>
</dependency>
{% endhighlight %}
+<span class="label label-danger">Attention</span> The ``0.10`` sink does not
support exactly-once writes to Kafka.
+</div>
+</div>
-Then instantiate the new source (`FlinkKafkaConsumer`) and sink
(`FlinkKafkaProducer`).
-The API is backward compatible with the Kafka 0.11 connector,
-except of dropping specific Kafka version from the module and class names.
+Flink's streaming connectors are not currently part of the binary distribution.
+See how to link with them for cluster execution [here]({{
site.baseurl}}/dev/projectsetup/dependencies.html).
## Kafka Consumer
-Flink's Kafka consumer is called `FlinkKafkaConsumer010` (or 011 for Kafka
0.11.0.x versions, etc.
-or just `FlinkKafkaConsumer` for Kafka >= 1.0.0 versions). It provides access
to one or more Kafka topics.
+Flink's Kafka consumer - `FlinkKafkaConsumer` (or `FlinkKafkaConsumer011` for
Kafka 0.11.x,
+or `FlinkKafkaConsumer010` for Kafka 0.10.x) - provides access to read from
one or more Kafka topics.
The constructor accepts the following arguments:
@@ -152,8 +86,6 @@ The constructor accepts the following arguments:
- "bootstrap.servers" (comma separated list of Kafka brokers)
- "group.id" the id of the consumer group
-Example:
-
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
@@ -161,7 +93,7 @@ Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
- .addSource(new FlinkKafkaConsumer010<>("topic", new
SimpleStringSchema(), properties));
+ .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(),
properties));
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
@@ -170,26 +102,17 @@ val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
stream = env
- .addSource(new FlinkKafkaConsumer010[String]("topic", new
SimpleStringSchema(), properties))
- .print()
+ .addSource(new FlinkKafkaConsumer[String]("topic", new
SimpleStringSchema(), properties))
{% endhighlight %}
</div>
</div>
### The `DeserializationSchema`
-The Flink Kafka Consumer needs to know how to turn the binary data in Kafka
into Java/Scala objects. The
-`DeserializationSchema` allows users to specify such a schema. The `T
deserialize(byte[] message)`
-method gets called for each Kafka message, passing the value from Kafka.
-
-It is usually helpful to start from the `AbstractDeserializationSchema`, which
takes care of describing the
-produced Java/Scala type to Flink's type system. Users that implement a
vanilla `DeserializationSchema` need
-to implement the `getProducedType(...)` method themselves.
+The Flink Kafka Consumer needs to know how to turn the binary data in Kafka
into Java/Scala objects.
+The `KafkaDeserializationSchema` allows users to specify such a schema. The `T
deserialize(ConsumerRecord<byte[], byte[]> record)` method gets called for each
Kafka message, passing the value from Kafka.
-For accessing the key, value and metadata of the Kafka message, the
`KafkaDeserializationSchema` has
-the following deserialize method `T deserialize(ConsumerRecord<byte[], byte[]>
record)`.
-
-For convenience, Flink provides the following schemas:
+For convenience, Flink provides the following schemas out of the box:
1. `TypeInformationSerializationSchema` (and
`TypeInformationKeyValueSerializationSchema`) which creates
a schema based on a Flink's `TypeInformation`. This is useful if the data
is both written and read by Flink.
@@ -216,46 +139,37 @@ For convenience, Flink provides the following schemas:
<div data-lang="AvroDeserializationSchema" markdown="1">
{% highlight xml %}
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-avro</artifactId>
- <version>{{site.version }}</version>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro</artifactId>
+ <version>{{site.version }}</version>
</dependency>
{% endhighlight %}
</div>
<div data-lang="ConfluentRegistryAvroDeserializationSchema" markdown="1">
{% highlight xml %}
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-avro-confluent-registry</artifactId>
- <version>{{site.version }}</version>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro-confluent-registry</artifactId>
+ <version>{{site.version }}</version>
</dependency>
{% endhighlight %}
</div>
</div>
-When encountering a corrupted message that cannot be deserialized for any
reason, there
-are two options - either throwing an exception from the `deserialize(...)`
method
-which will cause the job to fail and be restarted, or returning `null` to allow
-the Flink Kafka consumer to silently skip the corrupted message. Note that
-due to the consumer's fault tolerance (see below sections for more details),
-failing the job on the corrupted message will let the consumer attempt
-to deserialize the message again. Therefore, if deserialization still fails,
the
-consumer will fall into a non-stop restart and fail loop on that corrupted
-message.
+When encountering a corrupted message that cannot be deserialized for any
reason the deserialization schema should return null which will result in the
record being skipped.
+Due to the consumer's fault tolerance (see below sections for more details),
failing the job on the corrupted message will let the consumer attempt to
deserialize the message again.
+Therefore, if deserialization still fails, the consumer will fall into a
non-stop restart and fail loop on that corrupted message.
### Kafka Consumers Start Position Configuration
-The Flink Kafka Consumer allows configuring how the start position for Kafka
-partitions are determined.
-
-Example:
+The Flink Kafka Consumer allows configuring how the start positions for Kafka
partitions are determined.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<>(...);
+FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.setStartFromEarliest(); // start from the earliest record
possible
myConsumer.setStartFromLatest(); // start from the latest record
myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp
(milliseconds)
@@ -269,7 +183,7 @@ DataStream<String> stream = env.addSource(myConsumer);
{% highlight scala %}
val env = StreamExecutionEnvironment.getExecutionEnvironment()
-val myConsumer = new FlinkKafkaConsumer010[String](...)
+val myConsumer = new FlinkKafkaConsumer[String](...)
myConsumer.setStartFromEarliest() // start from the earliest record
possible
myConsumer.setStartFromLatest() // start from the latest record
myConsumer.setStartFromTimestamp(...) // start from specified epoch timestamp
(milliseconds)
@@ -339,34 +253,14 @@ fault tolerance for the consumer).
### Kafka Consumers and Fault Tolerance
With Flink's checkpointing enabled, the Flink Kafka Consumer will consume
records from a topic and periodically checkpoint all
-its Kafka offsets, together with the state of other operations, in a
consistent manner. In case of a job failure, Flink will restore
+its Kafka offsets, together with the state of other operations. In case of a
job failure, Flink will restore
the streaming program to the state of the latest checkpoint and re-consume the
records from Kafka, starting from the offsets that were
stored in the checkpoint.
The interval of drawing checkpoints therefore defines how much the program may
have to go back at most, in case of a failure.
+To use fault tolerant Kafka Consumers, checkpointing of the topology needs to
be enabled in the [job]({{ site.baseurl
}}/ops/config.html#execution-checkpointing-interval).
-To use fault tolerant Kafka Consumers, checkpointing of the topology needs to
be enabled at the execution environment:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-env.enableCheckpointing(5000); // checkpoint every 5000 msecs
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-env.enableCheckpointing(5000) // checkpoint every 5000 msecs
-{% endhighlight %}
-</div>
-</div>
-
-Also note that Flink can only restart the topology if enough processing slots
are available to restart the topology.
-So if the topology fails due to loss of a TaskManager, there must still be
enough slots available afterwards.
-Flink on YARN supports automatic restart of lost YARN containers.
-
-If checkpointing is not enabled, the Kafka consumer will periodically commit
the offsets to Zookeeper.
+If checkpointing is disabled, the Kafka consumer will periodically commit the
offsets to Zookeeper.
### Kafka Consumers Topic and Partition Discovery
@@ -380,15 +274,9 @@ By default, partition discovery is disabled. To enable it,
set a non-negative va
for `flink.partition-discovery.interval-millis` in the provided properties
config,
representing the discovery interval in milliseconds.
-<span class="label label-danger">Limitation</span> When the consumer is
restored from a savepoint from Flink versions
-prior to Flink 1.3.x, partition discovery cannot be enabled on the restore
run. If enabled, the restore would fail
-with an exception. In this case, in order to use partition discovery, please
first take a savepoint in Flink 1.3.x and
-then restore again from that.
-
#### Topic discovery
-At a higher-level, the Flink Kafka Consumer is also capable of discovering
topics, based on pattern matching on the
-topic names using regular expressions. See the below for an example:
+The Kafka Consumer is also capable of discovering topics by matching topic
names using regular expressions.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
@@ -399,7 +287,7 @@ Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
-FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
+FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema(),
properties);
@@ -416,7 +304,7 @@ val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
-val myConsumer = new FlinkKafkaConsumer010[String](
+val myConsumer = new FlinkKafkaConsumer[String](
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema,
properties)
@@ -445,7 +333,7 @@ tolerance guarantees. The committed offsets are only a
means to expose
the consumer's progress for monitoring purposes.
The way to configure offset commit behaviour is different, depending on
-whether or not checkpointing is enabled for the job.
+whether checkpointing is enabled for the job.
- *Checkpointing disabled:* if checkpointing is disabled, the Flink Kafka
Consumer relies on the automatic periodic offset committing capability
@@ -465,15 +353,14 @@ whether or not checkpointing is enabled for the job.
### Kafka Consumers and Timestamp Extraction/Watermark Emission
-In many scenarios, the timestamp of a record is embedded (explicitly or
implicitly) in the record itself.
-In addition, the user may want to emit watermarks either periodically, or in
an irregular fashion, e.g. based on
+In many scenarios, the timestamp of a record is embedded in the record itself,
or the metadata of the `ConsumerRecord`.
+In addition, users may want to emit watermarks either periodically, or
irregularly, e.g. based on
special records in the Kafka stream that contain the current event-time
watermark. For these cases, the Flink Kafka
-Consumer allows the specification of an `AssignerWithPeriodicWatermarks` or an
`AssignerWithPunctuatedWatermarks`.
+Consumer allows the specification of a [watermark strategy]({% link
dev/event_time.md %}).
-You can specify your custom timestamp extractor/watermark emitter as described
-[here]({{ site.baseurl }}/dev/event_timestamps_watermarks.html), or use one
from the
-[predefined ones]({{ site.baseurl }}/dev/event_timestamp_extractors.html).
After doing so, you
-can pass it to your consumer in the following way:
+You can specify your custom strategy as described
+[here]({% link dev/event_timestamps_watermarks.md %}), or use one from the
+[predefined ones]({% link dev/event_timestamp_extractors.md %}).
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
@@ -482,13 +369,14 @@ Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
-FlinkKafkaConsumer010<String> myConsumer =
- new FlinkKafkaConsumer010<>("topic", new SimpleStringSchema(), properties);
-myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
+FlinkKafkaConsumer<String> myConsumer =
+ new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
+myConsumer.assignTimestampsAndWatermarks(
+ WatermarkStrategies.
+ .<String>forBoundedOutOfOrderness(Duration.ofSeconds(20))
+ .build());
-DataStream<String> stream = env
- .addSource(myConsumer)
- .print();
+DataStream<String> stream = env.addSource(myConsumer);
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
@@ -497,51 +385,51 @@ val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
-val myConsumer = new FlinkKafkaConsumer010[String]("topic", new
SimpleStringSchema(), properties)
-myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
-stream = env
- .addSource(myConsumer)
- .print()
+val myConsumer =
+ new FlinkKafkaConsumer("topic", new SimpleStringSchema(), properties);
+myConsumer.assignTimestampsAndWatermarks(
+ WatermarkStrategies.
+ .forBoundedOutOfOrderness[String](Duration.ofSeconds(20))
+ .build())
+
+val stream = env.addSource(myConsumer)
{% endhighlight %}
</div>
</div>
-Internally, an instance of the assigner is executed per Kafka partition.
-When such an assigner is specified, for each record read from Kafka, the
-`extractTimestamp(T element, long previousElementTimestamp)` is called to
assign a timestamp to the record and
-the `Watermark getCurrentWatermark()` (for periodic) or the
-`Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)`
(for punctuated) is called to determine
-if a new watermark should be emitted and with which timestamp.
**Note**: If a watermark assigner depends on records read from Kafka to
advance its watermarks
(which is commonly the case), all topics and partitions need to have a
continuous stream of records.
Otherwise, the watermarks of the whole application cannot advance and all
time-based operations,
such as time windows or functions with timers, cannot make progress. A single
idle Kafka partition causes this behavior.
-A Flink improvement is planned to prevent this from happening
-(see [FLINK-5479: Per-partition watermarks in FlinkKafkaConsumer should
consider idle partitions](
-https://issues.apache.org/jira/browse/FLINK-5479)).
-In the meanwhile, a possible workaround is to send *heartbeat messages* to all
consumed partitions that advance the watermarks of idle partitions.
-
+Consider setting appropriate [idelness timeouts]({{ site.baseurl
}}/dev/event_timestamps_watermarks.html#dealing-with-idle-sources) to mitigate
this issue.
+
## Kafka Producer
-Flink’s Kafka Producer is called `FlinkKafkaProducer011` (or `010` for Kafka
0.10.0.x versions, etc. or just `FlinkKafkaProducer` for Kafka >= 1.0.0
versions).
-It allows writing a stream of records to one or more Kafka topics.
+Flink’s Kafka Producer - `FlinkKafkaProducer` (or `FlinkKafkaProducer010` for
Kafka 0.10.x versions or `FlinkKafkaProducer011` for Kafka 0.11.x versions) -
+ allows writing a stream of records to one or more Kafka topics.
+
+The constructor accepts the following arguments:
-Example:
+1. A default output topic where events should be written
+2. A SerializationSchema / KafkaSerializationSchema for serializing data into
Kafka
+3. Properties for the Kafka client. The following properties are required:
+ * "bootstrap.servers" (comma separated list of Kafka brokers)
+4. A fault-tolerance semantic
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-DataStream<String> stream = ...;
+DataStream<String> stream = ...
-FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
- "localhost:9092", // broker list
- "my-topic", // target topic
- new SimpleStringSchema()); // serialization schema
+Properties properties = new Properties();
+properties.setProperty("bootstrap.servers", "localhost:9092");
-// versions 0.10+ allow attaching the records' event timestamp when writing
them to Kafka;
-// this method is not available for earlier Kafka versions
-myProducer.setWriteTimestampToKafka(true);
+FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
+ "my-topic", // target topic
+ new SimpleStringSchema(), // serialization schema
+ properties, // producer config
+ FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
stream.addSink(myProducer);
{% endhighlight %}
@@ -550,91 +438,36 @@ stream.addSink(myProducer);
{% highlight scala %}
val stream: DataStream[String] = ...
-val myProducer = new FlinkKafkaProducer011[String](
- "localhost:9092", // broker list
- "my-topic", // target topic
- new SimpleStringSchema) // serialization schema
+Properties properties = new Properties
+properties.setProperty("bootstrap.servers", "localhost:9092")
-// versions 0.10+ allow attaching the records' event timestamp when writing
them to Kafka;
-// this method is not available for earlier Kafka versions
-myProducer.setWriteTimestampToKafka(true)
+val myProducer = new FlinkKafkaProducer[String](
+ "my-topic", // target topic
+ new SimpleStringSchema(), // serialization schema
+ properties, // producer config
+ FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // fault-tolerance
stream.addSink(myProducer)
{% endhighlight %}
</div>
</div>
-The above examples demonstrate the basic usage of creating a Flink Kafka
Producer
-to write streams to a single Kafka target topic. For more advanced usages,
there
-are other constructor variants that allow providing the following:
-
- * *Providing custom properties*:
- The producer allows providing a custom properties configuration for the
internal `KafkaProducer`.
- Please refer to the [Apache Kafka
documentation](https://kafka.apache.org/documentation.html) for
- details on how to configure Kafka Producers.
- * *Custom partitioner*: To assign records to specific
- partitions, you can provide an implementation of a `FlinkKafkaPartitioner` to
the
- constructor. This partitioner will be called for each record in the stream
- to determine which exact partition of the target topic the record should be
sent to.
- Please see [Kafka Producer Partitioning
Scheme](#kafka-producer-partitioning-scheme) for more details.
- * *Advanced serialization schema*: Similar to the consumer,
- the producer also allows using an advanced serialization schema called
`KeyedSerializationSchema`,
- which allows serializing the key and value separately. It also allows to
override the target topic,
- so that one producer instance can send data to multiple topics.
-
-### Kafka Producer Partitioning Scheme
-
-By default, if a custom partitioner is not specified for the Flink Kafka
Producer, the producer will use
-a `FlinkFixedPartitioner` that maps each Flink Kafka Producer parallel subtask
to a single Kafka partition
-(i.e., all records received by a sink subtask will end up in the same Kafka
partition).
-
-A custom partitioner can be implemented by extending the
`FlinkKafkaPartitioner` class. All
-Kafka versions' constructors allow providing a custom partitioner when
instantiating the producer.
-Note that the partitioner implementation must be serializable, as they will be
transferred across Flink nodes.
-Also, keep in mind that any state in the partitioner will be lost on job
failures since the partitioner
-is not part of the producer's checkpointed state.
-
-It is also possible to completely avoid using and kind of partitioner, and
simply let Kafka partition
-the written records by their attached key (as determined for each record using
the provided serialization schema).
-To do this, provide a `null` custom partitioner when instantiating the
producer. It is important
-to provide `null` as the custom partitioner; as explained above, if a custom
partitioner is not specified
-the `FlinkFixedPartitioner` is used instead.
+## The `SerializationSchema`
-### Kafka Producers and Fault Tolerance
+The Flink Kafka Producer needs to know how to turn Java/Scala objects into
binary data.
+The `KafkaSerializationSchema` allows users to specify such a schema.
+The `ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long
timestamp)` method gets called for each record, generating a `ProducerRecord`
that is written to Kafka.
-#### Kafka 0.10
+The gives users fine-grained control over how data is written out to Kafka.
+Through the producer record you can:
+* Set header values
+* Define keys for each record
+* Specify custom partitioning of data
-With Flink's checkpointing enabled, the `FlinkKafkaProducer010`
-can provide at-least-once delivery guarantees.
-
-Besides enabling Flink's checkpointing, you should also configure the setter
-methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)`
appropriately.
-
- * `setLogFailuresOnly(boolean)`: by default, this is set to `false`.
- Enabling this will let the producer only log failures
- instead of catching and rethrowing them. This essentially accounts the record
- to have succeeded, even if it was never written to the target Kafka topic.
This
- must be disabled for at-least-once.
- * `setFlushOnCheckpoint(boolean)`: by default, this is set to `true`.
- With this enabled, Flink's checkpoints will wait for any
- on-the-fly records at the time of the checkpoint to be acknowledged by Kafka
before
- succeeding the checkpoint. This ensures that all records before the
checkpoint have
- been written to Kafka. This must be enabled for at-least-once.
-
-In conclusion, the Kafka producer by default has at-least-once guarantees for
versions
-0.10, with `setLogFailureOnly` set to `false` and `setFlushOnCheckpoint` set
-to `true`.
-
-**Note**: By default, the number of retries is set to "0". This means that
when `setLogFailuresOnly` is set to `false`,
-the producer fails immediately on errors, including leader changes. The value
is set to "0" by default to avoid
-duplicate messages in the target topic that are caused by retries. For most
production environments with frequent broker changes,
-we recommend setting the number of retries to a higher value.
-
-**Note**: There is currently no transactional producer for Kafka, so Flink can
not guarantee exactly-once delivery
-into a Kafka topic.
-
-#### Kafka 0.11 and newer
+### Kafka Producers and Fault Tolerance
+<div class="codetabs" markdown="1">
+<div data-lang="Universal and 011" markdown="1">
With Flink's checkpointing enabled, the `FlinkKafkaProducer011`
(`FlinkKafkaProducer` for Kafka >= 1.0.0 versions) can provide
exactly-once delivery guarantees.
@@ -643,9 +476,8 @@ chosen by passing appropriate `semantic` parameter to the
`FlinkKafkaProducer011
* `Semantic.NONE`: Flink will not guarantee anything. Produced records can be
lost or they can
be duplicated.
- * `Semantic.AT_LEAST_ONCE` (default setting): similar to
`setFlushOnCheckpoint(true)` in
- `FlinkKafkaProducer010`. This guarantees that no records will be lost
(although they can be duplicated).
- * `Semantic.EXACTLY_ONCE`: uses Kafka transactions to provide exactly-once
semantic. Whenever you write
+ * `Semantic.AT_LEAST_ONCE` (default setting): This guarantees that no records
will be lost (although they can be duplicated).
+ * `Semantic.EXACTLY_ONCE`: Kafka transactions will be used to provide
exactly-once semantic. Whenever you write
to Kafka using transactions, do not forget about setting desired
`isolation.level` (`read_committed`
or `read_uncommitted` - the latter one is the default value) for any
application consuming records
from Kafka.
@@ -695,54 +527,53 @@ event of failure of Flink application before first
checkpoint, after restarting
is no information in the system about previous pool sizes. Thus it is unsafe
to scale down Flink
application before first checkpoint completes, by factor larger than
`FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR`.
-## Using Kafka timestamps and Flink event time in Kafka 0.10
-
-Since Apache Kafka 0.10+, Kafka's messages can carry
-[timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message),
indicating
-the time the event has occurred (see ["event time" in Apache
Flink](../event_time.html)) or the time when the message
-has been written to the Kafka broker.
-
-The `FlinkKafkaConsumer010` will emit records with the timestamp attached, if
the time characteristic in Flink is
-set to `TimeCharacteristic.EventTime`
(`StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)`).
-
-The Kafka consumer does not emit watermarks. To emit watermarks, the same
mechanisms as described above in
-"Kafka Consumers and Timestamp Extraction/Watermark Emission" using the
`assignTimestampsAndWatermarks` method are applicable.
-
-There is no need to define a timestamp extractor when using the timestamps
from Kafka. The `previousElementTimestamp` argument of
-the `extractTimestamp()` method contains the timestamp carried by the Kafka
message.
-
-A timestamp extractor for a Kafka consumer would look like this:
-{% highlight java %}
-public long extractTimestamp(Long element, long previousElementTimestamp) {
- return previousElementTimestamp;
-}
-{% endhighlight %}
-
+</div>
+<div data-lang="010" markdown="1">
+With Flink's checkpointing enabled, the `FlinkKafkaProducer010`
+can provide at-least-once delivery guarantees.
+Besides enabling Flink's checkpointing, you should also configure the setter
+methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)`
appropriately.
-The `FlinkKafkaProducer010` only emits the record timestamp, if
`setWriteTimestampToKafka(true)` is set.
+ * `setLogFailuresOnly(boolean)`: by default, this is set to `false`.
+ Enabling this will let the producer only log failures
+ instead of catching and rethrowing them. This essentially accounts the record
+ to have succeeded, even if it was never written to the target Kafka topic.
This
+ must be disabled for at-least-once.
+ * `setFlushOnCheckpoint(boolean)`: by default, this is set to `true`.
+ With this enabled, Flink's checkpoints will wait for any
+ on-the-fly records at the time of the checkpoint to be acknowledged by Kafka
before
+ succeeding the checkpoint. This ensures that all records before the
checkpoint have
+ been written to Kafka. This must be enabled for at-least-once.
+
+In conclusion, the Kafka producer by default has at-least-once guarantees for
versions
+0.10, with `setLogFailureOnly` set to `false` and `setFlushOnCheckpoint` set
+to `true`.
-{% highlight java %}
-FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config =
FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic,
new SimpleStringSchema(), standardProps);
-config.setWriteTimestampToKafka(true);
-{% endhighlight %}
+**Note**: By default, the number of retries is set to "0". This means that
when `setLogFailuresOnly` is set to `false`,
+the producer fails immediately on errors, including leader changes. The value
is set to "0" by default to avoid
+duplicate messages in the target topic that are caused by retries. For most
production environments with frequent broker changes,
+we recommend setting the number of retries to a higher value.
+**Note**: There is currently no transactional producer for Kafka, so Flink can
not guarantee exactly-once delivery
+into a Kafka topic.
+</div>
+</div>
-## Kafka Connector metrics
+## Kafka Connector Metrics
Flink's Kafka connectors provide some metrics through Flink's [metrics
system]({{ site.baseurl }}/monitoring/metrics.html) to analyze
the behavior of the connector.
-The producers export Kafka's internal metrics through Flink's metric system
for all supported versions. The consumers export
-all metrics starting from Kafka version 0.10. The Kafka documentation lists
all exported metrics
-in its
[documentation](http://kafka.apache.org/documentation/#selector_monitoring).
+The producers export Kafka's internal metrics through Flink's metric system
for all supported versions.
+The Kafka documentation lists all exported metrics in its
[documentation](http://kafka.apache.org/documentation/#selector_monitoring).
In addition to these metrics, all consumers expose the `current-offsets` and
`committed-offsets` for each topic partition.
The `current-offsets` refers to the current offset in the partition. This
refers to the offset of the last element that
we retrieved and emitted successfully. The `committed-offsets` is the last
committed offset.
-The Kafka Consumers in Flink commit the offsets back to the Kafka brokers
(Kafka 0.10+). If checkpointing
-is disabled, offsets are committed periodically.
+The Kafka Consumers in Flink commit the offsets back to the Kafka brokers.
+If checkpointing is disabled, offsets are committed periodically.
With checkpointing, the commit happens once all operators in the streaming
topology have confirmed that they've created a checkpoint of their state.
This provides users with at-least-once semantics for the offsets committed to
Zookeeper or the broker. For offsets checkpointed to Flink, the system
provides exactly once guarantees.
@@ -776,6 +607,15 @@ A mismatch in service name between client and server
configuration will cause th
For more information on Flink configuration for Kerberos security, please see
[here]({{ site.baseurl}}/ops/config.html).
You can also find [here]({{ site.baseurl}}/ops/security-kerberos.html) further
details on how Flink internally setups Kerberos-based security.
+## Migrating Kafka Connector from 0.11 to universal
+
+In order to perform the migration, see the [upgrading jobs and Flink versions
guide]({{ site.baseurl }}/ops/upgrading.html)
+and:
+* Use Flink 1.9 or newer for the whole process.
+* Do not upgrade Flink and user operators at the same time.
+* Make sure that Kafka Consumer and/or Kafka Producer used in your job have
assigned unique identifiers (`uid`):
+* Use stop with savepoint feature to take the savepoint (for example by using
`stop --withSavepoint`)[CLI command]({{ site.baseurl }}/ops/cli.html).
+
## Troubleshooting
<div class="alert alert-warning">