[FLINK-2550] Rename reduceWindow to reduce on *WindowedStream, add Lambda Reduce

Lambda Reduce is the reduce method that takes a Scala Lambda function.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bac272c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bac272c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bac272c

Branch: refs/heads/master
Commit: 0bac272c8309f2e7567ba762076bd75eeb8ea83a
Parents: 8634dbb
Author: Aljoscha Krettek <[email protected]>
Authored: Tue Oct 6 17:36:33 2015 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed Oct 7 22:08:25 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       |  2 +-
 .../api/datastream/WindowedStream.java          |  2 +-
 .../windowing/AllWindowTranslationTest.java     |  6 ++--
 .../windowing/TimeWindowTranslationTest.java    |  4 +--
 .../windowing/WindowTranslationTest.java        |  6 ++--
 .../GroupedProcessingTimeWindowExample.java     |  2 +-
 .../streaming/api/scala/AllWindowedStream.scala | 31 ++++++++++++++++++--
 .../streaming/api/scala/WindowedStream.scala    | 31 ++++++++++++++++++--
 .../api/scala/AllWindowTranslationTest.scala    |  6 ++--
 .../api/scala/WindowTranslationTest.scala       |  6 ++--
 10 files changed, 75 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 78ba8ad..0cc1854 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -120,7 +120,7 @@ public class AllWindowedStream<T, W extends Window> {
         * @param function The reduce function.
         * @return The data stream that is the result of applying the reduce 
function to the window. 
         */
-       public DataStream<T> reduceWindow(ReduceFunction<T> function) {
+       public DataStream<T> reduce(ReduceFunction<T> function) {
                //clean the closure
                function = input.getExecutionEnvironment().clean(function);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 349651e..0ea9cad 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -131,7 +131,7 @@ public class WindowedStream<T, K, W extends Window> {
         * @param function The reduce function.
         * @return The data stream that is the result of applying the reduce 
function to the window. 
         */
-       public DataStream<T> reduceWindow(ReduceFunction<T> function) {
+       public DataStream<T> reduce(ReduceFunction<T> function) {
                //clean the closure
                function = input.getExecutionEnvironment().clean(function);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 767b40c..09a7149 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -66,7 +66,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
 
                DataStream<Tuple2<String, Integer>> window1 = source
                                
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
-                               .reduceWindow(reducer);
+                               .reduce(reducer);
 
                OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
@@ -103,7 +103,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                DataStream<Tuple2<String, Integer>> window1 = source
                                
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
                                .trigger(CountTrigger.of(100))
-                               .reduceWindow(reducer);
+                               .reduce(reducer);
 
                OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
@@ -149,7 +149,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                DataStream<Tuple2<String, Integer>> window1 = source
                                
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
                                .evictor(CountEvictor.of(100))
-                               .reduceWindow(reducer);
+                               .reduce(reducer);
 
                OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index 76d7bfe..76c6f20 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -59,7 +59,7 @@ public class TimeWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                DataStream<Tuple2<String, Integer>> window1 = source
                                .keyBy(0)
                                .timeWindow(Time.of(1000, 
TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
-                               .reduceWindow(reducer);
+                               .reduce(reducer);
 
                OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
@@ -103,7 +103,7 @@ public class TimeWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                DataStream<Tuple2<String, Integer>> window1 = source
                                .timeWindowAll(Time.of(1000, 
TimeUnit.MILLISECONDS),
                                                Time.of(100, 
TimeUnit.MILLISECONDS))
-                               .reduceWindow(reducer);
+                               .reduce(reducer);
 
                OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 9dc6687..5124add 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -66,7 +66,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                                .keyBy(0)
                                
.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS),
                                                Time.of(100, 
TimeUnit.MILLISECONDS)))
-                               .reduceWindow(reducer);
+                               .reduce(reducer);
 
                OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
@@ -105,7 +105,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                                .keyBy(0)
                                
.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
                                .trigger(CountTrigger.of(100))
-                               .reduceWindow(reducer);
+                               .reduce(reducer);
 
                OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
@@ -153,7 +153,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                                .keyBy(0)
                                
.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
                                .evictor(CountEvictor.of(100))
-                               .reduceWindow(reducer);
+                               .reduce(reducer);
 
                OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
index 5d32b8e..982b73d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -79,7 +79,7 @@ public class GroupedProcessingTimeWindowExample {
                stream
                        .keyBy(0)
                        .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, 
MILLISECONDS))
-                       .reduceWindow(new SummingReducer())
+                       .reduce(new SummingReducer())
 
                        // alternative: use a apply function which does not 
pre-aggregate
 //                     .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, 
Long>())

http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 4f36722..9054d95 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -73,6 +73,26 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
   // ------------------------------------------------------------------------
   //  Operations on the keyed windows
   // ------------------------------------------------------------------------
+
+  /**
+   * Applies a reduce function to the window. The window function is called 
for each evaluation
+   * of the window for each key individually. The output of the reduce 
function is interpreted
+   * as a regular non-windowed stream.
+   *
+   * This window will try and pre-aggregate data as much as the window 
policies permit. For example,
+   * tumbling time windows can perfectly pre-aggregate the data, meaning that 
only one element per
+   * key is stored. Sliding time windows will pre-aggregate on the granularity 
of the slide
+   * interval, so a few elements are stored per key (one per slide interval).
+   * Custom windows may not be able to pre-aggregate, or may need to store 
extra values in an
+   * aggregation tree.
+   *
+   * @param function The reduce function.
+   * @return The data stream that is the result of applying the reduce 
function to the window.
+   */
+  def reduce(function: ReduceFunction[T]): DataStream[T] = {
+    javaStream.reduce(clean(function))
+  }
+
   /**
    * Applies a reduce function to the window. The window function is called 
for each evaluation
    * of the window for each key individually. The output of the reduce 
function is interpreted
@@ -88,8 +108,15 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
    * @param function The reduce function.
    * @return The data stream that is the result of applying the reduce 
function to the window.
    */
-  def reduceWindow(function: ReduceFunction[T]): DataStream[T] = {
-    javaStream.reduceWindow(clean(function))
+  def reduce(function: (T, T) => T): DataStream[T] = {
+    if (function == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    val cleanFun = clean(function)
+    val reducer = new ReduceFunction[T] {
+      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+    }
+    reduce(reducer)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index a688846..2d6806d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -76,6 +76,26 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
   // ------------------------------------------------------------------------
   //  Operations on the keyed windows
   // ------------------------------------------------------------------------
+
+  /**
+   * Applies a reduce function to the window. The window function is called 
for each evaluation
+   * of the window for each key individually. The output of the reduce 
function is interpreted
+   * as a regular non-windowed stream.
+   *
+   * This window will try and pre-aggregate data as much as the window 
policies permit. For example,
+   * tumbling time windows can perfectly pre-aggregate the data, meaning that 
only one element per
+   * key is stored. Sliding time windows will pre-aggregate on the granularity 
of the slide
+   * interval, so a few elements are stored per key (one per slide interval).
+   * Custom windows may not be able to pre-aggregate, or may need to store 
extra values in an
+   * aggregation tree.
+   *
+   * @param function The reduce function.
+   * @return The data stream that is the result of applying the reduce 
function to the window.
+   */
+  def reduce(function: ReduceFunction[T]): DataStream[T] = {
+    javaStream.reduce(clean(function))
+  }
+
   /**
    * Applies a reduce function to the window. The window function is called 
for each evaluation
    * of the window for each key individually. The output of the reduce 
function is interpreted
@@ -91,8 +111,15 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
    * @param function The reduce function.
    * @return The data stream that is the result of applying the reduce 
function to the window.
    */
-  def reduceWindow(function: ReduceFunction[T]): DataStream[T] = {
-    javaStream.reduceWindow(clean(function))
+  def reduce(function: (T, T) => T): DataStream[T] = {
+    if (function == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    val cleanFun = clean(function)
+    val reducer = new ReduceFunction[T] {
+      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+    }
+    reduce(reducer)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index 247256f..dece9f6 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -58,7 +58,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
       .windowAll(SlidingProcessingTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
-      .reduceWindow(reducer)
+      .reduce(reducer)
 
     val transform1 = window1.getJavaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
@@ -100,7 +100,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .trigger(CountTrigger.of(100))
-      .reduceWindow(reducer)
+      .reduce(reducer)
 
     val transform1 = window1.getJavaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
@@ -150,7 +150,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
-      .reduceWindow(reducer)
+      .reduce(reducer)
 
     val transform1 = window1.getJavaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]

http://git-wip-us.apache.org/repos/asf/flink/blob/0bac272c/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index f1b05c6..fa9c0a9 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -55,7 +55,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
       .window(SlidingProcessingTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
-      .reduceWindow(reducer)
+      .reduce(reducer)
 
     val transform1 = window1.getJavaStream.getTransformation
         .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
@@ -99,7 +99,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .trigger(CountTrigger.of(100))
-      .reduceWindow(reducer)
+      .reduce(reducer)
 
     val transform1 = window1.getJavaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
@@ -152,7 +152,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
-      .reduceWindow(reducer)
+      .reduce(reducer)
 
     val transform1 = window1.getJavaStream.getTransformation
       .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]

Reply via email to