[ 
https://issues.apache.org/jira/browse/FLINK-5529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15838204#comment-15838204
 ] 

ASF GitHub Bot commented on FLINK-5529:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3191#discussion_r97827094
  
    --- Diff: docs/dev/windows.md ---
    @@ -204,72 +221,120 @@ input
     {% highlight scala %}
     val input: DataStream[T] = ...
     
    -// tumbling event-time windows
    -input
    -    .keyBy(<key selector>)
    -    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    -    .<windowed transformation>(<window function>)
    -
     // sliding event-time windows
     input
         .keyBy(<key selector>)
         .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
         .<windowed transformation>(<window function>)
     
    -// event-time session windows
    +// sliding processing-time windows
     input
         .keyBy(<key selector>)
    -    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    +    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), 
Time.seconds(5)))
         .<windowed transformation>(<window function>)
     
    -// tumbling processing-time windows
    +// sliding processing-time windows offset by -8 hours
     input
         .keyBy(<key selector>)
    -    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), 
Time.hours(-8)))
         .<windowed transformation>(<window function>)
    +{% endhighlight %}
    +</div>
    +</div>
     
    -// sliding processing-time windows
    +Time intervals can be specified by using one of `Time.milliseconds(x)`, 
`Time.seconds(x)`,
    +`Time.minutes(x)`, and so on.
    +
    +As shown in the last example, sliding window assigners also take an 
optional `offset` parameter 
    +that can be used to change the alignment of windows. For example, without 
offsets hourly windows 
    +sliding by 30 minutes are aligned with epoch, that is you will get windows 
such as 
    +`1:00:00.000 - 1:59:59.999`, `1:30:00.000 - 2:29:59.999` and so on. If you 
want to change that 
    +you can give an offset. With an offset of 15 minutes you would, for 
example, get 
    +`1:15:00.000 - 2:14:59.999`, `1:45:00.000 - 2:44:59.999` etc. 
    +An important use case for offsets is to adjust windows to timezones other 
than UTC-0. 
    +For example, in China you would have to specify an offset of 
`Time.hours(-8)`.
    +
    +### Session Windows
    +
    +The *session windows* assigner groups elements by sessions of activity. 
Session windows do not overlap and
    +do not have a fixed start and end time in contrast to *tumbling windows* 
and *sliding windows*. Instead a 
    +session window assigner closes a window when it does not receive elements 
for a certain period 
    +of time, i.e., when a gap of inactivity occurred. A session window 
assigner is configured with the *session gap* which
    +defines how long the assigner waits until it closes the current session 
window and assigns following elements 
    +to a new session window.
    +
    +<img src="{{ site.baseurl }}/fig/session-windows.svg" class="center" 
style="width: 80%;" />
    +
    +The following code snippets show how to use session windows.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<T> input = ...;
    +
    +// event-time session windows
     input
         .keyBy(<key selector>)
    -    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), 
Time.seconds(5)))
    -    .<windowed transformation>(<window function>)
    +    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    +    .<windowed transformation>(<window function>);
     
     // processing-time session windows
     input
         .keyBy(<key selector>)
         .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    +    .<windowed transformation>(<window function>);
    +{% endhighlight %}
    +</div>
    +
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val input: DataStream[T] = ...
    +
    +// event-time session windows
    +input
    +    .keyBy(<key selector>)
    +    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
         .<windowed transformation>(<window function>)
     
    -// global windows
    +// processing-time session windows
     input
         .keyBy(<key selector>)
    -    .window(GlobalWindows.create())
    +    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    +    .<windowed transformation>(<window function>)
     {% endhighlight %}
     </div>
     </div>
     
    -Note, how we can specify a time interval by using one of 
`Time.milliseconds(x)`, `Time.seconds(x)`,
    +Time intervals can be specified by using one of `Time.milliseconds(x)`, 
`Time.seconds(x)`,
     `Time.minutes(x)`, and so on.
     
    -The time-based window assigners also take an optional `offset` parameter 
that can be used to
    -change the alignment of windows. For example, without offsets hourly 
windows are aligned
    -with epoch, that is you will get windows such as `1:00 - 1:59`, `2:00 - 
2:59` and so on. If you
    -want to change that you can give an offset. With an offset of 15 minutes 
you would, for example,
    -get `1:15 - 2:14`, `2:15 - 3:14` etc. Another important use case for 
offsets is when you
    -want to have daily windows and live in a timezone other than UTC-0. For 
example, in China
    -you would have to specify an offset of `Time.hours(-8)`.
    +<span class="label label-danger">Attention</span> Since session windows do 
not have a fixed start and end, 
    +they are  evaluated differently than tumbling and sliding windows. 
Internally, a session window operator 
    +creates a new window for each arriving record and merges windows together 
if their are closer to each other 
    +than the defined gap.
    +In order to be mergable, a session window operator requires a mergable 
[Trigger](#triggers) and a mergable 
    --- End diff --
    
    I think it should be "merging Trigger" and also "merging WindowFunction" 
since they themselves are not mergeable but just "merge aware" or "merge 
compatible".


> Improve / extends windowing documentation
> -----------------------------------------
>
>                 Key: FLINK-5529
>                 URL: https://issues.apache.org/jira/browse/FLINK-5529
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Documentation
>            Reporter: Stephan Ewen
>            Assignee: Kostas Kloudas
>             Fix For: 1.2.0, 1.3.0
>
>
> Suggested Outline:
> {code}
> Windows
> (0) Outline: The anatomy of a window operation
>   stream
>      [.keyBy(...)]         <-  keyed versus non-keyed windows
>       .window(...)         <-  required: "assigner"
>      [.trigger(...)]       <-  optional: "trigger" (else default trigger)
>      [.evictor(...)]       <-  optional: "evictor" (else no evictor)
>      [.allowedLateness()]  <-  optional, else zero
>       .reduce/fold/apply() <-  required: "function"
> (1) Types of windows
>   - tumble
>   - slide
>   - session
>   - global
> (2) Pre-defined windows
>    timeWindow() (tumble, slide)
>    countWindow() (tumble, slide)
>      - mention that count windows are inherently
>        resource leaky unless limited key space
> (3) Window Functions
>   - apply: most basic, iterates over elements in window
>   
>   - aggregating: reduce and fold, can be used with "apply()" which will get 
> one element
>   
>   - forward reference to state size section
> (4) Advanced Windows
>   - assigner
>     - simple
>     - merging
>   - trigger
>     - registering timers (processing time, event time)
>     - state in triggers
>   - life cycle of a window
>     - create
>     - state
>     - cleanup
>       - when is window contents purged
>       - when is state dropped
>       - when is metadata (like merging set) dropped
> (5) Late data
>   - picture
>   - fire vs fire_and_purge: late accumulates vs late resurrects (cf 
> discarding mode)
>   
> (6) Evictors
>   - TDB
>   
> (7) State size: How large will the state be?
> Basic rule: Each element has one copy per window it is assigned to
>   --> num windows * num elements in window
>   --> example: tumbline is one copy, sliding(n,m) is n/m copies
>   --> per key
> Pre-aggregation:
>   - if reduce or fold is set -> one element per window (rather than num 
> elements in window)
>   - evictor voids pre-aggregation from the perspective of state
> Special rules:
>   - fold cannot pre-aggregate on session windows (and other merging windows)
> (8) Non-keyed windows
>   - all elements through the same windows
>   - currently not parallel
>   - possible parallel in the future when having pre-aggregation functions
>   - inherently (by definition) produce a result stream with parallelism one
>   - state similar to one key of keyed windows
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to