Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/3282#discussion_r99791528
--- Diff: docs/dev/connectors/kafka.md ---
@@ -250,57 +250,116 @@ if a new watermark should be emitted and with which
timestamp.
### Kafka Producer
-The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can
specify a custom partitioner that assigns
-records to partitions.
+Flinkâs Kafka Producer is called `FlinkKafkaProducer08` (or `09` for
Kafka 0.9.0.x versions, etc.).
+It allows writing a stream of records to one or more Kafka topics.
Example:
-
<div class="codetabs" markdown="1">
<div data-lang="java, Kafka 0.8+" markdown="1">
{% highlight java %}
-stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092",
"my-topic", new SimpleStringSchema()));
+DataStream<String> stream = ...;
+
+FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
+ "localhost:9092", // broker list
+ "my-topic", // target topic
+ new SimpleStringSchema()); // serialization schema
+
+// the following is necessary for at-least-once delivery guarantee
+myProducer.setLogFailuresOnly(false); // "false" by default
+myProducer.setFlushOnCheckpoint(true); // "false" by default
+
+stream.addSink(myProducer);
{% endhighlight %}
</div>
<div data-lang="java, Kafka 0.10+" markdown="1">
{% highlight java %}
-FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new
SimpleStringSchema(), properties);
+DataStream<String> stream = ...;
+
+FlinkKafkaProducer010Configuration myProducerConfig =
FlinkKafkaProducer010.writeToKafkaWithTimestamps(
+ stream, // input stream
+ "my-topic", // target topic
+ new SimpleStringSchema(), // serialization schema
+ properties); // custom configuration for
KafkaProducer (including broker list)
+
+// the following is necessary for at-least-once delivery guarantee
+myProducerConfig.setLogFailuresOnly(false); // "false" by default
+myProducerConfig.setFlushOnCheckpoint(true); // "false" by default
{% endhighlight %}
</div>
<div data-lang="scala, Kafka 0.8+" markdown="1">
{% highlight scala %}
-stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092",
"my-topic", new SimpleStringSchema()))
+val stream: DataStream[String] = ...
+
+val myProducer = new FlinkKafkaProducer08[String](
+ "localhost:9092", // broker list
+ "my-topic", // target topic
+ new SimpleStringSchema) // serialization schema
+
+// the following is necessary for at-least-once delivery guarantee
+myProducer.setLogFailuresOnly(false) // "false" by default
+myProducer.setFlushOnCheckpoint(true) // "false" by default
+
+stream.addSink(myProducer)
{% endhighlight %}
</div>
<div data-lang="scala, Kafka 0.10+" markdown="1">
{% highlight scala %}
-FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new
SimpleStringSchema(), properties);
+val stream: DataStream[String] = ...
+
+val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
+ stream, // input stream
+ "my-topic", // target topic
+ new SimpleStringSchema, // serialization schema
+ properties) // custom configuration for
KafkaProducer (including broker list)
+
+// the following is necessary for at-least-once delivery guarantee
+myProducerConfig.setLogFailuresOnly(false) // "false" by default
+myProducerConfig.setFlushOnCheckpoint(true) // "false" by default
{% endhighlight %}
</div>
</div>
-You can also define a custom Kafka producer configuration for the
KafkaSink with the constructor. Please refer to
-the [Apache Kafka
documentation](https://kafka.apache.org/documentation.html) for details on how
to configure
-Kafka Producers.
-
-Similar to the consumer, the producer also allows using an advanced
serialization schema which allows
-serializing the key and value separately. It also allows to override the
target topic id, so that
-one producer instance can send data to multiple topics.
-
-The interface of the serialization schema is called
`KeyedSerializationSchema`.
-
+The above demonstrates the basic usage of creating a Flink Kafka Producer
+to write streams to a single Kafka target topic. For more advanced usages,
there
+are other constructor variants that allow providing the following:
+
+ * *Custom configuration for the internal Kafka client*:
+ The producer allows providing a custom properties configuration for the
internal `KafkaProducer`.
--- End diff --
Maybe `providing custom Properties...`?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---