sjwiesman commented on a change in pull request #12253:
URL: https://github.com/apache/flink/pull/12253#discussion_r429225536



##########
File path: docs/dev/event_timestamps_watermarks.md
##########
@@ -377,4 +457,77 @@ val stream: DataStream[MyType] = env.addSource(kafkaSource)
 
 <img src="{{ site.baseurl }}/fig/parallel_kafka_watermarks.svg" 
alt="Generating Watermarks with awareness for Kafka-partitions" class="center" 
width="80%" />
 
+## Generating Watermarks and Timestamps Directly in a Source
+
+Stream sources can directly assign timestamps to the elements they produce, and
+they can also emit watermarks.  When this is done, no `WatermarkStrategy` is
+needed.  Note that if a timestamp assigner is used, any timestamps and
+watermarks provided by the source will be overwritten.
+
+To assign a timestamp to an element in the source directly, the source must use
+the `collectWithTimestamp(...)` method on the `SourceContext`. To generate
+watermarks, the source must call the `emitWatermark(Watermark)` function.
+
+Below is a simple example of a *(non-checkpointed)* source that assigns
+timestamps and generates watermarks:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void run(SourceContext<MyType> ctx) throws Exception {
+    while (/* condition */) {
+        MyType next = getNext();
+        ctx.collectWithTimestamp(next, next.getEventTimestamp());
+
+        if (next.hasWatermarkTime()) {
+            ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
+        }
+    }
+}

Review comment:
       Yeah, let’s remove it. How many people are really building custom 
sources anyway.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to