This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 69cc19f  [FLINK-17773] Update documentation for new 
WatermarkGenerator/WatermarkStrategies
69cc19f is described below

commit 69cc19f12a2699ece521c8ce9d66e018f71d8340
Author: Aljoscha Krettek <aljos...@apache.org>
AuthorDate: Tue May 19 15:58:07 2020 +0200

    [FLINK-17773] Update documentation for new 
WatermarkGenerator/WatermarkStrategies
---
 docs/dev/event_time.md                  |  91 ++-----
 docs/dev/event_timestamp_extractors.md  | 105 ++++----
 docs/dev/event_timestamps_watermarks.md | 454 ++++++++++++++++++++------------
 3 files changed, 358 insertions(+), 292 deletions(-)

diff --git a/docs/dev/event_time.md b/docs/dev/event_time.md
index a25122f..5fcd7a9 100644
--- a/docs/dev/event_time.md
+++ b/docs/dev/event_time.md
@@ -34,30 +34,22 @@ For information about how to use time in Flink programs 
refer to
 [ProcessFunction]({% link
 dev/stream/operators/process_function.md %}).
 
-* toc
-{:toc}
+A prerequisite for using *event time* processing is setting the right *time
+characteristic*. That setting defines how data stream sources behave (for
+example, whether they will assign timestamps), and what notion of time should
+be used by window operations like `KeyedStream.timeWindow(Time.seconds(30))`.
 
-## Setting a Time Characteristic
-
-The first part of a Flink DataStream program usually sets the base *time 
characteristic*. That setting
-defines how data stream sources behave (for example, whether they will assign 
timestamps), and what notion of
-time should be used by window operations like 
`KeyedStream.timeWindow(Time.seconds(30))`.
-
-The following example shows a Flink program that aggregates events in hourly 
time windows. The behavior of the
-windows adapts with the time characteristic.
+You can set the time characteristic using
+`StreamExecutionEnvironment.setStreamTimeCharacteristic()`:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-// alternatively:
-// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
-DataStream<MyEvent> stream = env.addSource(new 
FlinkKafkaConsumer010<MyEvent>(topic, schema, props));
+DataStream<MyEvent> stream = env.addSource(new 
FlinkKafkaConsumer<MyEvent>(topic, schema, props));
 
 stream
     .keyBy( (event) -> event.getUser() )
@@ -70,13 +62,9 @@ stream
 {% highlight scala %}
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
-// alternatively:
-// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
-// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
-val stream: DataStream[MyEvent] = env.addSource(new 
FlinkKafkaConsumer010[MyEvent](topic, schema, props))
+val stream: DataStream[MyEvent] = env.addSource(new 
FlinkKafkaConsumer[MyEvent](topic, schema, props))
 
 stream
     .keyBy( _.getUser )
@@ -98,47 +86,22 @@ 
env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
 </div>
 </div>
 
-Note that in order to run this example in *event time*, the program needs to 
either use sources
-that directly define event time for the data and emit watermarks themselves, 
or the program must
-inject a *Timestamp Assigner & Watermark Generator* after the sources. Those 
functions describe how to access
-the event timestamps, and what degree of out-of-orderness the event stream 
exhibits.
-
-The section below describes the general mechanism behind *timestamps* and 
*watermarks*. For a guide on how
-to use timestamp assignment and watermark generation in the Flink DataStream 
API, please refer to
-[Generating Timestamps / Watermarks]({{ site.baseurl 
}}/dev/event_timestamps_watermarks.html).
-
-{% top %}
-
-## Idling sources
-
-Currently, with pure event time watermarks generators, watermarks can not 
progress if there are no elements
-to be processed. That means in case of gap in the incoming data, event time 
will not progress and for
-example the window operator will not be triggered and thus existing windows 
will not be able to produce any
-output data.
-
-To circumvent this one can use periodic watermark assigners that don't only 
assign based on
-element timestamps. An example solution could be an assigner that switches to 
using current processing time
-as the time basis after not observing new events for a while.
-
-Sources can be marked as idle using 
`SourceFunction.SourceContext#markAsTemporarilyIdle`. For details please refer 
to the Javadoc of
-this method as well as `StreamStatus`.
-
-## Debugging Watermarks
-
-Please refer to the [Debugging Windows & Event Time]({{ site.baseurl 
}}/monitoring/debugging_event_time.html) section for debugging
-watermarks at runtime.
-
-## How operators are processing watermarks
-
-As a general rule, operators are required to completely process a given 
watermark before forwarding it downstream. For example,
-`WindowOperator` will first evaluate which windows should be fired, and only 
after producing all of the output triggered by
-the watermark will the watermark itself be sent downstream. In other words, 
all elements produced due to occurrence of a watermark
-will be emitted before the watermark.
-
-The same rule applies to `TwoInputStreamOperator`. However, in this case the 
current watermark of the operator is defined as
-the minimum of both of its inputs.
-
-The details of this behavior are defined by the implementations of the 
`OneInputStreamOperator#processWatermark`,
-`TwoInputStreamOperator#processWatermark1` and 
`TwoInputStreamOperator#processWatermark2` methods.
+Note that in order to run this example in *event time*, the program needs to
+either use sources that directly define event time for the data and emit
+watermarks themselves, or the program must inject a *Timestamp Assigner &
+Watermark Generator* after the sources. Those functions describe how to access
+the event timestamps, and what degree of out-of-orderness the event stream
+exhibits.
+
+## Where to go next?
+
+* [Generating Watermarks]({% link dev/event_timestamps_watermarks.md
+  %}): Shows how to write timestamp assigners and watermark generators, which
+  are needed for event-time aware Flink applications.
+* [Builtin Watermark Generators]({% link dev/event_timestamp_extractors.md %}):
+  Gives an overview of the builtin watermark generators.
+* [Debugging Windows & Event Time]({{ site.baseurl
+  }}/monitoring/debugging_event_time.html): Show how to debug problems around
+  watermarks and timestamps in event-time Flink applications.
 
 {% top %}
diff --git a/docs/dev/event_timestamp_extractors.md 
b/docs/dev/event_timestamp_extractors.md
index 01b3634..a80181d 100644
--- a/docs/dev/event_timestamp_extractors.md
+++ b/docs/dev/event_timestamp_extractors.md
@@ -1,5 +1,5 @@
 ---
-title: "Pre-defined Timestamp Extractors / Watermark Emitters"
+title: "Builtin Watermark Generators"
 nav-parent_id: event_time
 nav-pos: 2
 ---
@@ -25,83 +25,80 @@ under the License.
 * toc
 {:toc}
 
-As described in [timestamps and watermark handling]({{ site.baseurl 
}}/dev/event_timestamps_watermarks.html),
-Flink provides abstractions that allow the programmer to assign their own 
timestamps and emit their own watermarks. More specifically,
-one can do so by implementing one of the `AssignerWithPeriodicWatermarks` and 
`AssignerWithPunctuatedWatermarks` interfaces, depending
-on the use case. In a nutshell, the first will emit watermarks periodically, 
while the second does so based on some property of
-the incoming records, e.g. whenever a special element is encountered in the 
stream.
+As described in [Generating Watermarks]({{ site.baseurl
+}}/dev/event_timestamps_watermarks.html), Flink provides abstractions that
+allow the programmer to assign their own timestamps and emit their own
+watermarks. More specifically, one can do so by implementing the
+`WatermarkGenerator` interface.
 
-In order to further ease the programming effort for such tasks, Flink comes 
with some pre-implemented timestamp assigners.
-This section provides a list of them. Apart from their out-of-the-box 
functionality, their implementation can serve as an example
-for custom implementations.
+In order to further ease the programming effort for such tasks, Flink comes
+with some pre-implemented timestamp assigners.  This section provides a list of
+them. Apart from their out-of-the-box functionality, their implementation can
+serve as an example for custom implementations.
 
-### **Assigners with ascending timestamps**
+## Monotonously Increasing Timestamps
 
-The simplest special case for *periodic* watermark generation is the case 
where timestamps seen by a given source task
-occur in ascending order. In that case, the current timestamp can always act 
as a watermark, because no earlier timestamps will
-arrive.
+The simplest special case for *periodic* watermark generation is the when
+timestamps seen by a given source task occur in ascending order. In that case,
+the current timestamp can always act as a watermark, because no earlier
+timestamps will arrive.
 
-Note that it is only necessary that timestamps are ascending *per parallel 
data source task*. For example, if
-in a specific setup one Kafka partition is read by one parallel data source 
instance, then it is only necessary that
-timestamps are ascending within each Kafka partition. Flink's watermark 
merging mechanism will generate correct
-watermarks whenever parallel streams are shuffled, unioned, connected, or 
merged.
+Note that it is only necessary that timestamps are ascending *per parallel data
+source task*. For example, if in a specific setup one Kafka partition is read
+by one parallel data source instance, then it is only necessary that timestamps
+are ascending within each Kafka partition. Flink's watermark merging mechanism
+will generate correct watermarks whenever parallel streams are shuffled,
+unioned, connected, or merged.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-DataStream<MyEvent> stream = ...
-
-DataStream<MyEvent> withTimestampsAndWatermarks =
-    stream.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor<MyEvent>() {
-
-        @Override
-        public long extractAscendingTimestamp(MyEvent element) {
-            return element.getCreationTime();
-        }
-});
+WatermarkStrategies
+        .<MyType>forMonotonousTimestamps()
+        .build();
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val stream: DataStream[MyEvent] = ...
-
-val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( 
_.getCreationTime )
+WatermarkStrategies
+  .forMonotonousTimestamps[MyType]()
+  .build()
 {% endhighlight %}
 </div>
 </div>
 
-### **Assigners allowing a fixed amount of lateness**
-
-Another example of periodic watermark generation is when the watermark lags 
behind the maximum (event-time) timestamp
-seen in the stream by a fixed amount of time. This case covers scenarios where 
the maximum lateness that can be encountered in a
-stream is known in advance, e.g. when creating a custom source containing 
elements with timestamps spread within a fixed period of
-time for testing. For these cases, Flink provides the 
`BoundedOutOfOrdernessTimestampExtractor` which takes as an argument
-the `maxOutOfOrderness`, i.e. the maximum amount of time an element is allowed 
to be late before being ignored when computing the
-final result for the given window. Lateness corresponds to the result of `t - 
t_w`, where `t` is the (event-time) timestamp of an
-element, and `t_w` that of the previous watermark. If `lateness > 0` then the 
element is considered late and is, by default, ignored when computing
-the result of the job for its corresponding window. See the documentation 
about [allowed lateness]({{ site.baseurl 
}}/dev/stream/operators/windows.html#allowed-lateness)
-for more information about working with late elements.
+## Fixed Amount of Lateness
+
+Another example of periodic watermark generation is when the watermark lags
+behind the maximum (event-time) timestamp seen in the stream by a fixed amount
+of time. This case covers scenarios where the maximum lateness that can be
+encountered in a stream is known in advance, e.g. when creating a custom source
+containing elements with timestamps spread within a fixed period of time for
+testing. For these cases, Flink provides the `BoundedOutOfOrdernessWatermarks`
+generator which takes as an argument the `maxOutOfOrderness`, i.e. the maximum
+amount of time an element is allowed to be late before being ignored when
+computing the final result for the given window. Lateness corresponds to the
+result of `t - t_w`, where `t` is the (event-time) timestamp of an element, and
+`t_w` that of the previous watermark.  If `lateness > 0` then the element is
+considered late and is, by default, ignored when computing the result of the
+job for its corresponding window. See the documentation about [allowed
+lateness]({{ site.baseurl
+}}/dev/stream/operators/windows.html#allowed-lateness) for more information
+about working with late elements.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-DataStream<MyEvent> stream = ...
-
-DataStream<MyEvent> withTimestampsAndWatermarks =
-    stream.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
-
-        @Override
-        public long extractTimestamp(MyEvent element) {
-            return element.getCreationTime();
-        }
-});
+WatermarkStrategies
+        .<MyType>forBoundedOutOfOrderness(Duration.ofSeconds(10))
+        .build();
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val stream: DataStream[MyEvent] = ...
-
-val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( 
_.getCreationTime ))
+WatermarkStrategies
+  .forBoundedOutOfOrderness[MyType](Duration.ofSeconds(10))
+  .build()
 {% endhighlight %}
 </div>
 </div>
diff --git a/docs/dev/event_timestamps_watermarks.md 
b/docs/dev/event_timestamps_watermarks.md
index 28e1f2f..58f78c3 100644
--- a/docs/dev/event_timestamps_watermarks.md
+++ b/docs/dev/event_timestamps_watermarks.md
@@ -1,5 +1,5 @@
 ---
-title: "Generating Timestamps / Watermarks"
+title: "Generating Watermarks"
 nav-parent_id: event_time
 nav-pos: 1
 ---
@@ -22,115 +22,103 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+In this section you will learn about the APIs that Flink provides for working
+with **event time** timestamps and watermarks.  For an introduction to *event
+time*, *processing time*, and *ingestion time*, please refer to the
+[introduction to event time]({{ site.baseurl }}/dev/event_time.html).
+
 * toc
 {:toc}
 
+## Introduction to Watermark Strategies
 
-This section is relevant for programs running on **event time**. For an 
introduction to *event time*,
-*processing time*, and *ingestion time*, please refer to the [introduction to 
event time]({{ site.baseurl }}/dev/event_time.html).
-
-To work with *event time*, streaming programs need to set the *time 
characteristic* accordingly.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-env = StreamExecutionEnvironment.get_execution_environment()
-env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-## Assigning Timestamps
-
-In order to work with *event time*, Flink needs to know the events' 
*timestamps*, meaning each element in the
-stream needs to have its event timestamp *assigned*. This is usually done by 
accessing/extracting the
-timestamp from some field in the element.
+In order to work with *event time*, Flink needs to know the events
+*timestamps*, meaning each element in the stream needs to have its event
+timestamp *assigned*. This is usually done by accessing/extracting the
+timestamp from some field in the element by using a `TimestampAssigner`.
 
-Timestamp assignment goes hand-in-hand with generating watermarks, which tell 
the system about
-progress in event time.
+Timestamp assignment goes hand-in-hand with generating watermarks, which tell
+the system about progress in event time. You can configure this by specifying a
+`WatermarkGenerator`.
 
-There are two ways to assign timestamps and generate watermarks:
+The Flink API expects a `WatermarkStrategy` that contains both a 
`TimestampAssigner` and `WatermarkGenerator`.
+A number of common strategies out of the box, available in the 
`WatermarkStrategies` helper, but users can also build their own strategies 
when required. 
+Here is the interface for completeness' sake:
 
-  1. Directly in the data stream source
-  2. Via a timestamp assigner / watermark generator: in Flink, timestamp 
assigners also define the watermarks to be emitted
-
-<span class="label label-danger">Attention</span> Both timestamps and 
watermarks are specified as
-milliseconds since the Java epoch of 1970-01-01T00:00:00Z.
-
-### Source Functions with Timestamps and Watermarks
+{% highlight java %}
+public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, 
WatermarkGeneratorSupplier<T>{
 
-Stream sources can directly assign timestamps to the elements they produce, 
and they can also emit watermarks.
-When this is done, no timestamp assigner is needed.
-Note that if a timestamp assigner is used, any timestamps and watermarks 
provided by the source will be overwritten.
+    /**
+     * Instantiates a {@link TimestampAssigner} for assigning timestamps 
according to this
+     * strategy.
+     */
+    @Override
+    TimestampAssigner<T> 
createTimestampAssigner(TimestampAssignerSupplier.Context context);
 
-To assign a timestamp to an element in the source directly, the source must 
use the `collectWithTimestamp(...)`
-method on the `SourceContext`. To generate watermarks, the source must call 
the `emitWatermark(Watermark)` function.
+    /**
+     * Instantiates a WatermarkGenerator that generates watermarks according 
to this strategy.
+     */
+    @Override
+    WatermarkGenerator<T> 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
+}
+{% endhighlight %}
 
-Below is a simple example of a *(non-checkpointed)* source that assigns 
timestamps and generates watermarks:
+As mentioned, you usually don't implement this interface yourself but use the
+`WatermarkStrategies` helper for using common watermark strategies or to bundle
+together a custom `TimestampAssigner` with a `WatermarkGenerator`. For 
example, to use bounded-of-orderness watermarks and a lambda function as a 
timestamp assigner you use this:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-@Override
-public void run(SourceContext<MyType> ctx) throws Exception {
-       while (/* condition */) {
-               MyType next = getNext();
-               ctx.collectWithTimestamp(next, next.getEventTimestamp());
-
-               if (next.hasWatermarkTime()) {
-                       ctx.emitWatermark(new 
Watermark(next.getWatermarkTime()));
-               }
-       }
-}
+WatermarkStrategies
+        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
+        .withTimestampAssigner((event, timestamp) -> event.f0)
+        .build();
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-override def run(ctx: SourceContext[MyType]): Unit = {
-       while (/* condition */) {
-               val next: MyType = getNext()
-               ctx.collectWithTimestamp(next, next.eventTimestamp)
-
-               if (next.hasWatermarkTime) {
-                       ctx.emitWatermark(new Watermark(next.getWatermarkTime))
-               }
-       }
-}
+WatermarkStrategies
+  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
+  .withTimestampAssigner(new SerializableTimestampAssigner[(Long, String)] {
+    override def extractTimestamp(element: (Long, String), recordTimestamp: 
Long): Long = element._1
+  })
+  .build()
 {% endhighlight %}
+
+(Using Scala Lambdas here currently doesn't work because Scala is stupid and 
it's hard to support this. #fus)
 </div>
 </div>
 
+Specifying a `TimestampAssigner` is optional and in most cases you don't
+actually want to specify one. For example, when using Kafka or Kinesis you
+would get timestamps directly from the Kafka/Kinesis records.
 
-### Timestamp Assigners / Watermark Generators
+We will look at the `WatermarkGenerator` interface later in [Writing
+WatermarkGenerators](#writing-watermarkgenerators).
 
-Timestamp assigners take a stream and produce a new stream with timestamped 
elements and watermarks. If the
-original stream had timestamps and/or watermarks already, the timestamp 
assigner overwrites them.
+<div class="alert alert-warning">
+<strong>Attention</strong>: Both timestamps and watermarks
+are specified as milliseconds since the Java epoch of 1970-01-01T00:00:00Z.
+</div>
 
-Timestamp assigners are usually specified immediately after the data source, 
but it is not strictly required to do so.
-A common pattern, for example, is to parse (*MapFunction*) and filter 
(*FilterFunction*) before the timestamp assigner.
-In any case, the timestamp assigner needs to be specified before the first 
operation on event time
-(such as the first window operation). As a special case, when using Kafka as 
the source of a streaming job,
-Flink allows the specification of a timestamp assigner / watermark emitter 
inside
-the source (or consumer) itself. More information on how to do so can be found 
in the
-[Kafka Connector documentation]({{ site.baseurl }}/dev/connectors/kafka.html).
+## Using Watermark Strategies
 
+There are two places in Flink applications where a `WatermarkStrategy` can be
+used: 1) directly on sources and 2) after non-source operation.
 
-**NOTE:** The remainder of this section presents the main interfaces a 
programmer has
-to implement in order to create her own timestamp extractors/watermark 
emitters.
-To see the pre-implemented extractors that ship with Flink, please refer to the
-[Pre-defined Timestamp Extractors / Watermark Emitters]({{ site.baseurl 
}}/dev/event_timestamp_extractors.html) page.
+The first option is preferable, because it allows sources to exploit knowledge
+about shards/partitions/splits in the watermarking logic. Sources can usually
+then track watermarks at a finer level and the overall watermark produced by a
+source will be more accurate. Specifying a `WatermarkStrategy` directly on the
+source usually means you have to use a source specific interface/ Refer to
+[Watermark Strategies and the Kafka
+Connector](#watermark-strategies-and-the-kafka-connector) for how this works on
+a Kafka Connector and for more details about how per-partition watermarking
+works there.
+
+The second option (setting a `WatermarkStrategy` after arbitrary operations)
+should only be used if you cannot set a strategy directly on the source:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -144,7 +132,7 @@ DataStream<MyEvent> stream = env.readFile(
 
 DataStream<MyEvent> withTimestampsAndWatermarks = stream
         .filter( event -> event.severity() == WARNING )
-        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
+        .assignTimestampsAndWatermarks(<watermark strategy>);
 
 withTimestampsAndWatermarks
         .keyBy( (event) -> event.getGroup() )
@@ -164,7 +152,7 @@ val stream: DataStream[MyEvent] = env.readFile(
 
 val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
         .filter( _.severity == WARNING )
-        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
+        .assignTimestampsAndWatermarks(<watermark strategy>)
 
 withTimestampsAndWatermarks
         .keyBy( _.getGroup )
@@ -175,18 +163,103 @@ withTimestampsAndWatermarks
 </div>
 </div>
 
+Using a `WatermarkStrategy` this way takes a stream and produce a new stream
+with timestamped elements and watermarks. If the original stream had timestamps
+and/or watermarks already, the timestamp assigner overwrites them.
 
-#### **With Periodic Watermarks**
+## Dealing With Idle Sources
 
-`AssignerWithPeriodicWatermarks` assigns timestamps and generates watermarks 
periodically (possibly depending
-on the stream elements, or purely based on processing time).
+If one of the input splits/partitions/shards does not carry events for a while
+this means that the `WatermarkGenerator` also does not get any new information
+on which to base a watermark. We call this an *idle input* or an *idle source*.
+This is a problem because it can happen that some of your partitions do still
+carry events. In that case, the watermark will be held back, because it is
+computed as the minimum over all the different parallel watermarks.
 
-The interval (every *n* milliseconds) in which the watermark will be generated 
is defined via
-`ExecutionConfig.setAutoWatermarkInterval(...)`. The assigner's 
`getCurrentWatermark()` method will be
-called each time, and a new watermark will be emitted if the returned 
watermark is non-null and larger than the previous
-watermark.
+To deal with this, you can use a `WatermarkStrategy` that will detect idleness 
and mark an input as idle. `WatermarkStrategies` provides a convenience helper 
for this:
 
-Here we show two simple examples of timestamp assigners that use periodic 
watermark generation. Note that Flink ships with a 
`BoundedOutOfOrdernessTimestampExtractor` similar to the 
`BoundedOutOfOrdernessGenerator` shown below, which you can read about 
[here]({{ site.baseurl 
}}/dev/event_timestamp_extractors.html#assigners-allowing-a-fixed-amount-of-lateness).
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+WatermarkStrategies
+        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
+        .withIdleness(Duration.ofMinutes(1))
+        .build();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+WatermarkStrategies
+  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
+  .withIdleness(Duration.ofMinutes(1))
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
+
+## Writing WatermarkGenerators
+
+A `TimestampAssigner` is a simple function that extracts a field from an 
event, we therefore don't need to look at them in detail. A 
`WatermarkGenerator`, on the other hand, is a bit more complicated to write and 
we will look at how you can do that in the next two sections. This is the 
`WatermarkGenerator` interface:
+
+{% highlight java %}
+/**
+ * The {@code WatermarkGenerator} generates watermarks either based on events 
or
+ * periodically (in a fixed interval).
+ *
+ * <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction 
between the
+ * {@code AssignerWithPunctuatedWatermarks} and the {@code 
AssignerWithPeriodicWatermarks}.
+ */
+@Public
+public interface WatermarkGenerator<T> {
+
+    /**
+     * Called for every event, allows the watermark generator to examine and 
remember the
+     * event timestamps, or to emit a watermark based on the event itself.
+     */
+    void onEvent(T event, long eventTimestamp, WatermarkOutput output);
+
+    /**
+     * Called periodically, and might emit a new watermark, or not.
+     *
+     * <p>The interval in which this method is called and Watermarks are 
generated
+     * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
+     */
+    void onPeriodicEmit(WatermarkOutput output);
+}
+{% endhighlight %}
+
+There are two different styles of watermark generation: *periodic* and
+*punctuated*.
+
+A periodic generator usually observes to the incoming events via `onEvent()`
+and then emits a watermark when the framework calls `onPeriodicEmit()`.
+
+A puncutated generator will look at events in `onEvent()` and wait for special
+*marker events* or *punctuations* that carry watermark information in the
+stream. When it sees one of these events it emits a watermark immediately.
+Usually, punctuated generators don't emit a watermark from `onPeriodicEmit()`.
+
+We will look at how to implement generators for each style next.
+
+### Writing a Periodic WatermarkGenerator
+
+A periodic generator observes stream events and generates
+watermarks periodically (possibly depending on the stream elements, or purely
+based on processing time).
+
+The interval (every *n* milliseconds) in which the watermark will be generated
+is defined via `ExecutionConfig.setAutoWatermarkInterval(...)`. The
+generators's `onPeriodicEmit()` method will be called each time, and a new
+watermark will be emitted if the returned watermark is non-null and larger than
+the previous watermark.
+
+Here we show two simple examples of watermark generators that use periodic
+watermark generation. Note that Flink ships with
+`BoundedOutOfOrdernessWatermarks`, which is a `WatermarkGenerator` that works
+similarly to the `BoundedOutOfOrdernessGenerator` shown below. You can read
+about using that [here]({{ site.baseurl
+}}/dev/event_timestamp_extractors.html#assigners-allowing-a-fixed-amount-of-lateness).
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -196,44 +269,42 @@ Here we show two simple examples of timestamp assigners 
that use periodic waterm
  * but only to a certain degree. The latest elements for a certain timestamp t 
will arrive
  * at most n milliseconds after the earliest elements for timestamp t.
  */
-public class BoundedOutOfOrdernessGenerator implements 
AssignerWithPeriodicWatermarks<MyEvent> {
+public class BoundedOutOfOrdernessGenerator implements 
WatermarkGenerator<MyEvent> {
 
     private final long maxOutOfOrderness = 3500; // 3.5 seconds
 
     private long currentMaxTimestamp;
 
     @Override
-    public long extractTimestamp(MyEvent element, long 
previousElementTimestamp) {
-        long timestamp = element.getCreationTime();
-        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
-        return timestamp;
+    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput 
output) {
+        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
     }
 
     @Override
-    public Watermark getCurrentWatermark() {
-        // return the watermark as current highest timestamp minus the 
out-of-orderness bound
-        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
+    public void onPeriodicEmit(WatermarkOutput output) {
+        // emit the watermark as current highest timestamp minus the 
out-of-orderness bound
+        output.emitWatermark(new Watermark(currentMaxTimestamp - 
maxOutOfOrderness - 1));
     }
+
 }
 
 /**
  * This generator generates watermarks that are lagging behind processing time 
by a fixed amount.
  * It assumes that elements arrive in Flink after a bounded delay.
  */
-public class TimeLagWatermarkGenerator implements 
AssignerWithPeriodicWatermarks<MyEvent> {
+public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {
 
-       private final long maxTimeLag = 5000; // 5 seconds
+    private final long maxTimeLag = 5000; // 5 seconds
 
-       @Override
-       public long extractTimestamp(MyEvent element, long 
previousElementTimestamp) {
-               return element.getCreationTime();
-       }
+    @Override
+    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput 
output) {
+        // don't need to do anything because we work on processing time
+    }
 
-       @Override
-       public Watermark getCurrentWatermark() {
-               // return the watermark as current time minus the maximum time 
lag
-               return new Watermark(System.currentTimeMillis() - maxTimeLag);
-       }
+    @Override
+    public void onPeriodicEmit(WatermarkOutput output) {
+        output.emitWatermark(new Watermark(System.currentTimeMillis() - 
maxTimeLag));
+    }
 }
 {% endhighlight %}
 </div>
@@ -250,15 +321,13 @@ class BoundedOutOfOrdernessGenerator extends 
AssignerWithPeriodicWatermarks[MyEv
 
     var currentMaxTimestamp: Long = _
 
-    override def extractTimestamp(element: MyEvent, previousElementTimestamp: 
Long): Long = {
-        val timestamp = element.getCreationTime()
-        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
-        timestamp
+    override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = {
+        currentMaxTimestamp = max(eventTimestamp, currentMaxTimestamp)
     }
 
-    override def getCurrentWatermark(): Watermark = {
-        // return the watermark as current highest timestamp minus the 
out-of-orderness bound
-        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
+    override def onPeriodicEmit(): Unit = {
+        // emit the watermark as current highest timestamp minus the 
out-of-orderness bound
+        output.emitWatermark(new Watermark(currentMaxTimestamp - 
maxOutOfOrderness - 1));
     }
 }
 
@@ -270,45 +339,43 @@ class TimeLagWatermarkGenerator extends 
AssignerWithPeriodicWatermarks[MyEvent]
 
     val maxTimeLag = 5000L // 5 seconds
 
-    override def extractTimestamp(element: MyEvent, previousElementTimestamp: 
Long): Long = {
-        element.getCreationTime
+    override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = {
+        // don't need to do anything because we work on processing time
     }
 
-    override def getCurrentWatermark(): Watermark = {
-        // return the watermark as current time minus the maximum time lag
-        new Watermark(System.currentTimeMillis() - maxTimeLag)
+    override def onPeriodicEmit(): Unit = {
+        output.emitWatermark(new Watermark(System.currentTimeMillis() - 
maxTimeLag));
     }
 }
 {% endhighlight %}
 </div>
 </div>
 
-#### **With Punctuated Watermarks**
+### Writing a Punctuated WatermarkGenerator
 
-To generate watermarks whenever a certain event indicates that a new watermark 
might be generated, use
-`AssignerWithPunctuatedWatermarks`. For this class Flink will first call the 
`extractTimestamp(...)` method
-to assign the element a timestamp, and then immediately call the
-`checkAndGetNextWatermark(...)` method on that element.
+A punctuated watermark generator will observe the stream of
+events and emit a watermark whenever it sees a special element that carries
+watermark information.
 
-The `checkAndGetNextWatermark(...)` method is passed the timestamp that was 
assigned in the `extractTimestamp(...)`
-method, and can decide whether it wants to generate a watermark. Whenever the 
`checkAndGetNextWatermark(...)`
-method returns a non-null watermark, and that watermark is larger than the 
latest previous watermark, that
-new watermark will be emitted.
+This is how you can implement a punctuated generator that emits a watermark
+whenever an event indicates that it carries a certain marker:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-public class PunctuatedAssigner implements 
AssignerWithPunctuatedWatermarks<MyEvent> {
+public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {
 
-       @Override
-       public long extractTimestamp(MyEvent element, long 
previousElementTimestamp) {
-               return element.getCreationTime();
-       }
+    @Override
+    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput 
output) {
+        if (event.hasWatermarkMarker()) {
+            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
+        }
+    }
 
-       @Override
-       public Watermark checkAndGetNextWatermark(MyEvent lastElement, long 
extractedTimestamp) {
-               return lastElement.hasWatermarkMarker() ? new 
Watermark(extractedTimestamp) : null;
-       }
+    @Override
+    public void onPeriodicEmit(WatermarkOutput output) {
+        // don't need to do anything because we emit in reaction to events 
above
+    }
 }
 {% endhighlight %}
 </div>
@@ -316,59 +383,71 @@ public class PunctuatedAssigner implements 
AssignerWithPunctuatedWatermarks<MyEv
 {% highlight scala %}
 class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
 
-       override def extractTimestamp(element: MyEvent, 
previousElementTimestamp: Long): Long = {
-               element.getCreationTime
-       }
+    override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = {
+        if (event.hasWatermarkMarker()) {
+            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()))
+        }
+    }
 
-       override def checkAndGetNextWatermark(lastElement: MyEvent, 
extractedTimestamp: Long): Watermark = {
-               if (lastElement.hasWatermarkMarker()) new 
Watermark(extractedTimestamp) else null
-       }
+    override def onPeriodicEmit(): Unit = {
+        // don't need to do anything because we emit in reaction to events 
above
+    }
 }
 {% endhighlight %}
 </div>
 </div>
 
-*Note:* It is possible to generate a watermark on every single event. However, 
because each watermark causes some
-computation downstream, an excessive number of watermarks degrades performance.
-
-
-## Timestamps per Kafka Partition
+<div class="alert alert-warning">
+<strong>Note</strong>: It is possible to
+generate a watermark on every single event. However, because each watermark
+causes some computation downstream, an excessive number of watermarks degrades
+performance.
+</div>
 
-When using [Apache Kafka](connectors/kafka.html) as a data source, each Kafka 
partition may have a simple event time pattern (ascending
-timestamps or bounded out-of-orderness). However, when consuming streams from 
Kafka, multiple partitions often get consumed in parallel,
-interleaving the events from the partitions and destroying the per-partition 
patterns (this is inherent in how Kafka's consumer clients work).
+## Watermark Strategies and the Kafka Connector
 
-In that case, you can use Flink's Kafka-partition-aware watermark generation. 
Using that feature, watermarks are generated inside the
-Kafka consumer, per Kafka partition, and the per-partition watermarks are 
merged in the same way as watermarks are merged on stream shuffles.
+When using [Apache Kafka](connectors/kafka.html) as a data source, each Kafka
+partition may have a simple event time pattern (ascending timestamps or bounded
+out-of-orderness). However, when consuming streams from Kafka, multiple
+partitions often get consumed in parallel, interleaving the events from the
+partitions and destroying the per-partition patterns (this is inherent in how
+Kafka's consumer clients work).
 
-For example, if event timestamps are strictly ascending per Kafka partition, 
generating per-partition watermarks with the
-[ascending timestamps watermark 
generator](event_timestamp_extractors.html#assigners-with-ascending-timestamps) 
will result in perfect overall watermarks.
+In that case, you can use Flink's Kafka-partition-aware watermark generation.
+Using that feature, watermarks are generated inside the Kafka consumer, per
+Kafka partition, and the per-partition watermarks are merged in the same way as
+watermarks are merged on stream shuffles.
 
-The illustrations below show how to use the per-Kafka-partition watermark 
generation, and how watermarks propagate through the
-streaming dataflow in that case.
+For example, if event timestamps are strictly ascending per Kafka partition,
+generating per-partition watermarks with the [ascending timestamps watermark
+generator](event_timestamp_extractors.html#assigners-with-ascending-timestamps)
+will result in perfect overall watermarks. Note, that we don't provide a
+`TimestampAssigner` in the example, the timestamps of the Kafka records
+themselves will be used instead.
 
+The illustrations below show how to use the per-Kafka-partition watermark
+generation, and how watermarks propagate through the streaming dataflow in that
+case.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-FlinkKafkaConsumer010<MyType> kafkaSource = new 
FlinkKafkaConsumer010<>("myTopic", schema, props);
-kafkaSource.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor<MyType>() {
-
-    @Override
-    public long extractAscendingTimestamp(MyType element) {
-        return element.eventTimestamp();
-    }
-});
+FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", 
schema, props);
+kafkaSource.assignTimestampsAndWatermarks(
+        WatermarkStrategies.
+                .<MyType>forBoundedOutOfOrderness(Duration.ofSeconds(20))
+                .build());
 
 DataStream<MyType> stream = env.addSource(kafkaSource);
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val kafkaSource = new FlinkKafkaConsumer010[MyType]("myTopic", schema, props)
-kafkaSource.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor[MyType] {
-    def extractAscendingTimestamp(element: MyType): Long = 
element.eventTimestamp
-})
+val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
+kafkaSource.assignTimestampsAndWatermarks(
+  WatermarkStrategies
+    .forBoundedOutOfOrderness[MyType](Duration.ofSeconds(20))
+    .build())
 
 val stream: DataStream[MyType] = env.addSource(kafkaSource)
 {% endhighlight %}
@@ -377,4 +456,31 @@ val stream: DataStream[MyType] = env.addSource(kafkaSource)
 
 <img src="{{ site.baseurl }}/fig/parallel_kafka_watermarks.svg" 
alt="Generating Watermarks with awareness for Kafka-partitions" class="center" 
width="80%" />
 
+## How Operators Process Watermarks
+
+As a general rule, operators are required to completely process a given
+watermark before forwarding it downstream. For example, `WindowOperator` will
+first evaluate all windows that should be fired, and only after producing all 
of
+the output triggered by the watermark will the watermark itself be sent
+downstream. In other words, all elements produced due to occurrence of a
+watermark will be emitted before the watermark.
+
+The same rule applies to `TwoInputStreamOperator`. However, in this case the
+current watermark of the operator is defined as the minimum of both of its
+inputs.
+
+The details of this behavior are defined by the implementations of the
+`OneInputStreamOperator#processWatermark`,
+`TwoInputStreamOperator#processWatermark1` and
+`TwoInputStreamOperator#processWatermark2` methods.
+
+## The Deprecated AssignerWithPeriodicWatermarks and 
AssignerWithPunctuatedWatermarks
+
+Prior to introducing the current abstraction of `WatermarkStrategy`,
+`TimestampAssigner`, and `WatermarkGenerator`, Flink used
+`AssignerWithPeriodicWatermarks` and `AssignerWithPeriodicWatermarks`. You will
+still see them in the API but it is recommended to use the new interfaces
+because they offer a clearer separation of concerns and also unify periodic and
+punctuated styles of watermark generation.
+
 {% top %}

Reply via email to