Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/3282#discussion_r99792597
--- Diff: docs/dev/connectors/kafka.md ---
@@ -250,57 +250,116 @@ if a new watermark should be emitted and with which
timestamp.
### Kafka Producer
-The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can
specify a custom partitioner that assigns
-records to partitions.
+Flinkâs Kafka Producer is called `FlinkKafkaProducer08` (or `09` for
Kafka 0.9.0.x versions, etc.).
+It allows writing a stream of records to one or more Kafka topics.
Example:
-
<div class="codetabs" markdown="1">
<div data-lang="java, Kafka 0.8+" markdown="1">
{% highlight java %}
-stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092",
"my-topic", new SimpleStringSchema()));
+DataStream<String> stream = ...;
+
+FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
+ "localhost:9092", // broker list
+ "my-topic", // target topic
+ new SimpleStringSchema()); // serialization schema
+
+// the following is necessary for at-least-once delivery guarantee
+myProducer.setLogFailuresOnly(false); // "false" by default
+myProducer.setFlushOnCheckpoint(true); // "false" by default
+
+stream.addSink(myProducer);
{% endhighlight %}
</div>
<div data-lang="java, Kafka 0.10+" markdown="1">
{% highlight java %}
-FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new
SimpleStringSchema(), properties);
+DataStream<String> stream = ...;
+
+FlinkKafkaProducer010Configuration myProducerConfig =
FlinkKafkaProducer010.writeToKafkaWithTimestamps(
+ stream, // input stream
+ "my-topic", // target topic
+ new SimpleStringSchema(), // serialization schema
+ properties); // custom configuration for
KafkaProducer (including broker list)
+
+// the following is necessary for at-least-once delivery guarantee
+myProducerConfig.setLogFailuresOnly(false); // "false" by default
+myProducerConfig.setFlushOnCheckpoint(true); // "false" by default
{% endhighlight %}
</div>
<div data-lang="scala, Kafka 0.8+" markdown="1">
{% highlight scala %}
-stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092",
"my-topic", new SimpleStringSchema()))
+val stream: DataStream[String] = ...
+
+val myProducer = new FlinkKafkaProducer08[String](
+ "localhost:9092", // broker list
+ "my-topic", // target topic
+ new SimpleStringSchema) // serialization schema
+
+// the following is necessary for at-least-once delivery guarantee
+myProducer.setLogFailuresOnly(false) // "false" by default
+myProducer.setFlushOnCheckpoint(true) // "false" by default
+
+stream.addSink(myProducer)
{% endhighlight %}
</div>
<div data-lang="scala, Kafka 0.10+" markdown="1">
{% highlight scala %}
-FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new
SimpleStringSchema(), properties);
+val stream: DataStream[String] = ...
+
+val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
+ stream, // input stream
+ "my-topic", // target topic
+ new SimpleStringSchema, // serialization schema
+ properties) // custom configuration for
KafkaProducer (including broker list)
+
+// the following is necessary for at-least-once delivery guarantee
+myProducerConfig.setLogFailuresOnly(false) // "false" by default
+myProducerConfig.setFlushOnCheckpoint(true) // "false" by default
{% endhighlight %}
</div>
</div>
-You can also define a custom Kafka producer configuration for the
KafkaSink with the constructor. Please refer to
-the [Apache Kafka
documentation](https://kafka.apache.org/documentation.html) for details on how
to configure
-Kafka Producers.
-
-Similar to the consumer, the producer also allows using an advanced
serialization schema which allows
-serializing the key and value separately. It also allows to override the
target topic id, so that
-one producer instance can send data to multiple topics.
-
-The interface of the serialization schema is called
`KeyedSerializationSchema`.
-
+The above demonstrates the basic usage of creating a Flink Kafka Producer
+to write streams to a single Kafka target topic. For more advanced usages,
there
+are other constructor variants that allow providing the following:
+
+ * *Custom configuration for the internal Kafka client*:
+ The producer allows providing a custom properties configuration for the
internal `KafkaProducer`.
+ Please refer to the [Apache Kafka
documentation](https://kafka.apache.org/documentation.html) for
+ details on how to configure Kafka Producers.
+ * *Custom partitioner*: To assign records to specific
+ partitions, you can provide an implementation of a `KafkaProducer` to the
+ constructor. This partitioner will be called for each record in the stream
+ to determine which exact partition the record will be sent to.
+ * *Advanced serialization schema*: Similar to the consumer,
+ the producer also allows using a advanced serialization schema called
`KeyedSerializationSchema`,
+ which allows serializing the key and value separately. It also allows to
override the target topic,
+ so that one producer instance can send data to multiple topics.
+
+The example also shows how to configure the Flink Kafka Producer for
at-least-once
+guarantees, with the setter methods `setLogFailuresOnly` and
`setFlushOnCheckpoint`:
+
+ * `setLogFailuresOnly`: enabling this will let the producer log failures
only
--- End diff --
Does it make sense to add a configuration like `setAtLeastOnce`? How often
do users actually configure single parameters separately?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---