Repository: flink
Updated Branches:
  refs/heads/master 1f9c0cf85 -> 698e53e47


[FLINK-3869] Replace WindowedStream.apply() by reduce()/fold()

Before, there where these overloads for apply():
 - appply(ReduceFunction, WindowFunction)
 - apply(T initial, FoldFunction, WindowFunction)

These are now called reduce() and fold(). We keep the old methods and
deprecate them for compatibility.

This also fixes a problem with apply(T initial, FoldFunction,
WindowFunction) being to restrictive.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/698e53e4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/698e53e4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/698e53e4

Branch: refs/heads/master
Commit: 698e53e473fa0f78ed1e23716a50adc914563754
Parents: 1f9c0cf
Author: Yassine Marzougui <[email protected]>
Authored: Sun Nov 20 00:19:10 2016 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Tue Nov 22 17:41:07 2016 +0100

----------------------------------------------------------------------
 docs/dev/windows.md                             |  12 +-
 .../api/datastream/AllWindowedStream.java       | 226 ++++++++++++++++++-
 .../api/datastream/WindowedStream.java          | 219 +++++++++++++++++-
 .../windowing/FoldApplyAllWindowFunction.java   |  25 +-
 .../windowing/FoldApplyWindowFunction.java      |  22 +-
 .../operators/FoldApplyWindowFunctionTest.java  |   5 +-
 .../streaming/api/scala/AllWindowedStream.scala | 135 ++++++++++-
 .../streaming/api/scala/WindowedStream.scala    | 127 +++++++++++
 .../OnWindowedStream.scala                      |  13 +-
 .../api/scala/AllWindowTranslationTest.scala    |   4 +-
 .../streaming/api/scala/WindowFoldITCase.scala  |   4 +-
 .../api/scala/WindowReduceITCase.scala          |   4 +-
 .../api/scala/WindowTranslationTest.scala       |   4 +-
 .../EventTimeAllWindowCheckpointingITCase.java  |   6 +-
 .../EventTimeWindowCheckpointingITCase.java     |   4 +-
 15 files changed, 748 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/docs/dev/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/windows.md b/docs/dev/windows.md
index 4bce07b..d6189d4 100644
--- a/docs/dev/windows.md
+++ b/docs/dev/windows.md
@@ -498,10 +498,6 @@ The following example shows how an incremental 
`FoldFunction` can be combined wi
 a `WindowFunction` to extract the number of events in the window and return 
also 
 the key and end time of the window. 
 
-Please note that the use of a `FoldFunction` in combination with 
`WindowFunction` is
-restricted in that the types of the `Iterable` and `Collector` arguments in
-`WindowFunction` must both correspond to the type of the accumulator in the 
`FoldFunction`.
-
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
@@ -510,7 +506,7 @@ DataStream<SensorReading> input = ...;
 input
   .keyBy(<key selector>)
   .timeWindow(<window assigner>)
-  .apply(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), 
new MyWindowFunction())
+  .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new 
MyWindowFunction())
 
 // Function definitions
 
@@ -546,7 +542,7 @@ val input: DataStream[SensorReading] = ...
 input
  .keyBy(<key selector>)
  .timeWindow(<window assigner>)
- .apply (
+ .fold (
     ("", 0L, 0), 
     (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
     ( key: String, 
@@ -577,7 +573,7 @@ DataStream<SensorReading> input = ...;
 input
   .keyBy(<key selector>)
   .timeWindow(<window assigner>)
-  .apply(new MyReduceFunction(), new MyWindowFunction());
+  .reduce(new MyReduceFunction(), new MyWindowFunction());
 
 // Function definitions
 
@@ -610,7 +606,7 @@ val input: DataStream[SensorReading] = ...
 input
   .keyBy(<key selector>)
   .timeWindow(<window assigner>)
-  .apply(
+  .reduce(
     (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 
else r1 },
     ( key: String, 
       window: TimeWindow, 

http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index e77b5c8..ae71ce5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -175,7 +175,7 @@ public class AllWindowedStream<T, W extends Window> {
        public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) 
{
                if (function instanceof RichFunction) {
                        throw new UnsupportedOperationException("ReduceFunction 
of reduce can not be a RichFunction. " +
-                                       "Please use apply(ReduceFunction, 
WindowFunction) instead.");
+                                       "Please use reduce(ReduceFunction, 
WindowFunction) instead.");
                }
 
                //clean the closure
@@ -184,7 +184,100 @@ public class AllWindowedStream<T, W extends Window> {
                String callLocation = Utils.getCallLocationName();
                String udfName = "WindowedStream." + callLocation;
 
-               return apply(function, new PassThroughAllWindowFunction<W, 
T>());
+               return reduce(function, new PassThroughAllWindowFunction<W, 
T>());
+       }
+
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>
+        * Arriving data is incrementally aggregated using the given reducer.
+        *
+        * @param reduceFunction The reduce function that is used for 
incremental aggregation.
+        * @param function The window function.
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+
+       public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> 
reduceFunction, AllWindowFunction<T, R, W> function) {
+               TypeInformation<T> inType = input.getType();
+               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
+                       function, AllWindowFunction.class, true, true, inType, 
null, false);
+
+               return reduce(reduceFunction, function, resultType);
+       }
+
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>
+        * Arriving data is incrementally aggregated using the given reducer.
+        *
+        * @param reduceFunction The reduce function that is used for 
incremental aggregation.
+        * @param function The window function.
+        * @param resultType Type information for the result type of the window 
function
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> 
reduceFunction, AllWindowFunction<T, R, W> function, TypeInformation<R> 
resultType) {
+               if (reduceFunction instanceof RichFunction) {
+                       throw new UnsupportedOperationException("ReduceFunction 
of reduce can not be a RichFunction.");
+               }
+
+               //clean the closures
+               function = input.getExecutionEnvironment().clean(function);
+               reduceFunction = 
input.getExecutionEnvironment().clean(reduceFunction);
+
+               String callLocation = Utils.getCallLocationName();
+               String udfName = "WindowedStream." + callLocation;
+
+               String opName;
+               KeySelector<T, Byte> keySel = input.getKeySelector();
+
+               OneInputStreamOperator<T, R> operator;
+
+               if (evictor != null) {
+                       @SuppressWarnings({"unchecked", "rawtypes"})
+                       TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+                               (TypeSerializer<StreamRecord<T>>) new 
StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+                       ListStateDescriptor<StreamRecord<T>> stateDesc =
+                               new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
+
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+                       operator =
+                               new EvictingWindowOperator<>(windowAssigner,
+                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                                       keySel,
+                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                       stateDesc,
+                                       new 
InternalIterableAllWindowFunction<>(new 
ReduceApplyAllWindowFunction<>(reduceFunction, function)),
+                                       trigger,
+                                       evictor,
+                                       allowedLateness);
+
+               } else {
+                       ReducingStateDescriptor<T> stateDesc = new 
ReducingStateDescriptor<>("window-contents",
+                               reduceFunction,
+                               
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + udfName + ")";
+
+                       operator =
+                               new WindowOperator<>(windowAssigner,
+                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                                       keySel,
+                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                       stateDesc,
+                                       new 
InternalSingleValueAllWindowFunction<>(function),
+                                       trigger,
+                                       allowedLateness);
+               }
+
+               return input.transform(opName, resultType, 
operator).forceNonParallel();
        }
 
        /**
@@ -197,8 +290,8 @@ public class AllWindowedStream<T, W extends Window> {
         */
        public <R> SingleOutputStreamOperator<R> fold(R initialValue, 
FoldFunction<T, R> function) {
                if (function instanceof RichFunction) {
-                       throw new UnsupportedOperationException("FoldFunction 
can not be a RichFunction. " +
-                                       "Please use apply(FoldFunction, 
WindowFunction) instead.");
+                       throw new UnsupportedOperationException("FoldFunction 
of fold can not be a RichFunction. " +
+                                       "Please use fold(FoldFunction, 
WindowFunction) instead.");
                }
 
                TypeInformation<R> resultType = 
TypeExtractor.getFoldReturnTypes(function, input.getType(),
@@ -217,11 +310,115 @@ public class AllWindowedStream<T, W extends Window> {
         */
        public <R> SingleOutputStreamOperator<R> fold(R initialValue, 
FoldFunction<T, R> function, TypeInformation<R> resultType) {
                if (function instanceof RichFunction) {
-                       throw new UnsupportedOperationException("FoldFunction 
can not be a RichFunction. " +
-                                       "Please use apply(FoldFunction, 
WindowFunction) instead.");
+                       throw new UnsupportedOperationException("FoldFunction 
of fold can not be a RichFunction. " +
+                                       "Please use fold(FoldFunction, 
WindowFunction) instead.");
+               }
+
+               return fold(initialValue, function, new 
PassThroughAllWindowFunction<W, R>(), resultType, resultType);
+       }
+
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>
+        * Arriving data is incrementally aggregated using the given fold 
function.
+        *
+        * @param initialValue The initial value of the fold.
+        * @param foldFunction The fold function that is used for incremental 
aggregation.
+        * @param function The window function.
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, 
FoldFunction<T, ACC> foldFunction, AllWindowFunction<ACC, R, W> function) {
+
+               TypeInformation<ACC> foldAccumulatorType = 
TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
+                       Utils.getCallLocationName(), true);
+
+               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
+                       function, AllWindowFunction.class, true, true, 
foldAccumulatorType, null, false);
+
+               return fold(initialValue, foldFunction, function, 
foldAccumulatorType, resultType);
+       }
+
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>
+        * Arriving data is incrementally aggregated using the given fold 
function.
+        *
+        * @param initialValue The initial value of the fold.
+        * @param foldFunction The fold function that is used for incremental 
aggregation.
+        * @param function The window function.
+        * @param foldAccumulatorType Type information for the result type of 
the fold function
+        * @param resultType Type information for the result type of the window 
function
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue,
+                       FoldFunction<T, ACC> foldFunction,
+                       AllWindowFunction<ACC, R, W> function,
+                       TypeInformation<ACC> foldAccumulatorType,
+                       TypeInformation<R> resultType) {
+               if (foldFunction instanceof RichFunction) {
+                       throw new UnsupportedOperationException("FoldFunction 
of fold can not be a RichFunction.");
+               }
+               if (windowAssigner instanceof MergingWindowAssigner) {
+                       throw new UnsupportedOperationException("Fold cannot be 
used with a merging WindowAssigner.");
+               }
+
+               //clean the closures
+               function = input.getExecutionEnvironment().clean(function);
+               foldFunction = 
input.getExecutionEnvironment().clean(foldFunction);
+
+               String callLocation = Utils.getCallLocationName();
+               String udfName = "WindowedStream." + callLocation;
+
+               String opName;
+               KeySelector<T, Byte> keySel = input.getKeySelector();
+
+               OneInputStreamOperator<T, R> operator;
+
+               if (evictor != null) {
+                       @SuppressWarnings({"unchecked", "rawtypes"})
+                       TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+                               (TypeSerializer<StreamRecord<T>>) new 
StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+                       ListStateDescriptor<StreamRecord<T>> stateDesc =
+                               new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
+
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+                       operator =
+                               new EvictingWindowOperator<>(windowAssigner,
+                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                                       keySel,
+                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                       stateDesc,
+                                       new 
InternalIterableAllWindowFunction<>(new 
FoldApplyAllWindowFunction<>(initialValue, foldFunction, function, 
foldAccumulatorType)),
+                                       trigger,
+                                       evictor,
+                                       allowedLateness);
+
+               } else {
+                       FoldingStateDescriptor<T, ACC> stateDesc = new 
FoldingStateDescriptor<>("window-contents",
+                               initialValue, foldFunction, 
foldAccumulatorType.createSerializer(getExecutionEnvironment().getConfig()));
+
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + udfName + ")";
+
+                       operator =
+                               new WindowOperator<>(windowAssigner,
+                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                                       keySel,
+                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                       stateDesc,
+                                       new 
InternalSingleValueAllWindowFunction<>(function),
+                                       trigger,
+                                       allowedLateness);
                }
 
-               return apply(initialValue, function, new 
PassThroughAllWindowFunction<W, R>(), resultType);
+               return input.transform(opName, resultType, 
operator).forceNonParallel();
        }
 
        /**
@@ -321,8 +518,10 @@ public class AllWindowedStream<T, W extends Window> {
         * @param reduceFunction The reduce function that is used for 
incremental aggregation.
         * @param function The window function.
         * @return The data stream that is the result of applying the window 
function to the window.
+        *
+        * @deprecated Use {@link #reduce(ReduceFunction, AllWindowFunction)} 
instead.
         */
-
+       @Deprecated
        public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> 
reduceFunction, AllWindowFunction<T, R, W> function) {
                TypeInformation<T> inType = input.getType();
                TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
@@ -343,7 +542,10 @@ public class AllWindowedStream<T, W extends Window> {
         * @param function The window function.
         * @param resultType Type information for the result type of the window 
function
         * @return The data stream that is the result of applying the window 
function to the window.
+        *
+        * @deprecated Use {@link #reduce(ReduceFunction, AllWindowFunction, 
TypeInformation)} instead.
         */
+       @Deprecated
        public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> 
reduceFunction, AllWindowFunction<T, R, W> function, TypeInformation<R> 
resultType) {
                if (reduceFunction instanceof RichFunction) {
                        throw new UnsupportedOperationException("ReduceFunction 
of apply can not be a RichFunction.");
@@ -415,7 +617,10 @@ public class AllWindowedStream<T, W extends Window> {
         * @param foldFunction The fold function that is used for incremental 
aggregation.
         * @param function The window function.
         * @return The data stream that is the result of applying the window 
function to the window.
+        *
+        * @deprecated Use {@link #fold(R, FoldFunction, AllWindowFunction)} 
instead.
         */
+       @Deprecated
        public <R> SingleOutputStreamOperator<R> apply(R initialValue, 
FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function) {
 
                TypeInformation<R> resultType = 
TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
@@ -437,7 +642,10 @@ public class AllWindowedStream<T, W extends Window> {
         * @param function The window function.
         * @param resultType Type information for the result type of the window 
function
         * @return The data stream that is the result of applying the window 
function to the window.
+        *
+        * @deprecated Use {@link #fold(R, FoldFunction, AllWindowFunction, 
TypeInformation, TypeInformation)} instead.
         */
+       @Deprecated
        public <R> SingleOutputStreamOperator<R> apply(R initialValue, 
FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function, 
TypeInformation<R> resultType) {
                if (foldFunction instanceof RichFunction) {
                        throw new UnsupportedOperationException("ReduceFunction 
of apply can not be a RichFunction.");
@@ -474,7 +682,7 @@ public class AllWindowedStream<T, W extends Window> {
                                        keySel,
                                        
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                                        stateDesc,
-                                       new 
InternalIterableAllWindowFunction<>(new 
FoldApplyAllWindowFunction<>(initialValue, foldFunction, function)),
+                                       new 
InternalIterableAllWindowFunction<>(new 
FoldApplyAllWindowFunction<>(initialValue, foldFunction, function, resultType)),
                                        trigger,
                                        evictor,
                                        allowedLateness);

http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 15ec5f1..ad7f371 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -185,7 +185,7 @@ public class WindowedStream<T, K, W extends Window> {
        public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) 
{
                if (function instanceof RichFunction) {
                        throw new UnsupportedOperationException("ReduceFunction 
of reduce can not be a RichFunction. " +
-                               "Please use apply(ReduceFunction, 
WindowFunction) instead.");
+                               "Please use reduce(ReduceFunction, 
WindowFunction) instead.");
                }
 
                //clean the closure
@@ -199,7 +199,99 @@ public class WindowedStream<T, K, W extends Window> {
                        return result;
                }
 
-               return apply(function, new PassThroughWindowFunction<K, W, 
T>());
+               return reduce(function, new PassThroughWindowFunction<K, W, 
T>());
+       }
+
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>
+        * Arriving data is incrementally aggregated using the given reducer.
+        *
+        * @param reduceFunction The reduce function that is used for 
incremental aggregation.
+        * @param function The window function.
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> 
reduceFunction, WindowFunction<T, R, K, W> function) {
+               TypeInformation<T> inType = input.getType();
+               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
+                       function, WindowFunction.class, true, true, inType, 
null, false);
+
+               return reduce(reduceFunction, function, resultType);
+       }
+
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>
+        * Arriving data is incrementally aggregated using the given reducer.
+        *
+        * @param reduceFunction The reduce function that is used for 
incremental aggregation.
+        * @param function The window function.
+        * @param resultType Type information for the result type of the window 
function
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> 
reduceFunction, WindowFunction<T, R, K, W> function, TypeInformation<R> 
resultType) {
+               if (reduceFunction instanceof RichFunction) {
+                       throw new UnsupportedOperationException("ReduceFunction 
of reduce can not be a RichFunction.");
+               }
+
+               //clean the closures
+               function = input.getExecutionEnvironment().clean(function);
+               reduceFunction = 
input.getExecutionEnvironment().clean(reduceFunction);
+
+               String callLocation = Utils.getCallLocationName();
+               String udfName = "WindowedStream." + callLocation;
+
+               String opName;
+               KeySelector<T, K> keySel = input.getKeySelector();
+
+               OneInputStreamOperator<T, R> operator;
+
+               if (evictor != null) {
+                       @SuppressWarnings({"unchecked", "rawtypes"})
+                       TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+                               (TypeSerializer<StreamRecord<T>>) new 
StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+                       ListStateDescriptor<StreamRecord<T>> stateDesc =
+                               new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
+
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+                       operator =
+                               new EvictingWindowOperator<>(windowAssigner,
+                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                                       keySel,
+                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                       stateDesc,
+                                       new 
InternalIterableWindowFunction<>(new 
ReduceApplyWindowFunction<>(reduceFunction, function)),
+                                       trigger,
+                                       evictor,
+                                       allowedLateness);
+
+               } else {
+                       ReducingStateDescriptor<T> stateDesc = new 
ReducingStateDescriptor<>("window-contents",
+                               reduceFunction,
+                               
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + udfName + ")";
+
+                       operator =
+                               new WindowOperator<>(windowAssigner,
+                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                                       keySel,
+                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                       stateDesc,
+                                       new 
InternalSingleValueWindowFunction<>(function),
+                                       trigger,
+                                       allowedLateness);
+               }
+
+               return input.transform(opName, resultType, operator);
        }
 
        /**
@@ -213,7 +305,7 @@ public class WindowedStream<T, K, W extends Window> {
        public <R> SingleOutputStreamOperator<R> fold(R initialValue, 
FoldFunction<T, R> function) {
                if (function instanceof RichFunction) {
                        throw new UnsupportedOperationException("FoldFunction 
can not be a RichFunction. " +
-                               "Please use apply(FoldFunction, WindowFunction) 
instead.");
+                               "Please use fold(FoldFunction, WindowFunction) 
instead.");
                }
 
                TypeInformation<R> resultType = 
TypeExtractor.getFoldReturnTypes(function, input.getType(),
@@ -233,10 +325,112 @@ public class WindowedStream<T, K, W extends Window> {
        public <R> SingleOutputStreamOperator<R> fold(R initialValue, 
FoldFunction<T, R> function, TypeInformation<R> resultType) {
                if (function instanceof RichFunction) {
                        throw new UnsupportedOperationException("FoldFunction 
can not be a RichFunction. " +
-                               "Please use apply(FoldFunction, WindowFunction) 
instead.");
+                               "Please use fold(FoldFunction, WindowFunction) 
instead.");
+               }
+
+               return fold(initialValue, function, new 
PassThroughWindowFunction<K, W, R>(), resultType, resultType);
+       }
+
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>
+        * Arriving data is incrementally aggregated using the given fold 
function.
+        *
+        * @param initialValue The initial value of the fold.
+        * @param foldFunction The fold function that is used for incremental 
aggregation.
+        * @param function The window function.
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, 
FoldFunction<T, ACC> foldFunction, WindowFunction<ACC, R, K, W> function) {
+
+               TypeInformation<ACC> foldAccumulatorType = 
TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
+                       Utils.getCallLocationName(), true);
+
+               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
+                       function, WindowFunction.class, true, true, 
getInputType(), null, false);
+
+               return fold(initialValue, foldFunction, function, 
foldAccumulatorType, resultType);
+       }
+
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>
+        * Arriving data is incrementally aggregated using the given fold 
function.
+        *
+        * @param initialValue The initial value of the fold.
+        * @param foldFunction The fold function that is used for incremental 
aggregation.
+        * @param function The window function.
+        * @param foldAccumulatorType Type information for the result type of 
the fold function
+        * @param resultType Type information for the result type of the window 
function
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue,
+                       FoldFunction<T, ACC> foldFunction,
+                       WindowFunction<ACC, R, K, W> function,
+                       TypeInformation<ACC> foldAccumulatorType,
+                       TypeInformation<R> resultType) {
+               if (foldFunction instanceof RichFunction) {
+                       throw new UnsupportedOperationException("FoldFunction 
of fold can not be a RichFunction.");
+               }
+               if (windowAssigner instanceof MergingWindowAssigner) {
+                       throw new UnsupportedOperationException("Fold cannot be 
used with a merging WindowAssigner.");
+               }
+
+               //clean the closures
+               function = input.getExecutionEnvironment().clean(function);
+               foldFunction = 
input.getExecutionEnvironment().clean(foldFunction);
+
+               String callLocation = Utils.getCallLocationName();
+               String udfName = "WindowedStream." + callLocation;
+
+               String opName;
+               KeySelector<T, K> keySel = input.getKeySelector();
+
+               OneInputStreamOperator<T, R> operator;
+
+               if (evictor != null) {
+                       @SuppressWarnings({"unchecked", "rawtypes"})
+                       TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+                               (TypeSerializer<StreamRecord<T>>) new 
StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+                       ListStateDescriptor<StreamRecord<T>> stateDesc =
+                               new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
+
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+                       operator = new EvictingWindowOperator<>(windowAssigner,
+                               
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                               keySel,
+                               
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                               stateDesc,
+                               new InternalIterableWindowFunction<>(new 
FoldApplyWindowFunction<>(initialValue, foldFunction, function, 
foldAccumulatorType)),
+                               trigger,
+                               evictor,
+                               allowedLateness);
+
+               } else {
+                       FoldingStateDescriptor<T, ACC> stateDesc = new 
FoldingStateDescriptor<>("window-contents",
+                               initialValue, foldFunction, 
foldAccumulatorType.createSerializer(getExecutionEnvironment().getConfig()));
+
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + udfName + ")";
+
+                       operator = new WindowOperator<>(windowAssigner,
+                               
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                               keySel,
+                               
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                               stateDesc,
+                               new 
InternalSingleValueWindowFunction<>(function),
+                               trigger,
+                               allowedLateness);
                }
 
-               return apply(initialValue, function, new 
PassThroughWindowFunction<K, W, R>(), resultType);
+               return input.transform(opName, resultType, operator);
        }
 
        /**
@@ -342,8 +536,10 @@ public class WindowedStream<T, K, W extends Window> {
         * @param reduceFunction The reduce function that is used for 
incremental aggregation.
         * @param function The window function.
         * @return The data stream that is the result of applying the window 
function to the window.
+        *
+        * @deprecated Use {@link #reduce(ReduceFunction, WindowFunction)} 
instead.
         */
-
+       @Deprecated
        public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> 
reduceFunction, WindowFunction<T, R, K, W> function) {
                TypeInformation<T> inType = input.getType();
                TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
@@ -364,7 +560,10 @@ public class WindowedStream<T, K, W extends Window> {
         * @param function The window function.
         * @param resultType Type information for the result type of the window 
function
         * @return The data stream that is the result of applying the window 
function to the window.
+        *
+        * @deprecated Use {@link #reduce(ReduceFunction, WindowFunction, 
TypeInformation)} instead.
         */
+       @Deprecated
        public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> 
reduceFunction, WindowFunction<T, R, K, W> function, TypeInformation<R> 
resultType) {
                if (reduceFunction instanceof RichFunction) {
                        throw new UnsupportedOperationException("ReduceFunction 
of apply can not be a RichFunction.");
@@ -436,7 +635,10 @@ public class WindowedStream<T, K, W extends Window> {
         * @param foldFunction The fold function that is used for incremental 
aggregation.
         * @param function The window function.
         * @return The data stream that is the result of applying the window 
function to the window.
+        *
+        * @deprecated Use {@link #fold(R, FoldFunction, WindowFunction)} 
instead.
         */
+       @Deprecated
        public <R> SingleOutputStreamOperator<R> apply(R initialValue, 
FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function) {
 
                TypeInformation<R> resultType = 
TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
@@ -458,7 +660,10 @@ public class WindowedStream<T, K, W extends Window> {
         * @param function The window function.
         * @param resultType Type information for the result type of the window 
function
         * @return The data stream that is the result of applying the window 
function to the window.
+        *
+        * @deprecated Use {@link #fold(R, FoldFunction, WindowFunction, 
TypeInformation, TypeInformation)} instead.
         */
+       @Deprecated
        public <R> SingleOutputStreamOperator<R> apply(R initialValue, 
FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function, 
TypeInformation<R> resultType) {
                if (foldFunction instanceof RichFunction) {
                        throw new UnsupportedOperationException("FoldFunction 
of apply can not be a RichFunction.");
@@ -494,7 +699,7 @@ public class WindowedStream<T, K, W extends Window> {
                                keySel,
                                
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                                stateDesc,
-                               new InternalIterableWindowFunction<>(new 
FoldApplyWindowFunction<>(initialValue, foldFunction, function)),
+                               new InternalIterableWindowFunction<>(new 
FoldApplyWindowFunction<>(initialValue, foldFunction, function, resultType)),
                                trigger,
                                evictor,
                                allowedLateness);

http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
index a5bc0a1..0efffb9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
@@ -36,20 +36,25 @@ import java.io.IOException;
 import java.util.Collections;
 
 @Internal
-public class FoldApplyAllWindowFunction<W extends Window, T, ACC>
-       extends WrappingFunction<AllWindowFunction<ACC, ACC, W>>
-       implements AllWindowFunction<T, ACC, W>, OutputTypeConfigurable<ACC> {
+public class FoldApplyAllWindowFunction<W extends Window, T, ACC, R>
+       extends WrappingFunction<AllWindowFunction<ACC, R, W>>
+       implements AllWindowFunction<T, R, W>, OutputTypeConfigurable<R> {
 
        private static final long serialVersionUID = 1L;
 
        private final FoldFunction<T, ACC> foldFunction;
 
        private byte[] serializedInitialValue;
+       private transient TypeInformation<ACC> accTypeInformation;
        private TypeSerializer<ACC> accSerializer;
        private transient ACC initialValue;
 
-       public FoldApplyAllWindowFunction(ACC initialValue, FoldFunction<T, 
ACC> foldFunction, AllWindowFunction<ACC, ACC, W> windowFunction) {
+       public FoldApplyAllWindowFunction(ACC initialValue,
+                       FoldFunction<T, ACC> foldFunction,
+                       AllWindowFunction<ACC, R, W> windowFunction,
+                       TypeInformation<ACC> accTypeInformation) {
                super(windowFunction);
+               this.accTypeInformation = accTypeInformation;
                this.foldFunction = foldFunction;
                this.initialValue = initialValue;
        }
@@ -58,6 +63,11 @@ public class FoldApplyAllWindowFunction<W extends Window, T, 
ACC>
        public void open(Configuration configuration) throws Exception {
                super.open(configuration);
 
+               if (accSerializer == null) {
+                       throw new RuntimeException("No serializer set for the 
fold accumulator type. " +
+                               "Probably the setOutputType method was not 
called.");
+               }
+
                if (serializedInitialValue == null) {
                        throw new RuntimeException("No initial value was 
serialized for the fold " +
                                "window function. Probably the setOutputType 
method was not called.");
@@ -69,7 +79,7 @@ public class FoldApplyAllWindowFunction<W extends Window, T, 
ACC>
        }
 
        @Override
-       public void apply(W window, Iterable<T> values, Collector<ACC> out) 
throws Exception {
+       public void apply(W window, Iterable<T> values, Collector<R> out) 
throws Exception {
                ACC result = accSerializer.copy(initialValue);
 
                for (T val: values) {
@@ -80,8 +90,9 @@ public class FoldApplyAllWindowFunction<W extends Window, T, 
ACC>
        }
 
        @Override
-       public void setOutputType(TypeInformation<ACC> outTypeInfo, 
ExecutionConfig executionConfig) {
-               accSerializer = outTypeInfo.createSerializer(executionConfig);
+       public void setOutputType(TypeInformation<R> outTypeInfo, 
ExecutionConfig executionConfig) {
+               // out type is not used, just use this for the execution config
+               accSerializer = 
accTypeInformation.createSerializer(executionConfig);
 
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);

http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
index 756a683..9e916f1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
@@ -36,20 +36,22 @@ import java.io.IOException;
 import java.util.Collections;
 
 @Internal
-public class FoldApplyWindowFunction<K, W extends Window, T, ACC>
-       extends WrappingFunction<WindowFunction<ACC, ACC, K, W>>
-       implements WindowFunction<T, ACC, K, W>, OutputTypeConfigurable<ACC> {
+public class FoldApplyWindowFunction<K, W extends Window, T, ACC, R>
+       extends WrappingFunction<WindowFunction<ACC, R, K, W>>
+       implements WindowFunction<T, R, K, W>, OutputTypeConfigurable<R> {
 
        private static final long serialVersionUID = 1L;
 
        private final FoldFunction<T, ACC> foldFunction;
 
        private byte[] serializedInitialValue;
+       private transient TypeInformation<ACC> accTypeInformation;
        private TypeSerializer<ACC> accSerializer;
        private transient ACC initialValue;
 
-       public FoldApplyWindowFunction(ACC initialValue, FoldFunction<T, ACC> 
foldFunction, WindowFunction<ACC, ACC, K, W> windowFunction) {
+       public FoldApplyWindowFunction(ACC initialValue, FoldFunction<T, ACC> 
foldFunction, WindowFunction<ACC, R, K, W> windowFunction, TypeInformation<ACC> 
accTypeInformation) {
                super(windowFunction);
+               this.accTypeInformation = accTypeInformation;
                this.foldFunction = foldFunction;
                this.initialValue = initialValue;
        }
@@ -58,6 +60,11 @@ public class FoldApplyWindowFunction<K, W extends Window, T, 
ACC>
        public void open(Configuration configuration) throws Exception {
                super.open(configuration);
 
+               if (accSerializer == null) {
+                       throw new RuntimeException("No serializer set for the 
fold accumulator type. " +
+                               "Probably the setOutputType method was not 
called.");
+               }
+
                if (serializedInitialValue == null) {
                        throw new RuntimeException("No initial value was 
serialized for the fold " +
                                "window function. Probably the setOutputType 
method was not called.");
@@ -69,7 +76,7 @@ public class FoldApplyWindowFunction<K, W extends Window, T, 
ACC>
        }
 
        @Override
-       public void apply(K key, W window, Iterable<T> values, Collector<ACC> 
out) throws Exception {
+       public void apply(K key, W window, Iterable<T> values, Collector<R> 
out) throws Exception {
                ACC result = accSerializer.copy(initialValue);
 
                for (T val: values) {
@@ -80,8 +87,9 @@ public class FoldApplyWindowFunction<K, W extends Window, T, 
ACC>
        }
 
        @Override
-       public void setOutputType(TypeInformation<ACC> outTypeInfo, 
ExecutionConfig executionConfig) {
-               accSerializer = outTypeInfo.createSerializer(executionConfig);
+       public void setOutputType(TypeInformation<R> outTypeInfo, 
ExecutionConfig executionConfig) {
+               // out type is not used, just use this for the execution config
+               accSerializer = 
accTypeInformation.createSerializer(executionConfig);
 
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);

http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
index 0b0ab9e..91ec427 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
@@ -56,7 +56,7 @@ public class FoldApplyWindowFunctionTest {
 
                int initValue = 1;
 
-               FoldApplyWindowFunction<Integer, TimeWindow, Integer, Integer> 
foldWindowFunction = new FoldApplyWindowFunction<>(
+               FoldApplyWindowFunction<Integer, TimeWindow, Integer, Integer, 
Integer> foldWindowFunction = new FoldApplyWindowFunction<>(
                        initValue,
                        new FoldFunction<Integer, Integer>() {
                                private static final long serialVersionUID = 
-4849549768529720587L;
@@ -76,7 +76,8 @@ public class FoldApplyWindowFunctionTest {
                                                out.collect(in);
                                        }
                                }
-                       }
+                       },
+                       BasicTypeInfo.INT_TYPE_INFO
                );
 
                AccumulatingProcessingTimeWindowOperator<Integer, Integer, 
Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 47c13c9..83104e8 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -140,6 +140,62 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
   }
 
   /**
+    * Applies the given window function to each window. The window function is 
called for each
+    * evaluation of the window for each key individually. The output of the 
window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+    *
+    * @param preAggregator The reduce function that is used for pre-aggregation
+    * @param windowFunction The window function.
+    * @return The data stream that is the result of applying the window 
function to the window.
+    */
+  def reduce[R: TypeInformation](
+      preAggregator: ReduceFunction[T],
+      windowFunction: AllWindowFunction[T, R, W]): DataStream[R] = {
+
+    val cleanedReducer = clean(preAggregator)
+    val cleanedWindowFunction = clean(windowFunction)
+
+    val applyFunction = new ScalaAllWindowFunctionWrapper[T, R, 
W](cleanedWindowFunction)
+
+    val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
+    asScalaStream(javaStream.reduce(cleanedReducer, applyFunction, returnType))
+  }
+
+  /**
+    * Applies the given window function to each window. The window function is 
called for each
+    * evaluation of the window for each key individually. The output of the 
window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+    *
+    * @param preAggregator The reduce function that is used for pre-aggregation
+    * @param windowFunction The window function.
+    * @return The data stream that is the result of applying the window 
function to the window.
+    */
+  def reduce[R: TypeInformation](
+      preAggregator: (T, T) => T,
+      windowFunction: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = 
{
+
+    if (preAggregator == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    if (windowFunction == null) {
+      throw new NullPointerException("WindowApply function must not be null.")
+    }
+
+    val cleanReducer = clean(preAggregator)
+    val cleanWindowFunction = clean(windowFunction)
+
+    val reducer = new ScalaReduceFunction[T](cleanReducer)
+    val applyFunction = new ScalaAllWindowFunction[T, R, 
W](cleanWindowFunction)
+
+    val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
+    asScalaStream(javaStream.reduce(reducer, applyFunction, returnType))
+  }
+
+  /**
    * Applies the given fold function to each window. The window function is 
called for each
    * evaluation of the window for each key individually. The output of the 
reduce function is
    * interpreted as a regular non-windowed stream.
@@ -178,6 +234,71 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
   }
 
   /**
+    * Applies the given window function to each window. The window function is 
called for each
+    * evaluation of the window for each key individually. The output of the 
window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is pre-aggregated using the given pre-aggregation folder.
+    *
+    * @param initialValue Initial value of the fold
+    * @param preAggregator The reduce function that is used for pre-aggregation
+    * @param windowFunction The window function.
+    * @return The data stream that is the result of applying the window 
function to the window.
+    */
+  def fold[ACC: TypeInformation, R: TypeInformation](
+      initialValue: ACC,
+      preAggregator: FoldFunction[T, ACC],
+      windowFunction: AllWindowFunction[ACC, R, W]): DataStream[R] = {
+
+    val cleanFolder = clean(preAggregator)
+    val cleanWindowFunction = clean(windowFunction)
+
+    val applyFunction = new ScalaAllWindowFunctionWrapper[ACC, R, 
W](cleanWindowFunction)
+
+    asScalaStream(javaStream.fold(
+      initialValue,
+      cleanFolder,
+      applyFunction,
+      implicitly[TypeInformation[ACC]],
+      implicitly[TypeInformation[R]]))
+  }
+
+  /**
+    * Applies the given window function to each window. The window function is 
called for each
+    * evaluation of the window for each key individually. The output of the 
window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is pre-aggregated using the given pre-aggregation folder.
+    *
+    * @param initialValue Initial value of the fold
+    * @param preAggregator The reduce function that is used for pre-aggregation
+    * @param windowFunction The window function.
+    * @return The data stream that is the result of applying the window 
function to the window.
+    */
+  def fold[ACC: TypeInformation, R: TypeInformation](
+      initialValue: ACC,
+      preAggregator: (ACC, T) => ACC,
+      windowFunction: (W, Iterable[ACC], Collector[R]) => Unit): DataStream[R] 
= {
+
+    if (preAggregator == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    if (windowFunction == null) {
+      throw new NullPointerException("WindowApply function must not be null.")
+    }
+
+    val cleanFolder = clean(preAggregator)
+    val cleanWindowFunction = clean(windowFunction)
+
+    val folder = new ScalaFoldFunction[T, ACC](cleanFolder)
+    val applyFunction = new ScalaAllWindowFunction[ACC, R, 
W](cleanWindowFunction)
+
+    val accType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
+    val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
+    asScalaStream(javaStream.fold(initialValue, folder, applyFunction, 
accType, returnType))
+  }
+
+  /**
    * Applies the given window function to each window. The window function is 
called for each
    * evaluation of the window for each key individually. The output of the 
window function is
    * interpreted as a regular non-windowed stream.
@@ -227,7 +348,9 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
    * @param preAggregator The reduce function that is used for pre-aggregation
    * @param windowFunction The window function.
    * @return The data stream that is the result of applying the window 
function to the window.
+   * @deprecated Use [[reduce(ReduceFunction, AllWindowFunction)]] instead.
    */
+  @deprecated
   def apply[R: TypeInformation](
       preAggregator: ReduceFunction[T],
       windowFunction: AllWindowFunction[T, R, W]): DataStream[R] = {
@@ -251,7 +374,9 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
    * @param preAggregator The reduce function that is used for pre-aggregation
    * @param windowFunction The window function.
    * @return The data stream that is the result of applying the window 
function to the window.
+   * @deprecated Use [[reduce(ReduceFunction, AllWindowFunction)]] instead.
    */
+  @deprecated
   def apply[R: TypeInformation](
       preAggregator: (T, T) => T,
       windowFunction: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = 
{
@@ -284,7 +409,9 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
     * @param preAggregator The reduce function that is used for pre-aggregation
     * @param windowFunction The window function.
     * @return The data stream that is the result of applying the window 
function to the window.
+    * @deprecated Use [[fold(R, FoldFunction, AllWindowFunction)]] instead.
     */
+  @deprecated
   def apply[R: TypeInformation](
       initialValue: R,
       preAggregator: FoldFunction[T, R],
@@ -313,12 +440,14 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
     * @param preAggregator The reduce function that is used for pre-aggregation
     * @param windowFunction The window function.
     * @return The data stream that is the result of applying the window 
function to the window.
+    * @deprecated Use [[fold(R, FoldFunction, AllWindowFunction]] instead.
     */
+  @deprecated
   def apply[R: TypeInformation](
       initialValue: R,
       preAggregator: (R, T) => R,
       windowFunction: (W, Iterable[R], Collector[R]) => Unit): DataStream[R] = 
{
-    
+
     if (preAggregator == null) {
       throw new NullPointerException("Reduce function must not be null.")
     }
@@ -328,10 +457,10 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
 
     val cleanFolder = clean(preAggregator)
     val cleanWindowFunction = clean(windowFunction)
-    
+
     val folder = new ScalaFoldFunction[T, R](cleanFolder)
     val applyFunction = new ScalaAllWindowFunction[R, R, 
W](cleanWindowFunction)
-    
+
     val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
     asScalaStream(javaStream.apply(initialValue, folder, applyFunction, 
returnType))
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 6a10ff6..76d9cda 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -142,6 +142,61 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
   }
 
   /**
+    * Applies the given window function to each window. The window function is 
called for each
+    * evaluation of the window for each key individually. The output of the 
window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+    *
+    * @param preAggregator The reduce function that is used for pre-aggregation
+    * @param function The window function.
+    * @return The data stream that is the result of applying the window 
function to the window.
+    */
+  def reduce[R: TypeInformation](
+  preAggregator: ReduceFunction[T],
+  function: WindowFunction[T, R, K, W]): DataStream[R] = {
+
+    val cleanedPreAggregator = clean(preAggregator)
+    val cleanedWindowFunction = clean(function)
+
+    val applyFunction = new ScalaWindowFunctionWrapper[T, R, K, 
W](cleanedWindowFunction)
+
+    val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
+    asScalaStream(javaStream.reduce(cleanedPreAggregator, applyFunction, 
resultType))
+  }
+
+  /**
+    * Applies the given window function to each window. The window function is 
called for each
+    * evaluation of the window for each key individually. The output of the 
window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+    *
+    * @param preAggregator The reduce function that is used for pre-aggregation
+    * @param windowFunction The window function.
+    * @return The data stream that is the result of applying the window 
function to the window.
+    */
+  def reduce[R: TypeInformation](
+      preAggregator: (T, T) => T,
+      windowFunction: (K, W, Iterable[T], Collector[R]) => Unit): 
DataStream[R] = {
+
+    if (preAggregator == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    if (windowFunction == null) {
+      throw new NullPointerException("WindowApply function must not be null.")
+    }
+
+    val cleanReducer = clean(preAggregator)
+    val cleanWindowFunction = clean(windowFunction)
+
+    val reducer = new ScalaReduceFunction[T](cleanReducer)
+    val applyFunction = new ScalaWindowFunction[T, R, K, 
W](cleanWindowFunction)
+
+    asScalaStream(javaStream.reduce(reducer, applyFunction, 
implicitly[TypeInformation[R]]))
+  }
+
+  /**
    * Applies the given fold function to each window. The window function is 
called for each
    * evaluation of the window for each key individually. The output of the 
reduce function is
    * interpreted as a regular non-windowed stream.
@@ -179,6 +234,70 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
   }
 
   /**
+    * Applies the given window function to each window. The window function is 
called for each
+    * evaluation of the window for each key individually. The output of the 
window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is incrementally aggregated using the given fold function.
+    *
+    * @param initialValue The initial value of the fold
+    * @param foldFunction The fold function that is used for incremental 
aggregation
+    * @param function The window function.
+    * @return The data stream that is the result of applying the window 
function to the window.
+    */
+  def fold[ACC: TypeInformation, R: TypeInformation](
+      initialValue: ACC,
+      foldFunction: FoldFunction[T, ACC],
+      function: WindowFunction[ACC, R, K, W]): DataStream[R] = {
+
+    val cleanedFunction = clean(function)
+    val cleanedFoldFunction = clean(foldFunction)
+
+    val applyFunction = new ScalaWindowFunctionWrapper[ACC, R, K, 
W](cleanedFunction)
+
+    asScalaStream(javaStream.fold(
+      initialValue,
+      cleanedFoldFunction,
+      applyFunction,
+      implicitly[TypeInformation[ACC]],
+      implicitly[TypeInformation[R]]))
+  }
+
+  /**
+    * Applies the given window function to each window. The window function is 
called for each
+    * evaluation of the window for each key individually. The output of the 
window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is incrementally aggregated using the given fold function.
+    *
+    * @param foldFunction The fold function that is used for incremental 
aggregation
+    * @param windowFunction The window function.
+    * @return The data stream that is the result of applying the window 
function to the window.
+    */
+  def fold[ACC: TypeInformation, R: TypeInformation](
+      initialValue: ACC,
+      foldFunction: (ACC, T) => ACC,
+      windowFunction: (K, W, Iterable[ACC], Collector[R]) => Unit): 
DataStream[R] = {
+
+    if (foldFunction == null) {
+      throw new NullPointerException("Fold function must not be null.")
+    }
+    if (windowFunction == null) {
+      throw new NullPointerException("WindowApply function must not be null.")
+    }
+
+    val cleanFolder = clean(foldFunction)
+    val cleanWindowFunction = clean(windowFunction)
+
+    val folder = new ScalaFoldFunction[T, ACC](cleanFolder)
+    val applyFunction = new ScalaWindowFunction[ACC, R, K, 
W](cleanWindowFunction)
+
+    val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
+    val accType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
+    asScalaStream(javaStream.fold(initialValue, folder, applyFunction, 
accType, resultType))
+  }
+
+  /**
    * Applies the given window function to each window. The window function is 
called for each
    * evaluation of the window for each key individually. The output of the 
window function is
    * interpreted as a regular non-windowed stream.
@@ -230,7 +349,9 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
    * @param preAggregator The reduce function that is used for pre-aggregation
    * @param function The window function.
    * @return The data stream that is the result of applying the window 
function to the window.
+   * @deprecated Use [[reduce(ReduceFunction, WindowFunction)]] instead.
    */
+  @deprecated
   def apply[R: TypeInformation](
       preAggregator: ReduceFunction[T],
       function: WindowFunction[T, R, K, W]): DataStream[R] = {
@@ -254,7 +375,9 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
    * @param preAggregator The reduce function that is used for pre-aggregation
    * @param windowFunction The window function.
    * @return The data stream that is the result of applying the window 
function to the window.
+   * @deprecated Use [[reduce(ReduceFunction, WindowFunction)]] instead.
    */
+  @deprecated
   def apply[R: TypeInformation](
       preAggregator: (T, T) => T,
       windowFunction: (K, W, Iterable[T], Collector[R]) => Unit): 
DataStream[R] = {
@@ -286,7 +409,9 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
     * @param foldFunction The fold function that is used for incremental 
aggregation
     * @param function The window function.
     * @return The data stream that is the result of applying the window 
function to the window.
+    * @deprecated Use [[fold(R, FoldFunction, WindowFunction)]] instead.
     */
+  @deprecated
   def apply[R: TypeInformation](
       initialValue: R,
       foldFunction: FoldFunction[T, R],
@@ -314,7 +439,9 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
     * @param foldFunction The fold function that is used for incremental 
aggregation
     * @param windowFunction The window function.
     * @return The data stream that is the result of applying the window 
function to the window.
+    * @deprecated Use [[fold(R, FoldFunction, WindowFunction)]] instead.
     */
+  @deprecated
   def apply[R: TypeInformation](
       initialValue: R,
       foldFunction: (R, T) => R,

http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala
index f7a5923..27cf3db 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.scala.{DataStream, WindowedStream}
 import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
 
 /**
   * Wraps a joined data stream, allowing to use anonymous partial functions to
@@ -77,13 +78,13 @@ class OnWindowedStream[T, K, W <: Window](stream: 
WindowedStream[T, K, W]) {
     * @return The data stream that is the result of applying the window 
function to the window.
     */
   @PublicEvolving
-  def applyWith[R: TypeInformation](
-      initialValue: R)(
-      foldFunction: (R, T) => R,
-      windowFunction: (K, W, Stream[R]) => TraversableOnce[R])
+  def applyWith[ACC: TypeInformation, R: TypeInformation](
+      initialValue: ACC)(
+      foldFunction: (ACC, T) => ACC,
+      windowFunction: (K, W, Stream[ACC]) => TraversableOnce[R])
     : DataStream[R] =
-    stream.apply(initialValue, foldFunction, {
-      (key, window, items, out) =>
+    stream.fold(initialValue, foldFunction, {
+      (key: K, window: W, items: Iterable[ACC], out: Collector[R]) =>
         windowFunction(key, window, items.toStream).foreach(out.collect)
     })
 

http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index 6d90239..6273e54 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -206,7 +206,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .trigger(CountTrigger.of(100))
-      .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, 
TimeWindow]() {
+      .reduce(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, 
TimeWindow]() {
         def apply(
                    tuple: Tuple,
                    window: TimeWindow,
@@ -231,7 +231,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
       .keyBy(0)
       .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
-      .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, 
TimeWindow]() {
+      .reduce(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, 
TimeWindow]() {
         def apply(
                    tuple: Tuple,
                    window: TimeWindow,

http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
index 6a6a956..83697ce 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
@@ -127,7 +127,7 @@ class WindowFoldITCase extends 
StreamingMultipleProgramsTestBase {
     source1
       .keyBy(0)
       .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
-      .apply(
+      .fold(
         ("R:", 0),
         foldFunc,
         new CheckingIdentityRichWindowFunction[(String, Int), Tuple, 
TimeWindow]())
@@ -230,7 +230,7 @@ class WindowFoldITCase extends 
StreamingMultipleProgramsTestBase {
 
     source1
       .windowAll(TumblingEventTimeWindows.of(Time.of(3, 
TimeUnit.MILLISECONDS)))
-      .apply(
+      .fold(
         ("R:", 0),
         foldFunc,
         new CheckingIdentityRichAllWindowFunction[(String, Int), TimeWindow]())

http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
index 7c414fa..9666266 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
@@ -128,7 +128,7 @@ class WindowReduceITCase extends 
StreamingMultipleProgramsTestBase {
     source1
       .keyBy(0)
       .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
-      .apply(
+      .reduce(
         reduceFunc,
         new CheckingIdentityRichWindowFunction[(String, Int), Tuple, 
TimeWindow]())
       .addSink(new SinkFunction[(String, Int)]() {
@@ -230,7 +230,7 @@ class WindowReduceITCase extends 
StreamingMultipleProgramsTestBase {
 
     source1
       .windowAll(TumblingEventTimeWindows.of(Time.of(3, 
TimeUnit.MILLISECONDS)))
-      .apply(
+      .reduce(
         reduceFunc,
         new CheckingIdentityRichAllWindowFunction[(String, Int), TimeWindow]())
       .addSink(new SinkFunction[(String, Int)]() {

http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index 60e61f0..c67c215 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -203,7 +203,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .trigger(CountTrigger.of(100))
-      .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, 
TimeWindow]() {
+      .reduce(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, 
TimeWindow]() {
         def apply(
                    tuple: Tuple,
                    window: TimeWindow,
@@ -228,7 +228,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
       .keyBy(0)
       .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
-      .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, 
TimeWindow]() {
+      .reduce(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, 
TimeWindow]() {
         def apply(
                    tuple: Tuple,
                    window: TimeWindow,

http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 4c873a3..5d17608 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -232,7 +232,7 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
                                                        NUM_ELEMENTS_PER_KEY / 
3))
                                        .rebalance()
                                        .timeWindowAll(Time.of(WINDOW_SIZE, 
MILLISECONDS))
-                                       .apply(
+                                       .reduce(
                                                        new 
ReduceFunction<Tuple2<Long, IntType>>() {
 
                                                                @Override
@@ -304,7 +304,7 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
                                                        NUM_ELEMENTS_PER_KEY / 
3))
                                        .rebalance()
                                        .timeWindowAll(Time.of(WINDOW_SIZE, 
MILLISECONDS))
-                                       .apply(new Tuple4<>(0L, 0L, 0L, new 
IntType(0)),
+                                       .fold(new Tuple4<>(0L, 0L, 0L, new 
IntType(0)),
                                                        new 
FoldFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>>() {
                                                                @Override
                                                                public 
Tuple4<Long, Long, Long, IntType> fold(Tuple4<Long, Long, Long, IntType> 
accumulator,
@@ -377,7 +377,7 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
                                        .rebalance()
                                        .timeWindowAll(Time.of(WINDOW_SIZE, 
MILLISECONDS),
                                                        Time.of(WINDOW_SLIDE, 
MILLISECONDS))
-                                       .apply(
+                                       .reduce(
                                                        new 
ReduceFunction<Tuple2<Long, IntType>>() {
 
                                                                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/698e53e4/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 4f28d8c..50079d1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -362,7 +362,7 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(WINDOW_SIZE, 
MILLISECONDS))
-                                       .apply(
+                                       .reduce(
                                                        new 
ReduceFunction<Tuple2<Long, IntType>>() {
 
                                                                @Override
@@ -435,7 +435,7 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(WINDOW_SIZE, 
MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
-                                       .apply(
+                                       .reduce(
                                                        new 
ReduceFunction<Tuple2<Long, IntType>>() {
 
                                                                @Override

Reply via email to