Repository: flink
Updated Branches:
  refs/heads/master 913dc8df2 -> a11c1c640


http://git-wip-us.apache.org/repos/asf/flink/blob/a11c1c64/docs/apis/streaming/windows.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/windows.md b/docs/apis/streaming/windows.md
index 90ad0de..fba17fc 100644
--- a/docs/apis/streaming/windows.md
+++ b/docs/apis/streaming/windows.md
@@ -24,1023 +24,616 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+Flink uses a concept called *windows* to divide a (potentially) infinite 
`DataStream` into finite
+slices based on the timestamps of elements or other criteria. This division is 
required when working
+with infinite streams of data and performing transformations that aggregate 
elements.
+
+<span class="label label-info">Info</span> We will mostly talk about *keyed 
windowing* here, i.e.
+windows that are applied on a `KeyedStream`. Keyed windows have the advantage 
that elements are
+subdivided based on both window and key before being given to
+a user function. The work can thus be distributed across the cluster
+because the elements for different keys can be processed independently. If you 
absolutely have to,
+you can check out [non-keyed windowing](#non-keyed-windowing) where we 
describe how non-keyed
+windows work.
+
 * This will be replaced by the TOC
 {:toc}
 
-## Windows on Keyed Data Streams
-
-Flink offers a variety of methods for defining windows on a `KeyedStream`. All 
of these group elements *per key*,
-i.e., each window will contain elements with the same key value.
+## Basics
 
-### Basic Window Constructs
+For a windowed transformation you must at least specify a *key*
+(see [specifying keys](/apis/common/index.html#specifying-keys)),
+a *window assigner* and a *window function*. The *key* divides the infinite, 
non-keyed, stream
+into logical keyed streams while the *window assigner* assigns elements to 
finite per-key windows.
+Finally, the *window function* is used to process the elements of each window.
 
-Flink offers a general window mechanism that provides flexibility, as well as 
a number of pre-defined windows
-for common use cases. See first if your use case can be served by the 
pre-defined windows below before moving
-to defining your own windows.
+The basic structure of a windowed transformation is thus as follows:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<T> input = ...;
 
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-      <tr>
-        <td><strong>Tumbling time window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-          Defines a window of 5 seconds, that "tumbles". This means that 
elements are
-          grouped according to their timestamp in groups of 5 second duration, 
and every element belongs to exactly one window.
-         The notion of time is specified by the selected TimeCharacteristic 
(see <a href="{{ site.baseurl }}/apis/streaming/event_time.html">time</a>).
-    {% highlight java %}
-keyedStream.timeWindow(Time.seconds(5));
-    {% endhighlight %}
-          </p>
-        </td>
-      </tr>
-      <tr>
-          <td><strong>Sliding time window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-          <td>
-            <p>
-             Defines a window of 5 seconds, that "slides" by 1 second. This 
means that elements are
-             grouped according to their timestamp in groups of 5 second 
duration, and elements can belong to more than
-             one window (since windows overlap by at most 4 seconds)
-             The notion of time is specified by the selected 
TimeCharacteristic (see <a href="{{ site.baseurl 
}}/apis/streaming/event_time.html">time</a>).
-      {% highlight java %}
-keyedStream.timeWindow(Time.seconds(5), Time.seconds(1));
-      {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-      <tr>
-        <td><strong>Tumbling count window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-          Defines a window of 1000 elements, that "tumbles". This means that 
elements are
-          grouped according to their arrival time (equivalent to processing 
time) in groups of 1000 elements,
-          and every element belongs to exactly one window.
-    {% highlight java %}
-keyedStream.countWindow(1000);
-    {% endhighlight %}
-        </p>
-        </td>
-      </tr>
-      <tr>
-      <td><strong>Sliding count window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-      <td>
-        <p>
-          Defines a window of 1000 elements, that "slides" every 100 elements. 
This means that elements are
-          grouped according to their arrival time (equivalent to processing 
time) in groups of 1000 elements,
-          and every element can belong to more than one window (as windows 
overlap by at most 900 elements).
-  {% highlight java %}
-keyedStream.countWindow(1000, 100)
-  {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .<windowed transformation>(<window function>);
+{% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[T] = ...
 
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-      <tr>
-        <td><strong>Tumbling time window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-          Defines a window of 5 seconds, that "tumbles". This means that 
elements are
-          grouped according to their timestamp in groups of 5 second duration, 
and every element belongs to exactly one window.
-          The notion of time is specified by the selected TimeCharacteristic 
(see <a href="{{ site.baseurl }}/apis/streaming/event_time.html">time</a>).
-    {% highlight scala %}
-keyedStream.timeWindow(Time.seconds(5))
-    {% endhighlight %}
-          </p>
-        </td>
-      </tr>
-      <tr>
-          <td><strong>Sliding time window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-          <td>
-            <p>
-             Defines a window of 5 seconds, that "slides" by 1 second. This 
means that elements are
-             grouped according to their timestamp in groups of 5 second 
duration, and elements can belong to more than
-             one window (since windows overlap by at most 4 seconds)
-             The notion of time is specified by the selected 
TimeCharacteristic (see <a href="{{ site.baseurl 
}}/apis/streaming/event_time.html">time</a>).
-      {% highlight scala %}
-keyedStream.timeWindow(Time.seconds(5), Time.seconds(1))
-      {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-      <tr>
-        <td><strong>Tumbling count window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-          Defines a window of 1000 elements, that "tumbles". This means that 
elements are
-          grouped according to their arrival time (equivalent to processing 
time) in groups of 1000 elements,
-          and every element belongs to exactly one window.
-    {% highlight scala %}
-keyedStream.countWindow(1000)
-    {% endhighlight %}
-        </p>
-        </td>
-      </tr>
-      <tr>
-      <td><strong>Sliding count window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-      <td>
-        <p>
-          Defines a window of 1000 elements, that "slides" every 100 elements. 
This means that elements are
-          grouped according to their arrival time (equivalent to processing 
time) in groups of 1000 elements,
-          and every element can belong to more than one window (as windows 
overlap by at most 900 elements).
-  {% highlight scala %}
-keyedStream.countWindow(1000, 100)
-  {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .<windowed transformation>(<window function>)
+{% endhighlight %}
 </div>
 </div>
 
-### Advanced Window Constructs
+We will cover [window assigners](#window-assigners) in a separate section 
below.
+
+The window transformation can be one of `reduce()`, `fold()` or `apply()`. 
Which respectively
+takes a `ReduceFunction`, `FoldFunction` or `WindowFunction`. We describe each 
of these ways
+of specifying a windowed transformation in detail below: [window 
functions](#window-functions).
+
+For more advanced use cases you can also specify a `Trigger` that determines 
when exactly a window
+is being considered as *ready for processing*. These will be covered in more 
detail in
+[triggers](#triggers).
+
+## Window Assigners
+
+The window assigner specifies how elements of the stream are divided into 
finite slices. Flink comes
+with pre-implemented window assigners for the most typical use cases, namely 
*tumbling windows*,
+*sliding windows*, *session windows* and *global windows*, but you can 
implement your own by
+extending the `WindowAssigner` class. All the built-in window assigners, 
except for the global
+windows one, assign elements to windows based on time, which can either be 
processing time or event
+time. Please take a look at our section on [event 
time](/apis/streaming/event_time.html) for more
+information about how Flink deals with time.
+
+Let's first look at how each of these window assigners works before looking at 
how they can be used
+in a Flink program. We will be using abstract figures to visualize the 
workings of each assigner:
+in the following, the purple circles are elements of the stream, they are 
partitioned
+by some key (in this case *user 1*, *user 2* and *user 3*) and the x-axis 
shows the progress
+of time.
+
+### Global Windows
+
+Global windows are a way of specifying that we don't want to subdivide our 
elements into windows.
+Each element is assigned to one single per-key *global window*.
+This windowing scheme is only useful if you also specify a custom 
[trigger](#triggers). Otherwise,
+no computation is ever going to be performed, as the global window does not 
have a natural end at
+which we could process the aggregated elements.
+
+<img src="non-windowed.svg" class="center" style="width: 80%;" />
+
+### Tumbling Windows
+
+A *tumbling windows* assigner assigns elements to fixed length, 
non-overlapping windows of a
+specified *window size*.. For example, if you specify a window size of 5 
minutes, the window
+function will get 5 minutes worth of elements in each invocation.
 
-The general mechanism can define more powerful windows at the cost of more 
verbose syntax. For example,
-below is a window definition where windows hold elements of the last 5 seconds 
and slides every 1 second,
-but the execution of the window function is triggered when 100 elements have 
been added to the
-window, and every time execution is triggered, 10 elements are retained in the 
window:
+<img src="tumbling-windows.svg" class="center" style="width: 80%;" />
+
+### Sliding Windows
+
+The *sliding windows* assigner assigns elements to windows of fixed length 
equal to *window size*,
+as the tumbling windows assigner, but in this case, windows can be 
overlapping. The size of the
+overlap is defined by the user-specified parameter *window slide*. As windows 
are overlapping, an
+element can be assigned to multiple windows
+
+For example, you could have windows of size 10 minutes that slide by 5 
minutes. With this you get 10
+minutes worth of elements in each invocation of the window function and it 
will be invoked for every
+5 minutes of data.
+
+<img src="sliding-windows.svg" class="center" style="width: 80%;" />
+
+### Session Windows
+
+The *session windows* assigner is ideal for cases where the window boundaries 
need to adjust to the
+incoming data. Both the *tumbling windows* and *sliding windows* assigner 
assign elements to windows
+that start at fixed time points and have a fixed *window size*. With session 
windows it is possible
+to have windows that start at individual points in time for each key and that 
end once there has
+been a certain period of inactivity. The configuration parameter is the 
*session gap* that specifies
+how long to wait for new data before considering a session as closed.
+
+<img src="session-windows.svg" class="center" style="width: 80%;" />
+
+### Specifying a Window Assigner
+
+The built-in window assigners (except `GlobalWindows`) come in two versions. 
One for processing-time
+windowing and one for event-time windowing. The processing-time assigners 
assign elements to
+windows based on the current clock of the worker machines while the event-time 
assigners assign
+windows based on the timestamps of elements. Please have a look at
+[event time](/apis/streaming/event_time.html) to learn about the difference 
between processing time
+and event time and about how timestamps can be assigned to elements.
+
+The following code snippets show how each of the window assigners can be used 
in a program:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-keyedStream
-    .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
-    .trigger(CountTrigger.of(100))
-    .evictor(CountEvictor.of(10));
+DataStream<T> input = ...;
+
+// tumbling event-time windows
+input
+    .keyBy(<key selector>)
+    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
+    .<windowed transformation>(<window function>);
+
+// sliding event-time windows
+input
+    .keyBy(<key selector>)
+    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
+    .<windowed transformation>(<window function>);
+
+// event-time session windows
+input
+    .keyBy(<key selector>)
+    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
+    .<windowed transformation>(<window function>);
+
+// tumbling processing-time windows
+input
+    .keyBy(<key selector>)
+    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
+    .<windowed transformation>(<window function>);
+
+// sliding processing-time windows
+input
+    .keyBy(<key selector>)
+    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
+    .<windowed transformation>(<window function>);
+
+// processing-time session windows
+input
+    .keyBy(<key selector>)
+    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
+    .<windowed transformation>(<window function>);
+
+// global windows
+input
+    .keyBy(<key selector>)
+    .window(GlobalWindows.create())
+    .<windowed transformation>(<window function>);
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-keyedStream
-    .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
-    .trigger(CountTrigger.of(100))
-    .evictor(CountEvictor.of(10))
+val input: DataStream[T] = ...
+
+// tumbling event-time windows
+input
+    .keyBy(<key selector>)
+    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
+    .<windowed transformation>(<window function>)
+
+// sliding event-time windows
+input
+    .keyBy(<key selector>)
+    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
+    .<windowed transformation>(<window function>)
+
+// event-time session windows
+input
+    .keyBy(<key selector>)
+    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
+    .<windowed transformation>(<window function>)
+
+// tumbling processing-time windows
+input
+    .keyBy(<key selector>)
+    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
+    .<windowed transformation>(<window function>)
+
+// sliding processing-time windows
+input
+    .keyBy(<key selector>)
+    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
+    .<windowed transformation>(<window function>)
+
+// processing-time session windows
+input
+    .keyBy(<key selector>)
+    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
+    .<windowed transformation>(<window function>)
+
+// global windows
+input
+    .keyBy(<key selector>)
+    .window(GlobalWindows.create())
 {% endhighlight %}
 </div>
 </div>
 
-The general recipe for building a custom window is to specify (1) a 
`WindowAssigner`, (2) a `Trigger` (optionally),
-and (3) an `Evictor` (optionally).
+Note, how we can specify a time interval by using one of 
`Time.milliseconds(x)`, `Time.seconds(x)`,
+`Time.minutes(x)`, and so on.
 
-The `WindowAssigner` defines how incoming elements are assigned to windows. A 
window is a logical group of elements
-that has a begin-value, and an end-value corresponding to a begin-time and 
end-time. Elements with timestamp (according
-to some notion of time described above within these values are part of the 
window).
+## Window Functions
 
-For example, the `SlidingEventTimeWindows`
-assigner in the code above defines a window of size 5 seconds, and a slide of 
1 second. Assume that
-time starts from 0 and is measured in milliseconds. Then, we have 6 windows
-that overlap: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 9000], 
and [5000, 10000]. Each incoming
-element is assigned to the windows according to its timestamp. For example, an 
element with timestamp 2000 will be
-assigned to the first three windows. Flink comes bundled with window assigners 
that cover the most common use cases. You can write your
-own window types by extending the `WindowAssigner` class.
+The *window function* is used to process the elements of each window (and key) 
once the system
+determines that a window is ready for processing (see [triggers](#triggers) 
for how the system
+determines when a window is ready).
 
-<div class="codetabs" markdown="1">
+The window function can be one of `ReduceFunction`, `FoldFunction` or 
`WindowFunction`. The first
+two can be executed more efficiently because Flink can incrementally aggregate 
the elements for each
+window as they arrive. A `WindowFunction` gets an `Iterable` for all the 
elements contained in a
+window and additional meta information about the window to which the elements 
belong.
+
+A windowed transformation with a `WindowFunction` cannot be executed as 
efficiently as the other
+cases because Flink has to buffer *all* elements for a window internally 
before invoking the function.
+This can be mitigated by combining a `WindowFunction` with a `ReduceFunction` 
or `FoldFunction` to
+get both incremental aggregation of window elements and the additional 
information that the
+`WindowFunction` receives. We will look at examples for each of these variants.
+
+### ReduceFunction
 
+A reduce function specifies how two values can be combined to form one 
element. Flink can use this
+to incrementally aggregate the elements in a window.
+
+A `ReduceFunction` can be used in a program like this:
+
+<div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-      <tr>
-        <td><strong>Global window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-           All incoming elements of a given key are assigned to the same 
window.
-           The window does not contain a default trigger, hence it will never 
be triggered
-           if a trigger is not explicitly specified.
-          </p>
-    {% highlight java %}
-stream.window(GlobalWindows.create());
-    {% endhighlight %}
-        </td>
-      </tr>
-      <tr>
-        <td><strong>Tumbling event-time windows</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-            Incoming elements are assigned to a window of a certain size (1 
second below) based on
-            their timestamp. Windows do not overlap, i.e., each element is 
assigned to exactly one window.
-            This assigner comes with a default trigger that fires for a window 
when a
-            watermark with value higher than its end-value is received.
-          </p>
-      {% highlight java %}
-stream.window(TumblingEventTimeWindows.of(Time.seconds(1)));
-      {% endhighlight %}
-        </td>
-      </tr>
-      <tr>
-        <td><strong>Sliding event-time windows</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-            Incoming elements are assigned to a window of a certain size (5 
seconds below) based on
-            their timestamp. Windows "slide" by the provided value (1 second 
in the example), and hence
-            overlap. This assigner comes with a default trigger that fires for 
a window when a
-                 watermark with value higher than its end-value is received.
-          </p>
-    {% highlight java %}
-stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)));
-    {% endhighlight %}
-        </td>
-      </tr>
-      <tr>
-          <td><strong>Tumbling processing time windows</strong><br>KeyedStream 
&rarr; WindowedStream</td>
-          <td>
-            <p>
-              Incoming elements are assigned to a window of a certain size (1 
second below) based on
-              the current processing time. Windows do not overlap, i.e., each 
element is assigned to exactly one window.
-              This assigner comes with a default trigger that fires for a 
window a window when the current
-              processing time exceeds its end-value.
-            </p>
-      {% highlight java %}
-stream.window(TumblingProcessingTimeWindows.of(Time.seconds(1)));
-      {% endhighlight %}
-          </td>
-        </tr>
-      <tr>
-        <td><strong>Sliding processing time windows</strong><br>KeyedStream 
&rarr; WindowedStream</td>
-        <td>
-          <p>
-            Incoming elements are assigned to a window of a certain size (5 
seconds below) based on
-            their timestamp. Windows "slide" by the provided value (1 second 
in the example), and hence
-            overlap. This assigner comes with a default trigger that fires for 
a window a window when the current
-            processing time exceeds its end-value.
-          </p>
-    {% highlight java %}
-stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), 
Time.seconds(1)));
-    {% endhighlight %}
-        </td>
-      </tr>
-          <tr>
-        <td><strong>Event-time Session windows</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-            Incoming elements are assigned to sessions based on a session gap 
interval (5 seconds in the example below).
-            Elements whose timestamp differs by more than the session gap are 
assigned to different sessions. If there are
-            consecutive elements which are less than the session gap apart 
then these will also be put into the same session, i.e. elements
-            can be connected into a session by intermediate elements.
-          </p>
-    {% highlight scala %}
-keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));
-    {% endhighlight %}
-        </td>
-      </tr>
-       <tr>
-        <td><strong>Processing time Session windows</strong><br>KeyedStream 
&rarr; WindowedStream</td>
-        <td>
-          <p>
-           This is similar to event-time session windows but works on the 
current processing
-           time instead of the timestamp of elements
-          </p>
-    {% highlight scala %}
-keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));
-    {% endhighlight %}
-        </td>
-      </tr>
-  </tbody>
-</table>
+{% highlight java %}
+DataStream<Tuple2<String, Long>> input = ...;
+
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .reduce(new ReduceFunction<Tuple2<String, Long>> {
+      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, 
Tuple2<String, Long> v2) {
+        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
+      }
+    });
+{% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-      <tr>
-        <td><strong>Global window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-            All incoming elements of a given key are assigned to the same 
window.
-           The window does not contain a default trigger, hence it will never 
be triggered
-           if a trigger is not explicitly specified.
-          </p>
-    {% highlight scala %}
-stream.window(GlobalWindows.create)
-    {% endhighlight %}
-        </td>
-      </tr>
-      <tr>
-          <td><strong>Tumbling event-time windows</strong><br>KeyedStream 
&rarr; WindowedStream</td>
-          <td>
-            <p>
-             Incoming elements are assigned to a window of a certain size (1 
second below) based on
-            their timestamp. Windows do not overlap, i.e., each element is 
assigned to exactly one window.
-            This assigner comes with a default trigger that fires for a window 
when a
-            watermark with value higher than its end-value is received.
-            </p>
-      {% highlight scala %}
-stream.window(TumblingEventTimeWindows.of(Time.seconds(1)))
-      {% endhighlight %}
-          </td>
-        </tr>
-      <tr>
-        <td><strong>Sliding event-time windows</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-            Incoming elements are assigned to a window of a certain size (5 
seconds below) based on
-            their timestamp. Windows "slide" by the provided value (1 second 
in the example), and hence
-            overlap. This assigner comes with a default trigger that fires for 
a window when a
-            watermark with value higher than its end-value is received.
-          </p>
-    {% highlight scala %}
-stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
-    {% endhighlight %}
-        </td>
-      </tr>
-      <tr>
-          <td><strong>Tumbling processing time windows</strong><br>KeyedStream 
&rarr; WindowedStream</td>
-          <td>
-            <p>
-              Incoming elements are assigned to a window of a certain size (1 
second below) based on
-              the current processing time. Windows do not overlap, i.e., each 
element is assigned to exactly one window.
-              This assigner comes with a default trigger that fires for a 
window a window when the current
-              processing time exceeds its end-value.
-
-            </p>
-      {% highlight scala %}
-stream.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
-      {% endhighlight %}
-          </td>
-        </tr>
-      <tr>
-        <td><strong>Sliding processing time windows</strong><br>KeyedStream 
&rarr; WindowedStream</td>
-        <td>
-          <p>
-            Incoming elements are assigned to a window of a certain size (5 
seconds below) based on
-            their timestamp. Windows "slide" by the provided value (1 second 
in the example), and hence
-            overlap. This assigner comes with a default trigger that fires for 
a window a window when the current
-            processing time exceeds its end-value.
-          </p>
-    {% highlight scala %}
-stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), 
Time.seconds(1)))
-    {% endhighlight %}
-        </td>
-      </tr>
-         <tr>
-        <td><strong>Event-time Session windows</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-            Incoming elements are assigned to sessions based on a session gap 
interval (5 seconds in the example below).
-            Elements whose timestamp differs by more than the session gap are 
assigned to different sessions. If there are
-            consecutive elements which are less than the session gap apart 
then these will also be put into the same session, i.e. elements
-            can be connected into a session by intermediate elements.
-          </p>
-    {% highlight scala %}
-keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
-    {% endhighlight %}
-        </td>
-      </tr>
-       <tr>
-        <td><strong>Processing time Session windows</strong><br>KeyedStream 
&rarr; WindowedStream</td>
-        <td>
-          <p>
-           This is similar to event-time session windows but works on the 
current processing
-           time instead of the timestamp of elements
-          </p>
-    {% highlight scala %}
-keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
-    {% endhighlight %}
-        </td>
-      </tr>
-  </tbody>
-</table>
-</div>
+{% highlight scala %}
+val input: DataStream[(String, Long)] = ...
 
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
+{% endhighlight %}
+</div>
 </div>
 
-The `Trigger` specifies when the function that comes after the window clause 
(e.g., `sum`, `count`) is evaluated ("fires")
-for each window. If a trigger is not specified, a default trigger for each 
window type is used (that is part of the
-definition of the `WindowAssigner`). Flink comes bundled with a set of 
triggers if the ones that windows use by
-default do not fit the application. You can write your own trigger by 
implementing the `Trigger` interface. Note that
-specifying a trigger will override the default trigger of the window assigner.
+A `ReduceFunction` specifies how two elements from the input can be combined 
to produce
+an output element. This example will sum up the second field of the tuple for 
all elements
+in a window.
 
-<div class="codetabs" markdown="1">
+### FoldFunction
+
+A fold function can be specified like this:
 
+<div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-  <tr>
-    <td><strong>Processing time trigger</strong></td>
-    <td>
-      <p>
-        A window is fired when the current processing time exceeds its 
end-value.
-        The elements on the triggered window are henceforth discarded.
-      </p>
 {% highlight java %}
-windowedStream.trigger(ProcessingTimeTrigger.create());
+DataStream<Tuple2<String, Long>> input = ...;
+
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .fold("", new FoldFunction<Tuple2<String, Long>, String>> {
+       public String fold(String acc, Tuple2<String, Long> value) {
+         return acc + value.f1;
+       }
+    });
 {% endhighlight %}
-    </td>
-  </tr>
-  <tr>
-    <td><strong>Watermark trigger</strong></td>
-    <td>
-      <p>
-        A window is fired when a watermark with value that exceeds the 
window's end-value has been received.
-        The elements on the triggered window are henceforth discarded.
-      </p>
-{% highlight java %}
-windowedStream.trigger(EventTimeTrigger.create());
-{% endhighlight %}
-    </td>
-  </tr>
-  <tr>
-    <td><strong>Continuous processing time trigger</strong></td>
-    <td>
-      <p>
-        A window is periodically considered for being fired (every 5 seconds 
in the example).
-        The window is actually fired only when the current processing time 
exceeds its end-value.
-        The elements on the triggered window are retained.
-      </p>
-{% highlight java %}
-windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)));
-{% endhighlight %}
-    </td>
-  </tr>
-  <tr>
-    <td><strong>Continuous watermark time trigger</strong></td>
-    <td>
-      <p>
-        A window is periodically considered for being fired (every 5 seconds 
in the example).
-        A window is actually fired when a watermark with value that exceeds 
the window's end-value has been received.
-        The elements on the triggered window are retained.
-      </p>
-{% highlight java %}
-windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.seconds(5)));
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[(String, Long)] = ...
+
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .fold("") { (acc, v) => acc + v._2 }
 {% endhighlight %}
-    </td>
-  </tr>
-  <tr>
-    <td><strong>Count trigger</strong></td>
-    <td>
-      <p>
-        A window is fired when it has more than a certain number of elements 
(1000 below).
-        The elements of the triggered window are retained.
-      </p>
+</div>
+</div>
+
+A `FoldFunction` specifies how elements from the input will be added to an 
initial
+accumulator value (`""`, the empty string, in our example). This example will 
compute
+a concatenation of all the `Long` fields of the input.
+
+### WindowFunction - The Generic Case
+
+Using a `WindowFunction` provides most flexibility, at the cost of 
performance. The reason for this
+is that elements cannot be incrementally aggregated for a window and instead 
need to be buffered
+internally until the window is considered ready for processing. A 
`WindowFunction` gets an
+`Iterable` containing all the elements of the window being processed. The 
signature of
+`WindowFunction` is this:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
-windowedStream.trigger(CountTrigger.of(1000));
+public interface WindowFunction<IN, OUT, KEY, W extends Window> extends 
Function, Serializable {
+
+  /**
+   * Evaluates the window and outputs none or several elements.
+   *
+   * @param key The key for which this window is evaluated.
+   * @param window The window that is being evaluated.
+   * @param input The elements in the window being evaluated.
+   * @param out A collector for emitting elements.
+   *
+   * @throws Exception The function may throw exceptions to fail the program 
and trigger recovery.
+   */
+  void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws 
Exception;
+}
 {% endhighlight %}
-    </td>
-  </tr>
-  <tr>
-    <td><strong>Purging trigger</strong></td>
-    <td>
-      <p>
-        Takes any trigger as an argument and forces the triggered window 
elements to be
-        "purged" (discarded) after triggering.
-      </p>
-{% highlight java %}
-windowedStream.trigger(PurgingTrigger.of(CountTrigger.of(1000)));
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+public interface WindowFunction<IN, OUT, KEY, W extends Window> extends 
Function, Serializable {
+
+  /**
+   * Evaluates the window and outputs none or several elements.
+   *
+   * @param key The key for which this window is evaluated.
+   * @param window The window that is being evaluated.
+   * @param input The elements in the window being evaluated.
+   * @param out A collector for emitting elements.
+   *
+   * @throws Exception The function may throw exceptions to fail the program 
and trigger recovery.
+   */
+  void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws 
Exception;
+}
 {% endhighlight %}
-    </td>
-  </tr>
-  <tr>
-    <td><strong>Delta trigger</strong></td>
-    <td>
-      <p>
-        A window is periodically considered for being fired (every 5000 
milliseconds in the example).
-        A window is actually fired when the value of the last added element 
exceeds the value of
-        the first element inserted in the window according to a 
`DeltaFunction`.
-      </p>
+</div>
+</div>
+
+Here we show an example that uses a `WindowFunction` to count the elements in 
a window. We do this
+because we want to access information about the window itself to emit it along 
with the count.
+This is very inefficient, however, and should be implemented with a
+`ReduceFunction` in practice. Below, we will see an example of how a 
`ReduceFunction` can
+be combined with a `WindowFunction` to get both incremental aggregation and 
the added
+information of a `WindowFunction`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
-windowedStream.trigger(new DeltaTrigger.of(5000.0, new DeltaFunction<Double>() 
{
-    @Override
-    public double getDelta (Double old, Double new) {
-        return (new - old > 0.01);
+DataStream<Tuple2<String, Long>> input = ...;
+
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .apply(new MyWindowFunction());
+
+/* ... */
+
+public class MyWindowFunction implements WindowFunction<Tuple<String, Long>, 
String, String, TimeWindow> {
+
+  void apply(String key, TimeWindow window, Iterable<Tuple<String, Long>> 
input, Collector<String> out) {
+    long count = 0;
+    for (Tuple<String, Long> in: input) {
+      count++;
     }
-}));
+    out.collect("Window: " + window + "count: " + count);
+  }
+}
+
 {% endhighlight %}
-    </td>
-  </tr>
- </tbody>
-</table>
 </div>
 
-
 <div data-lang="scala" markdown="1">
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-  <tr>
-    <td><strong>Processing time trigger</strong></td>
-    <td>
-      <p>
-        A window is fired when the current processing time exceeds its 
end-value.
-        The elements on the triggered window are henceforth discarded.
-      </p>
-{% highlight scala %}
-windowedStream.trigger(ProcessingTimeTrigger.create);
-{% endhighlight %}
-    </td>
-  </tr>
-  <tr>
-    <td><strong>Watermark trigger</strong></td>
-    <td>
-      <p>
-        A window is fired when a watermark with value that exceeds the 
window's end-value has been received.
-        The elements on the triggered window are henceforth discarded.
-      </p>
-{% highlight scala %}
-windowedStream.trigger(EventTimeTrigger.create);
-{% endhighlight %}
-    </td>
-  </tr>
-  <tr>
-    <td><strong>Continuous processing time trigger</strong></td>
-    <td>
-      <p>
-        A window is periodically considered for being fired (every 5 seconds 
in the example).
-        The window is actually fired only when the current processing time 
exceeds its end-value.
-        The elements on the triggered window are retained.
-      </p>
-{% highlight scala %}
-windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)));
-{% endhighlight %}
-    </td>
-  </tr>
-  <tr>
-    <td><strong>Continuous watermark time trigger</strong></td>
-    <td>
-      <p>
-        A window is periodically considered for being fired (every 5 seconds 
in the example).
-        A window is actually fired when a watermark with value that exceeds 
the window's end-value has been received.
-        The elements on the triggered window are retained.
-      </p>
-{% highlight scala %}
-windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.seconds(5)));
-{% endhighlight %}
-    </td>
-  </tr>
-  <tr>
-    <td><strong>Count trigger</strong></td>
-    <td>
-      <p>
-        A window is fired when it has more than a certain number of elements 
(1000 below).
-        The elements of the triggered window are retained.
-      </p>
 {% highlight scala %}
-windowedStream.trigger(CountTrigger.of(1000));
+val input: DataStream[(String, Long)] = ...
+
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .apply(new MyWindowFunction())
+
+/* ... */
+
+class MyWindowFunction extends WindowFunction[(String, Long), String, String, 
TimeWindow] {
+
+  def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], 
out: Collector[String]): () = {
+    var count = 0L
+    for (in <- input) {
+      count = count + 1
+    }
+    out.collect(s"Window $window count: $count")
+  }
+}
 {% endhighlight %}
-    </td>
-  </tr>
-  <tr>
-    <td><strong>Purging trigger</strong></td>
-    <td>
-      <p>
-        Takes any trigger as an argument and forces the triggered window 
elements to be
-        "purged" (discarded) after triggering.
-      </p>
-{% highlight scala %}
-windowedStream.trigger(PurgingTrigger.of(CountTrigger.of(1000)));
+</div>
+</div>
+
+### WindowFunction with Incremental Aggregation
+
+A `WindowFunction` can be combined with either a `ReduceFunction` or a 
`FoldFunction`. When doing
+this, the `ReduceFunction`/`FoldFunction` will be used to incrementally 
aggregate elements as they
+arrive while the `WindowFunction` will be provided with the aggregated result 
when the window is
+ready for processing. This allows to get the benefit of incremental window 
computation and also have
+the additional meta information that writing a `WindowFunction` provides.
+
+This is an example that shows how incremental aggregation functions can be 
combined with
+a `WindowFunction`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple2<String, Long>> input = ...;
+
+// for folding incremental computation
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
+
+// for reducing incremental computation
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .apply(new MyReduceFunction(), new MyWindowFunction());
 {% endhighlight %}
-    </td>
-  </tr>
-  <tr>
-    <td><strong>Delta trigger</strong></td>
-    <td>
-      <p>
-        A window is periodically considered for being fired (every 5000 
milliseconds in the example).
-        A window is actually fired when the value of the last added element 
exceeds the value of
-        the first element inserted in the window according to a 
`DeltaFunction`.
-      </p>
+</div>
+
+<div data-lang="scala" markdown="1">
 {% highlight scala %}
-windowedStream.trigger(DeltaTrigger.of(5000.0, { (old,new) => new - old > 0.01 
}))
+val input: DataStream[(String, Long)] = ...
+
+// for folding incremental computation
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction())
+
+// for reducing incremental computation
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .apply(new MyReduceFunction(), new MyWindowFunction())
 {% endhighlight %}
-    </td>
-  </tr>
- </tbody>
-</table>
 </div>
-
 </div>
 
-After the trigger fires, and before the function (e.g., `sum`, `count`) is 
applied to the window contents, an
-optional `Evictor` removes some elements from the beginning of the window 
before the remaining elements
-are passed on to the function. Flink comes bundled with a set of evictors You 
can write your own evictor by
-implementing the `Evictor` interface.
+## Dealing with Late Data
 
-<div class="codetabs" markdown="1">
+When working with event-time windowing it can happen that elements arrive 
late, i.e the
+watermark that Flink uses to keep track of the progress of event-time is 
already past the
+end timestamp of a window to which an element belongs. Please
+see [event time](/apis/streaming/event_time.html) and especially
+[late elements](/apis/streaming/event_time.html#late-elements) for a more 
thorough discussion of
+how Flink deals with event time.
+
+You can specify how a windowed transformation should deal with late elements 
and how much lateness
+is allowed. The parameter for this is called *allowed lateness*. This 
specifies by how much time
+elements can be late. Elements that arrive within the allowed lateness are 
still put into windows
+and are considered when computing window results. If elements arrive after the 
allowed lateness they
+will be dropped. Flink will also make sure that any state held by the 
windowing operation is garbage
+collected once the watermark passes the end of a window plus the allowed 
lateness.
 
+You can specify an allowed lateness like this:
+
+<div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-  <tr>
-      <td><strong>Time evictor</strong></td>
-      <td>
-        <p>
-         Evict all elements from the beginning of the window, so that elements 
from end-value - 1 second
-         until end-value are retained (the resulting window size is 1 second).
-        </p>
-  {% highlight java %}
-triggeredStream.evictor(TimeEvictor.of(Time.seconds(1)));
-  {% endhighlight %}
-      </td>
-    </tr>
-   <tr>
-       <td><strong>Count evictor</strong></td>
-       <td>
-         <p>
-          Retain 1000 elements from the end of the window backwards, evicting 
all others.
-         </p>
-   {% highlight java %}
-triggeredStream.evictor(CountEvictor.of(1000));
-   {% endhighlight %}
-       </td>
-     </tr>
-    <tr>
-        <td><strong>Delta evictor</strong></td>
-        <td>
-          <p>
-            Starting from the beginning of the window, evict elements until an 
element with
-            value lower than the value of the last element is found (by a 
threshold and a
-            DeltaFunction).
-          </p>
-    {% highlight java %}
-triggeredStream.evictor(DeltaEvictor.of(5000, new DeltaFunction<Double>() {
-  public double getDelta (Double oldValue, Double newValue) {
-      return newValue - oldValue;
-  }
-}));
-    {% endhighlight %}
-        </td>
-      </tr>
- </tbody>
-</table>
+{% highlight java %}
+DataStream<T> input = ...;
+
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .allowedLateness(<time>)
+    .<windowed transformation>(<window function>);
+{% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-  <tr>
-      <td><strong>Time evictor</strong></td>
-      <td>
-        <p>
-         Evict all elements from the beginning of the window, so that elements 
from end-value - 1 second
-         until end-value are retained (the resulting window size is 1 second).
-        </p>
-  {% highlight scala %}
-triggeredStream.evictor(TimeEvictor.of(Time.seconds(1)));
-  {% endhighlight %}
-      </td>
-    </tr>
-   <tr>
-       <td><strong>Count evictor</strong></td>
-       <td>
-         <p>
-          Retain 1000 elements from the end of the window backwards, evicting 
all others.
-         </p>
-   {% highlight scala %}
-triggeredStream.evictor(CountEvictor.of(1000));
-   {% endhighlight %}
-       </td>
-     </tr>
-    <tr>
-        <td><strong>Delta evictor</strong></td>
-        <td>
-          <p>
-            Starting from the beginning of the window, evict elements until an 
element with
-            value lower than the value of the last element is found (by a 
threshold and a
-            DeltaFunction).
-          </p>
-    {% highlight scala %}
-windowedStream.evictor(DeltaEvictor.of(5000.0, { (old,new) => new - old > 0.01 
}))
-    {% endhighlight %}
-        </td>
-      </tr>
- </tbody>
-</table>
-</div>
+{% highlight scala %}
+val input: DataStream[T] = ...
 
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .allowedLateness(<time>)
+    .<windowed transformation>(<window function>)
+{% endhighlight %}
+</div>
 </div>
 
-### Recipes for Building Windows
-
-The mechanism of window assigner, trigger, and evictor is very powerful, and 
it allows you to define
-many different kinds of windows. Flink's basic window constructs are, in fact, 
syntactic
-sugar on top of the general mechanism. Below is how some common types of 
windows can be
-constructed using the general mechanism
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 35%">Window type</th>
-      <th class="text-center">Definition</th>
-    </tr>
-  </thead>
-  <tbody>
-      <tr>
-        <td>
-         <strong>Tumbling count window</strong><br>
-    {% highlight java %}
-stream.countWindow(1000)
-    {% endhighlight %}
-       </td>
-        <td>
-    {% highlight java %}
-stream.window(GlobalWindows.create())
-  .trigger(PurgingTrigger.of(CountTrigger.of(size)))
-    {% endhighlight %}
-        </td>
-      </tr>
-      <tr>
-        <td>
-         <strong>Sliding count window</strong><br>
-    {% highlight java %}
-stream.countWindow(1000, 100)
-    {% endhighlight %}
-       </td>
-        <td>
-    {% highlight java %}
-stream.window(GlobalWindows.create())
-  .evictor(CountEvictor.of(1000))
-  .trigger(CountTrigger.of(100))
-    {% endhighlight %}
-        </td>
-      </tr>
-      <tr>
-        <td>
-         <strong>Tumbling event time window</strong><br>
-    {% highlight java %}
-stream.timeWindow(Time.seconds(5))
-    {% endhighlight %}
-       </td>
-        <td>
-    {% highlight java %}
-stream.window(TumblingEventTimeWindows.of(Time.seconds(5))
-  .trigger(EventTimeTrigger.create())
-    {% endhighlight %}
-        </td>
-      </tr>
-      <tr>
-        <td>
-         <strong>Sliding event time window</strong><br>
-    {% highlight java %}
-stream.timeWindow(Time.seconds(5), Time.seconds(1))
-    {% endhighlight %}
-       </td>
-        <td>
-    {% highlight java %}
-stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
-  .trigger(EventTimeTrigger.create())
-    {% endhighlight %}
-        </td>
-      </tr>
-      <tr>
-        <td>
-         <strong>Tumbling processing time window</strong><br>
-    {% highlight java %}
-stream.timeWindow(Time.seconds(5))
-    {% endhighlight %}
-       </td>
-        <td>
-    {% highlight java %}
-stream.window(TumblingProcessingTimeWindows.of(Time.seconds(5))
-  .trigger(ProcessingTimeTrigger.create())
-    {% endhighlight %}
-        </td>
-      </tr>
-      <tr>
-        <td>
-         <strong>Sliding processing time window</strong><br>
-    {% highlight java %}
-stream.timeWindow(Time.seconds(5), Time.seconds(1))
-    {% endhighlight %}
-       </td>
-        <td>
-    {% highlight java %}
-stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), 
Time.seconds(1)))
-  .trigger(ProcessingTimeTrigger.create())
-    {% endhighlight %}
-        </td>
-      </tr>
-  </tbody>
-</table>
-
-
-## Windows on Unkeyed Data Streams
-
-You can also define windows on regular (non-keyed) data streams using the 
`windowAll` transformation. These
-windowed data streams have all the capabilities of keyed windowed data 
streams, but are evaluated at a single
-task (and hence at a single computing node). The syntax for defining triggers 
and evictors is exactly the
-same:
+<span class="label label-info">Note</span> When using the `GlobalWindows` 
window assigner no
+data is ever considered late because the end timestamp of the global window is 
`Long.MAX_VALUE`.
+
+## Triggers
+
+A `Trigger` determines when a window (as assigned by the `WindowAssigner`) is 
ready for being
+processed by the *window function*. The trigger observes how elements are 
added to windows
+and can also keep track of the progress of processing time and event time. 
Once a trigger
+determines that a window is ready for processing, it fires. This is the signal 
for the
+window operation to take the elements that are currently in the window and 
pass them along to
+the window function to produce output for the firing window.
+
+Each `WindowAssigner` (except `GlobalWindows`) comes with a default trigger 
that should be
+appropriate for most use cases. For example, `TumblingEventTimeWindows` has an 
`EventTimeTrigger` as
+default trigger. This trigger simply fires once the watermark passes the end 
of a window.
+
+You can specify the trigger to be used by calling `trigger()` with a given 
`Trigger`. The
+whole specification of the windowed transformation would then look like this:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-nonKeyedStream
-    .windowAll(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
-    .trigger(CountTrigger.of(100))
-    .evictor(CountEvictor.of(10));
+DataStream<T> input = ...;
+
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .trigger(<trigger>)
+    .<windowed transformation>(<window function>);
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-nonKeyedStream
-    .windowAll(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
-    .trigger(CountTrigger.of(100))
-    .evictor(CountEvictor.of(10))
+val input: DataStream[T] = ...
+
+input
+    .keyBy(<key selector>)
+    .window(<window assigner>)
+    .trigger(<trigger>)
+    .<windowed transformation>(<window function>)
 {% endhighlight %}
 </div>
 </div>
 
-Basic window definitions are also available for windows on non-keyed streams:
+Flink comes with a few triggers out-of-box: there is the already mentioned 
`EventTimeTrigger` that
+fires based on the progress of event-time as measured by the watermark, the 
`ProcessingTimeTrigger`
+does the same but based on processing time and the `CountTrigger` fires once 
the number of elements
+in a window exceeds the given limit.
+
+<span class="label label-danger">Attention</span> By specifying a trigger 
using `trigger()` you
+are overwriting the default trigger of a `WindowAssigner`. For example, if you 
specify a
+`CountTrigger` for `TumblingEventTimeWindows` you will no longer get window 
firings based on the
+progress of time but only by count. Right now, you have to write your own 
custom trigger if
+you want to react based on both time and count.
+
+The internal `Trigger` API is still considered experimental but you can check 
out the code
+if you want to write your own custom trigger:
+{% gh_link 
/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
 "Trigger.java" %}.
+
+## Non-keyed Windowing
+
+You can also leave out the `keyBy()` when specifying a windowed 
transformation. This means, however,
+that Flink cannot process windows for different keys in parallel, essentially 
turning the
+transformation into a non-parallel operation.
+
+<span class="label label-danger">Warning</span> As mentioned in the 
introduction, non-keyed
+windows have the disadvantage that work cannot be distributed in the cluster 
because
+windows cannot be computed independently per key. This can have severe 
performance implications.
+
+
+The basic structure of a non-keyed windowed transformation is as follows:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<T> input = ...;
 
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-      <tr>
-        <td><strong>Tumbling time window all</strong><br>DataStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-          Defines a window of 5 seconds, that "tumbles". This means that 
elements are
-          grouped according to their timestamp in groups of 5 second duration, 
and every element belongs to exactly one window.
-          The notion of time used is controlled by the 
StreamExecutionEnvironment.
-    {% highlight java %}
-nonKeyedStream.timeWindowAll(Time.seconds(5));
-    {% endhighlight %}
-          </p>
-        </td>
-      </tr>
-      <tr>
-          <td><strong>Sliding time window all</strong><br>DataStream &rarr; 
WindowedStream</td>
-          <td>
-            <p>
-             Defines a window of 5 seconds, that "slides" by 1 second. This 
means that elements are
-             grouped according to their timestamp in groups of 5 second 
duration, and elements can belong to more than
-             one window (since windows overlap by at least 4 seconds)
-             The notion of time used is controlled by the 
StreamExecutionEnvironment.
-      {% highlight java %}
-nonKeyedStream.timeWindowAll(Time.seconds(5), Time.seconds(1));
-      {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-      <tr>
-        <td><strong>Tumbling count window all</strong><br>DataStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-          Defines a window of 1000 elements, that "tumbles". This means that 
elements are
-          grouped according to their arrival time (equivalent to processing 
time) in groups of 1000 elements,
-          and every element belongs to exactly one window.
-    {% highlight java %}
-nonKeyedStream.countWindowAll(1000)
-    {% endhighlight %}
-        </p>
-        </td>
-      </tr>
-      <tr>
-      <td><strong>Sliding count window all</strong><br>DataStream &rarr; 
WindowedStream</td>
-      <td>
-        <p>
-          Defines a window of 1000 elements, that "slides" every 100 elements. 
This means that elements are
-          grouped according to their arrival time (equivalent to processing 
time) in groups of 1000 elements,
-          and every element can belong to more than one window (as windows 
overlap by at least 900 elements).
-  {% highlight java %}
-nonKeyedStream.countWindowAll(1000, 100)
-  {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
+input
+    .windowAll(<window assigner>)
+    .<windowed transformation>(<window function>);
+{% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[T] = ...
 
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-      <tr>
-        <td><strong>Tumbling time window all</strong><br>DataStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-          Defines a window of 5 seconds, that "tumbles". This means that 
elements are
-          grouped according to their timestamp in groups of 5 second duration, 
and every element belongs to exactly one window.
-          The notion of time used is controlled by the 
StreamExecutionEnvironment.
-    {% highlight scala %}
-nonKeyedStream.timeWindowAll(Time.seconds(5));
-    {% endhighlight %}
-          </p>
-        </td>
-      </tr>
-      <tr>
-          <td><strong>Sliding time window all</strong><br>DataStream &rarr; 
WindowedStream</td>
-          <td>
-            <p>
-             Defines a window of 5 seconds, that "slides" by 1 second. This 
means that elements are
-             grouped according to their timestamp in groups of 5 second 
duration, and elements can belong to more than
-             one window (since windows overlap by at least 4 seconds)
-             The notion of time used is controlled by the 
StreamExecutionEnvironment.
-      {% highlight scala %}
-nonKeyedStream.timeWindowAll(Time.seconds(5), Time.seconds(1));
-      {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-      <tr>
-        <td><strong>Tumbling count window all</strong><br>DataStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-          Defines a window of 1000 elements, that "tumbles". This means that 
elements are
-          grouped according to their arrival time (equivalent to processing 
time) in groups of 1000 elements,
-          and every element belongs to exactly one window.
-    {% highlight scala %}
-nonKeyedStream.countWindowAll(1000)
-    {% endhighlight %}
-        </p>
-        </td>
-      </tr>
-      <tr>
-      <td><strong>Sliding count window all</strong><br>DataStream &rarr; 
WindowedStream</td>
-      <td>
-        <p>
-          Defines a window of 1000 elements, that "slides" every 100 elements. 
This means that elements are
-          grouped according to their arrival time (equivalent to processing 
time) in groups of 1000 elements,
-          and every element can belong to more than one window (as windows 
overlap by at least 900 elements).
-  {% highlight scala %}
-nonKeyedStream.countWindowAll(1000, 100)
-  {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
+input
+    .windowAll(<window assigner>)
+    .<windowed transformation>(<window function>)
+{% endhighlight %}
 </div>
 </div>

Reply via email to