[ 
https://issues.apache.org/jira/browse/FLINK-5529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15838199#comment-15838199
 ] 

ASF GitHub Bot commented on FLINK-5529:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3191#discussion_r97824676
  
    --- Diff: docs/dev/windows.md ---
    @@ -23,133 +23,96 @@ specific language governing permissions and limitations
     under the License.
     -->
     
    -Flink uses a concept called *windows* to divide a (potentially) infinite 
`DataStream` into finite
    -slices based on the timestamps of elements or other criteria. This 
division is required when working
    -with infinite streams of data and performing transformations that 
aggregate elements.
    -
    -<span class="label label-info">Info</span> We will mostly talk about 
*keyed windowing* here, i.e.
    -windows that are applied on a `KeyedStream`. Keyed windows have the 
advantage that elements are
    -subdivided based on both window and key before being given to
    -a user function. The work can thus be distributed across the cluster
    -because the elements for different keys can be processed independently. If 
you absolutely have to,
    -you can check out [non-keyed windowing](#non-keyed-windowing) where we 
describe how non-keyed
    -windows work.
    +Windows are at the heart of processing infinite streams. Windows split the 
stream into "buckets" of finite size, 
    +over which we can apply computations. This document focuses on how 
windowing is performed in Flink and how the 
    +programmer can benefit to the maximum from its offered functionality. 
     
    -* This will be replaced by the TOC
    -{:toc}
    +The general structure of a windowed Flink program is presented below. This 
is also going to serve as a roadmap for 
    +the rest of the page.
     
    -## Basics
    +    stream
    +           .keyBy(...)          <-  keyed versus non-keyed windows
    +           .window(...)         <-  required: "assigner"
    +          [.trigger(...)]       <-  optional: "trigger" (else default 
trigger)
    +          [.evictor(...)]       <-  optional: "evictor" (else no evictor)
    +          [.allowedLateness()]  <-  optional, else zero
    +           .reduce/fold/apply() <-  required: "function"
     
    -For a windowed transformation you must at least specify a *key*
    -(see [specifying keys]({{ site.baseurl 
}}/dev/api_concepts.html#specifying-keys)),
    -a *window assigner* and a *window function*. The *key* divides the 
infinite, non-keyed, stream
    -into logical keyed streams while the *window assigner* assigns elements to 
finite per-key windows.
    -Finally, the *window function* is used to process the elements of each 
window.
    +In the above, the commands in square brackets ([...]) are optional. This 
reveals that Flink allows you to customize your 
    +windowing logic in many different ways so that it best fits your needs. 
     
    -The basic structure of a windowed transformation is thus as follows:
    +* This will be replaced by the TOC
    +{:toc}
     
    -<div class="codetabs" markdown="1">
    -<div data-lang="java" markdown="1">
    -{% highlight java %}
    -DataStream<T> input = ...;
    +## Window Lifecycle
     
    -input
    -    .keyBy(<key selector>)
    -    .window(<window assigner>)
    -    .<windowed transformation>(<window function>);
    -{% endhighlight %}
    -</div>
    +In a nutshell, a window is **created** as soon as the first element that 
should belong to this window arrives, and the  
    +window is **completely removed** when the time (event or processing time) 
passes its end timestamp plus the user-specified 
    +`allowed lateness` (see [Allowed Lateness](#allowed-lateness)). Flink 
guarantees removal only for time-based 
    +windows and not for other types, *e.g.* global windows (see [Window 
Assigners](#window-assigners)). For example, with an 
    +event-time-based windowing strategy that creates non-overlapping (or 
tumbling) windows every 5 minutes and has an allowed 
    +lateness of 1 min, Flink will create a new window for the interval between 
`12:00` and `12:05` when the first element with 
    +a timestamp that falls into this interval arrives, and it will remove it 
when the watermark passes the `12:06`
    +timestamp. 
     
    -<div data-lang="scala" markdown="1">
    -{% highlight scala %}
    -val input: DataStream[T] = ...
    +In addition, each window will have a `Trigger` (see [Triggers](#triggers)) 
and a function (`WindowFunction`, `ReduceFunction` or 
    +`FoldFunction`) (see [Window Functions](#window-functions)) attached to 
it. The function will contain the computation to 
    +be applied to the contents of the window, while the `Trigger` specifies 
the conditions under which the window is 
    +considered ready for the function to be applied. A triggering policy might 
be something like "when the number of elements 
    +in the window is more than 4", or "when the watermark passes the end of 
the window". A trigger can also decide to 
    +purge a window's contents any time between its creation and removal. 
Purging in this case only refers to the elements 
    +in the window, and *not* the window metadata. This means that new data can 
still be added to that window.
     
    -input
    -    .keyBy(<key selector>)
    -    .window(<window assigner>)
    -    .<windowed transformation>(<window function>)
    -{% endhighlight %}
    -</div>
    -</div>
    +Apart from the above, you can specify an `Evictor` (see 
[Evictors](#evictors)) which will be able to remove  
    +elements from the window after the trigger fires and before and/or after 
the function is applied.
     
    -We will cover [window assigners](#window-assigners) in a separate section 
below.
    +In the following we go into more detail for each of the components above. 
We start with the required parts in the above 
    +snippet (see [Keyed vs Non-Keyed Windows](#keyed-vs-non-keyed-windows), 
[Window Assigner](#window-assigner), and 
    +[Window Function](#window-function)) before moving to the optional ones.
     
    -The window transformation can be one of `reduce()`, `fold()` or `apply()`. 
Which respectively
    -takes a `ReduceFunction`, `FoldFunction` or `WindowFunction`. We describe 
each of these ways
    -of specifying a windowed transformation in detail below: [window 
functions](#window-functions).
    +## Keyed vs Non-Keyed Windows
     
    -For more advanced use cases you can also specify a `Trigger` that 
determines when exactly a window
    -is being considered as *ready for processing*. These will be covered in 
more detail in
    -[triggers](#triggers).
    +The first thing to specify is whether your stream should be keyed or not. 
This has to be done before defining the window. 
    +Using the `keyBy(...)` will split your infinite stream into logical keyed 
streams. If `keyBy(...)` is not called, your 
    +stream is not keyed.
     
    -## Window Assigners
    +In the case of keyed streams, any attribute of your incoming events can be 
used as a key 
    +(more details [here]({{ site.baseurl 
}}/dev/api_concepts.html#specifying-keys)). Having a keyed stream will 
    +allow your windowed computation to be performed in parallel by multiple 
tasks, as each logical keyed stream can be processed 
    +independently from the rest. All elements referring to the same key will 
be sent to the same parallel task. 
     
    -The window assigner specifies how elements of the stream are divided into 
finite slices. Flink comes
    -with pre-implemented window assigners for the most typical use cases, 
namely *tumbling windows*,
    -*sliding windows*, *session windows* and *global windows*, but you can 
implement your own by
    -extending the `WindowAssigner` class. All the built-in window assigners, 
except for the global
    -windows one, assign elements to windows based on time, which can either be 
processing time or event
    -time. Please take a look at our section on [event time]({{ site.baseurl 
}}/dev/event_time.html) for more
    -information about how Flink deals with time.
    +In case of non-keyed streams, your original stream will not be split into 
multiple logical streams and all the windowing logic 
    +will be performed by a single task, *i.e.* with parallelism of 1.
     
    -Let's first look at how each of these window assigners works before 
looking at how they can be used
    -in a Flink program. We will be using abstract figures to visualize the 
workings of each assigner:
    -in the following, the purple circles are elements of the stream, they are 
partitioned
    -by some key (in this case *user 1*, *user 2* and *user 3*) and the x-axis 
shows the progress
    -of time.
    +## Window Assigners
     
    -### Global Windows
    +After specifying whether your stream is keyed or not, the next step is to 
define a *windowing strategy*. 
    --- End diff --
    
    I think we should stick to `window assigner` here because that's what we're 
describing. In my mind, the ensemble of window assigner, trigger (and evictor) 
is actually the `windowing strategy` since only those together define what 
happens in the end.
    
    What do you think? 


> Improve / extends windowing documentation
> -----------------------------------------
>
>                 Key: FLINK-5529
>                 URL: https://issues.apache.org/jira/browse/FLINK-5529
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Documentation
>            Reporter: Stephan Ewen
>            Assignee: Kostas Kloudas
>             Fix For: 1.2.0, 1.3.0
>
>
> Suggested Outline:
> {code}
> Windows
> (0) Outline: The anatomy of a window operation
>   stream
>      [.keyBy(...)]         <-  keyed versus non-keyed windows
>       .window(...)         <-  required: "assigner"
>      [.trigger(...)]       <-  optional: "trigger" (else default trigger)
>      [.evictor(...)]       <-  optional: "evictor" (else no evictor)
>      [.allowedLateness()]  <-  optional, else zero
>       .reduce/fold/apply() <-  required: "function"
> (1) Types of windows
>   - tumble
>   - slide
>   - session
>   - global
> (2) Pre-defined windows
>    timeWindow() (tumble, slide)
>    countWindow() (tumble, slide)
>      - mention that count windows are inherently
>        resource leaky unless limited key space
> (3) Window Functions
>   - apply: most basic, iterates over elements in window
>   
>   - aggregating: reduce and fold, can be used with "apply()" which will get 
> one element
>   
>   - forward reference to state size section
> (4) Advanced Windows
>   - assigner
>     - simple
>     - merging
>   - trigger
>     - registering timers (processing time, event time)
>     - state in triggers
>   - life cycle of a window
>     - create
>     - state
>     - cleanup
>       - when is window contents purged
>       - when is state dropped
>       - when is metadata (like merging set) dropped
> (5) Late data
>   - picture
>   - fire vs fire_and_purge: late accumulates vs late resurrects (cf 
> discarding mode)
>   
> (6) Evictors
>   - TDB
>   
> (7) State size: How large will the state be?
> Basic rule: Each element has one copy per window it is assigned to
>   --> num windows * num elements in window
>   --> example: tumbline is one copy, sliding(n,m) is n/m copies
>   --> per key
> Pre-aggregation:
>   - if reduce or fold is set -> one element per window (rather than num 
> elements in window)
>   - evictor voids pre-aggregation from the perspective of state
> Special rules:
>   - fold cannot pre-aggregate on session windows (and other merging windows)
> (8) Non-keyed windows
>   - all elements through the same windows
>   - currently not parallel
>   - possible parallel in the future when having pre-aggregation functions
>   - inherently (by definition) produce a result stream with parallelism one
>   - state similar to one key of keyed windows
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to