This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new 69cc19f [FLINK-17773] Update documentation for new WatermarkGenerator/WatermarkStrategies 69cc19f is described below commit 69cc19f12a2699ece521c8ce9d66e018f71d8340 Author: Aljoscha Krettek <aljos...@apache.org> AuthorDate: Tue May 19 15:58:07 2020 +0200 [FLINK-17773] Update documentation for new WatermarkGenerator/WatermarkStrategies --- docs/dev/event_time.md | 91 ++----- docs/dev/event_timestamp_extractors.md | 105 ++++---- docs/dev/event_timestamps_watermarks.md | 454 ++++++++++++++++++++------------ 3 files changed, 358 insertions(+), 292 deletions(-) diff --git a/docs/dev/event_time.md b/docs/dev/event_time.md index a25122f..5fcd7a9 100644 --- a/docs/dev/event_time.md +++ b/docs/dev/event_time.md @@ -34,30 +34,22 @@ For information about how to use time in Flink programs refer to [ProcessFunction]({% link dev/stream/operators/process_function.md %}). -* toc -{:toc} +A prerequisite for using *event time* processing is setting the right *time +characteristic*. That setting defines how data stream sources behave (for +example, whether they will assign timestamps), and what notion of time should +be used by window operations like `KeyedStream.timeWindow(Time.seconds(30))`. -## Setting a Time Characteristic - -The first part of a Flink DataStream program usually sets the base *time characteristic*. That setting -defines how data stream sources behave (for example, whether they will assign timestamps), and what notion of -time should be used by window operations like `KeyedStream.timeWindow(Time.seconds(30))`. - -The following example shows a Flink program that aggregates events in hourly time windows. The behavior of the -windows adapts with the time characteristic. +You can set the time characteristic using +`StreamExecutionEnvironment.setStreamTimeCharacteristic()`: <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); - -// alternatively: -// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); -// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); -DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer010<MyEvent>(topic, schema, props)); +DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer<MyEvent>(topic, schema, props)); stream .keyBy( (event) -> event.getUser() ) @@ -70,13 +62,9 @@ stream {% highlight scala %} val env = StreamExecutionEnvironment.getExecutionEnvironment -env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) -// alternatively: -// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) -// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - -val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer010[MyEvent](topic, schema, props)) +val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer[MyEvent](topic, schema, props)) stream .keyBy( _.getUser ) @@ -98,47 +86,22 @@ env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime) </div> </div> -Note that in order to run this example in *event time*, the program needs to either use sources -that directly define event time for the data and emit watermarks themselves, or the program must -inject a *Timestamp Assigner & Watermark Generator* after the sources. Those functions describe how to access -the event timestamps, and what degree of out-of-orderness the event stream exhibits. - -The section below describes the general mechanism behind *timestamps* and *watermarks*. For a guide on how -to use timestamp assignment and watermark generation in the Flink DataStream API, please refer to -[Generating Timestamps / Watermarks]({{ site.baseurl }}/dev/event_timestamps_watermarks.html). - -{% top %} - -## Idling sources - -Currently, with pure event time watermarks generators, watermarks can not progress if there are no elements -to be processed. That means in case of gap in the incoming data, event time will not progress and for -example the window operator will not be triggered and thus existing windows will not be able to produce any -output data. - -To circumvent this one can use periodic watermark assigners that don't only assign based on -element timestamps. An example solution could be an assigner that switches to using current processing time -as the time basis after not observing new events for a while. - -Sources can be marked as idle using `SourceFunction.SourceContext#markAsTemporarilyIdle`. For details please refer to the Javadoc of -this method as well as `StreamStatus`. - -## Debugging Watermarks - -Please refer to the [Debugging Windows & Event Time]({{ site.baseurl }}/monitoring/debugging_event_time.html) section for debugging -watermarks at runtime. - -## How operators are processing watermarks - -As a general rule, operators are required to completely process a given watermark before forwarding it downstream. For example, -`WindowOperator` will first evaluate which windows should be fired, and only after producing all of the output triggered by -the watermark will the watermark itself be sent downstream. In other words, all elements produced due to occurrence of a watermark -will be emitted before the watermark. - -The same rule applies to `TwoInputStreamOperator`. However, in this case the current watermark of the operator is defined as -the minimum of both of its inputs. - -The details of this behavior are defined by the implementations of the `OneInputStreamOperator#processWatermark`, -`TwoInputStreamOperator#processWatermark1` and `TwoInputStreamOperator#processWatermark2` methods. +Note that in order to run this example in *event time*, the program needs to +either use sources that directly define event time for the data and emit +watermarks themselves, or the program must inject a *Timestamp Assigner & +Watermark Generator* after the sources. Those functions describe how to access +the event timestamps, and what degree of out-of-orderness the event stream +exhibits. + +## Where to go next? + +* [Generating Watermarks]({% link dev/event_timestamps_watermarks.md + %}): Shows how to write timestamp assigners and watermark generators, which + are needed for event-time aware Flink applications. +* [Builtin Watermark Generators]({% link dev/event_timestamp_extractors.md %}): + Gives an overview of the builtin watermark generators. +* [Debugging Windows & Event Time]({{ site.baseurl + }}/monitoring/debugging_event_time.html): Show how to debug problems around + watermarks and timestamps in event-time Flink applications. {% top %} diff --git a/docs/dev/event_timestamp_extractors.md b/docs/dev/event_timestamp_extractors.md index 01b3634..a80181d 100644 --- a/docs/dev/event_timestamp_extractors.md +++ b/docs/dev/event_timestamp_extractors.md @@ -1,5 +1,5 @@ --- -title: "Pre-defined Timestamp Extractors / Watermark Emitters" +title: "Builtin Watermark Generators" nav-parent_id: event_time nav-pos: 2 --- @@ -25,83 +25,80 @@ under the License. * toc {:toc} -As described in [timestamps and watermark handling]({{ site.baseurl }}/dev/event_timestamps_watermarks.html), -Flink provides abstractions that allow the programmer to assign their own timestamps and emit their own watermarks. More specifically, -one can do so by implementing one of the `AssignerWithPeriodicWatermarks` and `AssignerWithPunctuatedWatermarks` interfaces, depending -on the use case. In a nutshell, the first will emit watermarks periodically, while the second does so based on some property of -the incoming records, e.g. whenever a special element is encountered in the stream. +As described in [Generating Watermarks]({{ site.baseurl +}}/dev/event_timestamps_watermarks.html), Flink provides abstractions that +allow the programmer to assign their own timestamps and emit their own +watermarks. More specifically, one can do so by implementing the +`WatermarkGenerator` interface. -In order to further ease the programming effort for such tasks, Flink comes with some pre-implemented timestamp assigners. -This section provides a list of them. Apart from their out-of-the-box functionality, their implementation can serve as an example -for custom implementations. +In order to further ease the programming effort for such tasks, Flink comes +with some pre-implemented timestamp assigners. This section provides a list of +them. Apart from their out-of-the-box functionality, their implementation can +serve as an example for custom implementations. -### **Assigners with ascending timestamps** +## Monotonously Increasing Timestamps -The simplest special case for *periodic* watermark generation is the case where timestamps seen by a given source task -occur in ascending order. In that case, the current timestamp can always act as a watermark, because no earlier timestamps will -arrive. +The simplest special case for *periodic* watermark generation is the when +timestamps seen by a given source task occur in ascending order. In that case, +the current timestamp can always act as a watermark, because no earlier +timestamps will arrive. -Note that it is only necessary that timestamps are ascending *per parallel data source task*. For example, if -in a specific setup one Kafka partition is read by one parallel data source instance, then it is only necessary that -timestamps are ascending within each Kafka partition. Flink's watermark merging mechanism will generate correct -watermarks whenever parallel streams are shuffled, unioned, connected, or merged. +Note that it is only necessary that timestamps are ascending *per parallel data +source task*. For example, if in a specific setup one Kafka partition is read +by one parallel data source instance, then it is only necessary that timestamps +are ascending within each Kafka partition. Flink's watermark merging mechanism +will generate correct watermarks whenever parallel streams are shuffled, +unioned, connected, or merged. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataStream<MyEvent> stream = ... - -DataStream<MyEvent> withTimestampsAndWatermarks = - stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() { - - @Override - public long extractAscendingTimestamp(MyEvent element) { - return element.getCreationTime(); - } -}); +WatermarkStrategies + .<MyType>forMonotonousTimestamps() + .build(); {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -val stream: DataStream[MyEvent] = ... - -val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime ) +WatermarkStrategies + .forMonotonousTimestamps[MyType]() + .build() {% endhighlight %} </div> </div> -### **Assigners allowing a fixed amount of lateness** - -Another example of periodic watermark generation is when the watermark lags behind the maximum (event-time) timestamp -seen in the stream by a fixed amount of time. This case covers scenarios where the maximum lateness that can be encountered in a -stream is known in advance, e.g. when creating a custom source containing elements with timestamps spread within a fixed period of -time for testing. For these cases, Flink provides the `BoundedOutOfOrdernessTimestampExtractor` which takes as an argument -the `maxOutOfOrderness`, i.e. the maximum amount of time an element is allowed to be late before being ignored when computing the -final result for the given window. Lateness corresponds to the result of `t - t_w`, where `t` is the (event-time) timestamp of an -element, and `t_w` that of the previous watermark. If `lateness > 0` then the element is considered late and is, by default, ignored when computing -the result of the job for its corresponding window. See the documentation about [allowed lateness]({{ site.baseurl }}/dev/stream/operators/windows.html#allowed-lateness) -for more information about working with late elements. +## Fixed Amount of Lateness + +Another example of periodic watermark generation is when the watermark lags +behind the maximum (event-time) timestamp seen in the stream by a fixed amount +of time. This case covers scenarios where the maximum lateness that can be +encountered in a stream is known in advance, e.g. when creating a custom source +containing elements with timestamps spread within a fixed period of time for +testing. For these cases, Flink provides the `BoundedOutOfOrdernessWatermarks` +generator which takes as an argument the `maxOutOfOrderness`, i.e. the maximum +amount of time an element is allowed to be late before being ignored when +computing the final result for the given window. Lateness corresponds to the +result of `t - t_w`, where `t` is the (event-time) timestamp of an element, and +`t_w` that of the previous watermark. If `lateness > 0` then the element is +considered late and is, by default, ignored when computing the result of the +job for its corresponding window. See the documentation about [allowed +lateness]({{ site.baseurl +}}/dev/stream/operators/windows.html#allowed-lateness) for more information +about working with late elements. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataStream<MyEvent> stream = ... - -DataStream<MyEvent> withTimestampsAndWatermarks = - stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) { - - @Override - public long extractTimestamp(MyEvent element) { - return element.getCreationTime(); - } -}); +WatermarkStrategies + .<MyType>forBoundedOutOfOrderness(Duration.ofSeconds(10)) + .build(); {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -val stream: DataStream[MyEvent] = ... - -val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime )) +WatermarkStrategies + .forBoundedOutOfOrderness[MyType](Duration.ofSeconds(10)) + .build() {% endhighlight %} </div> </div> diff --git a/docs/dev/event_timestamps_watermarks.md b/docs/dev/event_timestamps_watermarks.md index 28e1f2f..58f78c3 100644 --- a/docs/dev/event_timestamps_watermarks.md +++ b/docs/dev/event_timestamps_watermarks.md @@ -1,5 +1,5 @@ --- -title: "Generating Timestamps / Watermarks" +title: "Generating Watermarks" nav-parent_id: event_time nav-pos: 1 --- @@ -22,115 +22,103 @@ specific language governing permissions and limitations under the License. --> +In this section you will learn about the APIs that Flink provides for working +with **event time** timestamps and watermarks. For an introduction to *event +time*, *processing time*, and *ingestion time*, please refer to the +[introduction to event time]({{ site.baseurl }}/dev/event_time.html). + * toc {:toc} +## Introduction to Watermark Strategies -This section is relevant for programs running on **event time**. For an introduction to *event time*, -*processing time*, and *ingestion time*, please refer to the [introduction to event time]({{ site.baseurl }}/dev/event_time.html). - -To work with *event time*, streaming programs need to set the *time characteristic* accordingly. - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); -{% endhighlight %} -</div> -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val env = StreamExecutionEnvironment.getExecutionEnvironment -env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) -{% endhighlight %} -</div> -<div data-lang="python" markdown="1"> -{% highlight python %} -env = StreamExecutionEnvironment.get_execution_environment() -env.set_stream_time_characteristic(TimeCharacteristic.EventTime) -{% endhighlight %} -</div> -</div> - -## Assigning Timestamps - -In order to work with *event time*, Flink needs to know the events' *timestamps*, meaning each element in the -stream needs to have its event timestamp *assigned*. This is usually done by accessing/extracting the -timestamp from some field in the element. +In order to work with *event time*, Flink needs to know the events +*timestamps*, meaning each element in the stream needs to have its event +timestamp *assigned*. This is usually done by accessing/extracting the +timestamp from some field in the element by using a `TimestampAssigner`. -Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about -progress in event time. +Timestamp assignment goes hand-in-hand with generating watermarks, which tell +the system about progress in event time. You can configure this by specifying a +`WatermarkGenerator`. -There are two ways to assign timestamps and generate watermarks: +The Flink API expects a `WatermarkStrategy` that contains both a `TimestampAssigner` and `WatermarkGenerator`. +A number of common strategies out of the box, available in the `WatermarkStrategies` helper, but users can also build their own strategies when required. +Here is the interface for completeness' sake: - 1. Directly in the data stream source - 2. Via a timestamp assigner / watermark generator: in Flink, timestamp assigners also define the watermarks to be emitted - -<span class="label label-danger">Attention</span> Both timestamps and watermarks are specified as -milliseconds since the Java epoch of 1970-01-01T00:00:00Z. - -### Source Functions with Timestamps and Watermarks +{% highlight java %} +public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{ -Stream sources can directly assign timestamps to the elements they produce, and they can also emit watermarks. -When this is done, no timestamp assigner is needed. -Note that if a timestamp assigner is used, any timestamps and watermarks provided by the source will be overwritten. + /** + * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this + * strategy. + */ + @Override + TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context); -To assign a timestamp to an element in the source directly, the source must use the `collectWithTimestamp(...)` -method on the `SourceContext`. To generate watermarks, the source must call the `emitWatermark(Watermark)` function. + /** + * Instantiates a WatermarkGenerator that generates watermarks according to this strategy. + */ + @Override + WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context); +} +{% endhighlight %} -Below is a simple example of a *(non-checkpointed)* source that assigns timestamps and generates watermarks: +As mentioned, you usually don't implement this interface yourself but use the +`WatermarkStrategies` helper for using common watermark strategies or to bundle +together a custom `TimestampAssigner` with a `WatermarkGenerator`. For example, to use bounded-of-orderness watermarks and a lambda function as a timestamp assigner you use this: <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -@Override -public void run(SourceContext<MyType> ctx) throws Exception { - while (/* condition */) { - MyType next = getNext(); - ctx.collectWithTimestamp(next, next.getEventTimestamp()); - - if (next.hasWatermarkTime()) { - ctx.emitWatermark(new Watermark(next.getWatermarkTime())); - } - } -} +WatermarkStrategies + .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)) + .withTimestampAssigner((event, timestamp) -> event.f0) + .build(); {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -override def run(ctx: SourceContext[MyType]): Unit = { - while (/* condition */) { - val next: MyType = getNext() - ctx.collectWithTimestamp(next, next.eventTimestamp) - - if (next.hasWatermarkTime) { - ctx.emitWatermark(new Watermark(next.getWatermarkTime)) - } - } -} +WatermarkStrategies + .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20)) + .withTimestampAssigner(new SerializableTimestampAssigner[(Long, String)] { + override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long = element._1 + }) + .build() {% endhighlight %} + +(Using Scala Lambdas here currently doesn't work because Scala is stupid and it's hard to support this. #fus) </div> </div> +Specifying a `TimestampAssigner` is optional and in most cases you don't +actually want to specify one. For example, when using Kafka or Kinesis you +would get timestamps directly from the Kafka/Kinesis records. -### Timestamp Assigners / Watermark Generators +We will look at the `WatermarkGenerator` interface later in [Writing +WatermarkGenerators](#writing-watermarkgenerators). -Timestamp assigners take a stream and produce a new stream with timestamped elements and watermarks. If the -original stream had timestamps and/or watermarks already, the timestamp assigner overwrites them. +<div class="alert alert-warning"> +<strong>Attention</strong>: Both timestamps and watermarks +are specified as milliseconds since the Java epoch of 1970-01-01T00:00:00Z. +</div> -Timestamp assigners are usually specified immediately after the data source, but it is not strictly required to do so. -A common pattern, for example, is to parse (*MapFunction*) and filter (*FilterFunction*) before the timestamp assigner. -In any case, the timestamp assigner needs to be specified before the first operation on event time -(such as the first window operation). As a special case, when using Kafka as the source of a streaming job, -Flink allows the specification of a timestamp assigner / watermark emitter inside -the source (or consumer) itself. More information on how to do so can be found in the -[Kafka Connector documentation]({{ site.baseurl }}/dev/connectors/kafka.html). +## Using Watermark Strategies +There are two places in Flink applications where a `WatermarkStrategy` can be +used: 1) directly on sources and 2) after non-source operation. -**NOTE:** The remainder of this section presents the main interfaces a programmer has -to implement in order to create her own timestamp extractors/watermark emitters. -To see the pre-implemented extractors that ship with Flink, please refer to the -[Pre-defined Timestamp Extractors / Watermark Emitters]({{ site.baseurl }}/dev/event_timestamp_extractors.html) page. +The first option is preferable, because it allows sources to exploit knowledge +about shards/partitions/splits in the watermarking logic. Sources can usually +then track watermarks at a finer level and the overall watermark produced by a +source will be more accurate. Specifying a `WatermarkStrategy` directly on the +source usually means you have to use a source specific interface/ Refer to +[Watermark Strategies and the Kafka +Connector](#watermark-strategies-and-the-kafka-connector) for how this works on +a Kafka Connector and for more details about how per-partition watermarking +works there. + +The second option (setting a `WatermarkStrategy` after arbitrary operations) +should only be used if you cannot set a strategy directly on the source: <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> @@ -144,7 +132,7 @@ DataStream<MyEvent> stream = env.readFile( DataStream<MyEvent> withTimestampsAndWatermarks = stream .filter( event -> event.severity() == WARNING ) - .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks()); + .assignTimestampsAndWatermarks(<watermark strategy>); withTimestampsAndWatermarks .keyBy( (event) -> event.getGroup() ) @@ -164,7 +152,7 @@ val stream: DataStream[MyEvent] = env.readFile( val withTimestampsAndWatermarks: DataStream[MyEvent] = stream .filter( _.severity == WARNING ) - .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks()) + .assignTimestampsAndWatermarks(<watermark strategy>) withTimestampsAndWatermarks .keyBy( _.getGroup ) @@ -175,18 +163,103 @@ withTimestampsAndWatermarks </div> </div> +Using a `WatermarkStrategy` this way takes a stream and produce a new stream +with timestamped elements and watermarks. If the original stream had timestamps +and/or watermarks already, the timestamp assigner overwrites them. -#### **With Periodic Watermarks** +## Dealing With Idle Sources -`AssignerWithPeriodicWatermarks` assigns timestamps and generates watermarks periodically (possibly depending -on the stream elements, or purely based on processing time). +If one of the input splits/partitions/shards does not carry events for a while +this means that the `WatermarkGenerator` also does not get any new information +on which to base a watermark. We call this an *idle input* or an *idle source*. +This is a problem because it can happen that some of your partitions do still +carry events. In that case, the watermark will be held back, because it is +computed as the minimum over all the different parallel watermarks. -The interval (every *n* milliseconds) in which the watermark will be generated is defined via -`ExecutionConfig.setAutoWatermarkInterval(...)`. The assigner's `getCurrentWatermark()` method will be -called each time, and a new watermark will be emitted if the returned watermark is non-null and larger than the previous -watermark. +To deal with this, you can use a `WatermarkStrategy` that will detect idleness and mark an input as idle. `WatermarkStrategies` provides a convenience helper for this: -Here we show two simple examples of timestamp assigners that use periodic watermark generation. Note that Flink ships with a `BoundedOutOfOrdernessTimestampExtractor` similar to the `BoundedOutOfOrdernessGenerator` shown below, which you can read about [here]({{ site.baseurl }}/dev/event_timestamp_extractors.html#assigners-allowing-a-fixed-amount-of-lateness). +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +WatermarkStrategies + .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)) + .withIdleness(Duration.ofMinutes(1)) + .build(); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +WatermarkStrategies + .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20)) + .withIdleness(Duration.ofMinutes(1)) + .build() +{% endhighlight %} +</div> +</div> + + +## Writing WatermarkGenerators + +A `TimestampAssigner` is a simple function that extracts a field from an event, we therefore don't need to look at them in detail. A `WatermarkGenerator`, on the other hand, is a bit more complicated to write and we will look at how you can do that in the next two sections. This is the `WatermarkGenerator` interface: + +{% highlight java %} +/** + * The {@code WatermarkGenerator} generates watermarks either based on events or + * periodically (in a fixed interval). + * + * <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the + * {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}. + */ +@Public +public interface WatermarkGenerator<T> { + + /** + * Called for every event, allows the watermark generator to examine and remember the + * event timestamps, or to emit a watermark based on the event itself. + */ + void onEvent(T event, long eventTimestamp, WatermarkOutput output); + + /** + * Called periodically, and might emit a new watermark, or not. + * + * <p>The interval in which this method is called and Watermarks are generated + * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}. + */ + void onPeriodicEmit(WatermarkOutput output); +} +{% endhighlight %} + +There are two different styles of watermark generation: *periodic* and +*punctuated*. + +A periodic generator usually observes to the incoming events via `onEvent()` +and then emits a watermark when the framework calls `onPeriodicEmit()`. + +A puncutated generator will look at events in `onEvent()` and wait for special +*marker events* or *punctuations* that carry watermark information in the +stream. When it sees one of these events it emits a watermark immediately. +Usually, punctuated generators don't emit a watermark from `onPeriodicEmit()`. + +We will look at how to implement generators for each style next. + +### Writing a Periodic WatermarkGenerator + +A periodic generator observes stream events and generates +watermarks periodically (possibly depending on the stream elements, or purely +based on processing time). + +The interval (every *n* milliseconds) in which the watermark will be generated +is defined via `ExecutionConfig.setAutoWatermarkInterval(...)`. The +generators's `onPeriodicEmit()` method will be called each time, and a new +watermark will be emitted if the returned watermark is non-null and larger than +the previous watermark. + +Here we show two simple examples of watermark generators that use periodic +watermark generation. Note that Flink ships with +`BoundedOutOfOrdernessWatermarks`, which is a `WatermarkGenerator` that works +similarly to the `BoundedOutOfOrdernessGenerator` shown below. You can read +about using that [here]({{ site.baseurl +}}/dev/event_timestamp_extractors.html#assigners-allowing-a-fixed-amount-of-lateness). <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> @@ -196,44 +269,42 @@ Here we show two simple examples of timestamp assigners that use periodic waterm * but only to a certain degree. The latest elements for a certain timestamp t will arrive * at most n milliseconds after the earliest elements for timestamp t. */ -public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> { +public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> { private final long maxOutOfOrderness = 3500; // 3.5 seconds private long currentMaxTimestamp; @Override - public long extractTimestamp(MyEvent element, long previousElementTimestamp) { - long timestamp = element.getCreationTime(); - currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); - return timestamp; + public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) { + currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp); } @Override - public Watermark getCurrentWatermark() { - // return the watermark as current highest timestamp minus the out-of-orderness bound - return new Watermark(currentMaxTimestamp - maxOutOfOrderness); + public void onPeriodicEmit(WatermarkOutput output) { + // emit the watermark as current highest timestamp minus the out-of-orderness bound + output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1)); } + } /** * This generator generates watermarks that are lagging behind processing time by a fixed amount. * It assumes that elements arrive in Flink after a bounded delay. */ -public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> { +public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> { - private final long maxTimeLag = 5000; // 5 seconds + private final long maxTimeLag = 5000; // 5 seconds - @Override - public long extractTimestamp(MyEvent element, long previousElementTimestamp) { - return element.getCreationTime(); - } + @Override + public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) { + // don't need to do anything because we work on processing time + } - @Override - public Watermark getCurrentWatermark() { - // return the watermark as current time minus the maximum time lag - return new Watermark(System.currentTimeMillis() - maxTimeLag); - } + @Override + public void onPeriodicEmit(WatermarkOutput output) { + output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag)); + } } {% endhighlight %} </div> @@ -250,15 +321,13 @@ class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEv var currentMaxTimestamp: Long = _ - override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = { - val timestamp = element.getCreationTime() - currentMaxTimestamp = max(timestamp, currentMaxTimestamp) - timestamp + override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = { + currentMaxTimestamp = max(eventTimestamp, currentMaxTimestamp) } - override def getCurrentWatermark(): Watermark = { - // return the watermark as current highest timestamp minus the out-of-orderness bound - new Watermark(currentMaxTimestamp - maxOutOfOrderness) + override def onPeriodicEmit(): Unit = { + // emit the watermark as current highest timestamp minus the out-of-orderness bound + output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1)); } } @@ -270,45 +339,43 @@ class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] val maxTimeLag = 5000L // 5 seconds - override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = { - element.getCreationTime + override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = { + // don't need to do anything because we work on processing time } - override def getCurrentWatermark(): Watermark = { - // return the watermark as current time minus the maximum time lag - new Watermark(System.currentTimeMillis() - maxTimeLag) + override def onPeriodicEmit(): Unit = { + output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag)); } } {% endhighlight %} </div> </div> -#### **With Punctuated Watermarks** +### Writing a Punctuated WatermarkGenerator -To generate watermarks whenever a certain event indicates that a new watermark might be generated, use -`AssignerWithPunctuatedWatermarks`. For this class Flink will first call the `extractTimestamp(...)` method -to assign the element a timestamp, and then immediately call the -`checkAndGetNextWatermark(...)` method on that element. +A punctuated watermark generator will observe the stream of +events and emit a watermark whenever it sees a special element that carries +watermark information. -The `checkAndGetNextWatermark(...)` method is passed the timestamp that was assigned in the `extractTimestamp(...)` -method, and can decide whether it wants to generate a watermark. Whenever the `checkAndGetNextWatermark(...)` -method returns a non-null watermark, and that watermark is larger than the latest previous watermark, that -new watermark will be emitted. +This is how you can implement a punctuated generator that emits a watermark +whenever an event indicates that it carries a certain marker: <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> { +public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> { - @Override - public long extractTimestamp(MyEvent element, long previousElementTimestamp) { - return element.getCreationTime(); - } + @Override + public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) { + if (event.hasWatermarkMarker()) { + output.emitWatermark(new Watermark(event.getWatermarkTimestamp())); + } + } - @Override - public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) { - return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null; - } + @Override + public void onPeriodicEmit(WatermarkOutput output) { + // don't need to do anything because we emit in reaction to events above + } } {% endhighlight %} </div> @@ -316,59 +383,71 @@ public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MyEv {% highlight scala %} class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] { - override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = { - element.getCreationTime - } + override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = { + if (event.hasWatermarkMarker()) { + output.emitWatermark(new Watermark(event.getWatermarkTimestamp())) + } + } - override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = { - if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null - } + override def onPeriodicEmit(): Unit = { + // don't need to do anything because we emit in reaction to events above + } } {% endhighlight %} </div> </div> -*Note:* It is possible to generate a watermark on every single event. However, because each watermark causes some -computation downstream, an excessive number of watermarks degrades performance. - - -## Timestamps per Kafka Partition +<div class="alert alert-warning"> +<strong>Note</strong>: It is possible to +generate a watermark on every single event. However, because each watermark +causes some computation downstream, an excessive number of watermarks degrades +performance. +</div> -When using [Apache Kafka](connectors/kafka.html) as a data source, each Kafka partition may have a simple event time pattern (ascending -timestamps or bounded out-of-orderness). However, when consuming streams from Kafka, multiple partitions often get consumed in parallel, -interleaving the events from the partitions and destroying the per-partition patterns (this is inherent in how Kafka's consumer clients work). +## Watermark Strategies and the Kafka Connector -In that case, you can use Flink's Kafka-partition-aware watermark generation. Using that feature, watermarks are generated inside the -Kafka consumer, per Kafka partition, and the per-partition watermarks are merged in the same way as watermarks are merged on stream shuffles. +When using [Apache Kafka](connectors/kafka.html) as a data source, each Kafka +partition may have a simple event time pattern (ascending timestamps or bounded +out-of-orderness). However, when consuming streams from Kafka, multiple +partitions often get consumed in parallel, interleaving the events from the +partitions and destroying the per-partition patterns (this is inherent in how +Kafka's consumer clients work). -For example, if event timestamps are strictly ascending per Kafka partition, generating per-partition watermarks with the -[ascending timestamps watermark generator](event_timestamp_extractors.html#assigners-with-ascending-timestamps) will result in perfect overall watermarks. +In that case, you can use Flink's Kafka-partition-aware watermark generation. +Using that feature, watermarks are generated inside the Kafka consumer, per +Kafka partition, and the per-partition watermarks are merged in the same way as +watermarks are merged on stream shuffles. -The illustrations below show how to use the per-Kafka-partition watermark generation, and how watermarks propagate through the -streaming dataflow in that case. +For example, if event timestamps are strictly ascending per Kafka partition, +generating per-partition watermarks with the [ascending timestamps watermark +generator](event_timestamp_extractors.html#assigners-with-ascending-timestamps) +will result in perfect overall watermarks. Note, that we don't provide a +`TimestampAssigner` in the example, the timestamps of the Kafka records +themselves will be used instead. +The illustrations below show how to use the per-Kafka-partition watermark +generation, and how watermarks propagate through the streaming dataflow in that +case. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -FlinkKafkaConsumer010<MyType> kafkaSource = new FlinkKafkaConsumer010<>("myTopic", schema, props); -kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() { - - @Override - public long extractAscendingTimestamp(MyType element) { - return element.eventTimestamp(); - } -}); +FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props); +kafkaSource.assignTimestampsAndWatermarks( + WatermarkStrategies. + .<MyType>forBoundedOutOfOrderness(Duration.ofSeconds(20)) + .build()); DataStream<MyType> stream = env.addSource(kafkaSource); {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -val kafkaSource = new FlinkKafkaConsumer010[MyType]("myTopic", schema, props) -kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] { - def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp -}) +val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props) +kafkaSource.assignTimestampsAndWatermarks( + WatermarkStrategies + .forBoundedOutOfOrderness[MyType](Duration.ofSeconds(20)) + .build()) val stream: DataStream[MyType] = env.addSource(kafkaSource) {% endhighlight %} @@ -377,4 +456,31 @@ val stream: DataStream[MyType] = env.addSource(kafkaSource) <img src="{{ site.baseurl }}/fig/parallel_kafka_watermarks.svg" alt="Generating Watermarks with awareness for Kafka-partitions" class="center" width="80%" /> +## How Operators Process Watermarks + +As a general rule, operators are required to completely process a given +watermark before forwarding it downstream. For example, `WindowOperator` will +first evaluate all windows that should be fired, and only after producing all of +the output triggered by the watermark will the watermark itself be sent +downstream. In other words, all elements produced due to occurrence of a +watermark will be emitted before the watermark. + +The same rule applies to `TwoInputStreamOperator`. However, in this case the +current watermark of the operator is defined as the minimum of both of its +inputs. + +The details of this behavior are defined by the implementations of the +`OneInputStreamOperator#processWatermark`, +`TwoInputStreamOperator#processWatermark1` and +`TwoInputStreamOperator#processWatermark2` methods. + +## The Deprecated AssignerWithPeriodicWatermarks and AssignerWithPunctuatedWatermarks + +Prior to introducing the current abstraction of `WatermarkStrategy`, +`TimestampAssigner`, and `WatermarkGenerator`, Flink used +`AssignerWithPeriodicWatermarks` and `AssignerWithPeriodicWatermarks`. You will +still see them in the API but it is recommended to use the new interfaces +because they offer a clearer separation of concerns and also unify periodic and +punctuated styles of watermark generation. + {% top %}