Repository: flink Updated Branches: refs/heads/master 0214e8003 -> 404e37d21
[hotfix] Change order of reduce and fold in window documentation In the section about incremental aggregation with a window function the order of fold and reduce was different from the order of fold and reduce in the rest of the documentation. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa2f92c7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa2f92c7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa2f92c7 Branch: refs/heads/master Commit: aa2f92c73d2b2bfbd57f341597407ebcb44ca174 Parents: 0214e80 Author: Aljoscha Krettek <[email protected]> Authored: Mon Oct 16 13:25:15 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Oct 16 13:25:15 2017 +0200 ---------------------------------------------------------------------- docs/dev/stream/operators/windows.md | 112 +++++++++++++++--------------- 1 file changed, 56 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/aa2f92c7/docs/dev/stream/operators/windows.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md index 0ecae0c..79a2dd6 100644 --- a/docs/dev/stream/operators/windows.md +++ b/docs/dev/stream/operators/windows.md @@ -506,13 +506,13 @@ public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> impl * Returns the window that is being evaluated. */ public abstract W window(); - + /** Returns the current processing time. */ public abstract long currentProcessingTime(); - + /** Returns the current event-time watermark. */ public abstract long currentWatermark(); - + /** * State accessor for per-key and per-window state. * @@ -520,7 +520,7 @@ public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> impl * by implementing {@link ProcessWindowFunction#clear(Context)}. */ public abstract KeyedStateStore windowState(); - + /** * State accessor for per-key global state. */ @@ -558,22 +558,22 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function * Returns the window that is being evaluated. */ def window: W - + /** * Returns the current processing time. */ def currentProcessingTime: Long - + /** * Returns the current event-time watermark. */ def currentWatermark: Long - + /** * State accessor for per-key and per-window state. */ def windowState: KeyedStateStore - + /** * State accessor for per-key global state. */ @@ -658,11 +658,11 @@ additional window meta information of the `ProcessWindowFunction`. <span class="label label-info">Note</span> You can also the legacy `WindowFunction` instead of `ProcessWindowFunction` for incremental window aggregation. -#### Incremental Window Aggregation with FoldFunction +#### Incremental Window Aggregation with ReduceFunction -The following example shows how an incremental `FoldFunction` can be combined with -a `ProcessWindowFunction` to extract the number of events in the window and return also -the key and end time of the window. +The following example shows how an incremental `ReduceFunction` can be combined with +a `WindowFunction` to return the smallest event in a window along +with the start time of the window. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> @@ -672,29 +672,26 @@ DataStream<SensorReading> input = ...; input .keyBy(<key selector>) .timeWindow(<window assigner>) - .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction()) + .reduce(new MyReduceFunction(), new MyProcessWindowFunction()); // Function definitions -private static class MyFoldFunction - implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > { +private static class MyReduceFunction implements ReduceFunction<SensorReading> { - public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) { - Integer cur = acc.getField(2); - acc.setField(2, cur + 1); - return acc; + public SensorReading reduce(SensorReading r1, SensorReading r2) { + return r1.value() > r2.value() ? r2 : r1; } } private static class MyProcessWindowFunction - implements ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> { + implements ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> { - public void process(String key, + public void apply(String key, Context context, - Iterable<Tuple3<String, Long, Integer>> counts, - Collector<Tuple3<String, Long, Integer>> out) { - Integer count = counts.iterator().next().getField(2); - out.collect(new Tuple3<String, Long, Integer>(key, context.window().getEnd(),count)); + Iterable<SensorReading> minReadings, + Collector<Tuple2<Long, SensorReading>> out) { + SensorReading min = minReadings.iterator().next(); + out.collect(new Tuple2<Long, SensorReading>(window.getStart(), min)); } } @@ -706,18 +703,17 @@ private static class MyProcessWindowFunction val input: DataStream[SensorReading] = ... input - .keyBy(<key selector>) - .timeWindow(<window assigner>) - .fold ( - ("", 0L, 0), - (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) }, + .keyBy(<key selector>) + .timeWindow(<window assigner>) + .reduce( + (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 }, ( key: String, window: TimeWindow, - counts: Iterable[(String, Long, Int)], - out: Collector[(String, Long, Int)] ) => + minReadings: Iterable[SensorReading], + out: Collector[(Long, SensorReading)] ) => { - val count = counts.iterator.next() - out.collect((key, window.getEnd, count._3)) + val min = minReadings.iterator.next() + out.collect((window.getStart, min)) } ) @@ -725,11 +721,11 @@ input </div> </div> -#### Incremental Window Aggregation with ReduceFunction +#### Incremental Window Aggregation with FoldFunction -The following example shows how an incremental `ReduceFunction` can be combined with -a `WindowFunction` to return the smallest event in a window along -with the start time of the window. +The following example shows how an incremental `FoldFunction` can be combined with +a `ProcessWindowFunction` to extract the number of events in the window and return also +the key and end time of the window. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> @@ -739,26 +735,29 @@ DataStream<SensorReading> input = ...; input .keyBy(<key selector>) .timeWindow(<window assigner>) - .reduce(new MyReduceFunction(), new MyProcessWindowFunction()); + .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction()) // Function definitions -private static class MyReduceFunction implements ReduceFunction<SensorReading> { +private static class MyFoldFunction + implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > { - public SensorReading reduce(SensorReading r1, SensorReading r2) { - return r1.value() > r2.value() ? r2 : r1; + public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) { + Integer cur = acc.getField(2); + acc.setField(2, cur + 1); + return acc; } } private static class MyProcessWindowFunction - implements ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> { + implements ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> { - public void apply(String key, + public void process(String key, Context context, - Iterable<SensorReading> minReadings, - Collector<Tuple2<Long, SensorReading>> out) { - SensorReading min = minReadings.iterator().next(); - out.collect(new Tuple2<Long, SensorReading>(window.getStart(), min)); + Iterable<Tuple3<String, Long, Integer>> counts, + Collector<Tuple3<String, Long, Integer>> out) { + Integer count = counts.iterator().next().getField(2); + out.collect(new Tuple3<String, Long, Integer>(key, context.window().getEnd(),count)); } } @@ -770,17 +769,18 @@ private static class MyProcessWindowFunction val input: DataStream[SensorReading] = ... input - .keyBy(<key selector>) - .timeWindow(<window assigner>) - .reduce( - (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 }, + .keyBy(<key selector>) + .timeWindow(<window assigner>) + .fold ( + ("", 0L, 0), + (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) }, ( key: String, window: TimeWindow, - minReadings: Iterable[SensorReading], - out: Collector[(Long, SensorReading)] ) => + counts: Iterable[(String, Long, Int)], + out: Collector[(String, Long, Int)] ) => { - val min = minReadings.iterator.next() - out.collect((window.getStart, min)) + val count = counts.iterator.next() + out.collect((key, window.getEnd, count._3)) } )
