[FLINK-2550] Rename reduceWindow to reduce on *WindowedStream, add Lambda Reduce
Lambda Reduce is the reduce method that takes a Scala Lambda function. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bac272c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bac272c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bac272c Branch: refs/heads/master Commit: 0bac272c8309f2e7567ba762076bd75eeb8ea83a Parents: 8634dbb Author: Aljoscha Krettek <[email protected]> Authored: Tue Oct 6 17:36:33 2015 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Oct 7 22:08:25 2015 +0200 ---------------------------------------------------------------------- .../api/datastream/AllWindowedStream.java | 2 +- .../api/datastream/WindowedStream.java | 2 +- .../windowing/AllWindowTranslationTest.java | 6 ++-- .../windowing/TimeWindowTranslationTest.java | 4 +-- .../windowing/WindowTranslationTest.java | 6 ++-- .../GroupedProcessingTimeWindowExample.java | 2 +- .../streaming/api/scala/AllWindowedStream.scala | 31 ++++++++++++++++++-- .../streaming/api/scala/WindowedStream.scala | 31 ++++++++++++++++++-- .../api/scala/AllWindowTranslationTest.scala | 6 ++-- .../api/scala/WindowTranslationTest.scala | 6 ++-- 10 files changed, 75 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 78ba8ad..0cc1854 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -120,7 +120,7 @@ public class AllWindowedStream<T, W extends Window> { * @param function The reduce function. * @return The data stream that is the result of applying the reduce function to the window. */ - public DataStream<T> reduceWindow(ReduceFunction<T> function) { + public DataStream<T> reduce(ReduceFunction<T> function) { //clean the closure function = input.getExecutionEnvironment().clean(function); http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 349651e..0ea9cad 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -131,7 +131,7 @@ public class WindowedStream<T, K, W extends Window> { * @param function The reduce function. * @return The data stream that is the result of applying the reduce function to the window. */ - public DataStream<T> reduceWindow(ReduceFunction<T> function) { + public DataStream<T> reduce(ReduceFunction<T> function) { //clean the closure function = input.getExecutionEnvironment().clean(function); http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java index 767b40c..09a7149 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java @@ -66,7 +66,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase DataStream<Tuple2<String, Integer>> window1 = source .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) - .reduceWindow(reducer); + .reduce(reducer); OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); @@ -103,7 +103,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase DataStream<Tuple2<String, Integer>> window1 = source .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .trigger(CountTrigger.of(100)) - .reduceWindow(reducer); + .reduce(reducer); OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); @@ -149,7 +149,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase DataStream<Tuple2<String, Integer>> window1 = source .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .evictor(CountEvictor.of(100)) - .reduceWindow(reducer); + .reduce(reducer); OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java index 76d7bfe..76c6f20 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java @@ -59,7 +59,7 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase DataStream<Tuple2<String, Integer>> window1 = source .keyBy(0) .timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS)) - .reduceWindow(reducer); + .reduce(reducer); OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); @@ -103,7 +103,7 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase DataStream<Tuple2<String, Integer>> window1 = source .timeWindowAll(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS)) - .reduceWindow(reducer); + .reduce(reducer); OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java index 9dc6687..5124add 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java @@ -66,7 +66,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { .keyBy(0) .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) - .reduceWindow(reducer); + .reduce(reducer); OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); @@ -105,7 +105,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { .keyBy(0) .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .trigger(CountTrigger.of(100)) - .reduceWindow(reducer); + .reduce(reducer); OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); @@ -153,7 +153,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { .keyBy(0) .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .evictor(CountEvictor.of(100)) - .reduceWindow(reducer); + .reduce(reducer); OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java index 5d32b8e..982b73d 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java @@ -79,7 +79,7 @@ public class GroupedProcessingTimeWindowExample { stream .keyBy(0) .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS)) - .reduceWindow(new SummingReducer()) + .reduce(new SummingReducer()) // alternative: use a apply function which does not pre-aggregate // .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>()) http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala index 4f36722..9054d95 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala @@ -73,6 +73,26 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { // ------------------------------------------------------------------------ // Operations on the keyed windows // ------------------------------------------------------------------------ + + /** + * Applies a reduce function to the window. The window function is called for each evaluation + * of the window for each key individually. The output of the reduce function is interpreted + * as a regular non-windowed stream. + * + * This window will try and pre-aggregate data as much as the window policies permit. For example, + * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per + * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide + * interval, so a few elements are stored per key (one per slide interval). + * Custom windows may not be able to pre-aggregate, or may need to store extra values in an + * aggregation tree. + * + * @param function The reduce function. + * @return The data stream that is the result of applying the reduce function to the window. + */ + def reduce(function: ReduceFunction[T]): DataStream[T] = { + javaStream.reduce(clean(function)) + } + /** * Applies a reduce function to the window. The window function is called for each evaluation * of the window for each key individually. The output of the reduce function is interpreted @@ -88,8 +108,15 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * @param function The reduce function. * @return The data stream that is the result of applying the reduce function to the window. */ - def reduceWindow(function: ReduceFunction[T]): DataStream[T] = { - javaStream.reduceWindow(clean(function)) + def reduce(function: (T, T) => T): DataStream[T] = { + if (function == null) { + throw new NullPointerException("Reduce function must not be null.") + } + val cleanFun = clean(function) + val reducer = new ReduceFunction[T] { + def reduce(v1: T, v2: T) = { cleanFun(v1, v2) } + } + reduce(reducer) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala index a688846..2d6806d 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala @@ -76,6 +76,26 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { // ------------------------------------------------------------------------ // Operations on the keyed windows // ------------------------------------------------------------------------ + + /** + * Applies a reduce function to the window. The window function is called for each evaluation + * of the window for each key individually. The output of the reduce function is interpreted + * as a regular non-windowed stream. + * + * This window will try and pre-aggregate data as much as the window policies permit. For example, + * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per + * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide + * interval, so a few elements are stored per key (one per slide interval). + * Custom windows may not be able to pre-aggregate, or may need to store extra values in an + * aggregation tree. + * + * @param function The reduce function. + * @return The data stream that is the result of applying the reduce function to the window. + */ + def reduce(function: ReduceFunction[T]): DataStream[T] = { + javaStream.reduce(clean(function)) + } + /** * Applies a reduce function to the window. The window function is called for each evaluation * of the window for each key individually. The output of the reduce function is interpreted @@ -91,8 +111,15 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * @param function The reduce function. * @return The data stream that is the result of applying the reduce function to the window. */ - def reduceWindow(function: ReduceFunction[T]): DataStream[T] = { - javaStream.reduceWindow(clean(function)) + def reduce(function: (T, T) => T): DataStream[T] = { + if (function == null) { + throw new NullPointerException("Reduce function must not be null.") + } + val cleanFun = clean(function) + val reducer = new ReduceFunction[T] { + def reduce(v1: T, v2: T) = { cleanFun(v1, v2) } + } + reduce(reducer) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala index 247256f..dece9f6 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala @@ -58,7 +58,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { .windowAll(SlidingProcessingTimeWindows.of( Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) - .reduceWindow(reducer) + .reduce(reducer) val transform1 = window1.getJavaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] @@ -100,7 +100,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .trigger(CountTrigger.of(100)) - .reduceWindow(reducer) + .reduce(reducer) val transform1 = window1.getJavaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] @@ -150,7 +150,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS))) - .reduceWindow(reducer) + .reduce(reducer) val transform1 = window1.getJavaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala index f1b05c6..fa9c0a9 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala @@ -55,7 +55,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { .window(SlidingProcessingTimeWindows.of( Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) - .reduceWindow(reducer) + .reduce(reducer) val transform1 = window1.getJavaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] @@ -99,7 +99,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .trigger(CountTrigger.of(100)) - .reduceWindow(reducer) + .reduce(reducer) val transform1 = window1.getJavaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] @@ -152,7 +152,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS))) - .reduceWindow(reducer) + .reduce(reducer) val transform1 = window1.getJavaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
