[
https://issues.apache.org/jira/browse/FLINK-4062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357298#comment-15357298
]
ASF GitHub Bot commented on FLINK-4062:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2154#discussion_r69157149
--- Diff: docs/apis/streaming/windows.md ---
@@ -24,1023 +24,608 @@ specific language governing permissions and
limitations
under the License.
-->
+Flink uses a concept called *windows* to divide a (potentially) infinite
`DataStream` into finite
+slices based on the timestamps of elements or other criteria. This
division is required when working
+with infinite streams of data and performing transformations that
aggregate elements.
+
+<span class="label label-info">Info</span> We will mostly talk about
*keyed windowing* here, i.e.
+windows that are applied on a `KeyedStream`. Keyed windows have the
advantage that elements are
+subdivided based on both window and key before being given to
+a user function. The work can thus be distributed across the cluster
+because the elements for different keys can be processed independently. If
you absolutely have to,
+you can check out [non-keyed windowing](#non-keyed-windowing) where we
describe how non-keyed
+windows work.
+
* This will be replaced by the TOC
{:toc}
-## Windows on Keyed Data Streams
-
-Flink offers a variety of methods for defining windows on a `KeyedStream`.
All of these group elements *per key*,
-i.e., each window will contain elements with the same key value.
+## Basics
-### Basic Window Constructs
+For a windowed transformations you must at least specify a *key*
+(see [specifying keys](apis/common/index.html#specifying-keys))
+a *window assigner* and a *window function*. The *key* divides the
infinite, non-keyed, stream
+into logical keyed streams while the *window assigner* assigns elements to
finite per-key windows.
+Finally, the *window function* is used to process the elements of each
window.
-Flink offers a general window mechanism that provides flexibility, as well
as a number of pre-defined windows
-for common use cases. See first if your use case can be served by the
pre-defined windows below before moving
-to defining your own windows.
+The basic structure of a windowed transformation is thus as follows:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<T> input = ...;
-<br />
-
-<table class="table table-bordered">
- <thead>
- <tr>
- <th class="text-left" style="width: 25%">Transformation</th>
- <th class="text-center">Description</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <td><strong>Tumbling time window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 5 seconds, that "tumbles". This means that
elements are
- grouped according to their timestamp in groups of 5 second
duration, and every element belongs to exactly one window.
- The notion of time is specified by the selected TimeCharacteristic
(see <a href="{{ site.baseurl }}/apis/streaming/event_time.html">time</a>).
- {% highlight java %}
-keyedStream.timeWindow(Time.seconds(5));
- {% endhighlight %}
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Sliding time window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 5 seconds, that "slides" by 1 second.
This means that elements are
- grouped according to their timestamp in groups of 5 second
duration, and elements can belong to more than
- one window (since windows overlap by at most 4 seconds)
- The notion of time is specified by the selected
TimeCharacteristic (see <a href="{{ site.baseurl
}}/apis/streaming/event_time.html">time</a>).
- {% highlight java %}
-keyedStream.timeWindow(Time.seconds(5), Time.seconds(1));
- {% endhighlight %}
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Tumbling count window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 1000 elements, that "tumbles". This means
that elements are
- grouped according to their arrival time (equivalent to
processing time) in groups of 1000 elements,
- and every element belongs to exactly one window.
- {% highlight java %}
-keyedStream.countWindow(1000);
- {% endhighlight %}
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Sliding count window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 1000 elements, that "slides" every 100
elements. This means that elements are
- grouped according to their arrival time (equivalent to
processing time) in groups of 1000 elements,
- and every element can belong to more than one window (as windows
overlap by at most 900 elements).
- {% highlight java %}
-keyedStream.countWindow(1000, 100)
- {% endhighlight %}
- </p>
- </td>
- </tr>
- </tbody>
-</table>
-
+input
+ .keyBy(<key selector>)
+ .window(<window assigner>)
+ .<windowed transformation>(<window function>);
+{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[T] = ...
-<br />
-
-<table class="table table-bordered">
- <thead>
- <tr>
- <th class="text-left" style="width: 25%">Transformation</th>
- <th class="text-center">Description</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <td><strong>Tumbling time window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 5 seconds, that "tumbles". This means that
elements are
- grouped according to their timestamp in groups of 5 second
duration, and every element belongs to exactly one window.
- The notion of time is specified by the selected
TimeCharacteristic (see <a href="{{ site.baseurl
}}/apis/streaming/event_time.html">time</a>).
- {% highlight scala %}
-keyedStream.timeWindow(Time.seconds(5))
- {% endhighlight %}
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Sliding time window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 5 seconds, that "slides" by 1 second.
This means that elements are
- grouped according to their timestamp in groups of 5 second
duration, and elements can belong to more than
- one window (since windows overlap by at most 4 seconds)
- The notion of time is specified by the selected
TimeCharacteristic (see <a href="{{ site.baseurl
}}/apis/streaming/event_time.html">time</a>).
- {% highlight scala %}
-keyedStream.timeWindow(Time.seconds(5), Time.seconds(1))
- {% endhighlight %}
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Tumbling count window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 1000 elements, that "tumbles". This means
that elements are
- grouped according to their arrival time (equivalent to
processing time) in groups of 1000 elements,
- and every element belongs to exactly one window.
- {% highlight scala %}
-keyedStream.countWindow(1000)
- {% endhighlight %}
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Sliding count window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 1000 elements, that "slides" every 100
elements. This means that elements are
- grouped according to their arrival time (equivalent to
processing time) in groups of 1000 elements,
- and every element can belong to more than one window (as windows
overlap by at most 900 elements).
- {% highlight scala %}
-keyedStream.countWindow(1000, 100)
- {% endhighlight %}
- </p>
- </td>
- </tr>
- </tbody>
-</table>
-
+input
+ .keyBy(<key selector>)
+ .window(<window assigner>)
+ .<windowed transformation>(<window function>)
+{% endhighlight %}
</div>
</div>
-### Advanced Window Constructs
+We will cover [window assigners](#window-assigners) in a separate section
below.
+
+The window transformation can be one of `reduce()`, `fold()` or `apply()`.
Which respectively
+takes a `ReduceFunction`, `FoldFunction` or `WindowFunction`. We describe
each of these ways
+of specifying a windowed transformation in detail below: [window
functions](#window-functions).
+
+For more advanced use cases you can also specify a `Trigger` that
determines when exactly a window
+is being considered as *ready for processing*. These will be covered in
more detail in
+[triggers](#triggers).
+
+## Window Assigners
+
+The window assigner specifies how elements of the stream are divided into
finite slices. Flink comes
+with pre-implemented window assigners for the most typical use cases,
namely *tumbling windows*,
+*sliding windows*, *session windows* and *global windows* but you can
implement your own by
+extending the `WindowAssigner` class. All the built-in window assigners,
except for the global
+windows one, assign elements to windows based on time, which can either be
processing time or event
+time.
+
+Let's first look at how each of these window assigners works before
looking at how they can be used
+in a Flink program. We will be using abstract figures to visualize the
workings of each assigner:
+in the following, the purple circles are elements of the stream, they are
partitioned
+by some key (in this case *user 1*, *user 2* and *user 3*) and the x-axis
shows the progress
+of time.
+
+### Global Windows
+
+Global windows are a way of specifying that we don't want to subdivide our
elements into windows.
+Each element is assigned to one single per-key *global window*.
+This windowing scheme is only useful if you also specify a custom
[trigger](#triggers). Otherwise,
+no computation is ever going to be performed, as the global window does
not have a natural end at
+which we could process the aggregated elements.
+
+<img src="non-windowed.svg" class="center" style="width: 80%;" />
+
+### Tumbling Windows
+
+A *tumbling windows* assigner assigns elements to fixed length,
non-overlapping windows of a
+specified *window size*.. For example, if you specify a window size of 5
minutes, the window
+function will get 5 minutes worth of elements in each invocation.
-The general mechanism can define more powerful windows at the cost of more
verbose syntax. For example,
-below is a window definition where windows hold elements of the last 5
seconds and slides every 1 second,
-but the execution of the window function is triggered when 100 elements
have been added to the
-window, and every time execution is triggered, 10 elements are retained in
the window:
+<img src="tumbling-windows.svg" class="center" style="width: 80%;" />
+
+### Sliding Windows
+
+The *sliding windows* assigner assigns elements to windows of fixed length
equal to *window size*,
+as the tumbling windows assigner, but in this case, windows can be
overlapping. The size of the
+overlap is defined by the user-specified parameter *window slide*. As
windows are overlapping, an
+element can be assigned to multiple windows
+
+For example, you could have windows of size 10 minutes that slide by 5
minutes. With this you get 10
+minutes worth of elements in each invocation of the window function and it
will be invoked for every
+5 minutes of data.
+
+<img src="sliding-windows.svg" class="center" style="width: 80%;" />
+
+### Session Windows
+
+The *session windows* assigner is ideal for cases where the window
boundaries need to adjust to the
+incoming data. Both the *tumbling windows* and *sliding windows* assigner
assign elements to windows
+that start at fixed time points and have a fixed *window size*. With
session windows it is possible
+to have windows that start at individual points in time for each key and
that end once there has
+been a certain period of inactivity. The configuration parameter is the
*session gap* that specifies
+how long to wait for new data before considering a session as closed.
+
+<img src="session-windows.svg" class="center" style="width: 80%;" />
+
+### Specifying a Window Assigner
+
+The built-in window assigners (except `GlobalWindows`) come in two
versions. One for processing-time
+windowing and one for event-time windowing. The processing-time assigners
assign elements to
+windows based on the current clock of the worker machines while the
event-time assigners assign
+windows based on the timestamps of elements. Please have a look at
+[event time](/apis/streaming/event_time.html) to learn about the
difference between processing time
+and event time and about how timestamps can be assigned to elements.
+
+The following code snippets show how each of the window assigners can be
used in a program:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-keyedStream
- .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
- .trigger(CountTrigger.of(100))
- .evictor(CountEvictor.of(10));
+DataStream<T> input = ...;
+
+// tumbling event-time windows
+input
+ .keyBy(<key selector>)
+ .window(TumblingEventTimeWindows.of(Time.seconds(5)))
+ .<windowed transformation>(<window function>);
+
+// sliding event-time windows
+input
+ .keyBy(<key selector>)
+ .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
+ .<windowed transformation>(<window function>);
+
+// event-time session windows
+input
+ .keyBy(<key selector>)
+ .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
+ .<windowed transformation>(<window function>);
+
+// tumbling processing-time windows
+input
+ .keyBy(<key selector>)
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
+ .<windowed transformation>(<window function>);
+
+// sliding processing-time windows
+input
+ .keyBy(<key selector>)
+ .window(SlidingProcessingTimeWindows.of(Time.seconds(10),
Time.seconds(5)))
+ .<windowed transformation>(<window function>);
+
+// processing-time session windows
+input
+ .keyBy(<key selector>)
+ .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
+ .<windowed transformation>(<window function>);
+
+// global windows
+input
+ .keyBy(<key selector>)
+ .window(GlobalWindows.create())
+ .<windowed transformation>(<window function>);
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-keyedStream
- .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
- .trigger(CountTrigger.of(100))
- .evictor(CountEvictor.of(10))
+val input: DataStream[T] = ...
+
+// tumbling event-time windows
+input
+ .keyBy(<key selector>)
+ .window(TumblingEventTimeWindows.of(Time.seconds(5)))
+ .<windowed transformation>(<window function>)
+
+// sliding event-time windows
+input
+ .keyBy(<key selector>)
+ .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
+ .<windowed transformation>(<window function>)
+
+// event-time session windows
+input
+ .keyBy(<key selector>)
+ .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
+ .<windowed transformation>(<window function>)
+
+// tumbling processing-time windows
+input
+ .keyBy(<key selector>)
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
+ .<windowed transformation>(<window function>)
+
+// sliding processing-time windows
+input
+ .keyBy(<key selector>)
+ .window(SlidingProcessingTimeWindows.of(Time.seconds(10),
Time.seconds(5)))
+ .<windowed transformation>(<window function>)
+
+// processing-time session windows
+input
+ .keyBy(<key selector>)
+ .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
+ .<windowed transformation>(<window function>)
+
+// global windows
+input
+ .keyBy(<key selector>)
+ .window(GlobalWindows.create())
{% endhighlight %}
</div>
</div>
-The general recipe for building a custom window is to specify (1) a
`WindowAssigner`, (2) a `Trigger` (optionally),
-and (3) an `Evictor` (optionally).
+## Window Functions
-The `WindowAssigner` defines how incoming elements are assigned to
windows. A window is a logical group of elements
-that has a begin-value, and an end-value corresponding to a begin-time and
end-time. Elements with timestamp (according
-to some notion of time described above within these values are part of the
window).
+The *window function* is used to process the elements of each window (and
key) once the system
+determines that a window is ready for processing (see
[triggers](#triggers) for how the system
+determines when a window is ready).
-For example, the `SlidingEventTimeWindows`
-assigner in the code above defines a window of size 5 seconds, and a slide
of 1 second. Assume that
-time starts from 0 and is measured in milliseconds. Then, we have 6 windows
-that overlap: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000,
9000], and [5000, 10000]. Each incoming
-element is assigned to the windows according to its timestamp. For
example, an element with timestamp 2000 will be
-assigned to the first three windows. Flink comes bundled with window
assigners that cover the most common use cases. You can write your
-own window types by extending the `WindowAssigner` class.
+The window function can be one of `ReduceFunction`, `FoldFunction` or
`WindowFunction`. The former
+two can be executed more efficiently because Flink can incrementally
aggregate the elements for each
+window as they arrive. A `WindowFunction` gets an `Iterable` for all the
elements contained in a
+window and additional meta information about the window to which the
elements belong.
-<div class="codetabs" markdown="1">
+A windowed transformation with a `WindowFunction` cannot be executed as
efficiently as the other
+cases because Flink has to buffer all elements for a window internally
before invoking the function.
--- End diff --
maybe emphasize all like `*all*`?
> Update Windowing Documentation
> ------------------------------
>
> Key: FLINK-4062
> URL: https://issues.apache.org/jira/browse/FLINK-4062
> Project: Flink
> Issue Type: Sub-task
> Components: Documentation
> Affects Versions: 1.1.0
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> The window documentation could be a bit more principled and also needs
> updating with the new allowed lateness setting.
> There is also essentially no documentation about how to write a custom
> trigger.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)