[
https://issues.apache.org/jira/browse/FLINK-5702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15855761#comment-15855761
]
ASF GitHub Bot commented on FLINK-5702:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3282#discussion_r99796056
--- Diff: docs/dev/connectors/kafka.md ---
@@ -250,57 +250,116 @@ if a new watermark should be emitted and with which
timestamp.
### Kafka Producer
-The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can
specify a custom partitioner that assigns
-records to partitions.
+Flinkās Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka
0.9.0.x versions, etc.).
+It allows writing a stream of records to one or more Kafka topics.
Example:
-
<div class="codetabs" markdown="1">
<div data-lang="java, Kafka 0.8+" markdown="1">
{% highlight java %}
-stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092",
"my-topic", new SimpleStringSchema()));
+DataStream<String> stream = ...;
+
+FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
+ "localhost:9092", // broker list
+ "my-topic", // target topic
+ new SimpleStringSchema()); // serialization schema
+
+// the following is necessary for at-least-once delivery guarantee
+myProducer.setLogFailuresOnly(false); // "false" by default
+myProducer.setFlushOnCheckpoint(true); // "false" by default
+
+stream.addSink(myProducer);
{% endhighlight %}
</div>
<div data-lang="java, Kafka 0.10+" markdown="1">
{% highlight java %}
-FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new
SimpleStringSchema(), properties);
+DataStream<String> stream = ...;
+
+FlinkKafkaProducer010Configuration myProducerConfig =
FlinkKafkaProducer010.writeToKafkaWithTimestamps(
+ stream, // input stream
+ "my-topic", // target topic
+ new SimpleStringSchema(), // serialization schema
+ properties); // custom configuration for
KafkaProducer (including broker list)
+
+// the following is necessary for at-least-once delivery guarantee
+myProducerConfig.setLogFailuresOnly(false); // "false" by default
+myProducerConfig.setFlushOnCheckpoint(true); // "false" by default
{% endhighlight %}
</div>
<div data-lang="scala, Kafka 0.8+" markdown="1">
{% highlight scala %}
-stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092",
"my-topic", new SimpleStringSchema()))
+val stream: DataStream[String] = ...
+
+val myProducer = new FlinkKafkaProducer08[String](
+ "localhost:9092", // broker list
+ "my-topic", // target topic
+ new SimpleStringSchema) // serialization schema
+
+// the following is necessary for at-least-once delivery guarantee
+myProducer.setLogFailuresOnly(false) // "false" by default
+myProducer.setFlushOnCheckpoint(true) // "false" by default
+
+stream.addSink(myProducer)
{% endhighlight %}
</div>
<div data-lang="scala, Kafka 0.10+" markdown="1">
{% highlight scala %}
-FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new
SimpleStringSchema(), properties);
+val stream: DataStream[String] = ...
+
+val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
+ stream, // input stream
+ "my-topic", // target topic
+ new SimpleStringSchema, // serialization schema
+ properties) // custom configuration for
KafkaProducer (including broker list)
+
+// the following is necessary for at-least-once delivery guarantee
+myProducerConfig.setLogFailuresOnly(false) // "false" by default
+myProducerConfig.setFlushOnCheckpoint(true) // "false" by default
{% endhighlight %}
</div>
</div>
-You can also define a custom Kafka producer configuration for the
KafkaSink with the constructor. Please refer to
-the [Apache Kafka
documentation](https://kafka.apache.org/documentation.html) for details on how
to configure
-Kafka Producers.
-
-Similar to the consumer, the producer also allows using an advanced
serialization schema which allows
-serializing the key and value separately. It also allows to override the
target topic id, so that
-one producer instance can send data to multiple topics.
-
-The interface of the serialization schema is called
`KeyedSerializationSchema`.
-
+The above demonstrates the basic usage of creating a Flink Kafka Producer
+to write streams to a single Kafka target topic. For more advanced usages,
there
+are other constructor variants that allow providing the following:
+
+ * *Custom configuration for the internal Kafka client*:
+ The producer allows providing a custom properties configuration for the
internal `KafkaProducer`.
+ Please refer to the [Apache Kafka
documentation](https://kafka.apache.org/documentation.html) for
+ details on how to configure Kafka Producers.
+ * *Custom partitioner*: To assign records to specific
+ partitions, you can provide an implementation of a `KafkaProducer` to the
+ constructor. This partitioner will be called for each record in the stream
+ to determine which exact partition the record will be sent to.
+ * *Advanced serialization schema*: Similar to the consumer,
+ the producer also allows using a advanced serialization schema called
`KeyedSerializationSchema`,
+ which allows serializing the key and value separately. It also allows to
override the target topic,
+ so that one producer instance can send data to multiple topics.
+
+The example also shows how to configure the Flink Kafka Producer for
at-least-once
+guarantees, with the setter methods `setLogFailuresOnly` and
`setFlushOnCheckpoint`:
+
+ * `setLogFailuresOnly`: enabling this will let the producer log failures
only
--- End diff --
I was also thinking about this actually. Perhaps we should even just
enforce these two settings when checkpointing is enabled, otherwise having a
`setAtLeastOnce` setting but enabling checkpointing is yet another
configuration seems a bit strange to me (ex. the user might not have enabled
checkpointing but have called `setAtLeastOnce(true)`).
Either way I prefer to discuss this in
https://issues.apache.org/jira/browse/FLINK-5728, and keep this as is until we
come to a conclusion with how to deal with these 2 settings. I'll incorporate
your comment on this in FLINK-5728.
> Kafka Producer docs should warn if using setLogFailuresOnly, at-least-once is
> compromised
> -----------------------------------------------------------------------------------------
>
> Key: FLINK-5702
> URL: https://issues.apache.org/jira/browse/FLINK-5702
> Project: Flink
> Issue Type: Improvement
> Components: Documentation, Kafka Connector
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
>
> The documentation for FlinkKafkaProducer does not have any information about
> the {{setLogFailuresOnly}}. It should emphasize that if users choose to only
> log failures instead of failing the sink, at-least-once can not be guaranteed
> .
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)