[
https://issues.apache.org/jira/browse/FLINK-5529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15838200#comment-15838200
]
ASF GitHub Bot commented on FLINK-5529:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/3191#discussion_r97823809
--- Diff: docs/dev/windows.md ---
@@ -204,72 +221,120 @@ input
{% highlight scala %}
val input: DataStream[T] = ...
-// tumbling event-time windows
-input
- .keyBy(<key selector>)
- .window(TumblingEventTimeWindows.of(Time.seconds(5)))
- .<windowed transformation>(<window function>)
-
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
-// event-time session windows
+// sliding processing-time windows
input
.keyBy(<key selector>)
- .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
+ .window(SlidingProcessingTimeWindows.of(Time.seconds(10),
Time.seconds(5)))
.<windowed transformation>(<window function>)
-// tumbling processing-time windows
+// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
- .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
+ .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1),
Time.hours(-8)))
.<windowed transformation>(<window function>)
+{% endhighlight %}
+</div>
+</div>
-// sliding processing-time windows
+Time intervals can be specified by using one of `Time.milliseconds(x)`,
`Time.seconds(x)`,
+`Time.minutes(x)`, and so on.
+
+As shown in the last example, sliding window assigners also take an
optional `offset` parameter
+that can be used to change the alignment of windows. For example, without
offsets hourly windows
+sliding by 30 minutes are aligned with epoch, that is you will get windows
such as
+`1:00:00.000 - 1:59:59.999`, `1:30:00.000 - 2:29:59.999` and so on. If you
want to change that
+you can give an offset. With an offset of 15 minutes you would, for
example, get
+`1:15:00.000 - 2:14:59.999`, `1:45:00.000 - 2:44:59.999` etc.
+An important use case for offsets is to adjust windows to timezones other
than UTC-0.
+For example, in China you would have to specify an offset of
`Time.hours(-8)`.
+
+### Session Windows
+
+The *session windows* assigner groups elements by sessions of activity.
Session windows do not overlap and
+do not have a fixed start and end time in contrast to *tumbling windows*
and *sliding windows*. Instead a
+session window assigner closes a window when it does not receive elements
for a certain period
+of time, i.e., when a gap of inactivity occurred. A session window
assigner is configured with the *session gap* which
+defines how long the assigner waits until it closes the current session
window and assigns following elements
+to a new session window.
+
+<img src="{{ site.baseurl }}/fig/session-windows.svg" class="center"
style="width: 80%;" />
+
+The following code snippets show how to use session windows.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<T> input = ...;
+
+// event-time session windows
input
.keyBy(<key selector>)
- .window(SlidingProcessingTimeWindows.of(Time.seconds(10),
Time.seconds(5)))
- .<windowed transformation>(<window function>)
+ .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
+ .<windowed transformation>(<window function>);
// processing-time session windows
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
+ .<windowed transformation>(<window function>);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[T] = ...
+
+// event-time session windows
+input
+ .keyBy(<key selector>)
+ .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
-// global windows
+// processing-time session windows
input
.keyBy(<key selector>)
- .window(GlobalWindows.create())
+ .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
+ .<windowed transformation>(<window function>)
{% endhighlight %}
</div>
</div>
-Note, how we can specify a time interval by using one of
`Time.milliseconds(x)`, `Time.seconds(x)`,
+Time intervals can be specified by using one of `Time.milliseconds(x)`,
`Time.seconds(x)`,
`Time.minutes(x)`, and so on.
-The time-based window assigners also take an optional `offset` parameter
that can be used to
-change the alignment of windows. For example, without offsets hourly
windows are aligned
-with epoch, that is you will get windows such as `1:00 - 1:59`, `2:00 -
2:59` and so on. If you
-want to change that you can give an offset. With an offset of 15 minutes
you would, for example,
-get `1:15 - 2:14`, `2:15 - 3:14` etc. Another important use case for
offsets is when you
-want to have daily windows and live in a timezone other than UTC-0. For
example, in China
-you would have to specify an offset of `Time.hours(-8)`.
+<span class="label label-danger">Attention</span> Since session windows do
not have a fixed start and end,
+they are evaluated differently than tumbling and sliding windows.
Internally, a session window operator
+creates a new window for each arriving record and merges windows together
if their are closer to each other
+than the defined gap.
+In order to be mergable, a session window operator requires a mergable
[Trigger](#triggers) and a mergable
--- End diff --
I think it should be `mergeable` instead of `mergable`, appears several
times in the text.
> Improve / extends windowing documentation
> -----------------------------------------
>
> Key: FLINK-5529
> URL: https://issues.apache.org/jira/browse/FLINK-5529
> Project: Flink
> Issue Type: Sub-task
> Components: Documentation
> Reporter: Stephan Ewen
> Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.3.0
>
>
> Suggested Outline:
> {code}
> Windows
> (0) Outline: The anatomy of a window operation
> stream
> [.keyBy(...)] <- keyed versus non-keyed windows
> .window(...) <- required: "assigner"
> [.trigger(...)] <- optional: "trigger" (else default trigger)
> [.evictor(...)] <- optional: "evictor" (else no evictor)
> [.allowedLateness()] <- optional, else zero
> .reduce/fold/apply() <- required: "function"
> (1) Types of windows
> - tumble
> - slide
> - session
> - global
> (2) Pre-defined windows
> timeWindow() (tumble, slide)
> countWindow() (tumble, slide)
> - mention that count windows are inherently
> resource leaky unless limited key space
> (3) Window Functions
> - apply: most basic, iterates over elements in window
>
> - aggregating: reduce and fold, can be used with "apply()" which will get
> one element
>
> - forward reference to state size section
> (4) Advanced Windows
> - assigner
> - simple
> - merging
> - trigger
> - registering timers (processing time, event time)
> - state in triggers
> - life cycle of a window
> - create
> - state
> - cleanup
> - when is window contents purged
> - when is state dropped
> - when is metadata (like merging set) dropped
> (5) Late data
> - picture
> - fire vs fire_and_purge: late accumulates vs late resurrects (cf
> discarding mode)
>
> (6) Evictors
> - TDB
>
> (7) State size: How large will the state be?
> Basic rule: Each element has one copy per window it is assigned to
> --> num windows * num elements in window
> --> example: tumbline is one copy, sliding(n,m) is n/m copies
> --> per key
> Pre-aggregation:
> - if reduce or fold is set -> one element per window (rather than num
> elements in window)
> - evictor voids pre-aggregation from the perspective of state
> Special rules:
> - fold cannot pre-aggregate on session windows (and other merging windows)
> (8) Non-keyed windows
> - all elements through the same windows
> - currently not parallel
> - possible parallel in the future when having pre-aggregation functions
> - inherently (by definition) produce a result stream with parallelism one
> - state similar to one key of keyed windows
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)