AHeise commented on a change in pull request #17058:
URL: https://github.com/apache/flink/pull/17058#discussion_r698621444



##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -613,134 +613,89 @@ Otherwise, the watermarks of the whole application 
cannot advance and all time-b
 such as time windows or functions with timers, cannot make progress. A single 
idle Kafka partition causes this behavior.
 Consider setting appropriate [idleness timeouts]({{< ref 
"docs/dev/datastream/event-time/generating_watermarks" 
>}}#dealing-with-idle-sources) to mitigate this issue.
  
-## Kafka Producer
-
-Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of 
records to one or more Kafka topics.
+## Kafka Sink

Review comment:
       Quite a hard start. How about retaining the old intro?
   
   ```suggestion
   ## Kafka Sink
   
   `KafkaSink` allows writing a stream of records to one or more Kafka topics.
   ```

##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -613,134 +613,89 @@ Otherwise, the watermarks of the whole application 
cannot advance and all time-b
 such as time windows or functions with timers, cannot make progress. A single 
idle Kafka partition causes this behavior.
 Consider setting appropriate [idleness timeouts]({{< ref 
"docs/dev/datastream/event-time/generating_watermarks" 
>}}#dealing-with-idle-sources) to mitigate this issue.
  
-## Kafka Producer
-
-Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of 
records to one or more Kafka topics.
+## Kafka Sink
 
-The constructor accepts the following arguments:
+### Usage
 
-1. A default output topic where events should be written
-2. A SerializationSchema / KafkaSerializationSchema for serializing data into 
Kafka
-3. Properties for the Kafka client. The following properties are required:
-    * "bootstrap.servers" (comma separated list of Kafka brokers)
-4. A fault-tolerance semantic
+Kafka sink provides a builder class to construct an instance of a KafkaSink. 
The code snippet below
+shows how to write String records to a Kafka topic with a delivery guarantee 
of at least once.
 
-{{< tabs "8dbb32b3-47e8-468e-91a7-43cec0c658ac" >}}
-{{< tab "Java" >}}
 ```java
 DataStream<String> stream = ...
+        
+KafkaSink<String> sink = KafkaSink.<String>builder()
+        .setBootstrapServers(brokers)
+        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+            .setTopic("topic-name")
+            .setKafkaProducerConfig(producerProperties)
+            .setValueSerializationSchema(new SimpleStringSchema())
+            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+            .build()
+        )
+        .build();
+        
+stream.sinkTo(sink);
+```
 
-Properties properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
+The following properties are **required** to build a KafkaSink:
+
+- Bootstrap servers, ```setBootstrapServers(String)```
+- Record serializer, ``setRecordSerializer(KafkaRecordSerializationSchema)``
+- Kafka producer properties, ```setKafkaProducerConfig(Properties)```
+- If you configure the delivery guarantee with 
```DeliveryGuarantee.EXACTLY_ONCE``` you also have
+  use ```setTransactionalIdPrefix(String)```
 
-FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
-        "my-topic",                  // target topic
-        new SimpleStringSchema(),    // serialization schema
-        properties,                  // producer config
-        FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
+### Serializer
 
-stream.addSink(myProducer);
+You always need construct a ```KafkaRecordSerializationSchema``` to transform 
incoming elements from
+the data stream to Kafka producer records.
+We offer a schema builder to provide some common building blocks i.e. 
key/value serialization, topic
+selection, partitioning.

Review comment:
       ```suggestion
   You always need supply a ```KafkaRecordSerializationSchema``` to transform 
incoming elements from
   the data stream to Kafka producer records.
   Flink offers a schema builder to provide some common building blocks i.e. 
key/value serialization, topic
   selection, partitioning. You can also implement the interface on your own to 
exert more control.
   ```

##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -613,134 +613,89 @@ Otherwise, the watermarks of the whole application 
cannot advance and all time-b
 such as time windows or functions with timers, cannot make progress. A single 
idle Kafka partition causes this behavior.
 Consider setting appropriate [idleness timeouts]({{< ref 
"docs/dev/datastream/event-time/generating_watermarks" 
>}}#dealing-with-idle-sources) to mitigate this issue.
  
-## Kafka Producer
-
-Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of 
records to one or more Kafka topics.
+## Kafka Sink
 
-The constructor accepts the following arguments:
+### Usage
 
-1. A default output topic where events should be written
-2. A SerializationSchema / KafkaSerializationSchema for serializing data into 
Kafka
-3. Properties for the Kafka client. The following properties are required:
-    * "bootstrap.servers" (comma separated list of Kafka brokers)
-4. A fault-tolerance semantic
+Kafka sink provides a builder class to construct an instance of a KafkaSink. 
The code snippet below
+shows how to write String records to a Kafka topic with a delivery guarantee 
of at least once.
 
-{{< tabs "8dbb32b3-47e8-468e-91a7-43cec0c658ac" >}}
-{{< tab "Java" >}}
 ```java
 DataStream<String> stream = ...
+        
+KafkaSink<String> sink = KafkaSink.<String>builder()
+        .setBootstrapServers(brokers)
+        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+            .setTopic("topic-name")
+            .setKafkaProducerConfig(producerProperties)
+            .setValueSerializationSchema(new SimpleStringSchema())
+            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+            .build()
+        )
+        .build();
+        
+stream.sinkTo(sink);
+```
 
-Properties properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
+The following properties are **required** to build a KafkaSink:
+
+- Bootstrap servers, ```setBootstrapServers(String)```
+- Record serializer, ``setRecordSerializer(KafkaRecordSerializationSchema)``
+- Kafka producer properties, ```setKafkaProducerConfig(Properties)```
+- If you configure the delivery guarantee with 
```DeliveryGuarantee.EXACTLY_ONCE``` you also have
+  use ```setTransactionalIdPrefix(String)```
 
-FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
-        "my-topic",                  // target topic
-        new SimpleStringSchema(),    // serialization schema
-        properties,                  // producer config
-        FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
+### Serializer
 
-stream.addSink(myProducer);
+You always need construct a ```KafkaRecordSerializationSchema``` to transform 
incoming elements from
+the data stream to Kafka producer records.
+We offer a schema builder to provide some common building blocks i.e. 
key/value serialization, topic
+selection, partitioning.
+
+```java
+KafkaRecordSerializationSchema.builder()
+    .setTopicSelector((element) -> {<your-topic-selection-logic>})
+    .setValueSerializationSchema(new SimpleStringSchema())
+    .setKeySerializationSchema(new SimpleStringSchema())
+    .setPartitioner(new FlinkFixedPartitioner())
+    .build();
 ```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val stream: DataStream[String] = ...
 
-val properties = new Properties
-properties.setProperty("bootstrap.servers", "localhost:9092")
+It is **required** to always set a value serialization method and a topic 
retrieval method.
+Moreover, it is also possible to use Kafka serializers instead of Flink 
serializer by using 
+```setKafkaKeySerializer(Serializer)``` or 
```setKafkaValueSerializer(Serializer)```.

Review comment:
       ```suggestion
   It is **required** to always set a value serialization method and a topic 
(selection method).
   Moreover, it is also possible to use Kafka serializers instead of Flink 
serializer by using 
   ```setKafkaKeySerializer(Serializer)``` or 
```setKafkaValueSerializer(Serializer)```.
   ```

##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -613,134 +613,89 @@ Otherwise, the watermarks of the whole application 
cannot advance and all time-b
 such as time windows or functions with timers, cannot make progress. A single 
idle Kafka partition causes this behavior.
 Consider setting appropriate [idleness timeouts]({{< ref 
"docs/dev/datastream/event-time/generating_watermarks" 
>}}#dealing-with-idle-sources) to mitigate this issue.
  
-## Kafka Producer
-
-Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of 
records to one or more Kafka topics.
+## Kafka Sink
 
-The constructor accepts the following arguments:
+### Usage
 
-1. A default output topic where events should be written
-2. A SerializationSchema / KafkaSerializationSchema for serializing data into 
Kafka
-3. Properties for the Kafka client. The following properties are required:
-    * "bootstrap.servers" (comma separated list of Kafka brokers)
-4. A fault-tolerance semantic
+Kafka sink provides a builder class to construct an instance of a KafkaSink. 
The code snippet below
+shows how to write String records to a Kafka topic with a delivery guarantee 
of at least once.
 
-{{< tabs "8dbb32b3-47e8-468e-91a7-43cec0c658ac" >}}
-{{< tab "Java" >}}
 ```java
 DataStream<String> stream = ...
+        
+KafkaSink<String> sink = KafkaSink.<String>builder()
+        .setBootstrapServers(brokers)
+        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+            .setTopic("topic-name")
+            .setKafkaProducerConfig(producerProperties)
+            .setValueSerializationSchema(new SimpleStringSchema())
+            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+            .build()
+        )
+        .build();
+        
+stream.sinkTo(sink);
+```
 
-Properties properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
+The following properties are **required** to build a KafkaSink:
+
+- Bootstrap servers, ```setBootstrapServers(String)```
+- Record serializer, ``setRecordSerializer(KafkaRecordSerializationSchema)``
+- Kafka producer properties, ```setKafkaProducerConfig(Properties)```
+- If you configure the delivery guarantee with 
```DeliveryGuarantee.EXACTLY_ONCE``` you also have
+  use ```setTransactionalIdPrefix(String)```
 
-FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
-        "my-topic",                  // target topic
-        new SimpleStringSchema(),    // serialization schema
-        properties,                  // producer config
-        FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
+### Serializer
 
-stream.addSink(myProducer);
+You always need construct a ```KafkaRecordSerializationSchema``` to transform 
incoming elements from
+the data stream to Kafka producer records.
+We offer a schema builder to provide some common building blocks i.e. 
key/value serialization, topic
+selection, partitioning.
+
+```java
+KafkaRecordSerializationSchema.builder()
+    .setTopicSelector((element) -> {<your-topic-selection-logic>})
+    .setValueSerializationSchema(new SimpleStringSchema())
+    .setKeySerializationSchema(new SimpleStringSchema())
+    .setPartitioner(new FlinkFixedPartitioner())
+    .build();
 ```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val stream: DataStream[String] = ...
 
-val properties = new Properties
-properties.setProperty("bootstrap.servers", "localhost:9092")
+It is **required** to always set a value serialization method and a topic 
retrieval method.
+Moreover, it is also possible to use Kafka serializers instead of Flink 
serializer by using 
+```setKafkaKeySerializer(Serializer)``` or 
```setKafkaValueSerializer(Serializer)```.
+
+### Fault Tolerance
+
+Overall the ```KafkaSink``` supports all three different 
```DeliveryGuarantee```s. For 
+```DeliveryGuarantee.AT_LEAST_ONCE``` and ```DeliveryGuarantee.EXACTLY_ONCE``` 
Flink's checkpointing
+must be enabled. By default the ```KafkaSink``` uses 
```DeliveryGuarantee.NONE```. Below you can find
+a proper explanation of the different guarantees.
+
+- ```DeliveryGuarantee.NONE```: does not provide any guarantees: messages may 
be lost in case of 
+  issues on the Kafka broker and messages may be duplicated in case of a Flink 
failure.
+- ```DeliveryGuarantee.AT_LEAST_ONCE```: the sink will wait for all 
outstanding records in the 
+  Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. No 
messages will be 
+  lost in case of any issue with the Kafka brokers but messages may be 
duplicated when Flink 
+  restarts.
+- ```DeliveryGuarantee.EXACTLY_ONCE```: In this mode the KafkaSink will write 
all messages in a 
+  Kafka transaction that will be committed to Kafka on a checkpoint. Thus, if 
the consumer 
+  reads only committed data (see Kafka consumer config isolation.level), no 
duplicates will be 
+  seen in case of a Flink restart. However, this delays record writing 
effectively until a 

Review comment:
       ```suggestion
     seen in case of a Flink restart. However, this delays record visibility 
effectively until a 
   ```

##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -613,134 +613,89 @@ Otherwise, the watermarks of the whole application 
cannot advance and all time-b
 such as time windows or functions with timers, cannot make progress. A single 
idle Kafka partition causes this behavior.
 Consider setting appropriate [idleness timeouts]({{< ref 
"docs/dev/datastream/event-time/generating_watermarks" 
>}}#dealing-with-idle-sources) to mitigate this issue.
  
-## Kafka Producer
-
-Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of 
records to one or more Kafka topics.
+## Kafka Sink
 
-The constructor accepts the following arguments:
+### Usage
 
-1. A default output topic where events should be written
-2. A SerializationSchema / KafkaSerializationSchema for serializing data into 
Kafka
-3. Properties for the Kafka client. The following properties are required:
-    * "bootstrap.servers" (comma separated list of Kafka brokers)
-4. A fault-tolerance semantic
+Kafka sink provides a builder class to construct an instance of a KafkaSink. 
The code snippet below
+shows how to write String records to a Kafka topic with a delivery guarantee 
of at least once.
 
-{{< tabs "8dbb32b3-47e8-468e-91a7-43cec0c658ac" >}}
-{{< tab "Java" >}}
 ```java
 DataStream<String> stream = ...
+        
+KafkaSink<String> sink = KafkaSink.<String>builder()
+        .setBootstrapServers(brokers)
+        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+            .setTopic("topic-name")
+            .setKafkaProducerConfig(producerProperties)
+            .setValueSerializationSchema(new SimpleStringSchema())
+            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+            .build()
+        )
+        .build();
+        
+stream.sinkTo(sink);
+```
 
-Properties properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
+The following properties are **required** to build a KafkaSink:
+
+- Bootstrap servers, ```setBootstrapServers(String)```
+- Record serializer, ``setRecordSerializer(KafkaRecordSerializationSchema)``
+- Kafka producer properties, ```setKafkaProducerConfig(Properties)```
+- If you configure the delivery guarantee with 
```DeliveryGuarantee.EXACTLY_ONCE``` you also have
+  use ```setTransactionalIdPrefix(String)```
 
-FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
-        "my-topic",                  // target topic
-        new SimpleStringSchema(),    // serialization schema
-        properties,                  // producer config
-        FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
+### Serializer
 
-stream.addSink(myProducer);
+You always need construct a ```KafkaRecordSerializationSchema``` to transform 
incoming elements from
+the data stream to Kafka producer records.
+We offer a schema builder to provide some common building blocks i.e. 
key/value serialization, topic
+selection, partitioning.
+
+```java
+KafkaRecordSerializationSchema.builder()
+    .setTopicSelector((element) -> {<your-topic-selection-logic>})
+    .setValueSerializationSchema(new SimpleStringSchema())
+    .setKeySerializationSchema(new SimpleStringSchema())
+    .setPartitioner(new FlinkFixedPartitioner())
+    .build();
 ```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val stream: DataStream[String] = ...
 
-val properties = new Properties
-properties.setProperty("bootstrap.servers", "localhost:9092")
+It is **required** to always set a value serialization method and a topic 
retrieval method.
+Moreover, it is also possible to use Kafka serializers instead of Flink 
serializer by using 
+```setKafkaKeySerializer(Serializer)``` or 
```setKafkaValueSerializer(Serializer)```.
+
+### Fault Tolerance
+
+Overall the ```KafkaSink``` supports all three different 
```DeliveryGuarantee```s. For 
+```DeliveryGuarantee.AT_LEAST_ONCE``` and ```DeliveryGuarantee.EXACTLY_ONCE``` 
Flink's checkpointing
+must be enabled. By default the ```KafkaSink``` uses 
```DeliveryGuarantee.NONE```. Below you can find
+a proper explanation of the different guarantees.
+
+- ```DeliveryGuarantee.NONE```: does not provide any guarantees: messages may 
be lost in case of 
+  issues on the Kafka broker and messages may be duplicated in case of a Flink 
failure.
+- ```DeliveryGuarantee.AT_LEAST_ONCE```: the sink will wait for all 
outstanding records in the 
+  Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. No 
messages will be 
+  lost in case of any issue with the Kafka brokers but messages may be 
duplicated when Flink 
+  restarts.
+- ```DeliveryGuarantee.EXACTLY_ONCE```: In this mode the KafkaSink will write 
all messages in a 

Review comment:
       ```suggestion
   - ```DeliveryGuarantee.EXACTLY_ONCE```: In this mode, the KafkaSink will 
write all messages in a 
   ```

##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -613,134 +613,89 @@ Otherwise, the watermarks of the whole application 
cannot advance and all time-b
 such as time windows or functions with timers, cannot make progress. A single 
idle Kafka partition causes this behavior.
 Consider setting appropriate [idleness timeouts]({{< ref 
"docs/dev/datastream/event-time/generating_watermarks" 
>}}#dealing-with-idle-sources) to mitigate this issue.
  
-## Kafka Producer
-
-Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of 
records to one or more Kafka topics.
+## Kafka Sink
 
-The constructor accepts the following arguments:
+### Usage
 
-1. A default output topic where events should be written
-2. A SerializationSchema / KafkaSerializationSchema for serializing data into 
Kafka
-3. Properties for the Kafka client. The following properties are required:
-    * "bootstrap.servers" (comma separated list of Kafka brokers)
-4. A fault-tolerance semantic
+Kafka sink provides a builder class to construct an instance of a KafkaSink. 
The code snippet below
+shows how to write String records to a Kafka topic with a delivery guarantee 
of at least once.
 
-{{< tabs "8dbb32b3-47e8-468e-91a7-43cec0c658ac" >}}
-{{< tab "Java" >}}
 ```java
 DataStream<String> stream = ...
+        
+KafkaSink<String> sink = KafkaSink.<String>builder()
+        .setBootstrapServers(brokers)
+        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+            .setTopic("topic-name")
+            .setKafkaProducerConfig(producerProperties)
+            .setValueSerializationSchema(new SimpleStringSchema())
+            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+            .build()
+        )
+        .build();
+        
+stream.sinkTo(sink);
+```
 
-Properties properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
+The following properties are **required** to build a KafkaSink:
+
+- Bootstrap servers, ```setBootstrapServers(String)```
+- Record serializer, ``setRecordSerializer(KafkaRecordSerializationSchema)``
+- Kafka producer properties, ```setKafkaProducerConfig(Properties)```
+- If you configure the delivery guarantee with 
```DeliveryGuarantee.EXACTLY_ONCE``` you also have
+  use ```setTransactionalIdPrefix(String)```
 
-FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
-        "my-topic",                  // target topic
-        new SimpleStringSchema(),    // serialization schema
-        properties,                  // producer config
-        FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
+### Serializer
 
-stream.addSink(myProducer);
+You always need construct a ```KafkaRecordSerializationSchema``` to transform 
incoming elements from
+the data stream to Kafka producer records.
+We offer a schema builder to provide some common building blocks i.e. 
key/value serialization, topic
+selection, partitioning.
+
+```java
+KafkaRecordSerializationSchema.builder()
+    .setTopicSelector((element) -> {<your-topic-selection-logic>})
+    .setValueSerializationSchema(new SimpleStringSchema())
+    .setKeySerializationSchema(new SimpleStringSchema())
+    .setPartitioner(new FlinkFixedPartitioner())
+    .build();
 ```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val stream: DataStream[String] = ...
 
-val properties = new Properties
-properties.setProperty("bootstrap.servers", "localhost:9092")
+It is **required** to always set a value serialization method and a topic 
retrieval method.
+Moreover, it is also possible to use Kafka serializers instead of Flink 
serializer by using 
+```setKafkaKeySerializer(Serializer)``` or 
```setKafkaValueSerializer(Serializer)```.
+
+### Fault Tolerance
+
+Overall the ```KafkaSink``` supports all three different 
```DeliveryGuarantee```s. For 
+```DeliveryGuarantee.AT_LEAST_ONCE``` and ```DeliveryGuarantee.EXACTLY_ONCE``` 
Flink's checkpointing
+must be enabled. By default the ```KafkaSink``` uses 
```DeliveryGuarantee.NONE```. Below you can find
+a proper explanation of the different guarantees.
+
+- ```DeliveryGuarantee.NONE```: does not provide any guarantees: messages may 
be lost in case of 
+  issues on the Kafka broker and messages may be duplicated in case of a Flink 
failure.
+- ```DeliveryGuarantee.AT_LEAST_ONCE```: the sink will wait for all 
outstanding records in the 
+  Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. No 
messages will be 
+  lost in case of any issue with the Kafka brokers but messages may be 
duplicated when Flink 
+  restarts.

Review comment:
       ```suggestion
     restarts because Flink reprocesses old input records.
   ```

##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -613,134 +613,89 @@ Otherwise, the watermarks of the whole application 
cannot advance and all time-b
 such as time windows or functions with timers, cannot make progress. A single 
idle Kafka partition causes this behavior.
 Consider setting appropriate [idleness timeouts]({{< ref 
"docs/dev/datastream/event-time/generating_watermarks" 
>}}#dealing-with-idle-sources) to mitigate this issue.
  
-## Kafka Producer
-
-Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of 
records to one or more Kafka topics.
+## Kafka Sink
 
-The constructor accepts the following arguments:
+### Usage
 
-1. A default output topic where events should be written
-2. A SerializationSchema / KafkaSerializationSchema for serializing data into 
Kafka
-3. Properties for the Kafka client. The following properties are required:
-    * "bootstrap.servers" (comma separated list of Kafka brokers)
-4. A fault-tolerance semantic
+Kafka sink provides a builder class to construct an instance of a KafkaSink. 
The code snippet below
+shows how to write String records to a Kafka topic with a delivery guarantee 
of at least once.
 
-{{< tabs "8dbb32b3-47e8-468e-91a7-43cec0c658ac" >}}
-{{< tab "Java" >}}
 ```java
 DataStream<String> stream = ...
+        
+KafkaSink<String> sink = KafkaSink.<String>builder()
+        .setBootstrapServers(brokers)
+        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+            .setTopic("topic-name")
+            .setKafkaProducerConfig(producerProperties)
+            .setValueSerializationSchema(new SimpleStringSchema())
+            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+            .build()
+        )
+        .build();
+        
+stream.sinkTo(sink);
+```
 
-Properties properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
+The following properties are **required** to build a KafkaSink:
+
+- Bootstrap servers, ```setBootstrapServers(String)```
+- Record serializer, ``setRecordSerializer(KafkaRecordSerializationSchema)``
+- Kafka producer properties, ```setKafkaProducerConfig(Properties)```
+- If you configure the delivery guarantee with 
```DeliveryGuarantee.EXACTLY_ONCE``` you also have
+  use ```setTransactionalIdPrefix(String)```
 
-FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
-        "my-topic",                  // target topic
-        new SimpleStringSchema(),    // serialization schema
-        properties,                  // producer config
-        FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
+### Serializer
 
-stream.addSink(myProducer);
+You always need construct a ```KafkaRecordSerializationSchema``` to transform 
incoming elements from
+the data stream to Kafka producer records.
+We offer a schema builder to provide some common building blocks i.e. 
key/value serialization, topic
+selection, partitioning.
+
+```java
+KafkaRecordSerializationSchema.builder()
+    .setTopicSelector((element) -> {<your-topic-selection-logic>})
+    .setValueSerializationSchema(new SimpleStringSchema())
+    .setKeySerializationSchema(new SimpleStringSchema())
+    .setPartitioner(new FlinkFixedPartitioner())
+    .build();
 ```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val stream: DataStream[String] = ...
 
-val properties = new Properties
-properties.setProperty("bootstrap.servers", "localhost:9092")
+It is **required** to always set a value serialization method and a topic 
retrieval method.
+Moreover, it is also possible to use Kafka serializers instead of Flink 
serializer by using 
+```setKafkaKeySerializer(Serializer)``` or 
```setKafkaValueSerializer(Serializer)```.
+
+### Fault Tolerance
+
+Overall the ```KafkaSink``` supports all three different 
```DeliveryGuarantee```s. For 
+```DeliveryGuarantee.AT_LEAST_ONCE``` and ```DeliveryGuarantee.EXACTLY_ONCE``` 
Flink's checkpointing
+must be enabled. By default the ```KafkaSink``` uses 
```DeliveryGuarantee.NONE```. Below you can find
+a proper explanation of the different guarantees.
+
+- ```DeliveryGuarantee.NONE```: does not provide any guarantees: messages may 
be lost in case of 
+  issues on the Kafka broker and messages may be duplicated in case of a Flink 
failure.
+- ```DeliveryGuarantee.AT_LEAST_ONCE```: the sink will wait for all 
outstanding records in the 

Review comment:
       ```suggestion
   - ```DeliveryGuarantee.AT_LEAST_ONCE```: The sink will wait for all 
outstanding records in the 
   ```

##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -613,134 +613,89 @@ Otherwise, the watermarks of the whole application 
cannot advance and all time-b
 such as time windows or functions with timers, cannot make progress. A single 
idle Kafka partition causes this behavior.
 Consider setting appropriate [idleness timeouts]({{< ref 
"docs/dev/datastream/event-time/generating_watermarks" 
>}}#dealing-with-idle-sources) to mitigate this issue.
  
-## Kafka Producer
-
-Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of 
records to one or more Kafka topics.
+## Kafka Sink
 
-The constructor accepts the following arguments:
+### Usage
 
-1. A default output topic where events should be written
-2. A SerializationSchema / KafkaSerializationSchema for serializing data into 
Kafka
-3. Properties for the Kafka client. The following properties are required:
-    * "bootstrap.servers" (comma separated list of Kafka brokers)
-4. A fault-tolerance semantic
+Kafka sink provides a builder class to construct an instance of a KafkaSink. 
The code snippet below
+shows how to write String records to a Kafka topic with a delivery guarantee 
of at least once.
 
-{{< tabs "8dbb32b3-47e8-468e-91a7-43cec0c658ac" >}}
-{{< tab "Java" >}}
 ```java
 DataStream<String> stream = ...
+        
+KafkaSink<String> sink = KafkaSink.<String>builder()
+        .setBootstrapServers(brokers)
+        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+            .setTopic("topic-name")
+            .setKafkaProducerConfig(producerProperties)

Review comment:
       Why is this required?

##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -613,134 +613,89 @@ Otherwise, the watermarks of the whole application 
cannot advance and all time-b
 such as time windows or functions with timers, cannot make progress. A single 
idle Kafka partition causes this behavior.
 Consider setting appropriate [idleness timeouts]({{< ref 
"docs/dev/datastream/event-time/generating_watermarks" 
>}}#dealing-with-idle-sources) to mitigate this issue.
  
-## Kafka Producer
-
-Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of 
records to one or more Kafka topics.
+## Kafka Sink
 
-The constructor accepts the following arguments:
+### Usage
 
-1. A default output topic where events should be written
-2. A SerializationSchema / KafkaSerializationSchema for serializing data into 
Kafka
-3. Properties for the Kafka client. The following properties are required:
-    * "bootstrap.servers" (comma separated list of Kafka brokers)
-4. A fault-tolerance semantic
+Kafka sink provides a builder class to construct an instance of a KafkaSink. 
The code snippet below
+shows how to write String records to a Kafka topic with a delivery guarantee 
of at least once.
 
-{{< tabs "8dbb32b3-47e8-468e-91a7-43cec0c658ac" >}}
-{{< tab "Java" >}}
 ```java
 DataStream<String> stream = ...
+        
+KafkaSink<String> sink = KafkaSink.<String>builder()
+        .setBootstrapServers(brokers)
+        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+            .setTopic("topic-name")
+            .setKafkaProducerConfig(producerProperties)
+            .setValueSerializationSchema(new SimpleStringSchema())
+            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+            .build()
+        )
+        .build();
+        
+stream.sinkTo(sink);
+```
 
-Properties properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
+The following properties are **required** to build a KafkaSink:
+
+- Bootstrap servers, ```setBootstrapServers(String)```
+- Record serializer, ``setRecordSerializer(KafkaRecordSerializationSchema)``
+- Kafka producer properties, ```setKafkaProducerConfig(Properties)```
+- If you configure the delivery guarantee with 
```DeliveryGuarantee.EXACTLY_ONCE``` you also have
+  use ```setTransactionalIdPrefix(String)```
 
-FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
-        "my-topic",                  // target topic
-        new SimpleStringSchema(),    // serialization schema
-        properties,                  // producer config
-        FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
+### Serializer
 
-stream.addSink(myProducer);
+You always need construct a ```KafkaRecordSerializationSchema``` to transform 
incoming elements from
+the data stream to Kafka producer records.
+We offer a schema builder to provide some common building blocks i.e. 
key/value serialization, topic
+selection, partitioning.
+
+```java
+KafkaRecordSerializationSchema.builder()
+    .setTopicSelector((element) -> {<your-topic-selection-logic>})
+    .setValueSerializationSchema(new SimpleStringSchema())
+    .setKeySerializationSchema(new SimpleStringSchema())
+    .setPartitioner(new FlinkFixedPartitioner())
+    .build();
 ```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val stream: DataStream[String] = ...
 
-val properties = new Properties
-properties.setProperty("bootstrap.servers", "localhost:9092")
+It is **required** to always set a value serialization method and a topic 
retrieval method.
+Moreover, it is also possible to use Kafka serializers instead of Flink 
serializer by using 
+```setKafkaKeySerializer(Serializer)``` or 
```setKafkaValueSerializer(Serializer)```.
+
+### Fault Tolerance
+
+Overall the ```KafkaSink``` supports all three different 
```DeliveryGuarantee```s. For 
+```DeliveryGuarantee.AT_LEAST_ONCE``` and ```DeliveryGuarantee.EXACTLY_ONCE``` 
Flink's checkpointing
+must be enabled. By default the ```KafkaSink``` uses 
```DeliveryGuarantee.NONE```. Below you can find
+a proper explanation of the different guarantees.
+
+- ```DeliveryGuarantee.NONE```: does not provide any guarantees: messages may 
be lost in case of 

Review comment:
       ```suggestion
   - ```DeliveryGuarantee.NONE``` does not provide any guarantees: messages may 
be lost in case of 
   ```

##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -613,134 +613,89 @@ Otherwise, the watermarks of the whole application 
cannot advance and all time-b
 such as time windows or functions with timers, cannot make progress. A single 
idle Kafka partition causes this behavior.
 Consider setting appropriate [idleness timeouts]({{< ref 
"docs/dev/datastream/event-time/generating_watermarks" 
>}}#dealing-with-idle-sources) to mitigate this issue.
  
-## Kafka Producer
-
-Flink’s Kafka Producer - `FlinkKafkaProducer` allows writing a stream of 
records to one or more Kafka topics.
+## Kafka Sink
 
-The constructor accepts the following arguments:
+### Usage
 
-1. A default output topic where events should be written
-2. A SerializationSchema / KafkaSerializationSchema for serializing data into 
Kafka
-3. Properties for the Kafka client. The following properties are required:
-    * "bootstrap.servers" (comma separated list of Kafka brokers)
-4. A fault-tolerance semantic
+Kafka sink provides a builder class to construct an instance of a KafkaSink. 
The code snippet below
+shows how to write String records to a Kafka topic with a delivery guarantee 
of at least once.
 
-{{< tabs "8dbb32b3-47e8-468e-91a7-43cec0c658ac" >}}
-{{< tab "Java" >}}
 ```java
 DataStream<String> stream = ...
+        
+KafkaSink<String> sink = KafkaSink.<String>builder()
+        .setBootstrapServers(brokers)
+        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+            .setTopic("topic-name")
+            .setKafkaProducerConfig(producerProperties)
+            .setValueSerializationSchema(new SimpleStringSchema())
+            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+            .build()
+        )
+        .build();
+        
+stream.sinkTo(sink);
+```
 
-Properties properties = new Properties();
-properties.setProperty("bootstrap.servers", "localhost:9092");
+The following properties are **required** to build a KafkaSink:
+
+- Bootstrap servers, ```setBootstrapServers(String)```
+- Record serializer, ``setRecordSerializer(KafkaRecordSerializationSchema)``
+- Kafka producer properties, ```setKafkaProducerConfig(Properties)```
+- If you configure the delivery guarantee with 
```DeliveryGuarantee.EXACTLY_ONCE``` you also have
+  use ```setTransactionalIdPrefix(String)```
 
-FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
-        "my-topic",                  // target topic
-        new SimpleStringSchema(),    // serialization schema
-        properties,                  // producer config
-        FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
+### Serializer
 
-stream.addSink(myProducer);
+You always need construct a ```KafkaRecordSerializationSchema``` to transform 
incoming elements from
+the data stream to Kafka producer records.
+We offer a schema builder to provide some common building blocks i.e. 
key/value serialization, topic
+selection, partitioning.
+
+```java
+KafkaRecordSerializationSchema.builder()
+    .setTopicSelector((element) -> {<your-topic-selection-logic>})
+    .setValueSerializationSchema(new SimpleStringSchema())
+    .setKeySerializationSchema(new SimpleStringSchema())
+    .setPartitioner(new FlinkFixedPartitioner())
+    .build();
 ```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val stream: DataStream[String] = ...
 
-val properties = new Properties
-properties.setProperty("bootstrap.servers", "localhost:9092")
+It is **required** to always set a value serialization method and a topic 
retrieval method.
+Moreover, it is also possible to use Kafka serializers instead of Flink 
serializer by using 
+```setKafkaKeySerializer(Serializer)``` or 
```setKafkaValueSerializer(Serializer)```.
+
+### Fault Tolerance
+
+Overall the ```KafkaSink``` supports all three different 
```DeliveryGuarantee```s. For 
+```DeliveryGuarantee.AT_LEAST_ONCE``` and ```DeliveryGuarantee.EXACTLY_ONCE``` 
Flink's checkpointing
+must be enabled. By default the ```KafkaSink``` uses 
```DeliveryGuarantee.NONE```. Below you can find
+a proper explanation of the different guarantees.

Review comment:
       I removed all if we ever add more.
   ```suggestion
   Overall the ```KafkaSink``` supports three different 
```DeliveryGuarantee```s. For 
   ```DeliveryGuarantee.AT_LEAST_ONCE``` and 
```DeliveryGuarantee.EXACTLY_ONCE``` Flink's checkpointing
   must be enabled. By default the ```KafkaSink``` uses 
```DeliveryGuarantee.NONE```. Below you can find
   an explanation of the different guarantees.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to