[FLINK-4997] Add doc for ProcessWindowFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f047e13 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f047e13 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f047e13 Branch: refs/heads/master Commit: 4f047e13518ad2eb493903179e38eb174a37994c Parents: 86dff0e Author: Aljoscha Krettek <[email protected]> Authored: Thu Feb 9 11:56:46 2017 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Feb 17 17:15:51 2017 +0100 ---------------------------------------------------------------------- docs/dev/windows.md | 105 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4f047e13/docs/dev/windows.md ---------------------------------------------------------------------- diff --git a/docs/dev/windows.md b/docs/dev/windows.md index f8be08f..73f348e 100644 --- a/docs/dev/windows.md +++ b/docs/dev/windows.md @@ -565,6 +565,108 @@ The example shows a `WindowFunction` to count the elements in a window. In addit <span class="label label-danger">Attention</span> Note that using `WindowFunction` for simple aggregates such as count is quite inefficient. The next section shows how a `ReduceFunction` can be combined with a `WindowFunction` to get both incremental aggregation and the added information of a `WindowFunction`. +### ProcessWindowFunction + +In places where a `WindowFunction` can be used you can also use a `ProcessWindowFunction`. This +is very similar to `WindowFunction`, except that the interface allows to query more information +about the context in which the window evaluation happens. + +This is the `ProcessWindowFunction` interface: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function { + + /** + * Evaluates the window and outputs none or several elements. + * + * @param key The key for which this window is evaluated. + * @param context The context in which the window is being evaluated. + * @param elements The elements in the window being evaluated. + * @param out A collector for emitting elements. + * + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + public abstract void process( + KEY key, + Context context, + Iterable<IN> elements, + Collector<OUT> out) throws Exception; + + /** + * The context holding window metadata + */ + public abstract class Context { + /** + * @return The window that is being evaluated. + */ + public abstract W window(); + } +} +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function { + + /** + * Evaluates the window and outputs none or several elements. + * + * @param key The key for which this window is evaluated. + * @param context The context in which the window is being evaluated. + * @param elements The elements in the window being evaluated. + * @param out A collector for emitting elements. + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + @throws[Exception] + def process( + key: KEY, + context: Context, + elements: Iterable[IN], + out: Collector[OUT]) + + /** + * The context holding window metadata + */ + abstract class Context { + /** + * @return The window that is being evaluated. + */ + def window: W + } +} +{% endhighlight %} +</div> +</div> + +It can be used like this: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataStream<Tuple2<String, Long>> input = ...; + +input + .keyBy(<key selector>) + .window(<window assigner>) + .process(new MyProcessWindowFunction()); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val input: DataStream[(String, Long)] = ... + +input + .keyBy(<key selector>) + .window(<window assigner>) + .process(new MyProcessWindowFunction()) +{% endhighlight %} +</div> +</div> + ### WindowFunction with Incremental Aggregation A `WindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction` to @@ -573,6 +675,9 @@ When the window is closed, the `WindowFunction` will be provided with the aggreg This allows to incrementally compute windows while having access to the additional window meta information of the `WindowFunction`. +<span class="label label-info">Note</span> You can also `ProcessWindowFunction` instead of +`WindowFunction` for incremental window aggregation. + #### Incremental Window Aggregation with FoldFunction The following example shows how an incremental `FoldFunction` can be combined with
