[
https://issues.apache.org/jira/browse/FLINK-5529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15838212#comment-15838212
]
ASF GitHub Bot commented on FLINK-5529:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/3191#discussion_r97832399
--- Diff: docs/dev/windows.md ---
@@ -622,133 +690,138 @@ input
</div>
</div>
-## Dealing with Late Data
+## Triggers
-When working with event-time windowing it can happen that elements arrive
late, i.e the
-watermark that Flink uses to keep track of the progress of event-time is
already past the
-end timestamp of a window to which an element belongs. Please
-see [event time](./event_time.html) and especially
-[late elements](./event_time.html#late-elements) for a more thorough
discussion of
-how Flink deals with event time.
+A `Trigger` determines when a window (as formed by the `WindowAssigner`)
is ready to be
+processed by the *window function*. Each `WindowAssigner` comes with a
default `Trigger`.
+If the default trigger does not fit your needs, you can specify a custom
trigger using `trigger(...)`.
-You can specify how a windowed transformation should deal with late
elements and how much lateness
-is allowed. The parameter for this is called *allowed lateness*. This
specifies by how much time
-elements can be late. Elements that arrive within the allowed lateness are
still put into windows
-and are considered when computing window results. If elements arrive after
the allowed lateness they
-will be dropped. Flink will also make sure that any state held by the
windowing operation is garbage
-collected once the watermark passes the end of a window plus the allowed
lateness.
+The trigger interface provides five methods that react to different
events:
-<span class="label label-info">Default</span> By default, the allowed
lateness is set to
-`0`. That is, elements that arrive behind the watermark will be dropped.
+* The `onElement()` method is called for each element that is added to a
window.
+* The `onEventTime()` method is called when a registered event-time timer
fires.
+* The `onProcessingTime()` method is called when a registered
processing-time timer fires.
+* The `onMerge()` method is relevant for stateful triggers and merges the
states of two triggers when their corresponding windows merge, *e.g.* when
using session windows.
+* Finally the `clear()` method performs any action needed upon removal of
the corresponding window.
-You can specify an allowed lateness like this:
+Any of these methods can be used to register processing- or event-time
timers for future actions.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<T> input = ...;
+### Fire and Purge
-input
- .keyBy(<key selector>)
- .window(<window assigner>)
- .allowedLateness(<time>)
- .<windowed transformation>(<window function>);
-{% endhighlight %}
-</div>
+Once a trigger determines that a window is ready for processing, it fires.
This is the signal for the window operator to emit the result of the current
window. Given a window with a `WindowFunction`
+all elements are passed to the `WindowFunction` (possibly after passing
them to an evictor).
+Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly
aggregated result.
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val input: DataStream[T] = ...
+When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While
`FIRE` keeps the contents of the window, `FIRE_AND_PURGE` removes its content.
+By default, the pre-implemented triggers simply `FIRE` without purging the
window state.
-input
- .keyBy(<key selector>)
- .window(<window assigner>)
- .allowedLateness(<time>)
- .<windowed transformation>(<window function>)
-{% endhighlight %}
-</div>
-</div>
+<span class="label label-danger">Attention</span> When purging, only the
contents of the window are cleared. The window itself is not removed and
accepts new elements.
-<span class="label label-info">Note</span> When using the `GlobalWindows`
window assigner no
-data is ever considered late because the end timestamp of the global
window is `Long.MAX_VALUE`.
+### Default Triggers of WindowAssigners
-## Triggers
+The default `Trigger` of a `WindowAssigner` is appropriate for many use
cases. For example, all the event-time window assigners have an
`EventTimeTrigger` as
+default trigger. This trigger simply fires once the watermark passes the
end of a window.
-A `Trigger` determines when a window (as assigned by the `WindowAssigner`)
is ready for being
-processed by the *window function*. The trigger observes how elements are
added to windows
-and can also keep track of the progress of processing time and event time.
Once a trigger
-determines that a window is ready for processing, it fires. This is the
signal for the
-window operation to take the elements that are currently in the window and
pass them along to
-the window function to produce output for the firing window.
+<span class="label label-danger">Attention</span> The default trigger of
the `GlobalWindow` is the `NeverTrigger` which does never fire. Consequently,
you always have to define a custom trigger when using a `GlobalWindow`.
-Each `WindowAssigner` (except `GlobalWindows`) comes with a default
trigger that should be
-appropriate for most use cases. For example, `TumblingEventTimeWindows`
has an `EventTimeTrigger` as
-default trigger. This trigger simply fires once the watermark passes the
end of a window.
+<span class="label label-danger">Attention</span> By specifying a trigger
using `trigger()` you
+are overwriting the default trigger of a `WindowAssigner`. For example, if
you specify a
+`CountTrigger` for `TumblingEventTimeWindows` you will no longer get
window firings based on the
+progress of time but only by count. Right now, you have to write your own
custom trigger if
+you want to react based on both time and count.
-You can specify the trigger to be used by calling `trigger()` with a given
`Trigger`. The
-whole specification of the windowed transformation would then look like
this:
+### Built-in and Custom Triggers
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<T> input = ...;
+Flink comes with a few built-in triggers.
-input
- .keyBy(<key selector>)
- .window(<window assigner>)
- .trigger(<trigger>)
- .<windowed transformation>(<window function>);
-{% endhighlight %}
-</div>
+* The (already mentioned) `EventTimeTrigger` fires based on the progress
of event-time as measured by watermarks.
+* The `ProcessingTimeTrigger` fires based on processing time.
+* The `CountTrigger` which fires once the number of elements in a window
exceeds the given limit.
+* The `PurgingTrigger` takes as argument another trigger and transforms it
into a purging one.
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val input: DataStream[T] = ...
+If you need to implement a custom trigger, you should check out the
abstract {% gh_link
/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
"Trigger" %} class. Please note that the API is still evolving and might
change in future versions of Flink.
-input
- .keyBy(<key selector>)
- .window(<window assigner>)
- .trigger(<trigger>)
- .<windowed transformation>(<window function>)
-{% endhighlight %}
-</div>
-</div>
-Flink comes with a few triggers out-of-box: there is the already mentioned
`EventTimeTrigger` that
-fires based on the progress of event-time as measured by the watermark,
the `ProcessingTimeTrigger`
-does the same but based on processing time and the `CountTrigger` fires
once the number of elements
-in a window exceeds the given limit.
+## Evictors
-<span class="label label-danger">Attention</span> By specifying a trigger
using `trigger()` you
-are overwriting the default trigger of a `WindowAssigner`. For example, if
you specify a
-`CountTrigger` for `TumblingEventTimeWindows` you will no longer get
window firings based on the
-progress of time but only by count. Right now, you have to write your own
custom trigger if
-you want to react based on both time and count.
+Flinkās windowing model allows specifying an optional `Evictor` in
addition to the `WindowAssigner` and the `Trigger`.
+This can be done using the `evictor(...)` method (shown in the beginning
of this document). The evictor has the ability
+to remove elements from a window *after* the trigger fires and *before
and/or after* the window function is applied.
+To do so, the `Evictor` interface has two methods:
+
+ /**
+ * Optionally evicts elements. Called before windowing function.
+ *
+ * @param elements The elements currently in the pane.
+ * @param size The current number of elements in the pane.
+ * @param window The {@link Window}
+ * @param evictorContext The context for the Evictor
+ */
+ void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W
window, EvictorContext evictorContext);
+
+ /**
+ * Optionally evicts elements. Called after windowing function.
+ *
+ * @param elements The elements currently in the pane.
+ * @param size The current number of elements in the pane.
+ * @param window The {@link Window}
+ * @param evictorContext The context for the Evictor
+ */
+ void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W
window, EvictorContext evictorContext);
+
+The `evictBefore()` contains the eviction logic to be applied before the
window function, while the `evictAfter()`
+contains the one to be applied after the window function. Elements evicted
before the application of the window
+function will not be processed by it.
+
+Flink comes with three pre-implemented evictors. These are:
+
+* `CountEvictor`: keeps up to a user-specified number of elements from the
window and discards the remaining ones from
+the beginning of the window buffer.
+* `DeltaEvictor`: takes a `DeltaFunction` and a `threshold`, computes the
delta between the last element in the
+window buffer and each of the remaining ones, and removes the ones with a
delta greater or equal to the threshold.
+* `TimeEvictor`: takes as argument an `interval` in milliseconds and for a
given window, it finds the maximum
+timestamp `max_ts` among its elements and removes all the elements with
timestamps smaller than `max_ts - interval`.
-The internal `Trigger` API is still considered experimental but you can
check out the code
-if you want to write your own custom trigger:
-{% gh_link
/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
"Trigger.java" %}.
+<span class="label label-info">Default</span> By default, all the
pre-implemented evictors apply their logic before the
+window function.
-## Non-keyed Windowing
+<span class="label label-danger">Attention</span> Specifying an evictor
prevents any pre-aggregation, as all the
+elements of a window have to be passed to the evictor before applying the
computation.
-You can also leave out the `keyBy()` when specifying a windowed
transformation. This means, however,
-that Flink cannot process windows for different keys in parallel,
essentially turning the
-transformation into a non-parallel operation.
+<span class="label label-danger">Attention</span> Flink provides no
guarantees about the order of the elements within
+a window. This implies that although an evictor may remove elements from
the beginning of the window, these are not
+necessarily the ones that arrive first or last.
-<span class="label label-danger">Warning</span> As mentioned in the
introduction, non-keyed
-windows have the disadvantage that work cannot be distributed in the
cluster because
-windows cannot be computed independently per key. This can have severe
performance implications.
+## Allowed Lateness
-The basic structure of a non-keyed windowed transformation is as follows:
+When working with *event-time* windowing it can happen that elements
arrive late, *i.e.* the watermark that Flink uses to
+keep track of the progress of event-time is already past the end timestamp
of a window to which an element belongs. See
+[event time](./event_time.html) and especially [late
elements](./event_time.html#late-elements) for a more thorough
+discussion of how Flink deals with event time.
+
+By default, late elements are dropped if their associated window was
already evaluated. However,
--- End diff --
They are not dropped when the window was already evaluated but when the
watermark is past the end of the window plus the allowed lateness.
> Improve / extends windowing documentation
> -----------------------------------------
>
> Key: FLINK-5529
> URL: https://issues.apache.org/jira/browse/FLINK-5529
> Project: Flink
> Issue Type: Sub-task
> Components: Documentation
> Reporter: Stephan Ewen
> Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.3.0
>
>
> Suggested Outline:
> {code}
> Windows
> (0) Outline: The anatomy of a window operation
> stream
> [.keyBy(...)] <- keyed versus non-keyed windows
> .window(...) <- required: "assigner"
> [.trigger(...)] <- optional: "trigger" (else default trigger)
> [.evictor(...)] <- optional: "evictor" (else no evictor)
> [.allowedLateness()] <- optional, else zero
> .reduce/fold/apply() <- required: "function"
> (1) Types of windows
> - tumble
> - slide
> - session
> - global
> (2) Pre-defined windows
> timeWindow() (tumble, slide)
> countWindow() (tumble, slide)
> - mention that count windows are inherently
> resource leaky unless limited key space
> (3) Window Functions
> - apply: most basic, iterates over elements in window
>
> - aggregating: reduce and fold, can be used with "apply()" which will get
> one element
>
> - forward reference to state size section
> (4) Advanced Windows
> - assigner
> - simple
> - merging
> - trigger
> - registering timers (processing time, event time)
> - state in triggers
> - life cycle of a window
> - create
> - state
> - cleanup
> - when is window contents purged
> - when is state dropped
> - when is metadata (like merging set) dropped
> (5) Late data
> - picture
> - fire vs fire_and_purge: late accumulates vs late resurrects (cf
> discarding mode)
>
> (6) Evictors
> - TDB
>
> (7) State size: How large will the state be?
> Basic rule: Each element has one copy per window it is assigned to
> --> num windows * num elements in window
> --> example: tumbline is one copy, sliding(n,m) is n/m copies
> --> per key
> Pre-aggregation:
> - if reduce or fold is set -> one element per window (rather than num
> elements in window)
> - evictor voids pre-aggregation from the perspective of state
> Special rules:
> - fold cannot pre-aggregate on session windows (and other merging windows)
> (8) Non-keyed windows
> - all elements through the same windows
> - currently not parallel
> - possible parallel in the future when having pre-aggregation functions
> - inherently (by definition) produce a result stream with parallelism one
> - state similar to one key of keyed windows
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)