[FLINK-4997] [streaming] Introduce ProcessWindowFunction

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1dcb2dcd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1dcb2dcd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1dcb2dcd

Branch: refs/heads/master
Commit: 1dcb2dcd8969941988a4fc7e5488e9272dfd507e
Parents: 82db667
Author: Ventura Del Monte <[email protected]>
Authored: Wed Nov 23 18:00:23 2016 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Feb 17 17:15:51 2017 +0100

----------------------------------------------------------------------
 .../api/datastream/WindowedStream.java          | 381 ++++++++++++++---
 .../FoldApplyProcessWindowFunction.java         | 120 ++++++
 .../windowing/ProcessWindowFunction.java        |  61 +++
 .../ReduceApplyProcessWindowFunction.java       |  80 ++++
 .../windowing/RichProcessWindowFunction.java    |  85 ++++
 .../windowing/AccumulatingKeyedTimePanes.java   |  12 +-
 ...ccumulatingProcessingTimeWindowOperator.java |  16 +-
 .../InternalIterableProcessWindowFunction.java  |  63 +++
 ...nternalSingleValueProcessWindowFunction.java |  66 +++
 .../FoldApplyProcessWindowFunctionTest.java     | 155 +++++++
 .../operators/FoldApplyWindowFunctionTest.java  |  28 +-
 .../functions/InternalWindowFunctionTest.java   | 101 ++++-
 ...AlignedProcessingTimeWindowOperatorTest.java | 419 ++++++++++++++++++-
 .../operators/windowing/WindowOperatorTest.java | 177 ++++++++
 .../streaming/runtime/WindowFoldITCase.java     |  78 ++++
 15 files changed, 1738 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 04da04d..45eaae5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.AggregateFunction;
@@ -39,8 +40,11 @@ import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
 import 
org.apache.flink.streaming.api.functions.windowing.AggregateApplyWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ReduceApplyProcessWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -60,9 +64,12 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
 import 
org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
 import 
org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
 import 
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -360,6 +367,98 @@ public class WindowedStream<T, K, W extends Window> {
                return input.transform(opName, resultType, operator);
        }
 
+
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>Arriving data is incrementally aggregated using the given reducer.
+        *
+        * @param reduceFunction The reduce function that is used for 
incremental aggregation.
+        * @param function The window function.
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       @PublicEvolving
+       public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> 
reduceFunction, ProcessWindowFunction<T, R, K, W> function) {
+               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
+                               function, ProcessWindowFunction.class, true, 
true, input.getType(), null, false);
+
+               return reduce(reduceFunction, function, resultType);
+       }
+
+
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>Arriving data is incrementally aggregated using the given reducer.
+        *
+        * @param reduceFunction The reduce function that is used for 
incremental aggregation.
+        * @param function The window function.
+        * @param resultType Type information for the result type of the window 
function
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       @Internal
+       public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> 
reduceFunction, ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> 
resultType) {
+               if (reduceFunction instanceof RichFunction) {
+                       throw new UnsupportedOperationException("ReduceFunction 
of apply can not be a RichFunction.");
+               }
+               //clean the closures
+               function = input.getExecutionEnvironment().clean(function);
+               reduceFunction = 
input.getExecutionEnvironment().clean(reduceFunction);
+
+               String callLocation = Utils.getCallLocationName();
+               String udfName = "WindowedStream." + callLocation;
+
+               String opName;
+               KeySelector<T, K> keySel = input.getKeySelector();
+
+               OneInputStreamOperator<T, R> operator;
+
+               if (evictor != null) {
+                       @SuppressWarnings({"unchecked", "rawtypes"})
+                       TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+                                       (TypeSerializer<StreamRecord<T>>) new 
StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+                       ListStateDescriptor<StreamRecord<T>> stateDesc =
+                                       new 
ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+                       operator =
+                                       new 
EvictingWindowOperator<>(windowAssigner,
+                                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                                                       keySel,
+                                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                                       stateDesc,
+                                                       new 
InternalIterableProcessWindowFunction<>(new 
ReduceApplyProcessWindowFunction<>(reduceFunction, function)),
+                                                       trigger,
+                                                       evictor,
+                                                       allowedLateness);
+
+               } else {
+                       ReducingStateDescriptor<T> stateDesc = new 
ReducingStateDescriptor<>("window-contents",
+                                       reduceFunction,
+                                       
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + udfName + ")";
+
+                       operator =
+                                       new WindowOperator<>(windowAssigner,
+                                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                                                       keySel,
+                                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                                       stateDesc,
+                                                       new 
InternalSingleValueProcessWindowFunction<>(function),
+                                                       trigger,
+                                                       allowedLateness);
+               }
+
+               return input.transform(opName, resultType, operator);
+       }
+
        // 
------------------------------------------------------------------------
        //  Fold Function
        // 
------------------------------------------------------------------------
@@ -510,6 +609,117 @@ public class WindowedStream<T, K, W extends Window> {
                return input.transform(opName, resultType, operator);
        }
 
+
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>Arriving data is incrementally aggregated using the given fold 
function.
+        *
+        * @param initialValue The initial value of the fold.
+        * @param foldFunction The fold function that is used for incremental 
aggregation.
+        * @param windowFunction The window function.
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       @PublicEvolving
+       public <R, ACC> SingleOutputStreamOperator<R> fold(ACC initialValue, 
FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> 
windowFunction) {
+               if (foldFunction instanceof RichFunction) {
+                       throw new UnsupportedOperationException("FoldFunction 
can not be a RichFunction.");
+               }
+
+               TypeInformation<ACC> foldResultType = 
TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
+                               Utils.getCallLocationName(), true);
+
+               TypeInformation<R> windowResultType = 
TypeExtractor.getUnaryOperatorReturnType(
+                               windowFunction, ProcessWindowFunction.class, 
true, true, foldResultType, Utils.getCallLocationName(), false);
+
+               return fold(initialValue, foldFunction, windowFunction, 
foldResultType, windowResultType);
+       }
+
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>Arriving data is incrementally aggregated using the given fold 
function.
+        *
+        * @param initialValue the initial value to be passed to the first 
invocation of the fold function
+        * @param foldFunction The fold function.
+        * @param foldResultType The result type of the fold function.
+        * @param windowFunction The process window function.
+        * @param windowResultType The process window function result type.
+        * @return The data stream that is the result of applying the fold 
function to the window.
+        */
+       @Internal
+       public <R, ACC> SingleOutputStreamOperator<R> fold(
+                       ACC initialValue,
+                       FoldFunction<T, ACC> foldFunction,
+                       ProcessWindowFunction<ACC, R, K, W> windowFunction,
+                       TypeInformation<ACC> foldResultType,
+                       TypeInformation<R> windowResultType) {
+               if (foldFunction instanceof RichFunction) {
+                       throw new UnsupportedOperationException("FoldFunction 
can not be a RichFunction.");
+               }
+               if (windowAssigner instanceof MergingWindowAssigner) {
+                       throw new UnsupportedOperationException("Fold cannot be 
used with a merging WindowAssigner.");
+               }
+
+               //clean the closures
+               windowFunction = 
input.getExecutionEnvironment().clean(windowFunction);
+               foldFunction = 
input.getExecutionEnvironment().clean(foldFunction);
+
+               String callLocation = Utils.getCallLocationName();
+               String udfName = "WindowedStream." + callLocation;
+
+               String opName;
+               KeySelector<T, K> keySel = input.getKeySelector();
+
+               OneInputStreamOperator<T, R> operator;
+
+               if (evictor != null) {
+                       @SuppressWarnings({"unchecked", "rawtypes"})
+                       TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+                                       (TypeSerializer<StreamRecord<T>>) new 
StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+                       ListStateDescriptor<StreamRecord<T>> stateDesc =
+                                       new 
ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+                       operator =
+                                       new 
EvictingWindowOperator<>(windowAssigner,
+                                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                                                       keySel,
+                                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                                       stateDesc,
+                                                       new 
InternalIterableProcessWindowFunction<>(new 
FoldApplyProcessWindowFunction<>(initialValue, foldFunction, windowFunction, 
foldResultType)),
+                                                       trigger,
+                                                       evictor,
+                                                       allowedLateness);
+
+               } else {
+                       FoldingStateDescriptor<T, ACC> stateDesc = new 
FoldingStateDescriptor<>("window-contents",
+                                       initialValue,
+                                       foldFunction,
+                                       
foldResultType.createSerializer(getExecutionEnvironment().getConfig()));
+
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + udfName + ")";
+
+                       operator =
+                                       new WindowOperator<>(windowAssigner,
+                                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                                                       keySel,
+                                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                                       stateDesc,
+                                                       new 
InternalSingleValueProcessWindowFunction<>(windowFunction),
+                                                       trigger,
+                                                       allowedLateness);
+               }
+
+               return input.transform(opName, windowResultType, operator);
+       }
+
        // 
------------------------------------------------------------------------
        //  Aggregation Function
        // 
------------------------------------------------------------------------
@@ -733,11 +943,53 @@ public class WindowedStream<T, K, W extends Window> {
         * @return The data stream that is the result of applying the window 
function to the window.
         */
        public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, 
W> function, TypeInformation<R> resultType) {
-
-               //clean the closure
+               String callLocation = Utils.getCallLocationName();
                function = input.getExecutionEnvironment().clean(function);
+               return apply(new InternalIterableWindowFunction<>(function), 
resultType, callLocation);
+       }
+
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>
+        * Not that this function requires that all data in the windows is 
buffered until the window
+        * is evaluated, as the function provides no means of incremental 
aggregation.
+        *
+        * @param function The window function.
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       @PublicEvolving
+       public <R> SingleOutputStreamOperator<R> 
process(ProcessWindowFunction<T, R, K, W> function) {
+               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
+                       function, ProcessWindowFunction.class, true, true, 
getInputType(), null, false);
+
+               return process(function, resultType);
+       }
 
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>
+        * Not that this function requires that all data in the windows is 
buffered until the window
+        * is evaluated, as the function provides no means of incremental 
aggregation.
+        *
+        * @param function The window function.
+        * @param resultType Type information for the result type of the window 
function
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       @Internal
+       public <R> SingleOutputStreamOperator<R> 
process(ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> 
resultType) {
                String callLocation = Utils.getCallLocationName();
+               function = input.getExecutionEnvironment().clean(function);
+               return apply(new 
InternalIterableProcessWindowFunction<>(function), resultType, callLocation);
+       }
+
+       private <R> SingleOutputStreamOperator<R> 
apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> 
resultType, String callLocation) {
+
                String udfName = "WindowedStream." + callLocation;
 
                SingleOutputStreamOperator<R> result = 
createFastTimeOperatorIfValid(function, resultType, udfName);
@@ -767,7 +1019,7 @@ public class WindowedStream<T, K, W extends Window> {
                                        keySel,
                                        
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                                        stateDesc,
-                                       new 
InternalIterableWindowFunction<>(function),
+                                       function,
                                        trigger,
                                        evictor,
                                        allowedLateness);
@@ -784,7 +1036,7 @@ public class WindowedStream<T, K, W extends Window> {
                                        keySel,
                                        
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                                        stateDesc,
-                                       new 
InternalIterableWindowFunction<>(function),
+                                       function,
                                        trigger,
                                        allowedLateness,
                                        legacyWindowOpType);
@@ -1211,7 +1463,7 @@ public class WindowedStream<T, K, W extends Window> {
        }
 
        private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
-                       Function function,
+                       ReduceFunction<?> function,
                        TypeInformation<R> resultType,
                        String functionName) {
 
@@ -1222,30 +1474,18 @@ public class WindowedStream<T, K, W extends Window> {
 
                        String opName = "Fast " + timeWindows + " of " + 
functionName;
 
-                       if (function instanceof ReduceFunction) {
-                               @SuppressWarnings("unchecked")
-                               ReduceFunction<T> reducer = (ReduceFunction<T>) 
function;
-
-                               @SuppressWarnings("unchecked")
-                               OneInputStreamOperator<T, R> op = 
(OneInputStreamOperator<T, R>)
-                                               new 
AggregatingProcessingTimeWindowOperator<>(
-                                                               reducer, 
input.getKeySelector(),
-                                                               
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-                                                               
input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-                                                               windowLength, 
windowSlide);
-                               return input.transform(opName, resultType, op);
-                       }
-                       else if (function instanceof WindowFunction) {
-                               @SuppressWarnings("unchecked")
-                               WindowFunction<T, R, K, TimeWindow> wf = 
(WindowFunction<T, R, K, TimeWindow>) function;
-
-                               OneInputStreamOperator<T, R> op = new 
AccumulatingProcessingTimeWindowOperator<>(
-                                               wf, input.getKeySelector(),
-                                               
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-                                               
input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-                                               windowLength, windowSlide);
-                               return input.transform(opName, resultType, op);
-                       }
+                       @SuppressWarnings("unchecked")
+                       ReduceFunction<T> reducer = (ReduceFunction<T>) 
function;
+
+                       @SuppressWarnings("unchecked")
+                       OneInputStreamOperator<T, R> op = 
(OneInputStreamOperator<T, R>)
+                                       new 
AggregatingProcessingTimeWindowOperator<>(
+                                                       reducer, 
input.getKeySelector(),
+                                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                                       
input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+                                                       windowLength, 
windowSlide);
+                       return input.transform(opName, resultType, op);
+
                } else if (windowAssigner.getClass() == 
TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == 
null) {
                        TumblingAlignedProcessingTimeWindows timeWindows = 
(TumblingAlignedProcessingTimeWindows) windowAssigner;
                        final long windowLength = timeWindows.getSize();
@@ -1253,36 +1493,69 @@ public class WindowedStream<T, K, W extends Window> {
 
                        String opName = "Fast " + timeWindows + " of " + 
functionName;
 
-                       if (function instanceof ReduceFunction) {
-                               @SuppressWarnings("unchecked")
-                               ReduceFunction<T> reducer = (ReduceFunction<T>) 
function;
-
-                               @SuppressWarnings("unchecked")
-                               OneInputStreamOperator<T, R> op = 
(OneInputStreamOperator<T, R>)
-                                               new 
AggregatingProcessingTimeWindowOperator<>(
-                                                               reducer,
-                                                               
input.getKeySelector(),
-                                                               
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-                                                               
input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-                                                               windowLength, 
windowSlide);
-                               return input.transform(opName, resultType, op);
-                       }
-                       else if (function instanceof WindowFunction) {
-                               @SuppressWarnings("unchecked")
-                               WindowFunction<T, R, K, TimeWindow> wf = 
(WindowFunction<T, R, K, TimeWindow>) function;
-
-                               OneInputStreamOperator<T, R> op = new 
AccumulatingProcessingTimeWindowOperator<>(
-                                               wf, input.getKeySelector(),
-                                               
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-                                               
input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-                                               windowLength, windowSlide);
-                               return input.transform(opName, resultType, op);
-                       }
+                       @SuppressWarnings("unchecked")
+                       ReduceFunction<T> reducer = (ReduceFunction<T>) 
function;
+
+                       @SuppressWarnings("unchecked")
+                       OneInputStreamOperator<T, R> op = 
(OneInputStreamOperator<T, R>)
+                                       new 
AggregatingProcessingTimeWindowOperator<>(
+                                                       reducer,
+                                                       input.getKeySelector(),
+                                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                                       
input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+                                                       windowLength, 
windowSlide);
+                       return input.transform(opName, resultType, op);
+               }
+
+               return null;
+       }
+
+       private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
+                       InternalWindowFunction<Iterable<T>, R, K, W> function,
+                       TypeInformation<R> resultType,
+                       String functionName) {
+
+               if (windowAssigner.getClass() == 
SlidingAlignedProcessingTimeWindows.class && trigger == null && evictor == 
null) {
+                       SlidingAlignedProcessingTimeWindows timeWindows = 
(SlidingAlignedProcessingTimeWindows) windowAssigner;
+                       final long windowLength = timeWindows.getSize();
+                       final long windowSlide = timeWindows.getSlide();
+
+                       String opName = "Fast " + timeWindows + " of " + 
functionName;
+
+                       @SuppressWarnings("unchecked")
+                       InternalWindowFunction<Iterable<T>, R, K, TimeWindow> 
timeWindowFunction =
+                                       (InternalWindowFunction<Iterable<T>, R, 
K, TimeWindow>) function;
+
+                       OneInputStreamOperator<T, R> op = new 
AccumulatingProcessingTimeWindowOperator<>(
+                                       timeWindowFunction, 
input.getKeySelector(),
+                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                       
input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+                                       windowLength, windowSlide);
+                       return input.transform(opName, resultType, op);
+               } else if (windowAssigner.getClass() == 
TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == 
null) {
+                       TumblingAlignedProcessingTimeWindows timeWindows = 
(TumblingAlignedProcessingTimeWindows) windowAssigner;
+                       final long windowLength = timeWindows.getSize();
+                       final long windowSlide = timeWindows.getSize();
+
+                       String opName = "Fast " + timeWindows + " of " + 
functionName;
+
+                       @SuppressWarnings("unchecked")
+                       InternalWindowFunction<Iterable<T>, R, K, TimeWindow> 
timeWindowFunction =
+                                       (InternalWindowFunction<Iterable<T>, R, 
K, TimeWindow>) function;
+
+
+                       OneInputStreamOperator<T, R> op = new 
AccumulatingProcessingTimeWindowOperator<>(
+                                       timeWindowFunction, 
input.getKeySelector(),
+                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                       
input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+                                       windowLength, windowSlide);
+                       return input.transform(opName, resultType, op);
                }
 
                return null;
        }
 
+
        public StreamExecutionEnvironment getExecutionEnvironment() {
                return input.getExecutionEnvironment();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
new file mode 100644
index 0000000..e1bc759
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+
+@Internal
+public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
+       extends RichProcessWindowFunction<T, R, K, W>
+       implements OutputTypeConfigurable<R> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final FoldFunction<T, ACC> foldFunction;
+       private final ProcessWindowFunction<ACC, R, K, W> windowFunction;
+
+       private byte[] serializedInitialValue;
+       private TypeSerializer<ACC> accSerializer;
+       private final TypeInformation<ACC> accTypeInformation;
+       private transient ACC initialValue;
+
+       public FoldApplyProcessWindowFunction(ACC initialValue, FoldFunction<T, 
ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction, 
TypeInformation<ACC> accTypeInformation) {
+               this.windowFunction = windowFunction;
+               this.foldFunction = foldFunction;
+               this.initialValue = initialValue;
+               this.accTypeInformation = accTypeInformation;
+       }
+
+       @Override
+       public void open(Configuration configuration) throws Exception {
+               FunctionUtils.openFunction(this.windowFunction, configuration);
+
+               if (serializedInitialValue == null) {
+                       throw new RuntimeException("No initial value was 
serialized for the fold " +
+                               "window function. Probably the setOutputType 
method was not called.");
+               }
+
+               ByteArrayInputStream bais = new 
ByteArrayInputStream(serializedInitialValue);
+               DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais);
+               initialValue = accSerializer.deserialize(in);
+       }
+
+       @Override
+       public void close() throws Exception {
+               FunctionUtils.closeFunction(this.windowFunction);
+       }
+
+       @Override
+       public void setRuntimeContext(RuntimeContext t) {
+               super.setRuntimeContext(t);
+
+               FunctionUtils.setFunctionRuntimeContext(this.windowFunction, t);
+       }
+
+       @Override
+       public void process(K key, final Context context, Iterable<T> values, 
Collector<R> out) throws Exception {
+               ACC result = accSerializer.copy(initialValue);
+
+               for (T val : values) {
+                       result = foldFunction.fold(result, val);
+               }
+
+               windowFunction.process(key, windowFunction.new Context() {
+                       @Override
+                       public W window() {
+                               return context.window();
+                       }
+               }, Collections.singletonList(result), out);
+       }
+
+       @Override
+       public void setOutputType(TypeInformation<R> outTypeInfo, 
ExecutionConfig executionConfig) {
+               accSerializer = 
accTypeInformation.createSerializer(executionConfig);
+
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
+
+               try {
+                       accSerializer.serialize(initialValue, out);
+               } catch (IOException ioe) {
+                       throw new RuntimeException("Unable to serialize initial 
value of type " +
+                               initialValue.getClass().getSimpleName() + " of 
fold window function.", ioe);
+               }
+
+               serializedInitialValue = baos.toByteArray();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
new file mode 100644
index 0000000..9c48e24
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ * Base abstract class for functions that are evaluated over keyed (grouped) 
windows using a context
+ * for retrieving extra information.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
+ * @param <W> The type of {@code Window} that this window function can be 
applied on.
+ */
+@PublicEvolving
+public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> 
implements Function {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Evaluates the window and outputs none or several elements.
+        *
+        * @param key The key for which this window is evaluated.
+        * @param context The context in which the window is being evaluated.
+        * @param elements The elements in the window being evaluated.
+        * @param out A collector for emitting elements.
+        *
+        * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery.
+        */
+       public abstract void process(KEY key, Context context, Iterable<IN> 
elements, Collector<OUT> out) throws Exception;
+
+       /**
+        * The context holding window metadata
+        */
+       public abstract class Context {
+               /**
+                * @return The window that is being evaluated.
+                */
+               public abstract W window();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
new file mode 100644
index 0000000..9ea1fdf
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+@Internal
+public class ReduceApplyProcessWindowFunction<K, W extends Window, T, R>
+       extends RichProcessWindowFunction<T, R, K, W> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final ReduceFunction<T> reduceFunction;
+       private final ProcessWindowFunction<T, R, K, W> windowFunction;
+
+       public ReduceApplyProcessWindowFunction(ReduceFunction<T> 
reduceFunction, ProcessWindowFunction<T, R, K, W> windowFunction) {
+               this.windowFunction = windowFunction;
+               this.reduceFunction = reduceFunction;
+       }
+
+       @Override
+       public void process(K k, final Context context, Iterable<T> input, 
Collector<R> out) throws Exception {
+
+               T curr = null;
+               for (T val: input) {
+                       if (curr == null) {
+                               curr = val;
+                       } else {
+                               curr = reduceFunction.reduce(curr, val);
+                       }
+               }
+               windowFunction.process(k, windowFunction.new Context() {
+                       @Override
+                       public W window() {
+                               return context.window();
+                       }
+               }, Collections.singletonList(curr), out);
+       }
+
+       @Override
+       public void open(Configuration configuration) throws Exception {
+               FunctionUtils.openFunction(this.windowFunction, configuration);
+       }
+
+       @Override
+       public void close() throws Exception {
+               FunctionUtils.closeFunction(this.windowFunction);
+       }
+
+       @Override
+       public void setRuntimeContext(RuntimeContext t) {
+               super.setRuntimeContext(t);
+
+               FunctionUtils.setFunctionRuntimeContext(this.windowFunction, t);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
new file mode 100644
index 0000000..ac55bc6
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Base rich abstract class for functions that are evaluated over keyed 
(grouped) windows using a context
+ * for passing extra information.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
+ * @param <W> The type of {@code Window} that this window function can be 
applied on.
+ */
+@PublicEvolving
+public abstract class RichProcessWindowFunction<IN, OUT, KEY, W extends Window>
+               extends ProcessWindowFunction<IN, OUT, KEY, W>
+               implements RichFunction {
+
+       private static final long serialVersionUID = 1L;
+
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Runtime context access
+       // 
--------------------------------------------------------------------------------------------
+
+       private transient RuntimeContext runtimeContext;
+
+       @Override
+       public void setRuntimeContext(RuntimeContext t) {
+               this.runtimeContext = t;
+       }
+
+       @Override
+       public RuntimeContext getRuntimeContext() {
+               if (this.runtimeContext != null) {
+                       return this.runtimeContext;
+               } else {
+                       throw new IllegalStateException("The runtime context 
has not been initialized.");
+               }
+       }
+
+       @Override
+       public IterationRuntimeContext getIterationRuntimeContext() {
+               if (this.runtimeContext == null) {
+                       throw new IllegalStateException("The runtime context 
has not been initialized.");
+               } else if (this.runtimeContext instanceof 
IterationRuntimeContext) {
+                       return (IterationRuntimeContext) this.runtimeContext;
+               } else {
+                       throw new IllegalStateException("This stub is not part 
of an iteration step function.");
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Default life cycle methods
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void open(Configuration parameters) throws Exception {}
+
+       @Override
+       public void close() throws Exception {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index a252ece..87c5aca 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -20,8 +20,8 @@ package 
org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.functions.KeySelector;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.util.UnionIterator;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
@@ -36,7 +36,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
 
        private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = 
getListFactory();
 
-       private final WindowFunction<Type, Result, Key, Window> function;
+       private final InternalWindowFunction<Iterable<Type>, Result, Key, 
Window> function;
 
        /**
         * IMPORTANT: This value needs to start at one, so it is fresher than 
the value that new entries have (zero) */
@@ -44,7 +44,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
 
        // 
------------------------------------------------------------------------
        
-       public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, 
WindowFunction<Type, Result, Key, Window> function) {
+       public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, 
InternalWindowFunction<Iterable<Type>, Result, Key, Window> function) {
                this.keySelector = keySelector;
                this.function = function;
        }
@@ -59,7 +59,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
        }
 
        @Override
-       public void evaluateWindow(Collector<Result> out, TimeWindow window, 
+       public void evaluateWindow(Collector<Result> out, final TimeWindow 
window,
                                                                
AbstractStreamOperator<Result> operator) throws Exception
        {
                if (previousPanes.isEmpty()) {
@@ -86,7 +86,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
        
        static final class WindowFunctionTraversal<Key, Type, Result> 
implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
 
-               private final WindowFunction<Type, Result, Key, Window> 
function;
+               private final InternalWindowFunction<Iterable<Type>, Result, 
Key, Window> function;
                
                private final UnionIterator<Type> unionIterator;
                
@@ -99,7 +99,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
                private Key currentKey;
                
 
-               WindowFunctionTraversal(WindowFunction<Type, Result, Key, 
Window> function, TimeWindow window,
+               WindowFunctionTraversal(InternalWindowFunction<Iterable<Type>, 
Result, Key, Window> function, TimeWindow window,
                                                                
Collector<Result> out, AbstractStreamOperator<Result> contextOperator) {
                        this.function = function;
                        this.out = out;

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 7adaf13..094b34d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -23,22 +23,22 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.state.ArrayListSerializer;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 
 import java.util.ArrayList;
 
 @Internal
 @Deprecated
 public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
-               extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, 
OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> {
+               extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, 
OUT, ArrayList<IN>, InternalWindowFunction<Iterable<IN>, OUT, KEY, TimeWindow>> 
{
 
        private static final long serialVersionUID = 7305948082830843475L;
 
-
+       
        public AccumulatingProcessingTimeWindowOperator(
-                       WindowFunction<IN, OUT, KEY, TimeWindow> function,
+                       InternalWindowFunction<Iterable<IN>, OUT, KEY, 
TimeWindow> function,
                        KeySelector<IN, KEY> keySelector,
                        TypeSerializer<KEY> keySerializer,
                        TypeSerializer<IN> valueSerializer,
@@ -46,14 +46,14 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, 
IN, OUT>
                        long windowSlide)
        {
                super(function, keySelector, keySerializer,
-                               new ArrayListSerializer<IN>(valueSerializer), 
windowLength, windowSlide);
+                               new ArrayListSerializer<>(valueSerializer), 
windowLength, windowSlide);
        }
 
        @Override
        protected AccumulatingKeyedTimePanes<IN, KEY, OUT> 
createPanes(KeySelector<IN, KEY> keySelector, Function function) {
                @SuppressWarnings("unchecked")
-               WindowFunction<IN, OUT, KEY, Window> windowFunction = 
(WindowFunction<IN, OUT, KEY, Window>) function;
-
+               InternalWindowFunction<Iterable<IN>, OUT, KEY, Window> 
windowFunction = (InternalWindowFunction<Iterable<IN>, OUT, KEY, Window>) 
function;
+               
                return new AccumulatingKeyedTimePanes<>(keySelector, 
windowFunction);
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
new file mode 100644
index 0000000..de516a5
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ * Internal window function for wrapping a {@link ProcessWindowFunction} that 
takes an {@code Iterable}
+ * when the window state also is an {@code Iterable}.
+ */
+public final class InternalIterableProcessWindowFunction<IN, OUT, KEY, W 
extends Window>
+               extends WrappingFunction<ProcessWindowFunction<IN, OUT, KEY, W>>
+               implements InternalWindowFunction<Iterable<IN>, OUT, KEY, W> {
+
+       private static final long serialVersionUID = 1L;
+
+       public InternalIterableProcessWindowFunction(ProcessWindowFunction<IN, 
OUT, KEY, W> wrappedFunction) {
+               super(wrappedFunction);
+       }
+
+       @Override
+       public void apply(KEY key, final W window, Iterable<IN> input, 
Collector<OUT> out) throws Exception {
+               ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = 
this.wrappedFunction;
+               ProcessWindowFunction<IN, OUT, KEY, W>.Context context = 
wrappedFunction.new Context() {
+                       @Override
+                       public W window() {
+                               return window;
+                       }
+               };
+               
+               wrappedFunction.process(key, context, input, out);
+       }
+
+       @Override
+       public RuntimeContext getRuntimeContext() {
+               throw new RuntimeException("This should never be called.");
+       }
+
+       @Override
+       public IterationRuntimeContext getIterationRuntimeContext() {
+               throw new RuntimeException("This should never be called.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
new file mode 100644
index 0000000..b28c208
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+/**
+ * Internal window function for wrapping a {@link WindowFunction} that takes 
an {@code Iterable}
+ * when the window state is a single value.
+ */
+public final class InternalSingleValueProcessWindowFunction<IN, OUT, KEY, W 
extends Window>
+               extends WrappingFunction<ProcessWindowFunction<IN, OUT, KEY, W>>
+               implements InternalWindowFunction<IN, OUT, KEY, W> {
+
+       private static final long serialVersionUID = 1L;
+
+       public 
InternalSingleValueProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> 
wrappedFunction) {
+               super(wrappedFunction);
+       }
+
+       @Override
+       public void apply(KEY key, final W window, IN input, Collector<OUT> 
out) throws Exception {
+               ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = 
this.wrappedFunction;
+               ProcessWindowFunction<IN, OUT, KEY, W>.Context context = 
wrappedFunction.new Context() {
+                       @Override
+                       public W window() {
+                               return window;
+                       }
+               };
+
+               wrappedFunction.process(key, context, 
Collections.singletonList(input), out);
+       }
+
+       @Override
+       public RuntimeContext getRuntimeContext() {
+               throw new RuntimeException("This should never be called.");
+       }
+
+       @Override
+       public IterationRuntimeContext getIterationRuntimeContext() {
+               throw new RuntimeException("This should never be called.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
new file mode 100644
index 0000000..af5c77a
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import 
org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FoldApplyProcessWindowFunctionTest {
+
+       /**
+        * Tests that the FoldWindowFunction gets the output type serializer 
set by the
+        * StreamGraphGenerator and checks that the FoldWindowFunction computes 
the correct result.
+        */
+       @Test
+       public void testFoldWindowFunctionOutputTypeConfigurable() throws 
Exception{
+               StreamExecutionEnvironment env = new 
DummyStreamExecutionEnvironment();
+
+               List<StreamTransformation<?>> transformations = new 
ArrayList<>();
+
+               int initValue = 1;
+
+               FoldApplyProcessWindowFunction<Integer, TimeWindow, Integer, 
Integer, Integer> foldWindowFunction = new FoldApplyProcessWindowFunction<>(
+                       initValue,
+                       new FoldFunction<Integer, Integer>() {
+                               @Override
+                               public Integer fold(Integer accumulator, 
Integer value) throws Exception {
+                                       return accumulator + value;
+                               }
+
+                       },
+                       new ProcessWindowFunction<Integer, Integer, Integer, 
TimeWindow>() {
+                               @Override
+                               public void process(Integer integer,
+                                                                       Context 
context,
+                                                                       
Iterable<Integer> input,
+                                                                       
Collector<Integer> out) throws Exception {
+                                       for (Integer in: input) {
+                                               out.collect(in);
+                                       }
+                               }
+                       },
+                       BasicTypeInfo.INT_TYPE_INFO
+               );
+
+               AccumulatingProcessingTimeWindowOperator<Integer, Integer, 
Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>(
+                       new 
InternalIterableProcessWindowFunction<>(foldWindowFunction),
+                       new KeySelector<Integer, Integer>() {
+                               private static final long serialVersionUID = 
-7951310554369722809L;
+
+                               @Override
+                               public Integer getKey(Integer value) throws 
Exception {
+                                       return value;
+                               }
+                       },
+                       IntSerializer.INSTANCE,
+                       IntSerializer.INSTANCE,
+                       3000,
+                       3000
+               );
+
+               SourceFunction<Integer> sourceFunction = new 
SourceFunction<Integer>(){
+
+                       private static final long serialVersionUID = 
8297735565464653028L;
+
+                       @Override
+                       public void run(SourceContext<Integer> ctx) throws 
Exception {
+
+                       }
+
+                       @Override
+                       public void cancel() {
+
+                       }
+               };
+
+               SourceTransformation<Integer> source = new 
SourceTransformation<>("", new StreamSource<>(sourceFunction), 
BasicTypeInfo.INT_TYPE_INFO, 1);
+
+               transformations.add(new OneInputTransformation<>(source, 
"test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
+
+               StreamGraph streamGraph = StreamGraphGenerator.generate(env, 
transformations);
+
+               List<Integer> result = new ArrayList<>();
+               List<Integer> input = new ArrayList<>();
+               List<Integer> expected = new ArrayList<>();
+
+               input.add(1);
+               input.add(2);
+               input.add(3);
+
+               for (int value : input) {
+                       initValue += value;
+               }
+
+               expected.add(initValue);
+
+               foldWindowFunction.process(0, foldWindowFunction.new Context() {
+                       @Override
+                       public TimeWindow window() {
+                               return new TimeWindow(0, 1);
+                       }
+               }, input, new ListCollector<>(result));
+
+               Assert.assertEquals(expected, result);
+       }
+
+       public static class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment {
+
+               @Override
+               public JobExecutionResult execute(String jobName) throws 
Exception {
+                       return null;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
index 91ec427..fecd440 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.streaming.api.transformations.SourceTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import 
org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 import org.junit.Assert;
@@ -81,19 +82,20 @@ public class FoldApplyWindowFunctionTest {
                );
 
                AccumulatingProcessingTimeWindowOperator<Integer, Integer, 
Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>(
-                       foldWindowFunction,
-                       new KeySelector<Integer, Integer>() {
-                               private static final long serialVersionUID = 
-7951310554369722809L;
-
-                               @Override
-                               public Integer getKey(Integer value) throws 
Exception {
-                                       return value;
-                               }
-                       },
-                       IntSerializer.INSTANCE,
-                       IntSerializer.INSTANCE,
-                       3000,
-                       3000
+                       new InternalIterableWindowFunction<>(
+                                       foldWindowFunction),
+                               new KeySelector<Integer, Integer>() {
+                                       private static final long 
serialVersionUID = -7951310554369722809L;
+
+                                       @Override
+                                       public Integer getKey(Integer value) 
throws Exception {
+                                               return value;
+                                       }
+                               },
+                               IntSerializer.INSTANCE,
+                               IntSerializer.INSTANCE,
+                               3000,
+                               3000
                );
 
                SourceFunction<Integer> sourceFunction = new 
SourceFunction<Integer>(){

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index f3c3423..3c73035 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -25,12 +25,15 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils;
 import 
org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.util.Collector;
 import org.hamcrest.collection.IsIterableContainingInOrder;
@@ -115,7 +118,48 @@ public class InternalWindowFunctionTest {
                Collector<String> c = (Collector<String>) mock(Collector.class);
 
                windowFunction.apply(42L, w, i, c);
-               verify(mock).apply(42L, w, i, c);
+               verify(mock).apply(eq(42L), eq(w), eq(i), eq(c));
+
+               // check close
+               windowFunction.close();
+               verify(mock).close();
+       }
+
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testInternalIterableProcessWindowFunction() throws 
Exception {
+
+               ProcessWindowFunctionMock mock = 
mock(ProcessWindowFunctionMock.class);
+               InternalIterableProcessWindowFunction<Long, String, Long, 
TimeWindow> windowFunction =
+                               new 
InternalIterableProcessWindowFunction<>(mock);
+
+               // check setOutputType
+               TypeInformation<String> stringType = 
BasicTypeInfo.STRING_TYPE_INFO;
+               ExecutionConfig execConf = new ExecutionConfig();
+               execConf.setParallelism(42);
+
+               StreamingFunctionUtils.setOutputType(windowFunction, 
stringType, execConf);
+               verify(mock).setOutputType(stringType, execConf);
+
+               // check open
+               Configuration config = new Configuration();
+
+               windowFunction.open(config);
+               verify(mock).open(config);
+
+               // check setRuntimeContext
+               RuntimeContext rCtx = mock(RuntimeContext.class);
+
+               windowFunction.setRuntimeContext(rCtx);
+               verify(mock).setRuntimeContext(rCtx);
+
+               // check apply
+               TimeWindow w = mock(TimeWindow.class);
+               Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
+               Collector<String> c = (Collector<String>) mock(Collector.class);
+
+               windowFunction.apply(42L, w, i, c);
+               verify(mock).process(eq(42L), 
(ProcessWindowFunctionMock.Context) anyObject(), eq(i), eq(c));
 
                // check close
                windowFunction.close();
@@ -204,6 +248,59 @@ public class InternalWindowFunctionTest {
                verify(mock).close();
        }
 
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testInternalSingleValueProcessWindowFunction() throws 
Exception {
+
+               ProcessWindowFunctionMock mock = 
mock(ProcessWindowFunctionMock.class);
+               InternalSingleValueProcessWindowFunction<Long, String, Long, 
TimeWindow> windowFunction =
+                               new 
InternalSingleValueProcessWindowFunction<>(mock);
+
+               // check setOutputType
+               TypeInformation<String> stringType = 
BasicTypeInfo.STRING_TYPE_INFO;
+               ExecutionConfig execConf = new ExecutionConfig();
+               execConf.setParallelism(42);
+
+               StreamingFunctionUtils.setOutputType(windowFunction, 
stringType, execConf);
+               verify(mock).setOutputType(stringType, execConf);
+
+               // check open
+               Configuration config = new Configuration();
+
+               windowFunction.open(config);
+               verify(mock).open(config);
+
+               // check setRuntimeContext
+               RuntimeContext rCtx = mock(RuntimeContext.class);
+
+               windowFunction.setRuntimeContext(rCtx);
+               verify(mock).setRuntimeContext(rCtx);
+
+               // check apply
+               TimeWindow w = mock(TimeWindow.class);
+               Collector<String> c = (Collector<String>) mock(Collector.class);
+
+               windowFunction.apply(42L, w, 23L, c);
+               verify(mock).process(eq(42L), 
(ProcessWindowFunctionMock.Context) anyObject(), 
(Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
+
+               // check close
+               windowFunction.close();
+               verify(mock).close();
+       }
+
+       public static class ProcessWindowFunctionMock
+               extends RichProcessWindowFunction<Long, String, Long, 
TimeWindow>
+               implements OutputTypeConfigurable<String> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void setOutputType(TypeInformation<String> outTypeInfo, 
ExecutionConfig executionConfig) { }
+
+               @Override
+               public void process(Long aLong, Context context, Iterable<Long> 
input, Collector<String> out) throws Exception { }
+       }
+
        public static class WindowFunctionMock
                extends RichWindowFunction<Long, String, Long, TimeWindow>
                implements OutputTypeConfigurable<String> {
@@ -214,7 +311,7 @@ public class InternalWindowFunctionTest {
                public void setOutputType(TypeInformation<String> outTypeInfo, 
ExecutionConfig executionConfig) { }
 
                @Override
-               public void apply(Long aLong, TimeWindow window, Iterable<Long> 
input, Collector<String> out) throws Exception { }
+               public void apply(Long aLong, TimeWindow w, Iterable<Long> 
input, Collector<String> out) throws Exception { }
        }
 
        public static class AllWindowFunctionMock

http://git-wip-us.apache.org/repos/asf/flink/blob/1dcb2dcd/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 255a20f..508d2e1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -36,8 +36,12 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -47,6 +51,9 @@ import org.apache.flink.util.Collector;
 
 import org.junit.After;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -64,10 +71,12 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @SuppressWarnings({"serial"})
+@PrepareForTest(InternalIterableWindowFunction.class)
+@RunWith(PowerMockRunner.class)
 public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
        @SuppressWarnings("unchecked")
-       private final WindowFunction<String, String, String, TimeWindow> 
mockFunction = mock(WindowFunction.class);
+       private final InternalIterableWindowFunction<String, String, String, 
TimeWindow> mockFunction = mock(InternalIterableWindowFunction.class);
 
        @SuppressWarnings("unchecked")
        private final KeySelector<String, String> mockKeySelector = 
mock(KeySelector.class);
@@ -79,26 +88,34 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                }
        };
        
-       private final WindowFunction<Integer, Integer, Integer, TimeWindow> 
validatingIdentityFunction =
-                       new WindowFunction<Integer, Integer, Integer, 
TimeWindow>()
-       {
-               @Override
-               public void apply(Integer key,
-                               TimeWindow window,
-                               Iterable<Integer> values,
-                               Collector<Integer> out) {
-                       for (Integer val : values) {
-                               assertEquals(key, val);
-                               out.collect(val);
-                       }
-               }
-       };
+       private final InternalIterableWindowFunction<Integer, Integer, Integer, 
TimeWindow> validatingIdentityFunction =
+                       new InternalIterableWindowFunction<>(new 
WindowFunction<Integer, Integer, Integer, TimeWindow>() {
+                               @Override
+                               public void apply(Integer key, TimeWindow 
window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
+                                       for (Integer val : values) {
+                                               assertEquals(key, val);
+                                               out.collect(val);
+                                       }
+                               }
+                       });
+
+       private final InternalIterableProcessWindowFunction<Integer, Integer, 
Integer, TimeWindow> validatingIdentityProcessFunction =
+                       new InternalIterableProcessWindowFunction<>(new 
ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
+                               @Override
+                               public void process(Integer key, Context 
context, Iterable<Integer> values, Collector<Integer> out) throws Exception {
+                                       for (Integer val : values) {
+                                               assertEquals(key, val);
+                                               out.collect(val);
+                                       }
+                               }
+                       });
 
        // 
------------------------------------------------------------------------
 
        public AccumulatingAlignedProcessingTimeWindowOperatorTest() {
                ClosureCleaner.clean(identitySelector, false);
                ClosureCleaner.clean(validatingIdentityFunction, false);
+               ClosureCleaner.clean(validatingIdentityProcessFunction, false);
        }
        
        // 
------------------------------------------------------------------------
@@ -281,6 +298,50 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
        }
 
        @Test
+       public void testTumblingWindowWithProcessFunction() throws Exception {
+               try {
+                       final int windowSize = 50;
+
+                       // tumbling window that triggers every 20 milliseconds
+                       AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
+                               new AccumulatingProcessingTimeWindowOperator<>(
+                                       validatingIdentityProcessFunction, 
identitySelector,
+                                       IntSerializer.INSTANCE, 
IntSerializer.INSTANCE,
+                                       windowSize, windowSize);
+
+                       KeyedOneInputStreamOperatorTestHarness<Integer, 
Integer, Integer> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, 
BasicTypeInfo.INT_TYPE_INFO);
+
+                       testHarness.open();
+
+                       final int numElements = 1000;
+
+                       long currentTime = 0;
+
+                       for (int i = 0; i < numElements; i++) {
+                               testHarness.processElement(new 
StreamRecord<>(i));
+                               currentTime = currentTime + 10;
+                               testHarness.setProcessingTime(currentTime);
+                       }
+
+
+                       List<Integer> result = 
extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+                       assertEquals(numElements, result.size());
+
+                       Collections.sort(result);
+                       for (int i = 0; i < numElements; i++) {
+                               assertEquals(i, result.get(i).intValue());
+                       }
+
+                       testHarness.close();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
        public void testSlidingWindow() throws Exception {
 
                // tumbling window that triggers every 20 milliseconds
@@ -333,6 +394,58 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
        }
 
        @Test
+       public void testSlidingWindowWithProcessFunction() throws Exception {
+
+               // tumbling window that triggers every 20 milliseconds
+               AccumulatingProcessingTimeWindowOperator<Integer, Integer, 
Integer> op =
+                       new AccumulatingProcessingTimeWindowOperator<>(
+                               validatingIdentityProcessFunction, 
identitySelector,
+                               IntSerializer.INSTANCE, IntSerializer.INSTANCE, 
150, 50);
+
+               KeyedOneInputStreamOperatorTestHarness<Integer, Integer, 
Integer> testHarness =
+                       new KeyedOneInputStreamOperatorTestHarness<>(op, 
identitySelector, BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.open();
+
+               final int numElements = 1000;
+
+               long currentTime = 0;
+
+               for (int i = 0; i < numElements; i++) {
+                       testHarness.processElement(new StreamRecord<>(i));
+                       currentTime = currentTime + 10;
+                       testHarness.setProcessingTime(currentTime);
+               }
+
+               // get and verify the result
+               List<Integer> result = 
extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+
+               // if we kept this running, each element would be in the result 
three times (for each slide).
+               // we are closing the window before the final panes are through 
three times, so we may have less
+               // elements.
+               if (result.size() < numElements || result.size() > 3 * 
numElements) {
+                       fail("Wrong number of results: " + result.size());
+               }
+
+               Collections.sort(result);
+               int lastNum = -1;
+               int lastCount = -1;
+
+               for (int num : result) {
+                       if (num == lastNum) {
+                               lastCount++;
+                               assertTrue(lastCount <= 3);
+                       }
+                       else {
+                               lastNum = num;
+                               lastCount = 1;
+                       }
+               }
+
+               testHarness.close();
+       }
+
+       @Test
        public void testTumblingWindowSingleElements() throws Exception {
 
                try {
@@ -379,7 +492,55 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        fail(e.getMessage());
                }
        }
-       
+
+       @Test
+       public void testTumblingWindowSingleElementsWithProcessFunction() 
throws Exception {
+
+               try {
+
+                       // tumbling window that triggers every 20 milliseconds
+                       AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
+                               new AccumulatingProcessingTimeWindowOperator<>(
+                                       validatingIdentityProcessFunction, 
identitySelector,
+                                       IntSerializer.INSTANCE, 
IntSerializer.INSTANCE, 50, 50);
+
+                       KeyedOneInputStreamOperatorTestHarness<Integer, 
Integer, Integer> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, 
BasicTypeInfo.INT_TYPE_INFO);
+
+                       testHarness.open();
+
+                       testHarness.setProcessingTime(0);
+
+                       testHarness.processElement(new StreamRecord<>(1));
+                       testHarness.processElement(new StreamRecord<>(2));
+
+                       testHarness.setProcessingTime(50);
+
+                       testHarness.processElement(new StreamRecord<>(3));
+                       testHarness.processElement(new StreamRecord<>(4));
+                       testHarness.processElement(new StreamRecord<>(5));
+
+                       testHarness.setProcessingTime(100);
+
+                       testHarness.processElement(new StreamRecord<>(6));
+
+                       testHarness.setProcessingTime(200);
+
+
+                       List<Integer> result = 
extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+                       assertEquals(6, result.size());
+
+                       Collections.sort(result);
+                       assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result);
+
+                       testHarness.close();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
        @Test
        public void testSlidingWindowSingleElements() throws Exception {
                try {
@@ -420,6 +581,126 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
        }
 
        @Test
+       public void testSlidingWindowSingleElementsWithProcessFunction() throws 
Exception {
+               try {
+
+                       // tumbling window that triggers every 20 milliseconds
+                       AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
+                               new AccumulatingProcessingTimeWindowOperator<>(
+                                       validatingIdentityProcessFunction, 
identitySelector,
+                                       IntSerializer.INSTANCE, 
IntSerializer.INSTANCE, 150, 50);
+
+                       KeyedOneInputStreamOperatorTestHarness<Integer, 
Integer, Integer> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, 
BasicTypeInfo.INT_TYPE_INFO);
+
+                       testHarness.setProcessingTime(0);
+
+                       testHarness.open();
+
+                       testHarness.processElement(new StreamRecord<>(1));
+                       testHarness.processElement(new StreamRecord<>(2));
+
+                       testHarness.setProcessingTime(50);
+                       testHarness.setProcessingTime(100);
+                       testHarness.setProcessingTime(150);
+
+                       List<Integer> result = 
extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+
+                       assertEquals(6, result.size());
+
+                       Collections.sort(result);
+                       assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
+
+                       testHarness.close();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void 
checkpointRestoreWithPendingWindowTumblingWithProcessFunction() {
+               try {
+                       final int windowSize = 200;
+
+                       // tumbling window that triggers every 200 milliseconds
+                       AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
+                               new AccumulatingProcessingTimeWindowOperator<>(
+                                       validatingIdentityProcessFunction, 
identitySelector,
+                                       IntSerializer.INSTANCE, 
IntSerializer.INSTANCE,
+                                       windowSize, windowSize);
+
+                       OneInputStreamOperatorTestHarness<Integer, Integer> 
testHarness =
+                               new OneInputStreamOperatorTestHarness<>(op);
+
+                       testHarness.setup();
+                       testHarness.open();
+
+                       testHarness.setProcessingTime(0);
+
+                       // inject some elements
+                       final int numElementsFirst = 700;
+                       final int numElements = 1000;
+                       for (int i = 0; i < numElementsFirst; i++) {
+                               testHarness.processElement(new 
StreamRecord<>(i));
+                       }
+
+                       // draw a snapshot and dispose the window
+                       int beforeSnapShot = testHarness.getOutput().size();
+                       StreamStateHandle state = 
testHarness.snapshotLegacy(1L, System.currentTimeMillis());
+                       List<Integer> resultAtSnapshot = 
extractFromStreamRecords(testHarness.getOutput());
+                       int afterSnapShot = testHarness.getOutput().size();
+                       assertEquals("operator performed computation during 
snapshot", beforeSnapShot, afterSnapShot);
+                       assertTrue(afterSnapShot <= numElementsFirst);
+
+                       // inject some random elements, which should not show 
up in the state
+                       for (int i = 0; i < 300; i++) {
+                               testHarness.processElement(new StreamRecord<>(i 
+ numElementsFirst));
+                       }
+
+                       testHarness.close();
+                       op.dispose();
+
+                       // re-create the operator and restore the state
+                       op = new AccumulatingProcessingTimeWindowOperator<>(
+                               validatingIdentityProcessFunction, 
identitySelector,
+                               IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+                               windowSize, windowSize);
+
+                       testHarness = new 
OneInputStreamOperatorTestHarness<>(op);
+
+                       testHarness.setup();
+                       testHarness.restore(state);
+                       testHarness.open();
+
+                       // inject some more elements
+                       for (int i = numElementsFirst; i < numElements; i++) {
+                               testHarness.processElement(new 
StreamRecord<>(i));
+                       }
+
+                       testHarness.setProcessingTime(400);
+
+                       // get and verify the result
+                       List<Integer> finalResult = new ArrayList<>();
+                       finalResult.addAll(resultAtSnapshot);
+                       List<Integer> finalPartialResult = 
extractFromStreamRecords(testHarness.getOutput());
+                       finalResult.addAll(finalPartialResult);
+                       assertEquals(numElements, finalResult.size());
+
+                       Collections.sort(finalResult);
+                       for (int i = 0; i < numElements; i++) {
+                               assertEquals(i, finalResult.get(i).intValue());
+                       }
+                       testHarness.close();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
        public void checkpointRestoreWithPendingWindowTumbling() {
                try {
                        final int windowSize = 200;
@@ -501,6 +782,98 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
        }
 
        @Test
+       public void 
checkpointRestoreWithPendingWindowSlidingWithProcessFunction() {
+               try {
+                       final int factor = 4;
+                       final int windowSlide = 50;
+                       final int windowSize = factor * windowSlide;
+
+                       // sliding window (200 msecs) every 50 msecs
+                       AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
+                               new AccumulatingProcessingTimeWindowOperator<>(
+                                       validatingIdentityProcessFunction, 
identitySelector,
+                                       IntSerializer.INSTANCE, 
IntSerializer.INSTANCE,
+                                       windowSize, windowSlide);
+
+                       OneInputStreamOperatorTestHarness<Integer, Integer> 
testHarness =
+                               new OneInputStreamOperatorTestHarness<>(op);
+
+                       testHarness.setProcessingTime(0);
+
+                       testHarness.setup();
+                       testHarness.open();
+
+                       // inject some elements
+                       final int numElements = 1000;
+                       final int numElementsFirst = 700;
+
+                       for (int i = 0; i < numElementsFirst; i++) {
+                               testHarness.processElement(new 
StreamRecord<>(i));
+                       }
+
+                       // draw a snapshot
+                       List<Integer> resultAtSnapshot = 
extractFromStreamRecords(testHarness.getOutput());
+                       int beforeSnapShot = testHarness.getOutput().size();
+                       StreamStateHandle state = 
testHarness.snapshotLegacy(1L, System.currentTimeMillis());
+                       int afterSnapShot = testHarness.getOutput().size();
+                       assertEquals("operator performed computation during 
snapshot", beforeSnapShot, afterSnapShot);
+
+                       assertTrue(resultAtSnapshot.size() <= factor * 
numElementsFirst);
+
+                       // inject the remaining elements - these should not 
influence the snapshot
+                       for (int i = numElementsFirst; i < numElements; i++) {
+                               testHarness.processElement(new 
StreamRecord<>(i));
+                       }
+
+                       testHarness.close();
+
+                       // re-create the operator and restore the state
+                       op = new AccumulatingProcessingTimeWindowOperator<>(
+                               validatingIdentityProcessFunction, 
identitySelector,
+                               IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+                               windowSize, windowSlide);
+
+                       testHarness = new 
OneInputStreamOperatorTestHarness<>(op);
+
+                       testHarness.setup();
+                       testHarness.restore(state);
+                       testHarness.open();
+
+
+                       // inject again the remaining elements
+                       for (int i = numElementsFirst; i < numElements; i++) {
+                               testHarness.processElement(new 
StreamRecord<>(i));
+                       }
+
+                       testHarness.setProcessingTime(50);
+                       testHarness.setProcessingTime(100);
+                       testHarness.setProcessingTime(150);
+                       testHarness.setProcessingTime(200);
+                       testHarness.setProcessingTime(250);
+                       testHarness.setProcessingTime(300);
+                       testHarness.setProcessingTime(350);
+
+                       // get and verify the result
+                       List<Integer> finalResult = new 
ArrayList<>(resultAtSnapshot);
+                       List<Integer> finalPartialResult = 
extractFromStreamRecords(testHarness.getOutput());
+                       finalResult.addAll(finalPartialResult);
+                       assertEquals(factor * numElements, finalResult.size());
+
+                       Collections.sort(finalResult);
+                       for (int i = 0; i < factor * numElements; i++) {
+                               assertEquals(i / factor, 
finalResult.get(i).intValue());
+                       }
+
+                       testHarness.close();
+                       op.dispose();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
        public void checkpointRestoreWithPendingWindowSliding() {
                try {
                        final int factor = 4;
@@ -601,8 +974,12 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        // tumbling window that triggers every 20 milliseconds
                        AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
                                        new 
AccumulatingProcessingTimeWindowOperator<>(
-                                                       new StatefulFunction(), 
identitySelector,
-                                                       IntSerializer.INSTANCE, 
IntSerializer.INSTANCE, 50, 50);
+                                                       new 
InternalIterableProcessWindowFunction<>(new StatefulFunction()),
+                                                       identitySelector,
+                                                       IntSerializer.INSTANCE,
+                                                       IntSerializer.INSTANCE,
+                                                       50,
+                                                       50);
 
                        OneInputStreamOperatorTestHarness<Integer, Integer> 
testHarness =
                                        new 
KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, 
BasicTypeInfo.INT_TYPE_INFO);
@@ -661,7 +1038,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
        // 
------------------------------------------------------------------------
 
-       private static class StatefulFunction extends 
RichWindowFunction<Integer, Integer, Integer, TimeWindow> {
+       private static class StatefulFunction extends 
RichProcessWindowFunction<Integer, Integer, Integer, TimeWindow> {
 
                // we use a concurrent map here even though there is no 
concurrency, to
                // get "volatile" style access to entries
@@ -677,8 +1054,8 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                }
 
                @Override
-               public void apply(Integer key,
-                                                 TimeWindow window,
+               public void process(Integer key,
+                                                 Context context,
                                                  Iterable<Integer> values,
                                                  Collector<Integer> out) 
throws Exception {
                        for (Integer i : values) {

Reply via email to