[
https://issues.apache.org/jira/browse/FLINK-3752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15344376#comment-15344376
]
ASF GitHub Bot commented on FLINK-3752:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/2142#discussion_r68061306
--- Diff: docs/apis/streaming/connectors/kafka.md ---
@@ -198,6 +198,63 @@ Flink on YARN supports automatic restart of lost YARN
containers.
If checkpointing is not enabled, the Kafka consumer will periodically
commit the offsets to Zookeeper.
+#### Kafka Consumers and Timestamp Extraction/Watermark Emission
+
+In many scenarios, the timestamp of a record is embedded (explicitly or
implicitly) in the record itself.
+In addition, the user may want to emit watermarks either periodically, or
in an irregular fashion, e.g. based on
+special records in the Kafka stream that contain the current event-time
watermark. For these cases, the Flink Kafka
+Consumer allows the specification of an `AssignerWithPeriodicWatermarks`
or an `AssignerWithPunctuatedWatermarks`.
+
+You can specify your custom timestamp extractor/ watermark emitter as
described
+[here]({{ site.baseurl
}}/apis/streaming/event_timestamps_watermarks.html), or use one from the
+[predefined ones]({{ site.baseurl
}}/apis/streaming/event_timestamp_extractors.html). After doing so, you
+can pass it to your consumer in the following way:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Properties properties = new Properties();
+properties.setProperty("bootstrap.servers", "localhost:9092");
+// only required for Kafka 0.8
+properties.setProperty("zookeeper.connect", "localhost:2181");
+properties.setProperty("group.id", "test");
+
+FlinkKafkaConsumer08<String> myConsumer =
+ new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(),
properties);
+myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
+
+DataStream<String> stream = env
+ .addSource(myConsumer)
+ .print();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val properties = new Properties();
+properties.setProperty("bootstrap.servers", "localhost:9092");
+// only required for Kafka 0.8
+properties.setProperty("zookeeper.connect", "localhost:2181");
+properties.setProperty("group.id", "test");
+
+val myConsumer = new FlinkKafkaConsumer08[Stirng]("topic", new
SimpleStringSchema(), properties);
+myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
+stream = env
+ .addSource(myConsumer)
+ .print
+{% endhighlight %}
+</div>
+</div>
+
+When such an assigner is specified, for each record read from Kafka, the
+`extractTimestamp(T element, long previousElementTimestamp)` is called to
assign a timestamp to the record and
+the `Watermark getCurrentWatermark()` (for periodic) or the
+`Watermark checkAndGetNextWatermark(T lastElement, long
extractedTimestamp)` (for punctuated) is called to determine
+if a new watermark should be emitted and with which timestamp.
+
+Internally, an instance of the assigner is executed per Kafka partition,
and when a single task is reading
+form multiple partitions, watermark alignment is done as described
+[here](http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/).
--- End diff --
Could you remove the link to the data Artisans blog here?
The Flink documentation should be self-contained and independent of
material from any companies.
> Add Per-Kafka-Partition Watermark Generation to the docs
> --------------------------------------------------------
>
> Key: FLINK-3752
> URL: https://issues.apache.org/jira/browse/FLINK-3752
> Project: Flink
> Issue Type: Improvement
> Components: Documentation
> Affects Versions: 1.1.0
> Reporter: Stephan Ewen
> Assignee: Kostas Kloudas
> Fix For: 1.1.0
>
>
> The new methods that create watermarks per Kafka topic-partition, rather than
> per Flink DataStream partition, should be documented under
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_timestamps_watermarks.html
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)