Repository: flink Updated Branches: refs/heads/master c6fa05e56 -> 3df780902
[FLINK-9299] [docs] Fix errors in ProcessWindowFunction documentation Java examples This closes #6001 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/22531a87 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/22531a87 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/22531a87 Branch: refs/heads/master Commit: 22531a87158a725804734b27e091ff6597686dd6 Parents: 9ddb978 Author: yanghua <yanghua1...@gmail.com> Authored: Sun May 13 16:27:11 2018 +0800 Committer: Stephan Ewen <se...@apache.org> Committed: Thu May 17 15:12:50 2018 +0200 ---------------------------------------------------------------------- docs/dev/stream/operators/windows.md | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/22531a87/docs/dev/stream/operators/windows.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md index 649bfe8..ad2b516 100644 --- a/docs/dev/stream/operators/windows.md +++ b/docs/dev/stream/operators/windows.md @@ -504,7 +504,7 @@ private static class AverageAggregate @Override public Double getResult(Tuple2<Long, Long> accumulator) { - return accumulator.f0 / accumulator.f1; + return ((double) accumulator.f0) / accumulator.f1; } @Override @@ -730,7 +730,7 @@ input /* ... */ -public class MyProcessWindowFunction implements ProcessWindowFunction<Tuple<String, Long>, String, String, TimeWindow> { +public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple<String, Long>, String, String, TimeWindow> { void process(String key, Context context, Iterable<Tuple<String, Long>> input, Collector<String> out) { long count = 0; @@ -778,7 +778,7 @@ The example shows a `ProcessWindowFunction` that counts the elements in a window A `ProcessWindowFunction` can be combined with either a `ReduceFunction`, an `AggregateFunction`, or a `FoldFunction` to incrementally aggregate elements as they arrive in the window. When the window is closed, the `ProcessWindowFunction` will be provided with the aggregated result. -This allows to incrementally compute windows while having access to the +This allows it to incrementally compute windows while having access to the additional window meta information of the `ProcessWindowFunction`. <span class="label label-info">Note</span> You can also the legacy `WindowFunction` instead of @@ -797,7 +797,7 @@ DataStream<SensorReading> input = ...; input .keyBy(<key selector>) - .timeWindow(<window assigner>) + .timeWindow(<duration>) .reduce(new MyReduceFunction(), new MyProcessWindowFunction()); // Function definitions @@ -810,7 +810,7 @@ private static class MyReduceFunction implements ReduceFunction<SensorReading> { } private static class MyProcessWindowFunction - implements ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> { + extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> { public void process(String key, Context context, @@ -830,7 +830,7 @@ val input: DataStream[SensorReading] = ... input .keyBy(<key selector>) - .timeWindow(<window assigner>) + .timeWindow(<duration>) .reduce( (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 }, ( key: String, @@ -856,11 +856,11 @@ the average. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataStream<Tuple2<String, Long> input = ...; +DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) - .timeWindow(<window assigner>) + .timeWindow(<duration>) .aggregate(new AverageAggregate(), new MyProcessWindowFunction()); // Function definitions @@ -883,7 +883,7 @@ private static class AverageAggregate @Override public Double getResult(Tuple2<Long, Long> accumulator) { - return accumulator.f0 / accumulator.f1; + return ((double) accumulator.f0) / accumulator.f1; } @Override @@ -893,7 +893,7 @@ private static class AverageAggregate } private static class MyProcessWindowFunction - implements ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> { + extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> { public void process(String key, Context context, @@ -913,7 +913,7 @@ val input: DataStream[(String, Long)] = ... input .keyBy(<key selector>) - .timeWindow(<window assigner>) + .timeWindow(<duration>) .aggregate(new AverageAggregate(), new MyProcessWindowFunction()) // Function definitions @@ -959,7 +959,7 @@ DataStream<SensorReading> input = ...; input .keyBy(<key selector>) - .timeWindow(<window assigner>) + .timeWindow(<duration>) .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction()) // Function definitions @@ -975,7 +975,7 @@ private static class MyFoldFunction } private static class MyProcessWindowFunction - implements ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> { + extends ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> { public void process(String key, Context context, @@ -995,7 +995,7 @@ val input: DataStream[SensorReading] = ... input .keyBy(<key selector>) - .timeWindow(<window assigner>) + .timeWindow(<duration>) .fold ( ("", 0L, 0), (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },