Repository: flink
Updated Branches:
  refs/heads/master c6fa05e56 -> 3df780902


[FLINK-9299] [docs] Fix errors in ProcessWindowFunction documentation Java 
examples

This closes #6001


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/22531a87
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/22531a87
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/22531a87

Branch: refs/heads/master
Commit: 22531a87158a725804734b27e091ff6597686dd6
Parents: 9ddb978
Author: yanghua <yanghua1...@gmail.com>
Authored: Sun May 13 16:27:11 2018 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 17 15:12:50 2018 +0200

----------------------------------------------------------------------
 docs/dev/stream/operators/windows.md | 28 ++++++++++++++--------------
 1 file changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/22531a87/docs/dev/stream/operators/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/windows.md 
b/docs/dev/stream/operators/windows.md
index 649bfe8..ad2b516 100644
--- a/docs/dev/stream/operators/windows.md
+++ b/docs/dev/stream/operators/windows.md
@@ -504,7 +504,7 @@ private static class AverageAggregate
 
   @Override
   public Double getResult(Tuple2<Long, Long> accumulator) {
-    return accumulator.f0 / accumulator.f1;
+    return ((double) accumulator.f0) / accumulator.f1;
   }
 
   @Override
@@ -730,7 +730,7 @@ input
 
 /* ... */
 
-public class MyProcessWindowFunction implements 
ProcessWindowFunction<Tuple<String, Long>, String, String, TimeWindow> {
+public class MyProcessWindowFunction extends 
ProcessWindowFunction<Tuple<String, Long>, String, String, TimeWindow> {
 
   void process(String key, Context context, Iterable<Tuple<String, Long>> 
input, Collector<String> out) {
     long count = 0;
@@ -778,7 +778,7 @@ The example shows a `ProcessWindowFunction` that counts the 
elements in a window
 A `ProcessWindowFunction` can be combined with either a `ReduceFunction`, an 
`AggregateFunction`, or a `FoldFunction` to
 incrementally aggregate elements as they arrive in the window.
 When the window is closed, the `ProcessWindowFunction` will be provided with 
the aggregated result.
-This allows to incrementally compute windows while having access to the
+This allows it to incrementally compute windows while having access to the
 additional window meta information of the `ProcessWindowFunction`.
 
 <span class="label label-info">Note</span> You can also the legacy 
`WindowFunction` instead of
@@ -797,7 +797,7 @@ DataStream<SensorReading> input = ...;
 
 input
   .keyBy(<key selector>)
-  .timeWindow(<window assigner>)
+  .timeWindow(<duration>)
   .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
 
 // Function definitions
@@ -810,7 +810,7 @@ private static class MyReduceFunction implements 
ReduceFunction<SensorReading> {
 }
 
 private static class MyProcessWindowFunction
-    implements ProcessWindowFunction<SensorReading, Tuple2<Long, 
SensorReading>, String, TimeWindow> {
+    extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, 
String, TimeWindow> {
 
   public void process(String key,
                     Context context,
@@ -830,7 +830,7 @@ val input: DataStream[SensorReading] = ...
 
 input
   .keyBy(<key selector>)
-  .timeWindow(<window assigner>)
+  .timeWindow(<duration>)
   .reduce(
     (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 
else r1 },
     ( key: String,
@@ -856,11 +856,11 @@ the average.
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-DataStream<Tuple2<String, Long> input = ...;
+DataStream<Tuple2<String, Long>> input = ...;
 
 input
   .keyBy(<key selector>)
-  .timeWindow(<window assigner>)
+  .timeWindow(<duration>)
   .aggregate(new AverageAggregate(), new MyProcessWindowFunction());
 
 // Function definitions
@@ -883,7 +883,7 @@ private static class AverageAggregate
 
   @Override
   public Double getResult(Tuple2<Long, Long> accumulator) {
-    return accumulator.f0 / accumulator.f1;
+    return ((double) accumulator.f0) / accumulator.f1;
   }
 
   @Override
@@ -893,7 +893,7 @@ private static class AverageAggregate
 }
 
 private static class MyProcessWindowFunction
-    implements ProcessWindowFunction<Double, Tuple2<String, Double>, String, 
TimeWindow> {
+    extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, 
TimeWindow> {
 
   public void process(String key,
                     Context context,
@@ -913,7 +913,7 @@ val input: DataStream[(String, Long)] = ...
 
 input
   .keyBy(<key selector>)
-  .timeWindow(<window assigner>)
+  .timeWindow(<duration>)
   .aggregate(new AverageAggregate(), new MyProcessWindowFunction())
 
 // Function definitions
@@ -959,7 +959,7 @@ DataStream<SensorReading> input = ...;
 
 input
   .keyBy(<key selector>)
-  .timeWindow(<window assigner>)
+  .timeWindow(<duration>)
   .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new 
MyProcessWindowFunction())
 
 // Function definitions
@@ -975,7 +975,7 @@ private static class MyFoldFunction
 }
 
 private static class MyProcessWindowFunction
-    implements ProcessWindowFunction<Tuple3<String, Long, Integer>, 
Tuple3<String, Long, Integer>, String, TimeWindow> {
+    extends ProcessWindowFunction<Tuple3<String, Long, Integer>, 
Tuple3<String, Long, Integer>, String, TimeWindow> {
 
   public void process(String key,
                     Context context,
@@ -995,7 +995,7 @@ val input: DataStream[SensorReading] = ...
 
 input
  .keyBy(<key selector>)
- .timeWindow(<window assigner>)
+ .timeWindow(<duration>)
  .fold (
     ("", 0L, 0),
     (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },

Reply via email to