Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/2142#discussion_r68061306
--- Diff: docs/apis/streaming/connectors/kafka.md ---
@@ -198,6 +198,63 @@ Flink on YARN supports automatic restart of lost YARN
containers.
If checkpointing is not enabled, the Kafka consumer will periodically
commit the offsets to Zookeeper.
+#### Kafka Consumers and Timestamp Extraction/Watermark Emission
+
+In many scenarios, the timestamp of a record is embedded (explicitly or
implicitly) in the record itself.
+In addition, the user may want to emit watermarks either periodically, or
in an irregular fashion, e.g. based on
+special records in the Kafka stream that contain the current event-time
watermark. For these cases, the Flink Kafka
+Consumer allows the specification of an `AssignerWithPeriodicWatermarks`
or an `AssignerWithPunctuatedWatermarks`.
+
+You can specify your custom timestamp extractor/ watermark emitter as
described
+[here]({{ site.baseurl
}}/apis/streaming/event_timestamps_watermarks.html), or use one from the
+[predefined ones]({{ site.baseurl
}}/apis/streaming/event_timestamp_extractors.html). After doing so, you
+can pass it to your consumer in the following way:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Properties properties = new Properties();
+properties.setProperty("bootstrap.servers", "localhost:9092");
+// only required for Kafka 0.8
+properties.setProperty("zookeeper.connect", "localhost:2181");
+properties.setProperty("group.id", "test");
+
+FlinkKafkaConsumer08<String> myConsumer =
+ new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(),
properties);
+myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
+
+DataStream<String> stream = env
+ .addSource(myConsumer)
+ .print();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val properties = new Properties();
+properties.setProperty("bootstrap.servers", "localhost:9092");
+// only required for Kafka 0.8
+properties.setProperty("zookeeper.connect", "localhost:2181");
+properties.setProperty("group.id", "test");
+
+val myConsumer = new FlinkKafkaConsumer08[Stirng]("topic", new
SimpleStringSchema(), properties);
+myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
+stream = env
+ .addSource(myConsumer)
+ .print
+{% endhighlight %}
+</div>
+</div>
+
+When such an assigner is specified, for each record read from Kafka, the
+`extractTimestamp(T element, long previousElementTimestamp)` is called to
assign a timestamp to the record and
+the `Watermark getCurrentWatermark()` (for periodic) or the
+`Watermark checkAndGetNextWatermark(T lastElement, long
extractedTimestamp)` (for punctuated) is called to determine
+if a new watermark should be emitted and with which timestamp.
+
+Internally, an instance of the assigner is executed per Kafka partition,
and when a single task is reading
+form multiple partitions, watermark alignment is done as described
+[here](http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/).
--- End diff --
Could you remove the link to the data Artisans blog here?
The Flink documentation should be self-contained and independent of
material from any companies.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---