[
https://issues.apache.org/jira/browse/FLINK-5529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15838199#comment-15838199
]
ASF GitHub Bot commented on FLINK-5529:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/3191#discussion_r97824676
--- Diff: docs/dev/windows.md ---
@@ -23,133 +23,96 @@ specific language governing permissions and limitations
under the License.
-->
-Flink uses a concept called *windows* to divide a (potentially) infinite
`DataStream` into finite
-slices based on the timestamps of elements or other criteria. This
division is required when working
-with infinite streams of data and performing transformations that
aggregate elements.
-
-<span class="label label-info">Info</span> We will mostly talk about
*keyed windowing* here, i.e.
-windows that are applied on a `KeyedStream`. Keyed windows have the
advantage that elements are
-subdivided based on both window and key before being given to
-a user function. The work can thus be distributed across the cluster
-because the elements for different keys can be processed independently. If
you absolutely have to,
-you can check out [non-keyed windowing](#non-keyed-windowing) where we
describe how non-keyed
-windows work.
+Windows are at the heart of processing infinite streams. Windows split the
stream into "buckets" of finite size,
+over which we can apply computations. This document focuses on how
windowing is performed in Flink and how the
+programmer can benefit to the maximum from its offered functionality.
-* This will be replaced by the TOC
-{:toc}
+The general structure of a windowed Flink program is presented below. This
is also going to serve as a roadmap for
+the rest of the page.
-## Basics
+ stream
+ .keyBy(...) <- keyed versus non-keyed windows
+ .window(...) <- required: "assigner"
+ [.trigger(...)] <- optional: "trigger" (else default
trigger)
+ [.evictor(...)] <- optional: "evictor" (else no evictor)
+ [.allowedLateness()] <- optional, else zero
+ .reduce/fold/apply() <- required: "function"
-For a windowed transformation you must at least specify a *key*
-(see [specifying keys]({{ site.baseurl
}}/dev/api_concepts.html#specifying-keys)),
-a *window assigner* and a *window function*. The *key* divides the
infinite, non-keyed, stream
-into logical keyed streams while the *window assigner* assigns elements to
finite per-key windows.
-Finally, the *window function* is used to process the elements of each
window.
+In the above, the commands in square brackets ([...]) are optional. This
reveals that Flink allows you to customize your
+windowing logic in many different ways so that it best fits your needs.
-The basic structure of a windowed transformation is thus as follows:
+* This will be replaced by the TOC
+{:toc}
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<T> input = ...;
+## Window Lifecycle
-input
- .keyBy(<key selector>)
- .window(<window assigner>)
- .<windowed transformation>(<window function>);
-{% endhighlight %}
-</div>
+In a nutshell, a window is **created** as soon as the first element that
should belong to this window arrives, and the
+window is **completely removed** when the time (event or processing time)
passes its end timestamp plus the user-specified
+`allowed lateness` (see [Allowed Lateness](#allowed-lateness)). Flink
guarantees removal only for time-based
+windows and not for other types, *e.g.* global windows (see [Window
Assigners](#window-assigners)). For example, with an
+event-time-based windowing strategy that creates non-overlapping (or
tumbling) windows every 5 minutes and has an allowed
+lateness of 1 min, Flink will create a new window for the interval between
`12:00` and `12:05` when the first element with
+a timestamp that falls into this interval arrives, and it will remove it
when the watermark passes the `12:06`
+timestamp.
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val input: DataStream[T] = ...
+In addition, each window will have a `Trigger` (see [Triggers](#triggers))
and a function (`WindowFunction`, `ReduceFunction` or
+`FoldFunction`) (see [Window Functions](#window-functions)) attached to
it. The function will contain the computation to
+be applied to the contents of the window, while the `Trigger` specifies
the conditions under which the window is
+considered ready for the function to be applied. A triggering policy might
be something like "when the number of elements
+in the window is more than 4", or "when the watermark passes the end of
the window". A trigger can also decide to
+purge a window's contents any time between its creation and removal.
Purging in this case only refers to the elements
+in the window, and *not* the window metadata. This means that new data can
still be added to that window.
-input
- .keyBy(<key selector>)
- .window(<window assigner>)
- .<windowed transformation>(<window function>)
-{% endhighlight %}
-</div>
-</div>
+Apart from the above, you can specify an `Evictor` (see
[Evictors](#evictors)) which will be able to remove
+elements from the window after the trigger fires and before and/or after
the function is applied.
-We will cover [window assigners](#window-assigners) in a separate section
below.
+In the following we go into more detail for each of the components above.
We start with the required parts in the above
+snippet (see [Keyed vs Non-Keyed Windows](#keyed-vs-non-keyed-windows),
[Window Assigner](#window-assigner), and
+[Window Function](#window-function)) before moving to the optional ones.
-The window transformation can be one of `reduce()`, `fold()` or `apply()`.
Which respectively
-takes a `ReduceFunction`, `FoldFunction` or `WindowFunction`. We describe
each of these ways
-of specifying a windowed transformation in detail below: [window
functions](#window-functions).
+## Keyed vs Non-Keyed Windows
-For more advanced use cases you can also specify a `Trigger` that
determines when exactly a window
-is being considered as *ready for processing*. These will be covered in
more detail in
-[triggers](#triggers).
+The first thing to specify is whether your stream should be keyed or not.
This has to be done before defining the window.
+Using the `keyBy(...)` will split your infinite stream into logical keyed
streams. If `keyBy(...)` is not called, your
+stream is not keyed.
-## Window Assigners
+In the case of keyed streams, any attribute of your incoming events can be
used as a key
+(more details [here]({{ site.baseurl
}}/dev/api_concepts.html#specifying-keys)). Having a keyed stream will
+allow your windowed computation to be performed in parallel by multiple
tasks, as each logical keyed stream can be processed
+independently from the rest. All elements referring to the same key will
be sent to the same parallel task.
-The window assigner specifies how elements of the stream are divided into
finite slices. Flink comes
-with pre-implemented window assigners for the most typical use cases,
namely *tumbling windows*,
-*sliding windows*, *session windows* and *global windows*, but you can
implement your own by
-extending the `WindowAssigner` class. All the built-in window assigners,
except for the global
-windows one, assign elements to windows based on time, which can either be
processing time or event
-time. Please take a look at our section on [event time]({{ site.baseurl
}}/dev/event_time.html) for more
-information about how Flink deals with time.
+In case of non-keyed streams, your original stream will not be split into
multiple logical streams and all the windowing logic
+will be performed by a single task, *i.e.* with parallelism of 1.
-Let's first look at how each of these window assigners works before
looking at how they can be used
-in a Flink program. We will be using abstract figures to visualize the
workings of each assigner:
-in the following, the purple circles are elements of the stream, they are
partitioned
-by some key (in this case *user 1*, *user 2* and *user 3*) and the x-axis
shows the progress
-of time.
+## Window Assigners
-### Global Windows
+After specifying whether your stream is keyed or not, the next step is to
define a *windowing strategy*.
--- End diff --
I think we should stick to `window assigner` here because that's what we're
describing. In my mind, the ensemble of window assigner, trigger (and evictor)
is actually the `windowing strategy` since only those together define what
happens in the end.
What do you think?
> Improve / extends windowing documentation
> -----------------------------------------
>
> Key: FLINK-5529
> URL: https://issues.apache.org/jira/browse/FLINK-5529
> Project: Flink
> Issue Type: Sub-task
> Components: Documentation
> Reporter: Stephan Ewen
> Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.3.0
>
>
> Suggested Outline:
> {code}
> Windows
> (0) Outline: The anatomy of a window operation
> stream
> [.keyBy(...)] <- keyed versus non-keyed windows
> .window(...) <- required: "assigner"
> [.trigger(...)] <- optional: "trigger" (else default trigger)
> [.evictor(...)] <- optional: "evictor" (else no evictor)
> [.allowedLateness()] <- optional, else zero
> .reduce/fold/apply() <- required: "function"
> (1) Types of windows
> - tumble
> - slide
> - session
> - global
> (2) Pre-defined windows
> timeWindow() (tumble, slide)
> countWindow() (tumble, slide)
> - mention that count windows are inherently
> resource leaky unless limited key space
> (3) Window Functions
> - apply: most basic, iterates over elements in window
>
> - aggregating: reduce and fold, can be used with "apply()" which will get
> one element
>
> - forward reference to state size section
> (4) Advanced Windows
> - assigner
> - simple
> - merging
> - trigger
> - registering timers (processing time, event time)
> - state in triggers
> - life cycle of a window
> - create
> - state
> - cleanup
> - when is window contents purged
> - when is state dropped
> - when is metadata (like merging set) dropped
> (5) Late data
> - picture
> - fire vs fire_and_purge: late accumulates vs late resurrects (cf
> discarding mode)
>
> (6) Evictors
> - TDB
>
> (7) State size: How large will the state be?
> Basic rule: Each element has one copy per window it is assigned to
> --> num windows * num elements in window
> --> example: tumbline is one copy, sliding(n,m) is n/m copies
> --> per key
> Pre-aggregation:
> - if reduce or fold is set -> one element per window (rather than num
> elements in window)
> - evictor voids pre-aggregation from the perspective of state
> Special rules:
> - fold cannot pre-aggregate on session windows (and other merging windows)
> (8) Non-keyed windows
> - all elements through the same windows
> - currently not parallel
> - possible parallel in the future when having pre-aggregation functions
> - inherently (by definition) produce a result stream with parallelism one
> - state similar to one key of keyed windows
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)