Repository: flink Updated Branches: refs/heads/master 7c11bd7f4 -> 6d2124ee2
[FLINK-7568] Add section about consecutive windows to window doc Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d2124ee Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d2124ee Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d2124ee Branch: refs/heads/master Commit: 6d2124ee2e9a1d629bc56b470f356faa90559e1c Parents: 7d9e3bf Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Fri Sep 1 15:29:21 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Tue Sep 5 17:33:47 2017 +0200 ---------------------------------------------------------------------- docs/dev/stream/operators/windows.md | 74 +++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6d2124ee/docs/dev/stream/operators/windows.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md index 85a6630..012d531 100644 --- a/docs/dev/stream/operators/windows.md +++ b/docs/dev/stream/operators/windows.md @@ -1080,6 +1080,80 @@ as they may "bridge" the gap between two pre-existing, unmerged windows. <span class="label label-info">Attention</span> You should be aware that the elements emitted by a late firing should be treated as updated results of a previous computation, i.e., your data stream will contain multiple results for the same computation. Depending on your application, you need to take these duplicated results into account or deduplicate them. +## Working with window results + +The result of a windowed operation is again a `DataStream`, no information about the windowed +operations is retained in the result elements so if you want to keep meta-information about the +window you have to manually encode that information in the result elements in your +`ProcessWindowFunction`. The only relevant information that is set on the result elements is the +element *timestamp*. This is set to the maximum allowed timestamp of the processed window, which +is *end timestamp - 1*, since the window-end timestamp is exclusive. Note that this is true for both +event-time windows and processing-time windows. i.e. after a windowed operations elements always +have a timestamp, but this can be an event-time timestamp or a processing-time timestamp. For +processing-time windows this has no special implications but for event-time windows this together +with how watermarks interact with windows enables +[consecutive windowed operations](#consecutive-windowed-operations) with the same window sizes. We +will cover this after taking a look how watermarks interact with windows. + +### Interaction of watermarks and windows + +Before continuing in this section you might want to take a look at our section about +[event time and watermarks]({{ site.baseurl }}/dev/event_time.html). + +When watermarks arrive at the window operator this triggers two things: + - the watermark triggers computation of all windows where the maximum timestamp (which is + *end-timestamp - 1*) is smaller than the new watermark + - the watermark is forwarded (as is) to downstream operations + +Intuitively, a watermark "flushes" out any windows that would be considered late in downstream +operations once they receive that watermark. + +### Consecutive windowed operations + +As mentioned before, the way the timestamp of windowed results is computed and how watermarks +interact with windows allows stringing together consecutive windowed operations. This can be useful +when you want to do two consecutive windowed operations where you want to use different keys but +still want elements from the same upstream window to end up in the same downstream window. Consider +this example: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataStream<Integer> input = ...; + +DataStream<Integer> resultsPerKey = input + .keyBy(<key selector>) + .window(TumblingEventTimeWindows.of(Time.seconds(5))) + .reduce(new Summer()); + +DataStream<Integer> globalResults = resultsPerKey + .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) + .process(new TopKWindowFunction()); + +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val input: DataStream[Int] = ... + +val resultsPerKey = input + .keyBy(<key selector>) + .window(TumblingEventTimeWindows.of(Time.seconds(5))) + .reduce(new Summer()) + +val globalResults = resultsPerKey + .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) + .process(new TopKWindowFunction()) +{% endhighlight %} +</div> +</div> + +In this example, the results for time window `[0, 5)` from the first operation will also end up in +time window `[0, 5)` in the subsequent windowed operation. This allows calculating a sum per key +and then calculating the top-k elements within the same window in the second operation. +and then calculating the top-k elements within the same window in the second operation. + ## Useful state size considerations Windows can be defined over long periods of time (such as days, weeks, or months) and therefore accumulate very large state. There are a couple of rules to keep in mind when estimating the storage requirements of your windowing computation: