Repository: flink
Updated Branches:
  refs/heads/master 7c11bd7f4 -> 6d2124ee2


[FLINK-7568] Add section about consecutive windows to window doc


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d2124ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d2124ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d2124ee

Branch: refs/heads/master
Commit: 6d2124ee2e9a1d629bc56b470f356faa90559e1c
Parents: 7d9e3bf
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Fri Sep 1 15:29:21 2017 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Tue Sep 5 17:33:47 2017 +0200

----------------------------------------------------------------------
 docs/dev/stream/operators/windows.md | 74 +++++++++++++++++++++++++++++++
 1 file changed, 74 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6d2124ee/docs/dev/stream/operators/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/windows.md 
b/docs/dev/stream/operators/windows.md
index 85a6630..012d531 100644
--- a/docs/dev/stream/operators/windows.md
+++ b/docs/dev/stream/operators/windows.md
@@ -1080,6 +1080,80 @@ as they may "bridge" the gap between two pre-existing, 
unmerged windows.
 
 <span class="label label-info">Attention</span> You should be aware that the 
elements emitted by a late firing should be treated as updated results of a 
previous computation, i.e., your data stream will contain multiple results for 
the same computation. Depending on your application, you need to take these 
duplicated results into account or deduplicate them.
 
+## Working with window results
+
+The result of a windowed operation is again a `DataStream`, no information 
about the windowed
+operations is retained in the result elements so if you want to keep 
meta-information about the
+window you have to manually encode that information in the result elements in 
your
+`ProcessWindowFunction`. The only relevant information that is set on the 
result elements is the
+element *timestamp*. This is set to the maximum allowed timestamp of the 
processed window, which
+is *end timestamp - 1*, since the window-end timestamp is exclusive. Note that 
this is true for both
+event-time windows and processing-time windows. i.e. after a windowed 
operations elements always
+have a timestamp, but this can be an event-time timestamp or a processing-time 
timestamp. For
+processing-time windows this has no special implications but for event-time 
windows this together
+with how watermarks interact with windows enables
+[consecutive windowed operations](#consecutive-windowed-operations) with the 
same window sizes. We
+will cover this after taking a look how watermarks interact with windows.
+
+### Interaction of watermarks and windows
+
+Before continuing in this section you might want to take a look at our section 
about
+[event time and watermarks]({{ site.baseurl }}/dev/event_time.html).
+
+When watermarks arrive at the window operator this triggers two things:
+ - the watermark triggers computation of all windows where the maximum 
timestamp (which is
+ *end-timestamp - 1*) is smaller than the new watermark
+ - the watermark is forwarded (as is) to downstream operations
+
+Intuitively, a watermark "flushes" out any windows that would be considered 
late in downstream
+operations once they receive that watermark.
+
+### Consecutive windowed operations
+
+As mentioned before, the way the timestamp of windowed results is computed and 
how watermarks
+interact with windows allows stringing together consecutive windowed 
operations. This can be useful
+when you want to do two consecutive windowed operations where you want to use 
different keys but
+still want elements from the same upstream window to end up in the same 
downstream window. Consider
+this example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Integer> input = ...;
+
+DataStream<Integer> resultsPerKey = input
+    .keyBy(<key selector>)
+    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
+    .reduce(new Summer());
+
+DataStream<Integer> globalResults = resultsPerKey
+    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
+    .process(new TopKWindowFunction());
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[Int] = ...
+
+val resultsPerKey = input
+    .keyBy(<key selector>)
+    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
+    .reduce(new Summer())
+
+val globalResults = resultsPerKey
+    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
+    .process(new TopKWindowFunction())
+{% endhighlight %}
+</div>
+</div>
+
+In this example, the results for time window `[0, 5)` from the first operation 
will also end up in
+time window `[0, 5)` in the subsequent windowed operation. This allows 
calculating a sum per key
+and then calculating the top-k elements within the same window in the second 
operation.
+and then calculating the top-k elements within the same window in the second 
operation.
+
 ## Useful state size considerations
 
 Windows can be defined over long periods of time (such as days, weeks, or 
months) and therefore accumulate very large state. There are a couple of rules 
to keep in mind when estimating the storage requirements of your windowing 
computation:

Reply via email to