aljoscha commented on a change in pull request #12253:
URL: https://github.com/apache/flink/pull/12253#discussion_r429212797



##########
File path: docs/dev/event_timestamps_watermarks.md
##########
@@ -377,4 +457,77 @@ val stream: DataStream[MyType] = env.addSource(kafkaSource)
 
 <img src="{{ site.baseurl }}/fig/parallel_kafka_watermarks.svg" 
alt="Generating Watermarks with awareness for Kafka-partitions" class="center" 
width="80%" />
 
+## Generating Watermarks and Timestamps Directly in a Source
+
+Stream sources can directly assign timestamps to the elements they produce, and
+they can also emit watermarks.  When this is done, no `WatermarkStrategy` is
+needed.  Note that if a timestamp assigner is used, any timestamps and
+watermarks provided by the source will be overwritten.
+
+To assign a timestamp to an element in the source directly, the source must use
+the `collectWithTimestamp(...)` method on the `SourceContext`. To generate
+watermarks, the source must call the `emitWatermark(Watermark)` function.
+
+Below is a simple example of a *(non-checkpointed)* source that assigns
+timestamps and generates watermarks:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void run(SourceContext<MyType> ctx) throws Exception {
+    while (/* condition */) {
+        MyType next = getNext();
+        ctx.collectWithTimestamp(next, next.getEventTimestamp());
+
+        if (next.hasWatermarkTime()) {
+            ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
+        }
+    }
+}

Review comment:
       For this one, I just copied the existing example. I might update to the 
new Source interface as a follow-up, but that's very different. Maybe we 
shouldn't even mention this here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to