This is an automated email from the ASF dual-hosted git repository. fhueske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 396bdd1 [hotfix] [docs] Fix ProcessWindowFunction code snippets. 396bdd1 is described below commit 396bdd1459850c294403b7de2c6e4e120ff7f9e8 Author: Fabian Hueske <fhue...@apache.org> AuthorDate: Thu Aug 9 09:56:12 2018 +0200 [hotfix] [docs] Fix ProcessWindowFunction code snippets. This closes #6527. --- docs/dev/stream/operators/windows.md | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md index e5ec009..bebc5dd 100644 --- a/docs/dev/stream/operators/windows.md +++ b/docs/dev/stream/operators/windows.md @@ -724,17 +724,19 @@ A `ProcessWindowFunction` can be defined and used like this: DataStream<Tuple2<String, Long>> input = ...; input - .keyBy(<key selector>) - .window(<window assigner>) - .process(new MyProcessWindowFunction()); + .keyBy(t -> t.f0) + .timeWindow(Time.minutes(5)) + .process(new MyProcessWindowFunction()); /* ... */ -public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple<String, Long>, String, String, TimeWindow> { +public class MyProcessWindowFunction + extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> { - void process(String key, Context context, Iterable<Tuple<String, Long>> input, Collector<String> out) { + @Override + public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) { long count = 0; - for (Tuple<String, Long> in: input) { + for (Tuple2<String, Long> in: input) { count++; } out.collect("Window: " + context.window() + "count: " + count); @@ -749,9 +751,9 @@ public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple<String, val input: DataStream[(String, Long)] = ... input - .keyBy(<key selector>) - .window(<window assigner>) - .process(new MyProcessWindowFunction()) + .keyBy(_._1) + .timeWindow(Time.minutes(5)) + .process(new MyProcessWindowFunction()) /* ... */