[
https://issues.apache.org/jira/browse/FLINK-5702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15855744#comment-15855744
]
ASF GitHub Bot commented on FLINK-5702:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3282#discussion_r99793771
--- Diff: docs/dev/connectors/kafka.md ---
@@ -250,57 +250,116 @@ if a new watermark should be emitted and with which
timestamp.
### Kafka Producer
-The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can
specify a custom partitioner that assigns
-records to partitions.
+Flinkās Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka
0.9.0.x versions, etc.).
+It allows writing a stream of records to one or more Kafka topics.
Example:
-
<div class="codetabs" markdown="1">
<div data-lang="java, Kafka 0.8+" markdown="1">
{% highlight java %}
-stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092",
"my-topic", new SimpleStringSchema()));
+DataStream<String> stream = ...;
+
+FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
+ "localhost:9092", // broker list
+ "my-topic", // target topic
+ new SimpleStringSchema()); // serialization schema
+
+// the following is necessary for at-least-once delivery guarantee
+myProducer.setLogFailuresOnly(false); // "false" by default
+myProducer.setFlushOnCheckpoint(true); // "false" by default
+
+stream.addSink(myProducer);
{% endhighlight %}
</div>
<div data-lang="java, Kafka 0.10+" markdown="1">
{% highlight java %}
-FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new
SimpleStringSchema(), properties);
+DataStream<String> stream = ...;
+
+FlinkKafkaProducer010Configuration myProducerConfig =
FlinkKafkaProducer010.writeToKafkaWithTimestamps(
+ stream, // input stream
+ "my-topic", // target topic
+ new SimpleStringSchema(), // serialization schema
+ properties); // custom configuration for
KafkaProducer (including broker list)
+
+// the following is necessary for at-least-once delivery guarantee
+myProducerConfig.setLogFailuresOnly(false); // "false" by default
+myProducerConfig.setFlushOnCheckpoint(true); // "false" by default
{% endhighlight %}
</div>
<div data-lang="scala, Kafka 0.8+" markdown="1">
{% highlight scala %}
-stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092",
"my-topic", new SimpleStringSchema()))
+val stream: DataStream[String] = ...
+
+val myProducer = new FlinkKafkaProducer08[String](
+ "localhost:9092", // broker list
+ "my-topic", // target topic
+ new SimpleStringSchema) // serialization schema
+
+// the following is necessary for at-least-once delivery guarantee
+myProducer.setLogFailuresOnly(false) // "false" by default
+myProducer.setFlushOnCheckpoint(true) // "false" by default
+
+stream.addSink(myProducer)
{% endhighlight %}
</div>
<div data-lang="scala, Kafka 0.10+" markdown="1">
{% highlight scala %}
-FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new
SimpleStringSchema(), properties);
+val stream: DataStream[String] = ...
+
+val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
+ stream, // input stream
+ "my-topic", // target topic
+ new SimpleStringSchema, // serialization schema
+ properties) // custom configuration for
KafkaProducer (including broker list)
+
+// the following is necessary for at-least-once delivery guarantee
+myProducerConfig.setLogFailuresOnly(false) // "false" by default
+myProducerConfig.setFlushOnCheckpoint(true) // "false" by default
{% endhighlight %}
</div>
</div>
-You can also define a custom Kafka producer configuration for the
KafkaSink with the constructor. Please refer to
-the [Apache Kafka
documentation](https://kafka.apache.org/documentation.html) for details on how
to configure
-Kafka Producers.
-
-Similar to the consumer, the producer also allows using an advanced
serialization schema which allows
-serializing the key and value separately. It also allows to override the
target topic id, so that
-one producer instance can send data to multiple topics.
-
-The interface of the serialization schema is called
`KeyedSerializationSchema`.
-
+The above demonstrates the basic usage of creating a Flink Kafka Producer
+to write streams to a single Kafka target topic. For more advanced usages,
there
+are other constructor variants that allow providing the following:
+
+ * *Custom configuration for the internal Kafka client*:
+ The producer allows providing a custom properties configuration for the
internal `KafkaProducer`.
+ Please refer to the [Apache Kafka
documentation](https://kafka.apache.org/documentation.html) for
+ details on how to configure Kafka Producers.
+ * *Custom partitioner*: To assign records to specific
+ partitions, you can provide an implementation of a `KafkaProducer` to the
--- End diff --
Ah, this should be `KafkaPartitioner`, will change.
> Kafka Producer docs should warn if using setLogFailuresOnly, at-least-once is
> compromised
> -----------------------------------------------------------------------------------------
>
> Key: FLINK-5702
> URL: https://issues.apache.org/jira/browse/FLINK-5702
> Project: Flink
> Issue Type: Improvement
> Components: Documentation, Kafka Connector
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
>
> The documentation for FlinkKafkaProducer does not have any information about
> the {{setLogFailuresOnly}}. It should emphasize that if users choose to only
> log failures instead of failing the sink, at-least-once can not be guaranteed
> .
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)