AHeise commented on a change in pull request #17058:
URL: https://github.com/apache/flink/pull/17058#discussion_r698621444
##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -613,134 +613,89 @@ Otherwise, the watermarks of the whole application
cannot advance and all time-b
such as time windows or functions with timers, cannot make progress. A single
idle Kafka partition causes this behavior.
Consider setting appropriate [idleness timeouts]({{< ref
"docs/dev/datastream/event-time/generating_watermarks"
>}}#dealing-with-idle-sources) to mitigate this issue.
-## Kafka Producer
-
-Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of
records to one or more Kafka topics.
+## Kafka Sink
Review comment:
Quite a hard start. How about retaining the old intro?
```suggestion
## Kafka Sink
`KafkaSink` allows writing a stream of records to one or more Kafka topics.
```
##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -613,134 +613,89 @@ Otherwise, the watermarks of the whole application
cannot advance and all time-b
such as time windows or functions with timers, cannot make progress. A single
idle Kafka partition causes this behavior.
Consider setting appropriate [idleness timeouts]({{< ref
"docs/dev/datastream/event-time/generating_watermarks"
>}}#dealing-with-idle-sources) to mitigate this issue.
-## Kafka Producer
-
-Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of
records to one or more Kafka topics.
+## Kafka Sink
-The constructor accepts the following arguments:
+### Usage
-1. A default output topic where events should be written
-2. A SerializationSchema / KafkaSerializationSchema for serializing data into
Kafka
-3. Properties for the Kafka client. The following properties are required:
- * "bootstrap.servers" (comma separated list of Kafka brokers)
-4. A fault-tolerance semantic
+Kafka sink provides a builder class to construct an instance of a KafkaSink.
The code snippet below
+shows how to write String records to a Kafka topic with a delivery guarantee
of at least once.
-{{< tabs "8dbb32b3-47e8-468e-91a7-43cec0c658ac" >}}
-{{< tab "Java" >}}
```java
DataStream<String> stream = ...
+
+KafkaSink<String> sink = KafkaSink.<String>builder()
+ .setBootstrapServers(brokers)
+ .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+ .setTopic("topic-name")
+ .setKafkaProducerConfig(producerProperties)
+ .setValueSerializationSchema(new SimpleStringSchema())
+ .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build()
+ )
+ .build();
+
+stream.sinkTo(sink);
+```
-Properties properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
+The following properties are **required** to build a KafkaSink:
+
+- Bootstrap servers, ```setBootstrapServers(String)```
+- Record serializer, ``setRecordSerializer(KafkaRecordSerializationSchema)``
+- Kafka producer properties, ```setKafkaProducerConfig(Properties)```
+- If you configure the delivery guarantee with
```DeliveryGuarantee.EXACTLY_ONCE``` you also have
+ use ```setTransactionalIdPrefix(String)```
-FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
- "my-topic", // target topic
- new SimpleStringSchema(), // serialization schema
- properties, // producer config
- FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
+### Serializer
-stream.addSink(myProducer);
+You always need construct a ```KafkaRecordSerializationSchema``` to transform
incoming elements from
+the data stream to Kafka producer records.
+We offer a schema builder to provide some common building blocks i.e.
key/value serialization, topic
+selection, partitioning.
Review comment:
```suggestion
You always need supply a ```KafkaRecordSerializationSchema``` to transform
incoming elements from
the data stream to Kafka producer records.
Flink offers a schema builder to provide some common building blocks i.e.
key/value serialization, topic
selection, partitioning. You can also implement the interface on your own to
exert more control.
```
##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -613,134 +613,89 @@ Otherwise, the watermarks of the whole application
cannot advance and all time-b
such as time windows or functions with timers, cannot make progress. A single
idle Kafka partition causes this behavior.
Consider setting appropriate [idleness timeouts]({{< ref
"docs/dev/datastream/event-time/generating_watermarks"
>}}#dealing-with-idle-sources) to mitigate this issue.
-## Kafka Producer
-
-Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of
records to one or more Kafka topics.
+## Kafka Sink
-The constructor accepts the following arguments:
+### Usage
-1. A default output topic where events should be written
-2. A SerializationSchema / KafkaSerializationSchema for serializing data into
Kafka
-3. Properties for the Kafka client. The following properties are required:
- * "bootstrap.servers" (comma separated list of Kafka brokers)
-4. A fault-tolerance semantic
+Kafka sink provides a builder class to construct an instance of a KafkaSink.
The code snippet below
+shows how to write String records to a Kafka topic with a delivery guarantee
of at least once.
-{{< tabs "8dbb32b3-47e8-468e-91a7-43cec0c658ac" >}}
-{{< tab "Java" >}}
```java
DataStream<String> stream = ...
+
+KafkaSink<String> sink = KafkaSink.<String>builder()
+ .setBootstrapServers(brokers)
+ .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+ .setTopic("topic-name")
+ .setKafkaProducerConfig(producerProperties)
+ .setValueSerializationSchema(new SimpleStringSchema())
+ .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build()
+ )
+ .build();
+
+stream.sinkTo(sink);
+```
-Properties properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
+The following properties are **required** to build a KafkaSink:
+
+- Bootstrap servers, ```setBootstrapServers(String)```
+- Record serializer, ``setRecordSerializer(KafkaRecordSerializationSchema)``
+- Kafka producer properties, ```setKafkaProducerConfig(Properties)```
+- If you configure the delivery guarantee with
```DeliveryGuarantee.EXACTLY_ONCE``` you also have
+ use ```setTransactionalIdPrefix(String)```
-FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
- "my-topic", // target topic
- new SimpleStringSchema(), // serialization schema
- properties, // producer config
- FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
+### Serializer
-stream.addSink(myProducer);
+You always need construct a ```KafkaRecordSerializationSchema``` to transform
incoming elements from
+the data stream to Kafka producer records.
+We offer a schema builder to provide some common building blocks i.e.
key/value serialization, topic
+selection, partitioning.
+
+```java
+KafkaRecordSerializationSchema.builder()
+ .setTopicSelector((element) -> {<your-topic-selection-logic>})
+ .setValueSerializationSchema(new SimpleStringSchema())
+ .setKeySerializationSchema(new SimpleStringSchema())
+ .setPartitioner(new FlinkFixedPartitioner())
+ .build();
```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val stream: DataStream[String] = ...
-val properties = new Properties
-properties.setProperty("bootstrap.servers", "localhost:9092")
+It is **required** to always set a value serialization method and a topic
retrieval method.
+Moreover, it is also possible to use Kafka serializers instead of Flink
serializer by using
+```setKafkaKeySerializer(Serializer)``` or
```setKafkaValueSerializer(Serializer)```.
Review comment:
```suggestion
It is **required** to always set a value serialization method and a topic
(selection method).
Moreover, it is also possible to use Kafka serializers instead of Flink
serializer by using
```setKafkaKeySerializer(Serializer)``` or
```setKafkaValueSerializer(Serializer)```.
```
##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -613,134 +613,89 @@ Otherwise, the watermarks of the whole application
cannot advance and all time-b
such as time windows or functions with timers, cannot make progress. A single
idle Kafka partition causes this behavior.
Consider setting appropriate [idleness timeouts]({{< ref
"docs/dev/datastream/event-time/generating_watermarks"
>}}#dealing-with-idle-sources) to mitigate this issue.
-## Kafka Producer
-
-Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of
records to one or more Kafka topics.
+## Kafka Sink
-The constructor accepts the following arguments:
+### Usage
-1. A default output topic where events should be written
-2. A SerializationSchema / KafkaSerializationSchema for serializing data into
Kafka
-3. Properties for the Kafka client. The following properties are required:
- * "bootstrap.servers" (comma separated list of Kafka brokers)
-4. A fault-tolerance semantic
+Kafka sink provides a builder class to construct an instance of a KafkaSink.
The code snippet below
+shows how to write String records to a Kafka topic with a delivery guarantee
of at least once.
-{{< tabs "8dbb32b3-47e8-468e-91a7-43cec0c658ac" >}}
-{{< tab "Java" >}}
```java
DataStream<String> stream = ...
+
+KafkaSink<String> sink = KafkaSink.<String>builder()
+ .setBootstrapServers(brokers)
+ .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+ .setTopic("topic-name")
+ .setKafkaProducerConfig(producerProperties)
+ .setValueSerializationSchema(new SimpleStringSchema())
+ .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build()
+ )
+ .build();
+
+stream.sinkTo(sink);
+```
-Properties properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
+The following properties are **required** to build a KafkaSink:
+
+- Bootstrap servers, ```setBootstrapServers(String)```
+- Record serializer, ``setRecordSerializer(KafkaRecordSerializationSchema)``
+- Kafka producer properties, ```setKafkaProducerConfig(Properties)```
+- If you configure the delivery guarantee with
```DeliveryGuarantee.EXACTLY_ONCE``` you also have
+ use ```setTransactionalIdPrefix(String)```
-FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
- "my-topic", // target topic
- new SimpleStringSchema(), // serialization schema
- properties, // producer config
- FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
+### Serializer
-stream.addSink(myProducer);
+You always need construct a ```KafkaRecordSerializationSchema``` to transform
incoming elements from
+the data stream to Kafka producer records.
+We offer a schema builder to provide some common building blocks i.e.
key/value serialization, topic
+selection, partitioning.
+
+```java
+KafkaRecordSerializationSchema.builder()
+ .setTopicSelector((element) -> {<your-topic-selection-logic>})
+ .setValueSerializationSchema(new SimpleStringSchema())
+ .setKeySerializationSchema(new SimpleStringSchema())
+ .setPartitioner(new FlinkFixedPartitioner())
+ .build();
```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val stream: DataStream[String] = ...
-val properties = new Properties
-properties.setProperty("bootstrap.servers", "localhost:9092")
+It is **required** to always set a value serialization method and a topic
retrieval method.
+Moreover, it is also possible to use Kafka serializers instead of Flink
serializer by using
+```setKafkaKeySerializer(Serializer)``` or
```setKafkaValueSerializer(Serializer)```.
+
+### Fault Tolerance
+
+Overall the ```KafkaSink``` supports all three different
```DeliveryGuarantee```s. For
+```DeliveryGuarantee.AT_LEAST_ONCE``` and ```DeliveryGuarantee.EXACTLY_ONCE```
Flink's checkpointing
+must be enabled. By default the ```KafkaSink``` uses
```DeliveryGuarantee.NONE```. Below you can find
+a proper explanation of the different guarantees.
+
+- ```DeliveryGuarantee.NONE```: does not provide any guarantees: messages may
be lost in case of
+ issues on the Kafka broker and messages may be duplicated in case of a Flink
failure.
+- ```DeliveryGuarantee.AT_LEAST_ONCE```: the sink will wait for all
outstanding records in the
+ Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. No
messages will be
+ lost in case of any issue with the Kafka brokers but messages may be
duplicated when Flink
+ restarts.
+- ```DeliveryGuarantee.EXACTLY_ONCE```: In this mode the KafkaSink will write
all messages in a
+ Kafka transaction that will be committed to Kafka on a checkpoint. Thus, if
the consumer
+ reads only committed data (see Kafka consumer config isolation.level), no
duplicates will be
+ seen in case of a Flink restart. However, this delays record writing
effectively until a
Review comment:
```suggestion
seen in case of a Flink restart. However, this delays record visibility
effectively until a
```
##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -613,134 +613,89 @@ Otherwise, the watermarks of the whole application
cannot advance and all time-b
such as time windows or functions with timers, cannot make progress. A single
idle Kafka partition causes this behavior.
Consider setting appropriate [idleness timeouts]({{< ref
"docs/dev/datastream/event-time/generating_watermarks"
>}}#dealing-with-idle-sources) to mitigate this issue.
-## Kafka Producer
-
-Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of
records to one or more Kafka topics.
+## Kafka Sink
-The constructor accepts the following arguments:
+### Usage
-1. A default output topic where events should be written
-2. A SerializationSchema / KafkaSerializationSchema for serializing data into
Kafka
-3. Properties for the Kafka client. The following properties are required:
- * "bootstrap.servers" (comma separated list of Kafka brokers)
-4. A fault-tolerance semantic
+Kafka sink provides a builder class to construct an instance of a KafkaSink.
The code snippet below
+shows how to write String records to a Kafka topic with a delivery guarantee
of at least once.
-{{< tabs "8dbb32b3-47e8-468e-91a7-43cec0c658ac" >}}
-{{< tab "Java" >}}
```java
DataStream<String> stream = ...
+
+KafkaSink<String> sink = KafkaSink.<String>builder()
+ .setBootstrapServers(brokers)
+ .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+ .setTopic("topic-name")
+ .setKafkaProducerConfig(producerProperties)
+ .setValueSerializationSchema(new SimpleStringSchema())
+ .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build()
+ )
+ .build();
+
+stream.sinkTo(sink);
+```
-Properties properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
+The following properties are **required** to build a KafkaSink:
+
+- Bootstrap servers, ```setBootstrapServers(String)```
+- Record serializer, ``setRecordSerializer(KafkaRecordSerializationSchema)``
+- Kafka producer properties, ```setKafkaProducerConfig(Properties)```
+- If you configure the delivery guarantee with
```DeliveryGuarantee.EXACTLY_ONCE``` you also have
+ use ```setTransactionalIdPrefix(String)```
-FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
- "my-topic", // target topic
- new SimpleStringSchema(), // serialization schema
- properties, // producer config
- FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
+### Serializer
-stream.addSink(myProducer);
+You always need construct a ```KafkaRecordSerializationSchema``` to transform
incoming elements from
+the data stream to Kafka producer records.
+We offer a schema builder to provide some common building blocks i.e.
key/value serialization, topic
+selection, partitioning.
+
+```java
+KafkaRecordSerializationSchema.builder()
+ .setTopicSelector((element) -> {<your-topic-selection-logic>})
+ .setValueSerializationSchema(new SimpleStringSchema())
+ .setKeySerializationSchema(new SimpleStringSchema())
+ .setPartitioner(new FlinkFixedPartitioner())
+ .build();
```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val stream: DataStream[String] = ...
-val properties = new Properties
-properties.setProperty("bootstrap.servers", "localhost:9092")
+It is **required** to always set a value serialization method and a topic
retrieval method.
+Moreover, it is also possible to use Kafka serializers instead of Flink
serializer by using
+```setKafkaKeySerializer(Serializer)``` or
```setKafkaValueSerializer(Serializer)```.
+
+### Fault Tolerance
+
+Overall the ```KafkaSink``` supports all three different
```DeliveryGuarantee```s. For
+```DeliveryGuarantee.AT_LEAST_ONCE``` and ```DeliveryGuarantee.EXACTLY_ONCE```
Flink's checkpointing
+must be enabled. By default the ```KafkaSink``` uses
```DeliveryGuarantee.NONE```. Below you can find
+a proper explanation of the different guarantees.
+
+- ```DeliveryGuarantee.NONE```: does not provide any guarantees: messages may
be lost in case of
+ issues on the Kafka broker and messages may be duplicated in case of a Flink
failure.
+- ```DeliveryGuarantee.AT_LEAST_ONCE```: the sink will wait for all
outstanding records in the
+ Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. No
messages will be
+ lost in case of any issue with the Kafka brokers but messages may be
duplicated when Flink
+ restarts.
+- ```DeliveryGuarantee.EXACTLY_ONCE```: In this mode the KafkaSink will write
all messages in a
Review comment:
```suggestion
- ```DeliveryGuarantee.EXACTLY_ONCE```: In this mode, the KafkaSink will
write all messages in a
```
##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -613,134 +613,89 @@ Otherwise, the watermarks of the whole application
cannot advance and all time-b
such as time windows or functions with timers, cannot make progress. A single
idle Kafka partition causes this behavior.
Consider setting appropriate [idleness timeouts]({{< ref
"docs/dev/datastream/event-time/generating_watermarks"
>}}#dealing-with-idle-sources) to mitigate this issue.
-## Kafka Producer
-
-Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of
records to one or more Kafka topics.
+## Kafka Sink
-The constructor accepts the following arguments:
+### Usage
-1. A default output topic where events should be written
-2. A SerializationSchema / KafkaSerializationSchema for serializing data into
Kafka
-3. Properties for the Kafka client. The following properties are required:
- * "bootstrap.servers" (comma separated list of Kafka brokers)
-4. A fault-tolerance semantic
+Kafka sink provides a builder class to construct an instance of a KafkaSink.
The code snippet below
+shows how to write String records to a Kafka topic with a delivery guarantee
of at least once.
-{{< tabs "8dbb32b3-47e8-468e-91a7-43cec0c658ac" >}}
-{{< tab "Java" >}}
```java
DataStream<String> stream = ...
+
+KafkaSink<String> sink = KafkaSink.<String>builder()
+ .setBootstrapServers(brokers)
+ .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+ .setTopic("topic-name")
+ .setKafkaProducerConfig(producerProperties)
+ .setValueSerializationSchema(new SimpleStringSchema())
+ .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build()
+ )
+ .build();
+
+stream.sinkTo(sink);
+```
-Properties properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
+The following properties are **required** to build a KafkaSink:
+
+- Bootstrap servers, ```setBootstrapServers(String)```
+- Record serializer, ``setRecordSerializer(KafkaRecordSerializationSchema)``
+- Kafka producer properties, ```setKafkaProducerConfig(Properties)```
+- If you configure the delivery guarantee with
```DeliveryGuarantee.EXACTLY_ONCE``` you also have
+ use ```setTransactionalIdPrefix(String)```
-FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
- "my-topic", // target topic
- new SimpleStringSchema(), // serialization schema
- properties, // producer config
- FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
+### Serializer
-stream.addSink(myProducer);
+You always need construct a ```KafkaRecordSerializationSchema``` to transform
incoming elements from
+the data stream to Kafka producer records.
+We offer a schema builder to provide some common building blocks i.e.
key/value serialization, topic
+selection, partitioning.
+
+```java
+KafkaRecordSerializationSchema.builder()
+ .setTopicSelector((element) -> {<your-topic-selection-logic>})
+ .setValueSerializationSchema(new SimpleStringSchema())
+ .setKeySerializationSchema(new SimpleStringSchema())
+ .setPartitioner(new FlinkFixedPartitioner())
+ .build();
```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val stream: DataStream[String] = ...
-val properties = new Properties
-properties.setProperty("bootstrap.servers", "localhost:9092")
+It is **required** to always set a value serialization method and a topic
retrieval method.
+Moreover, it is also possible to use Kafka serializers instead of Flink
serializer by using
+```setKafkaKeySerializer(Serializer)``` or
```setKafkaValueSerializer(Serializer)```.
+
+### Fault Tolerance
+
+Overall the ```KafkaSink``` supports all three different
```DeliveryGuarantee```s. For
+```DeliveryGuarantee.AT_LEAST_ONCE``` and ```DeliveryGuarantee.EXACTLY_ONCE```
Flink's checkpointing
+must be enabled. By default the ```KafkaSink``` uses
```DeliveryGuarantee.NONE```. Below you can find
+a proper explanation of the different guarantees.
+
+- ```DeliveryGuarantee.NONE```: does not provide any guarantees: messages may
be lost in case of
+ issues on the Kafka broker and messages may be duplicated in case of a Flink
failure.
+- ```DeliveryGuarantee.AT_LEAST_ONCE```: the sink will wait for all
outstanding records in the
+ Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. No
messages will be
+ lost in case of any issue with the Kafka brokers but messages may be
duplicated when Flink
+ restarts.
Review comment:
```suggestion
restarts because Flink reprocesses old input records.
```
##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -613,134 +613,89 @@ Otherwise, the watermarks of the whole application
cannot advance and all time-b
such as time windows or functions with timers, cannot make progress. A single
idle Kafka partition causes this behavior.
Consider setting appropriate [idleness timeouts]({{< ref
"docs/dev/datastream/event-time/generating_watermarks"
>}}#dealing-with-idle-sources) to mitigate this issue.
-## Kafka Producer
-
-Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of
records to one or more Kafka topics.
+## Kafka Sink
-The constructor accepts the following arguments:
+### Usage
-1. A default output topic where events should be written
-2. A SerializationSchema / KafkaSerializationSchema for serializing data into
Kafka
-3. Properties for the Kafka client. The following properties are required:
- * "bootstrap.servers" (comma separated list of Kafka brokers)
-4. A fault-tolerance semantic
+Kafka sink provides a builder class to construct an instance of a KafkaSink.
The code snippet below
+shows how to write String records to a Kafka topic with a delivery guarantee
of at least once.
-{{< tabs "8dbb32b3-47e8-468e-91a7-43cec0c658ac" >}}
-{{< tab "Java" >}}
```java
DataStream<String> stream = ...
+
+KafkaSink<String> sink = KafkaSink.<String>builder()
+ .setBootstrapServers(brokers)
+ .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+ .setTopic("topic-name")
+ .setKafkaProducerConfig(producerProperties)
+ .setValueSerializationSchema(new SimpleStringSchema())
+ .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build()
+ )
+ .build();
+
+stream.sinkTo(sink);
+```
-Properties properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
+The following properties are **required** to build a KafkaSink:
+
+- Bootstrap servers, ```setBootstrapServers(String)```
+- Record serializer, ``setRecordSerializer(KafkaRecordSerializationSchema)``
+- Kafka producer properties, ```setKafkaProducerConfig(Properties)```
+- If you configure the delivery guarantee with
```DeliveryGuarantee.EXACTLY_ONCE``` you also have
+ use ```setTransactionalIdPrefix(String)```
-FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
- "my-topic", // target topic
- new SimpleStringSchema(), // serialization schema
- properties, // producer config
- FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
+### Serializer
-stream.addSink(myProducer);
+You always need construct a ```KafkaRecordSerializationSchema``` to transform
incoming elements from
+the data stream to Kafka producer records.
+We offer a schema builder to provide some common building blocks i.e.
key/value serialization, topic
+selection, partitioning.
+
+```java
+KafkaRecordSerializationSchema.builder()
+ .setTopicSelector((element) -> {<your-topic-selection-logic>})
+ .setValueSerializationSchema(new SimpleStringSchema())
+ .setKeySerializationSchema(new SimpleStringSchema())
+ .setPartitioner(new FlinkFixedPartitioner())
+ .build();
```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val stream: DataStream[String] = ...
-val properties = new Properties
-properties.setProperty("bootstrap.servers", "localhost:9092")
+It is **required** to always set a value serialization method and a topic
retrieval method.
+Moreover, it is also possible to use Kafka serializers instead of Flink
serializer by using
+```setKafkaKeySerializer(Serializer)``` or
```setKafkaValueSerializer(Serializer)```.
+
+### Fault Tolerance
+
+Overall the ```KafkaSink``` supports all three different
```DeliveryGuarantee```s. For
+```DeliveryGuarantee.AT_LEAST_ONCE``` and ```DeliveryGuarantee.EXACTLY_ONCE```
Flink's checkpointing
+must be enabled. By default the ```KafkaSink``` uses
```DeliveryGuarantee.NONE```. Below you can find
+a proper explanation of the different guarantees.
+
+- ```DeliveryGuarantee.NONE```: does not provide any guarantees: messages may
be lost in case of
+ issues on the Kafka broker and messages may be duplicated in case of a Flink
failure.
+- ```DeliveryGuarantee.AT_LEAST_ONCE```: the sink will wait for all
outstanding records in the
Review comment:
```suggestion
- ```DeliveryGuarantee.AT_LEAST_ONCE```: The sink will wait for all
outstanding records in the
```
##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -613,134 +613,89 @@ Otherwise, the watermarks of the whole application
cannot advance and all time-b
such as time windows or functions with timers, cannot make progress. A single
idle Kafka partition causes this behavior.
Consider setting appropriate [idleness timeouts]({{< ref
"docs/dev/datastream/event-time/generating_watermarks"
>}}#dealing-with-idle-sources) to mitigate this issue.
-## Kafka Producer
-
-Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of
records to one or more Kafka topics.
+## Kafka Sink
-The constructor accepts the following arguments:
+### Usage
-1. A default output topic where events should be written
-2. A SerializationSchema / KafkaSerializationSchema for serializing data into
Kafka
-3. Properties for the Kafka client. The following properties are required:
- * "bootstrap.servers" (comma separated list of Kafka brokers)
-4. A fault-tolerance semantic
+Kafka sink provides a builder class to construct an instance of a KafkaSink.
The code snippet below
+shows how to write String records to a Kafka topic with a delivery guarantee
of at least once.
-{{< tabs "8dbb32b3-47e8-468e-91a7-43cec0c658ac" >}}
-{{< tab "Java" >}}
```java
DataStream<String> stream = ...
+
+KafkaSink<String> sink = KafkaSink.<String>builder()
+ .setBootstrapServers(brokers)
+ .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+ .setTopic("topic-name")
+ .setKafkaProducerConfig(producerProperties)
Review comment:
Why is this required?
##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -613,134 +613,89 @@ Otherwise, the watermarks of the whole application
cannot advance and all time-b
such as time windows or functions with timers, cannot make progress. A single
idle Kafka partition causes this behavior.
Consider setting appropriate [idleness timeouts]({{< ref
"docs/dev/datastream/event-time/generating_watermarks"
>}}#dealing-with-idle-sources) to mitigate this issue.
-## Kafka Producer
-
-Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of
records to one or more Kafka topics.
+## Kafka Sink
-The constructor accepts the following arguments:
+### Usage
-1. A default output topic where events should be written
-2. A SerializationSchema / KafkaSerializationSchema for serializing data into
Kafka
-3. Properties for the Kafka client. The following properties are required:
- * "bootstrap.servers" (comma separated list of Kafka brokers)
-4. A fault-tolerance semantic
+Kafka sink provides a builder class to construct an instance of a KafkaSink.
The code snippet below
+shows how to write String records to a Kafka topic with a delivery guarantee
of at least once.
-{{< tabs "8dbb32b3-47e8-468e-91a7-43cec0c658ac" >}}
-{{< tab "Java" >}}
```java
DataStream<String> stream = ...
+
+KafkaSink<String> sink = KafkaSink.<String>builder()
+ .setBootstrapServers(brokers)
+ .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+ .setTopic("topic-name")
+ .setKafkaProducerConfig(producerProperties)
+ .setValueSerializationSchema(new SimpleStringSchema())
+ .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build()
+ )
+ .build();
+
+stream.sinkTo(sink);
+```
-Properties properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
+The following properties are **required** to build a KafkaSink:
+
+- Bootstrap servers, ```setBootstrapServers(String)```
+- Record serializer, ``setRecordSerializer(KafkaRecordSerializationSchema)``
+- Kafka producer properties, ```setKafkaProducerConfig(Properties)```
+- If you configure the delivery guarantee with
```DeliveryGuarantee.EXACTLY_ONCE``` you also have
+ use ```setTransactionalIdPrefix(String)```
-FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
- "my-topic", // target topic
- new SimpleStringSchema(), // serialization schema
- properties, // producer config
- FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
+### Serializer
-stream.addSink(myProducer);
+You always need construct a ```KafkaRecordSerializationSchema``` to transform
incoming elements from
+the data stream to Kafka producer records.
+We offer a schema builder to provide some common building blocks i.e.
key/value serialization, topic
+selection, partitioning.
+
+```java
+KafkaRecordSerializationSchema.builder()
+ .setTopicSelector((element) -> {<your-topic-selection-logic>})
+ .setValueSerializationSchema(new SimpleStringSchema())
+ .setKeySerializationSchema(new SimpleStringSchema())
+ .setPartitioner(new FlinkFixedPartitioner())
+ .build();
```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val stream: DataStream[String] = ...
-val properties = new Properties
-properties.setProperty("bootstrap.servers", "localhost:9092")
+It is **required** to always set a value serialization method and a topic
retrieval method.
+Moreover, it is also possible to use Kafka serializers instead of Flink
serializer by using
+```setKafkaKeySerializer(Serializer)``` or
```setKafkaValueSerializer(Serializer)```.
+
+### Fault Tolerance
+
+Overall the ```KafkaSink``` supports all three different
```DeliveryGuarantee```s. For
+```DeliveryGuarantee.AT_LEAST_ONCE``` and ```DeliveryGuarantee.EXACTLY_ONCE```
Flink's checkpointing
+must be enabled. By default the ```KafkaSink``` uses
```DeliveryGuarantee.NONE```. Below you can find
+a proper explanation of the different guarantees.
+
+- ```DeliveryGuarantee.NONE```: does not provide any guarantees: messages may
be lost in case of
Review comment:
```suggestion
- ```DeliveryGuarantee.NONE``` does not provide any guarantees: messages may
be lost in case of
```
##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -613,134 +613,89 @@ Otherwise, the watermarks of the whole application
cannot advance and all time-b
such as time windows or functions with timers, cannot make progress. A single
idle Kafka partition causes this behavior.
Consider setting appropriate [idleness timeouts]({{< ref
"docs/dev/datastream/event-time/generating_watermarks"
>}}#dealing-with-idle-sources) to mitigate this issue.
-## Kafka Producer
-
-Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of
records to one or more Kafka topics.
+## Kafka Sink
-The constructor accepts the following arguments:
+### Usage
-1. A default output topic where events should be written
-2. A SerializationSchema / KafkaSerializationSchema for serializing data into
Kafka
-3. Properties for the Kafka client. The following properties are required:
- * "bootstrap.servers" (comma separated list of Kafka brokers)
-4. A fault-tolerance semantic
+Kafka sink provides a builder class to construct an instance of a KafkaSink.
The code snippet below
+shows how to write String records to a Kafka topic with a delivery guarantee
of at least once.
-{{< tabs "8dbb32b3-47e8-468e-91a7-43cec0c658ac" >}}
-{{< tab "Java" >}}
```java
DataStream<String> stream = ...
+
+KafkaSink<String> sink = KafkaSink.<String>builder()
+ .setBootstrapServers(brokers)
+ .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+ .setTopic("topic-name")
+ .setKafkaProducerConfig(producerProperties)
+ .setValueSerializationSchema(new SimpleStringSchema())
+ .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build()
+ )
+ .build();
+
+stream.sinkTo(sink);
+```
-Properties properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
+The following properties are **required** to build a KafkaSink:
+
+- Bootstrap servers, ```setBootstrapServers(String)```
+- Record serializer, ``setRecordSerializer(KafkaRecordSerializationSchema)``
+- Kafka producer properties, ```setKafkaProducerConfig(Properties)```
+- If you configure the delivery guarantee with
```DeliveryGuarantee.EXACTLY_ONCE``` you also have
+ use ```setTransactionalIdPrefix(String)```
-FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
- "my-topic", // target topic
- new SimpleStringSchema(), // serialization schema
- properties, // producer config
- FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
+### Serializer
-stream.addSink(myProducer);
+You always need construct a ```KafkaRecordSerializationSchema``` to transform
incoming elements from
+the data stream to Kafka producer records.
+We offer a schema builder to provide some common building blocks i.e.
key/value serialization, topic
+selection, partitioning.
+
+```java
+KafkaRecordSerializationSchema.builder()
+ .setTopicSelector((element) -> {<your-topic-selection-logic>})
+ .setValueSerializationSchema(new SimpleStringSchema())
+ .setKeySerializationSchema(new SimpleStringSchema())
+ .setPartitioner(new FlinkFixedPartitioner())
+ .build();
```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val stream: DataStream[String] = ...
-val properties = new Properties
-properties.setProperty("bootstrap.servers", "localhost:9092")
+It is **required** to always set a value serialization method and a topic
retrieval method.
+Moreover, it is also possible to use Kafka serializers instead of Flink
serializer by using
+```setKafkaKeySerializer(Serializer)``` or
```setKafkaValueSerializer(Serializer)```.
+
+### Fault Tolerance
+
+Overall the ```KafkaSink``` supports all three different
```DeliveryGuarantee```s. For
+```DeliveryGuarantee.AT_LEAST_ONCE``` and ```DeliveryGuarantee.EXACTLY_ONCE```
Flink's checkpointing
+must be enabled. By default the ```KafkaSink``` uses
```DeliveryGuarantee.NONE```. Below you can find
+a proper explanation of the different guarantees.
Review comment:
I removed all if we ever add more.
```suggestion
Overall the ```KafkaSink``` supports three different
```DeliveryGuarantee```s. For
```DeliveryGuarantee.AT_LEAST_ONCE``` and
```DeliveryGuarantee.EXACTLY_ONCE``` Flink's checkpointing
must be enabled. By default the ```KafkaSink``` uses
```DeliveryGuarantee.NONE```. Below you can find
an explanation of the different guarantees.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]