Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/3191#discussion_r97832573
--- Diff: docs/dev/windows.md ---
@@ -622,133 +690,138 @@ input
</div>
</div>
-## Dealing with Late Data
+## Triggers
-When working with event-time windowing it can happen that elements arrive
late, i.e the
-watermark that Flink uses to keep track of the progress of event-time is
already past the
-end timestamp of a window to which an element belongs. Please
-see [event time](./event_time.html) and especially
-[late elements](./event_time.html#late-elements) for a more thorough
discussion of
-how Flink deals with event time.
+A `Trigger` determines when a window (as formed by the `WindowAssigner`)
is ready to be
+processed by the *window function*. Each `WindowAssigner` comes with a
default `Trigger`.
+If the default trigger does not fit your needs, you can specify a custom
trigger using `trigger(...)`.
-You can specify how a windowed transformation should deal with late
elements and how much lateness
-is allowed. The parameter for this is called *allowed lateness*. This
specifies by how much time
-elements can be late. Elements that arrive within the allowed lateness are
still put into windows
-and are considered when computing window results. If elements arrive after
the allowed lateness they
-will be dropped. Flink will also make sure that any state held by the
windowing operation is garbage
-collected once the watermark passes the end of a window plus the allowed
lateness.
+The trigger interface provides five methods that react to different
events:
-<span class="label label-info">Default</span> By default, the allowed
lateness is set to
-`0`. That is, elements that arrive behind the watermark will be dropped.
+* The `onElement()` method is called for each element that is added to a
window.
+* The `onEventTime()` method is called when a registered event-time timer
fires.
+* The `onProcessingTime()` method is called when a registered
processing-time timer fires.
+* The `onMerge()` method is relevant for stateful triggers and merges the
states of two triggers when their corresponding windows merge, *e.g.* when
using session windows.
+* Finally the `clear()` method performs any action needed upon removal of
the corresponding window.
-You can specify an allowed lateness like this:
+Any of these methods can be used to register processing- or event-time
timers for future actions.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<T> input = ...;
+### Fire and Purge
-input
- .keyBy(<key selector>)
- .window(<window assigner>)
- .allowedLateness(<time>)
- .<windowed transformation>(<window function>);
-{% endhighlight %}
-</div>
+Once a trigger determines that a window is ready for processing, it fires.
This is the signal for the window operator to emit the result of the current
window. Given a window with a `WindowFunction`
+all elements are passed to the `WindowFunction` (possibly after passing
them to an evictor).
+Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly
aggregated result.
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val input: DataStream[T] = ...
+When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While
`FIRE` keeps the contents of the window, `FIRE_AND_PURGE` removes its content.
+By default, the pre-implemented triggers simply `FIRE` without purging the
window state.
-input
- .keyBy(<key selector>)
- .window(<window assigner>)
- .allowedLateness(<time>)
- .<windowed transformation>(<window function>)
-{% endhighlight %}
-</div>
-</div>
+<span class="label label-danger">Attention</span> When purging, only the
contents of the window are cleared. The window itself is not removed and
accepts new elements.
-<span class="label label-info">Note</span> When using the `GlobalWindows`
window assigner no
-data is ever considered late because the end timestamp of the global
window is `Long.MAX_VALUE`.
+### Default Triggers of WindowAssigners
-## Triggers
+The default `Trigger` of a `WindowAssigner` is appropriate for many use
cases. For example, all the event-time window assigners have an
`EventTimeTrigger` as
+default trigger. This trigger simply fires once the watermark passes the
end of a window.
-A `Trigger` determines when a window (as assigned by the `WindowAssigner`)
is ready for being
-processed by the *window function*. The trigger observes how elements are
added to windows
-and can also keep track of the progress of processing time and event time.
Once a trigger
-determines that a window is ready for processing, it fires. This is the
signal for the
-window operation to take the elements that are currently in the window and
pass them along to
-the window function to produce output for the firing window.
+<span class="label label-danger">Attention</span> The default trigger of
the `GlobalWindow` is the `NeverTrigger` which does never fire. Consequently,
you always have to define a custom trigger when using a `GlobalWindow`.
-Each `WindowAssigner` (except `GlobalWindows`) comes with a default
trigger that should be
-appropriate for most use cases. For example, `TumblingEventTimeWindows`
has an `EventTimeTrigger` as
-default trigger. This trigger simply fires once the watermark passes the
end of a window.
+<span class="label label-danger">Attention</span> By specifying a trigger
using `trigger()` you
+are overwriting the default trigger of a `WindowAssigner`. For example, if
you specify a
+`CountTrigger` for `TumblingEventTimeWindows` you will no longer get
window firings based on the
+progress of time but only by count. Right now, you have to write your own
custom trigger if
+you want to react based on both time and count.
-You can specify the trigger to be used by calling `trigger()` with a given
`Trigger`. The
-whole specification of the windowed transformation would then look like
this:
+### Built-in and Custom Triggers
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<T> input = ...;
+Flink comes with a few built-in triggers.
-input
- .keyBy(<key selector>)
- .window(<window assigner>)
- .trigger(<trigger>)
- .<windowed transformation>(<window function>);
-{% endhighlight %}
-</div>
+* The (already mentioned) `EventTimeTrigger` fires based on the progress
of event-time as measured by watermarks.
+* The `ProcessingTimeTrigger` fires based on processing time.
+* The `CountTrigger` which fires once the number of elements in a window
exceeds the given limit.
+* The `PurgingTrigger` takes as argument another trigger and transforms it
into a purging one.
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val input: DataStream[T] = ...
+If you need to implement a custom trigger, you should check out the
abstract {% gh_link
/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
"Trigger" %} class. Please note that the API is still evolving and might
change in future versions of Flink.
-input
- .keyBy(<key selector>)
- .window(<window assigner>)
- .trigger(<trigger>)
- .<windowed transformation>(<window function>)
-{% endhighlight %}
-</div>
-</div>
-Flink comes with a few triggers out-of-box: there is the already mentioned
`EventTimeTrigger` that
-fires based on the progress of event-time as measured by the watermark,
the `ProcessingTimeTrigger`
-does the same but based on processing time and the `CountTrigger` fires
once the number of elements
-in a window exceeds the given limit.
+## Evictors
-<span class="label label-danger">Attention</span> By specifying a trigger
using `trigger()` you
-are overwriting the default trigger of a `WindowAssigner`. For example, if
you specify a
-`CountTrigger` for `TumblingEventTimeWindows` you will no longer get
window firings based on the
-progress of time but only by count. Right now, you have to write your own
custom trigger if
-you want to react based on both time and count.
+Flinkâs windowing model allows specifying an optional `Evictor` in
addition to the `WindowAssigner` and the `Trigger`.
+This can be done using the `evictor(...)` method (shown in the beginning
of this document). The evictor has the ability
+to remove elements from a window *after* the trigger fires and *before
and/or after* the window function is applied.
+To do so, the `Evictor` interface has two methods:
+
+ /**
+ * Optionally evicts elements. Called before windowing function.
+ *
+ * @param elements The elements currently in the pane.
+ * @param size The current number of elements in the pane.
+ * @param window The {@link Window}
+ * @param evictorContext The context for the Evictor
+ */
+ void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W
window, EvictorContext evictorContext);
+
+ /**
+ * Optionally evicts elements. Called after windowing function.
+ *
+ * @param elements The elements currently in the pane.
+ * @param size The current number of elements in the pane.
+ * @param window The {@link Window}
+ * @param evictorContext The context for the Evictor
+ */
+ void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W
window, EvictorContext evictorContext);
+
+The `evictBefore()` contains the eviction logic to be applied before the
window function, while the `evictAfter()`
+contains the one to be applied after the window function. Elements evicted
before the application of the window
+function will not be processed by it.
+
+Flink comes with three pre-implemented evictors. These are:
+
+* `CountEvictor`: keeps up to a user-specified number of elements from the
window and discards the remaining ones from
+the beginning of the window buffer.
+* `DeltaEvictor`: takes a `DeltaFunction` and a `threshold`, computes the
delta between the last element in the
+window buffer and each of the remaining ones, and removes the ones with a
delta greater or equal to the threshold.
+* `TimeEvictor`: takes as argument an `interval` in milliseconds and for a
given window, it finds the maximum
+timestamp `max_ts` among its elements and removes all the elements with
timestamps smaller than `max_ts - interval`.
-The internal `Trigger` API is still considered experimental but you can
check out the code
-if you want to write your own custom trigger:
-{% gh_link
/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
"Trigger.java" %}.
+<span class="label label-info">Default</span> By default, all the
pre-implemented evictors apply their logic before the
+window function.
-## Non-keyed Windowing
+<span class="label label-danger">Attention</span> Specifying an evictor
prevents any pre-aggregation, as all the
+elements of a window have to be passed to the evictor before applying the
computation.
-You can also leave out the `keyBy()` when specifying a windowed
transformation. This means, however,
-that Flink cannot process windows for different keys in parallel,
essentially turning the
-transformation into a non-parallel operation.
+<span class="label label-danger">Attention</span> Flink provides no
guarantees about the order of the elements within
+a window. This implies that although an evictor may remove elements from
the beginning of the window, these are not
+necessarily the ones that arrive first or last.
-<span class="label label-danger">Warning</span> As mentioned in the
introduction, non-keyed
-windows have the disadvantage that work cannot be distributed in the
cluster because
-windows cannot be computed independently per key. This can have severe
performance implications.
+## Allowed Lateness
-The basic structure of a non-keyed windowed transformation is as follows:
+When working with *event-time* windowing it can happen that elements
arrive late, *i.e.* the watermark that Flink uses to
+keep track of the progress of event-time is already past the end timestamp
of a window to which an element belongs. See
+[event time](./event_time.html) and especially [late
elements](./event_time.html#late-elements) for a more thorough
+discussion of how Flink deals with event time.
+
+By default, late elements are dropped if their associated window was
already evaluated. However,
+Flink allows to specify a maximum *allowed lateness* for window operators.
Allowed lateness
+specifies by how much time elements can be late before they are dropped.
Elements that arrive
+within the allowed lateness of a window are still added to the window and
trigger an immediate evaluation of the window which might emit elements.
--- End diff --
"the trigger an immediate evaluation" depends on the `Trigger`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---