[FLINK-8260] [kafka] Fix usage of deprecated instantiation methods in FlinkKafkaProducer docs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c454ee3f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c454ee3f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c454ee3f Branch: refs/heads/release-1.4 Commit: c454ee3fa9a14cfe28dbfc641134659252d2c80b Parents: 1e637c5 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Mon Dec 18 15:21:31 2017 -0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Fri Jan 5 22:02:09 2018 -0800 ---------------------------------------------------------------------- docs/dev/connectors/kafka.md | 46 ++++++++++----------------------------- 1 file changed, 11 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c454ee3f/docs/dev/connectors/kafka.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md index daf1903..c6461f9 100644 --- a/docs/dev/connectors/kafka.md +++ b/docs/dev/connectors/kafka.md @@ -447,17 +447,17 @@ if a new watermark should be emitted and with which timestamp. ## Kafka Producer -Flinkâs Kafka Producer is called `FlinkKafkaProducer08` (or `09` for Kafka 0.9.0.x versions, etc.). +Flinkâs Kafka Producer is called `FlinkKafkaProducer011` (or `010` for Kafka 0.10.0.x versions, etc.). It allows writing a stream of records to one or more Kafka topics. Example: <div class="codetabs" markdown="1"> -<div data-lang="java, Kafka 0.8+" markdown="1"> +<div data-lang="java" markdown="1"> {% highlight java %} DataStream<String> stream = ...; -FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>( +FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>( "localhost:9092", // broker list "my-topic", // target topic new SimpleStringSchema()); // serialization schema @@ -466,29 +466,17 @@ FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>( myProducer.setLogFailuresOnly(false); // "false" by default myProducer.setFlushOnCheckpoint(true); // "false" by default -stream.addSink(myProducer); -{% endhighlight %} -</div> -<div data-lang="java, Kafka 0.10+" markdown="1"> -{% highlight java %} -DataStream<String> stream = ...; - -FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( - stream, // input stream - "my-topic", // target topic - new SimpleStringSchema(), // serialization schema - properties); // custom configuration for KafkaProducer (including broker list) +// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka +myProducer.setWriteTimestampToKafka(true); -// the following is necessary for at-least-once delivery guarantee -myProducerConfig.setLogFailuresOnly(false); // "false" by default -myProducerConfig.setFlushOnCheckpoint(true); // "false" by default +stream.addSink(myProducer); {% endhighlight %} </div> -<div data-lang="scala, Kafka 0.8+" markdown="1"> +<div data-lang="scala" markdown="1"> {% highlight scala %} val stream: DataStream[String] = ... -val myProducer = new FlinkKafkaProducer08[String]( +val myProducer = new FlinkKafkaProducer011[String]( "localhost:9092", // broker list "my-topic", // target topic new SimpleStringSchema) // serialization schema @@ -497,22 +485,10 @@ val myProducer = new FlinkKafkaProducer08[String]( myProducer.setLogFailuresOnly(false) // "false" by default myProducer.setFlushOnCheckpoint(true) // "false" by default -stream.addSink(myProducer) -{% endhighlight %} -</div> -<div data-lang="scala, Kafka 0.10+" markdown="1"> -{% highlight scala %} -val stream: DataStream[String] = ... +// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka +myProducer.setWriteTimestampToKafka(true) -val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( - stream, // input stream - "my-topic", // target topic - new SimpleStringSchema, // serialization schema - properties) // custom configuration for KafkaProducer (including broker list) - -// the following is necessary for at-least-once delivery guarantee -myProducerConfig.setLogFailuresOnly(false) // "false" by default -myProducerConfig.setFlushOnCheckpoint(true) // "false" by default +stream.addSink(myProducer) {% endhighlight %} </div> </div>
