sjwiesman commented on a change in pull request #12253:
URL: https://github.com/apache/flink/pull/12253#discussion_r427558322



##########
File path: docs/dev/event_timestamp_extractors.md
##########
@@ -25,83 +25,80 @@ under the License.
 * toc
 {:toc}
 
-As described in [timestamps and watermark handling]({{ site.baseurl 
}}/dev/event_timestamps_watermarks.html),
-Flink provides abstractions that allow the programmer to assign their own 
timestamps and emit their own watermarks. More specifically,
-one can do so by implementing one of the `AssignerWithPeriodicWatermarks` and 
`AssignerWithPunctuatedWatermarks` interfaces, depending
-on the use case. In a nutshell, the first will emit watermarks periodically, 
while the second does so based on some property of
-the incoming records, e.g. whenever a special element is encountered in the 
stream.
+As described in [Generating Watermarks]({{ site.baseurl
+}}/dev/event_timestamps_watermarks.html), Flink provides abstractions that
+allow the programmer to assign their own timestamps and emit their own
+watermarks. More specifically, one can do so by implementing the
+`WatermarkGenerator` interface.
 
-In order to further ease the programming effort for such tasks, Flink comes 
with some pre-implemented timestamp assigners.
-This section provides a list of them. Apart from their out-of-the-box 
functionality, their implementation can serve as an example
-for custom implementations.
+In order to further ease the programming effort for such tasks, Flink comes
+with some pre-implemented timestamp assigners.  This section provides a list of
+them. Apart from their out-of-the-box functionality, their implementation can
+serve as an example for custom implementations.
 
-### **Assigners with ascending timestamps**
+## Monotonously Increasing Timestamps
 
-The simplest special case for *periodic* watermark generation is the case 
where timestamps seen by a given source task
-occur in ascending order. In that case, the current timestamp can always act 
as a watermark, because no earlier timestamps will
-arrive.
+The simplest special case for *periodic* watermark generation is the case where

Review comment:
       You use the word case three times in two sentences.
   ```suggestion
   The simplest special case for *periodic* watermark generation is the when
   ```

##########
File path: docs/dev/event_timestamps_watermarks.md
##########
@@ -22,115 +22,104 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+In this section you will learn about the APIs that Flink provides for working
+with **event time** timestamps and watermarks.  For an introduction to *event
+time*, *processing time*, and *ingestion time*, please refer to the
+[introduction to event time]({{ site.baseurl }}/dev/event_time.html).
+
 * toc
 {:toc}
 
+## Introduction to Watermark Strategies
 
-This section is relevant for programs running on **event time**. For an 
introduction to *event time*,
-*processing time*, and *ingestion time*, please refer to the [introduction to 
event time]({{ site.baseurl }}/dev/event_time.html).
-
-To work with *event time*, streaming programs need to set the *time 
characteristic* accordingly.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-env = StreamExecutionEnvironment.get_execution_environment()
-env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-## Assigning Timestamps
-
-In order to work with *event time*, Flink needs to know the events' 
*timestamps*, meaning each element in the
-stream needs to have its event timestamp *assigned*. This is usually done by 
accessing/extracting the
-timestamp from some field in the element.
+In order to work with *event time*, Flink needs to know the events'

Review comment:
       ```suggestion
   In order to work with *event time*, Flink needs to know the events
   ```

##########
File path: docs/dev/event_timestamps_watermarks.md
##########
@@ -22,115 +22,104 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+In this section you will learn about the APIs that Flink provides for working
+with **event time** timestamps and watermarks.  For an introduction to *event
+time*, *processing time*, and *ingestion time*, please refer to the
+[introduction to event time]({{ site.baseurl }}/dev/event_time.html).
+
 * toc
 {:toc}
 
+## Introduction to Watermark Strategies
 
-This section is relevant for programs running on **event time**. For an 
introduction to *event time*,
-*processing time*, and *ingestion time*, please refer to the [introduction to 
event time]({{ site.baseurl }}/dev/event_time.html).
-
-To work with *event time*, streaming programs need to set the *time 
characteristic* accordingly.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-env = StreamExecutionEnvironment.get_execution_environment()
-env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-## Assigning Timestamps
-
-In order to work with *event time*, Flink needs to know the events' 
*timestamps*, meaning each element in the
-stream needs to have its event timestamp *assigned*. This is usually done by 
accessing/extracting the
-timestamp from some field in the element.
+In order to work with *event time*, Flink needs to know the events'
+*timestamps*, meaning each element in the stream needs to have its event
+timestamp *assigned*. This is usually done by accessing/extracting the
+timestamp from some field in the element by using a `TimestampAssigner`.
 
-Timestamp assignment goes hand-in-hand with generating watermarks, which tell 
the system about
-progress in event time.
+Timestamp assignment goes hand-in-hand with generating watermarks, which tell
+the system about progress in event time. You can configure this by speciying a
+`WatermarkGenerator`.
 
-There are two ways to assign timestamps and generate watermarks:
+The Flink API expects both `TimestampAssigner` and `WatermarkGenerator` to be
+bundled in a `WatermarkStrategy`. As you will see below, you usually don't have
+to do this by hand because you will mostly use the convenience helper
+`WatermarkStrategies`. Here is the interface for completess' sake:

Review comment:
       I think it always makes sense to let users know there is an easy option 
first and then outline the low level interface. Its less scary 👻 
   ```suggestion
   The Flink API expects a `WatermarkStrategy` that contains both a 
`TimestampAssigner` and `WatermarkGenerator`.
   A number of common strategies out of the box, available in the 
`WatermarkStrategies` helper, but users can also build their own strategies 
when required. 
   Here is the interface for completeness' sake:
   ```

##########
File path: docs/dev/event_timestamps_watermarks.md
##########
@@ -377,4 +457,77 @@ val stream: DataStream[MyType] = env.addSource(kafkaSource)
 
 <img src="{{ site.baseurl }}/fig/parallel_kafka_watermarks.svg" 
alt="Generating Watermarks with awareness for Kafka-partitions" class="center" 
width="80%" />
 
+## Generating Watermarks and Timestamps Directly in a Source
+
+Stream sources can directly assign timestamps to the elements they produce, and
+they can also emit watermarks.  When this is done, no `WatermarkStrategy` is
+needed.  Note that if a timestamp assigner is used, any timestamps and
+watermarks provided by the source will be overwritten.
+
+To assign a timestamp to an element in the source directly, the source must use
+the `collectWithTimestamp(...)` method on the `SourceContext`. To generate
+watermarks, the source must call the `emitWatermark(Watermark)` function.
+
+Below is a simple example of a *(non-checkpointed)* source that assigns
+timestamps and generates watermarks:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void run(SourceContext<MyType> ctx) throws Exception {
+    while (/* condition */) {
+        MyType next = getNext();
+        ctx.collectWithTimestamp(next, next.getEventTimestamp());
+
+        if (next.hasWatermarkTime()) {
+            ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
+        }
+    }
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+override def run(ctx: SourceContext[MyType]): Unit = {
+    while (/* condition */) {
+        val next: MyType = getNext()
+        ctx.collectWithTimestamp(next, next.eventTimestamp)
+
+        if (next.hasWatermarkTime) {
+            ctx.emitWatermark(new Watermark(next.getWatermarkTime))
+        }
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+## How Operators Are Processing Watermarks

Review comment:
       ```suggestion
   ## How Operators Process Watermarks
   ```

##########
File path: docs/dev/event_timestamps_watermarks.md
##########
@@ -22,115 +22,104 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+In this section you will learn about the APIs that Flink provides for working
+with **event time** timestamps and watermarks.  For an introduction to *event
+time*, *processing time*, and *ingestion time*, please refer to the
+[introduction to event time]({{ site.baseurl }}/dev/event_time.html).
+
 * toc
 {:toc}
 
+## Introduction to Watermark Strategies
 
-This section is relevant for programs running on **event time**. For an 
introduction to *event time*,
-*processing time*, and *ingestion time*, please refer to the [introduction to 
event time]({{ site.baseurl }}/dev/event_time.html).
-
-To work with *event time*, streaming programs need to set the *time 
characteristic* accordingly.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-env = StreamExecutionEnvironment.get_execution_environment()
-env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-## Assigning Timestamps
-
-In order to work with *event time*, Flink needs to know the events' 
*timestamps*, meaning each element in the
-stream needs to have its event timestamp *assigned*. This is usually done by 
accessing/extracting the
-timestamp from some field in the element.
+In order to work with *event time*, Flink needs to know the events'
+*timestamps*, meaning each element in the stream needs to have its event
+timestamp *assigned*. This is usually done by accessing/extracting the
+timestamp from some field in the element by using a `TimestampAssigner`.
 
-Timestamp assignment goes hand-in-hand with generating watermarks, which tell 
the system about
-progress in event time.
+Timestamp assignment goes hand-in-hand with generating watermarks, which tell
+the system about progress in event time. You can configure this by speciying a

Review comment:
       ```suggestion
   the system about progress in event time. You can configure this by 
specifying a
   ```

##########
File path: docs/dev/event_timestamps_watermarks.md
##########
@@ -270,105 +340,115 @@ class TimeLagWatermarkGenerator extends 
AssignerWithPeriodicWatermarks[MyEvent]
 
     val maxTimeLag = 5000L // 5 seconds
 
-    override def extractTimestamp(element: MyEvent, previousElementTimestamp: 
Long): Long = {
-        element.getCreationTime
+    override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = {
+        // don't need to do anything because we work on processing time
     }
 
-    override def getCurrentWatermark(): Watermark = {
-        // return the watermark as current time minus the maximum time lag
-        new Watermark(System.currentTimeMillis() - maxTimeLag)
+    override def onPeriodicEmit(): Unit = {
+        output.emitWatermark(new Watermark(System.currentTimeMillis() - 
maxTimeLag));
     }
 }
 {% endhighlight %}
 </div>
 </div>
 
-#### **With Punctuated Watermarks**
+### Writing a Punctuated WatermarkGenerator
 
-To generate watermarks whenever a certain event indicates that a new watermark 
might be generated, use
-`AssignerWithPunctuatedWatermarks`. For this class Flink will first call the 
`extractTimestamp(...)` method
-to assign the element a timestamp, and then immediately call the
-`checkAndGetNextWatermark(...)` method on that element.
+As mentioned above, a punctuated watermark generator will observe the stream of

Review comment:
       ```suggestion
   A punctuated watermark generator will observe the stream of
   ```

##########
File path: docs/dev/event_timestamps_watermarks.md
##########
@@ -377,4 +457,77 @@ val stream: DataStream[MyType] = env.addSource(kafkaSource)
 
 <img src="{{ site.baseurl }}/fig/parallel_kafka_watermarks.svg" 
alt="Generating Watermarks with awareness for Kafka-partitions" class="center" 
width="80%" />
 
+## Generating Watermarks and Timestamps Directly in a Source
+
+Stream sources can directly assign timestamps to the elements they produce, and
+they can also emit watermarks.  When this is done, no `WatermarkStrategy` is
+needed.  Note that if a timestamp assigner is used, any timestamps and
+watermarks provided by the source will be overwritten.
+
+To assign a timestamp to an element in the source directly, the source must use
+the `collectWithTimestamp(...)` method on the `SourceContext`. To generate
+watermarks, the source must call the `emitWatermark(Watermark)` function.
+
+Below is a simple example of a *(non-checkpointed)* source that assigns
+timestamps and generates watermarks:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void run(SourceContext<MyType> ctx) throws Exception {
+    while (/* condition */) {
+        MyType next = getNext();
+        ctx.collectWithTimestamp(next, next.getEventTimestamp());
+
+        if (next.hasWatermarkTime()) {
+            ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
+        }
+    }
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+override def run(ctx: SourceContext[MyType]): Unit = {
+    while (/* condition */) {
+        val next: MyType = getNext()
+        ctx.collectWithTimestamp(next, next.eventTimestamp)
+
+        if (next.hasWatermarkTime) {
+            ctx.emitWatermark(new Watermark(next.getWatermarkTime))
+        }
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+## How Operators Are Processing Watermarks
+
+As a general rule, operators are required to completely process a given
+watermark before forwarding it downstream. For example, `WindowOperator` will
+first evaluate which windows should be fired, and only after producing all of

Review comment:
       ```suggestion
   first evaluate all windows that should be fired, and only after producing 
all of
   ```

##########
File path: docs/dev/event_timestamps_watermarks.md
##########
@@ -377,4 +457,77 @@ val stream: DataStream[MyType] = env.addSource(kafkaSource)
 
 <img src="{{ site.baseurl }}/fig/parallel_kafka_watermarks.svg" 
alt="Generating Watermarks with awareness for Kafka-partitions" class="center" 
width="80%" />
 
+## Generating Watermarks and Timestamps Directly in a Source
+
+Stream sources can directly assign timestamps to the elements they produce, and
+they can also emit watermarks.  When this is done, no `WatermarkStrategy` is
+needed.  Note that if a timestamp assigner is used, any timestamps and
+watermarks provided by the source will be overwritten.
+
+To assign a timestamp to an element in the source directly, the source must use
+the `collectWithTimestamp(...)` method on the `SourceContext`. To generate
+watermarks, the source must call the `emitWatermark(Watermark)` function.
+
+Below is a simple example of a *(non-checkpointed)* source that assigns
+timestamps and generates watermarks:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void run(SourceContext<MyType> ctx) throws Exception {
+    while (/* condition */) {
+        MyType next = getNext();
+        ctx.collectWithTimestamp(next, next.getEventTimestamp());
+
+        if (next.hasWatermarkTime()) {
+            ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
+        }
+    }
+}

Review comment:
       Should this use the new source interface? 

##########
File path: docs/dev/event_timestamps_watermarks.md
##########
@@ -175,18 +164,103 @@ withTimestampsAndWatermarks
 </div>
 </div>
 
+Using a `WatermarkStrategy` this way takes a stream and produce a new stream
+with timestamped elements and watermarks. If the original stream had timestamps
+and/or watermarks already, the timestamp assigner overwrites them.
+
+## Dealing With Idle Sources
+
+If one of the input splits/partitions/shards does not carry events for a while
+this means that the `WatermarkGenerator` also does not get any new information
+on which to base a watermark. We call this an *idle input* or an *idle source*.
+This is a problem because it can happen that some of your partitions do still
+carry events. In that case, the watermark will be held back, because it is
+computed as the minimum over all the different parallel watermarks.
+
+To deal with this, you can use a `WatermarkStrategy` that will detect idleness 
and mark an input as idle. `WatermarkStrategies` provides a convenience helper 
for this:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+WatermarkStrategies
+        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
+        .withIdleness(Duration.ofMinutes(1))
+        .build();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+WatermarkStrategies
+  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
+  .withIdleness(Duration.ofMinutes(1))
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
+
+## Writing WatermarkGenerators
+
+A `TimestampAssigner` is a simple function that extracts a field from an 
event, we therefore don't need to look at them in detail. A 
`WatermarkGenerator`, on the other hand, is a bit more complicated to write and 
we will look at how you can do that in the next two sections. This is the 
`WatermarkGenerator` interface:
+
+{% highlight java %}
+/**
+ * The {@code WatermarkGenerator} generates watermarks either based on events 
or
+ * periodically (in a fixed interval).
+ *
+ * <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction 
between the
+ * {@code AssignerWithPunctuatedWatermarks} and the {@code 
AssignerWithPeriodicWatermarks}.
+ */
+@Public
+public interface WatermarkGenerator<T> {
+
+    /**
+     * Called for every event, allows the watermark generator to examine and 
remember the
+     * event timestamps, or to emit a watermark based on the event itself.
+     */
+    void onEvent(T event, long eventTimestamp, WatermarkOutput output);
+
+    /**
+     * Called periodically, and might emit a new watermark, or not.
+     *
+     * <p>The interval in which this method is called and Watermarks are 
generated
+     * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
+     */
+    void onPeriodicEmit(WatermarkOutput output);
+}
+{% endhighlight %}
+
+There are two different styles of watermark generation: *periodic* and
+*punctuated*.
+
+A periodic generator usually observes to the incoming events via `onEvent()`
+and then emits a watermark when the framework calls `onPeriodicEmit()`.
+
+A puncutated generator will look at events in `onEvent()` and wait for special
+*marker events* or *punctuations* that carry watermark information in the
+stream. When it sees one of these events it emits a watermark immediately.
+Usually, punctuated generators don't emit a watermark from `onPeriodicEmit()`.
 
-#### **With Periodic Watermarks**
+We will look at how to implement generators for each style next.
 
-`AssignerWithPeriodicWatermarks` assigns timestamps and generates watermarks 
periodically (possibly depending
-on the stream elements, or purely based on processing time).
+### Writing a Periodic WatermarkGenerator
 
-The interval (every *n* milliseconds) in which the watermark will be generated 
is defined via
-`ExecutionConfig.setAutoWatermarkInterval(...)`. The assigner's 
`getCurrentWatermark()` method will be
-called each time, and a new watermark will be emitted if the returned 
watermark is non-null and larger than the previous
-watermark.
+As mentioned above, a periodic generator observes stream events and generates

Review comment:
       You use the phrase "as mentioned above" alot. 
   ```suggestion
   A periodic generator observes stream events and generates
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to