[FLINK-5157] [streaming] Introduce ProcessAllWindowFunction

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/788b8392
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/788b8392
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/788b8392

Branch: refs/heads/master
Commit: 788b839213811c6f2407ac6d54fef28dfa3d29a6
Parents: 87b9077
Author: Ventura Del Monte <[email protected]>
Authored: Wed Feb 22 14:55:17 2017 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Tue Feb 28 14:02:56 2017 +0100

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       | 398 ++++++++++++++++-
 .../FoldApplyProcessAllWindowFunction.java      | 120 +++++
 .../windowing/ProcessAllWindowFunction.java     |  59 +++
 .../ReduceApplyProcessAllWindowFunction.java    |  80 ++++
 .../windowing/RichProcessAllWindowFunction.java |  84 ++++
 ...ternalAggregateProcessAllWindowFunction.java |  83 ++++
 ...nternalIterableProcessAllWindowFunction.java |  63 +++
 ...rnalSingleValueProcessAllWindowFunction.java |  65 +++
 ...nternalSingleValueProcessWindowFunction.java |   3 +-
 .../FoldApplyProcessWindowFunctionTest.java     |  99 +++++
 .../operators/StateDescriptorPassingTest.java   |  19 +
 .../functions/InternalWindowFunctionTest.java   | 193 +++++++-
 .../windowing/AllWindowTranslationTest.java     | 445 +++++++++++++++++++
 .../streaming/api/scala/AllWindowedStream.scala | 186 +++++++-
 .../function/ProcessAllWindowFunction.scala     |  59 +++
 .../function/RichProcessAllWindowFunction.scala |  86 ++++
 .../ScalaProcessWindowFunctionWrapper.scala     |  85 +++-
 .../api/scala/AllWindowTranslationTest.scala    | 410 ++++++++++++++++-
 .../streaming/api/scala/WindowFoldITCase.scala  |  60 ++-
 .../api/scala/WindowFunctionITCase.scala        |  51 ++-
 .../api/scala/WindowReduceITCase.scala          |  59 ++-
 ...ngIdentityRichProcessAllWindowFunction.scala |  81 ++++
 .../streaming/runtime/WindowFoldITCase.java     |  73 +++
 23 files changed, 2830 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 742a2ed..a45cb0a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -40,9 +40,12 @@ import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
 import 
org.apache.flink.streaming.api.functions.windowing.AggregateApplyAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessAllWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.FoldApplyAllWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.ReduceApplyAllWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ReduceApplyProcessAllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -52,8 +55,12 @@ import 
org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import 
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessAllWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessAllWindowFunction;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -287,6 +294,102 @@ public class AllWindowedStream<T, W extends Window> {
                return input.transform(opName, resultType, 
operator).forceNonParallel();
        }
 
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>
+        * Arriving data is incrementally aggregated using the given reducer.
+        *
+        * @param reduceFunction The reduce function that is used for 
incremental aggregation.
+        * @param function The process window function.
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       @PublicEvolving
+       public <R> SingleOutputStreamOperator<R> reduce(
+                       ReduceFunction<T> reduceFunction,
+                       ProcessAllWindowFunction<T, R, W> function) {
+
+               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
+                       function, ProcessAllWindowFunction.class, true, true, 
input.getType(), null, false);
+
+               return reduce(reduceFunction, function, resultType);
+       }
+
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>
+        * Arriving data is incrementally aggregated using the given reducer.
+        *
+        * @param reduceFunction The reduce function that is used for 
incremental aggregation.
+        * @param function The process window function.
+        * @param resultType Type information for the result type of the window 
function
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       @PublicEvolving
+       public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> 
reduceFunction, ProcessAllWindowFunction<T, R, W> function, TypeInformation<R> 
resultType) {
+               if (reduceFunction instanceof RichFunction) {
+                       throw new UnsupportedOperationException("ReduceFunction 
of reduce can not be a RichFunction.");
+               }
+
+               //clean the closures
+               function = input.getExecutionEnvironment().clean(function);
+               reduceFunction = 
input.getExecutionEnvironment().clean(reduceFunction);
+
+               String callLocation = Utils.getCallLocationName();
+               String udfName = "AllWindowedStream." + callLocation;
+
+               String opName;
+               KeySelector<T, Byte> keySel = input.getKeySelector();
+
+               OneInputStreamOperator<T, R> operator;
+
+               if (evictor != null) {
+                       @SuppressWarnings({"unchecked", "rawtypes"})
+                       TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+                               (TypeSerializer<StreamRecord<T>>) new 
StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+                       ListStateDescriptor<StreamRecord<T>> stateDesc =
+                               new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
+
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+                       operator =
+                               new EvictingWindowOperator<>(windowAssigner,
+                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                                       keySel,
+                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                       stateDesc,
+                                       new 
InternalIterableProcessAllWindowFunction<>(new 
ReduceApplyProcessAllWindowFunction<>(reduceFunction, function)),
+                                       trigger,
+                                       evictor,
+                                       allowedLateness);
+
+               } else {
+                       ReducingStateDescriptor<T> stateDesc = new 
ReducingStateDescriptor<>("window-contents",
+                               reduceFunction,
+                               
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + udfName + ")";
+
+                       operator =
+                               new WindowOperator<>(windowAssigner,
+                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                                       keySel,
+                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                       stateDesc,
+                                       new 
InternalSingleValueProcessAllWindowFunction<>(function),
+                                       trigger,
+                                       allowedLateness);
+               }
+
+               return input.transform(opName, resultType, 
operator).forceNonParallel();
+       }
+
        // 
------------------------------------------------------------------------
        //  AggregateFunction
        // 
------------------------------------------------------------------------
@@ -483,6 +586,137 @@ public class AllWindowedStream<T, W extends Window> {
                return input.transform(opName, resultType, 
operator).forceNonParallel();
        }
 
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>Arriving data is incrementally aggregated using the given 
aggregate function. This means
+        * that the window function typically has only a single value to 
process when called.
+        *
+        * @param aggFunction The aggregate function that is used for 
incremental aggregation.
+        * @param windowFunction The process window function.
+        *
+        * @return The data stream that is the result of applying the window 
function to the window.
+        *
+        * @param <ACC> The type of the AggregateFunction's accumulator
+        * @param <V> The type of AggregateFunction's result, and the 
WindowFunction's input
+        * @param <R> The type of the elements in the resulting stream, equal 
to the
+        *            WindowFunction's result type
+        */
+       @PublicEvolving
+       public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
+                       AggregateFunction<T, ACC, V> aggFunction,
+                       ProcessAllWindowFunction<V, R, W> windowFunction) {
+
+               checkNotNull(aggFunction, "aggFunction");
+               checkNotNull(windowFunction, "windowFunction");
+
+               TypeInformation<ACC> accumulatorType = 
TypeExtractor.getAggregateFunctionAccumulatorType(
+                               aggFunction, input.getType(), null, false);
+
+               TypeInformation<V> aggResultType = 
TypeExtractor.getAggregateFunctionReturnType(
+                               aggFunction, input.getType(), null, false);
+
+               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
+                               windowFunction, ProcessAllWindowFunction.class, 
true, true, aggResultType, null, false);
+
+               return aggregate(aggFunction, windowFunction, accumulatorType, 
aggResultType, resultType);
+       }
+
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>Arriving data is incrementally aggregated using the given 
aggregate function. This means
+        * that the window function typically has only a single value to 
process when called.
+        *
+        * @param aggregateFunction The aggregation function that is used for 
incremental aggregation.
+        * @param windowFunction The process window function.
+        * @param accumulatorType Type information for the internal accumulator 
type of the aggregation function
+        * @param resultType Type information for the result type of the window 
function
+        *
+        * @return The data stream that is the result of applying the window 
function to the window.
+        *
+        * @param <ACC> The type of the AggregateFunction's accumulator
+        * @param <V> The type of AggregateFunction's result, and the 
WindowFunction's input
+        * @param <R> The type of the elements in the resulting stream, equal 
to the
+        *            WindowFunction's result type
+        */
+       @PublicEvolving
+       public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
+                       AggregateFunction<T, ACC, V> aggregateFunction,
+                       ProcessAllWindowFunction<V, R, W> windowFunction,
+                       TypeInformation<ACC> accumulatorType,
+                       TypeInformation<V> aggregateResultType,
+                       TypeInformation<R> resultType) {
+
+               checkNotNull(aggregateFunction, "aggregateFunction");
+               checkNotNull(windowFunction, "windowFunction");
+               checkNotNull(accumulatorType, "accumulatorType");
+               checkNotNull(aggregateResultType, "aggregateResultType");
+               checkNotNull(resultType, "resultType");
+
+               if (aggregateFunction instanceof RichFunction) {
+                       throw new UnsupportedOperationException("This aggregate 
function cannot be a RichFunction.");
+               }
+
+               //clean the closures
+               windowFunction = 
input.getExecutionEnvironment().clean(windowFunction);
+               aggregateFunction = 
input.getExecutionEnvironment().clean(aggregateFunction);
+
+               final String callLocation = Utils.getCallLocationName();
+               final String udfName = "AllWindowedStream." + callLocation;
+
+               final String opName;
+               final KeySelector<T, Byte> keySel = input.getKeySelector();
+
+               OneInputStreamOperator<T, R> operator;
+
+               if (evictor != null) {
+                       @SuppressWarnings({"unchecked", "rawtypes"})
+                       TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+                                       (TypeSerializer<StreamRecord<T>>) new 
StreamElementSerializer(
+                                                       
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+                       ListStateDescriptor<StreamRecord<T>> stateDesc =
+                                       new 
ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+                       operator = new EvictingWindowOperator<>(windowAssigner,
+                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                                       keySel,
+                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                       stateDesc,
+                                       new 
InternalAggregateProcessAllWindowFunction<>(aggregateFunction, windowFunction),
+                                       trigger,
+                                       evictor,
+                                       allowedLateness);
+
+               } else {
+                       AggregatingStateDescriptor<T, ACC, V> stateDesc = new 
AggregatingStateDescriptor<>(
+                                       "window-contents",
+                                       aggregateFunction,
+                                       
accumulatorType.createSerializer(getExecutionEnvironment().getConfig()));
+
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + udfName + ")";
+
+                       operator = new WindowOperator<>(
+                                       windowAssigner,
+                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                                       keySel,
+                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                       stateDesc,
+                                       new 
InternalSingleValueProcessAllWindowFunction<>(windowFunction),
+                                       trigger,
+                                       allowedLateness);
+               }
+
+               return input.transform(opName, resultType, 
operator).forceNonParallel();
+       }
+
        // 
------------------------------------------------------------------------
        //  FoldFunction
        // 
------------------------------------------------------------------------
@@ -630,13 +864,119 @@ public class AllWindowedStream<T, W extends Window> {
                return input.transform(opName, resultType, 
operator).forceNonParallel();
        }
 
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>
+        * Arriving data is incrementally aggregated using the given fold 
function.
+        *
+        * @param initialValue The initial value of the fold.
+        * @param foldFunction The fold function that is used for incremental 
aggregation.
+        * @param function The window function.
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       @PublicEvolving
+       public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, 
FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> 
function) {
+
+               TypeInformation<ACC> foldAccumulatorType = 
TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
+                       Utils.getCallLocationName(), true);
+
+               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
+                       function, ProcessAllWindowFunction.class, true, true, 
foldAccumulatorType, null, false);
+
+               return fold(initialValue, foldFunction, function, 
foldAccumulatorType, resultType);
+       }
+
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>
+        * Arriving data is incrementally aggregated using the given fold 
function.
+        *
+        * @param initialValue The initial value of the fold.
+        * @param foldFunction The fold function that is used for incremental 
aggregation.
+        * @param function The process window function.
+        * @param foldAccumulatorType Type information for the result type of 
the fold function
+        * @param resultType Type information for the result type of the window 
function
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       @PublicEvolving
+       public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue,
+                       FoldFunction<T, ACC> foldFunction,
+                       ProcessAllWindowFunction<ACC, R, W> function,
+                       TypeInformation<ACC> foldAccumulatorType,
+                       TypeInformation<R> resultType) {
+               if (foldFunction instanceof RichFunction) {
+                       throw new UnsupportedOperationException("FoldFunction 
of fold can not be a RichFunction.");
+               }
+               if (windowAssigner instanceof MergingWindowAssigner) {
+                       throw new UnsupportedOperationException("Fold cannot be 
used with a merging WindowAssigner.");
+               }
+
+               //clean the closures
+               function = input.getExecutionEnvironment().clean(function);
+               foldFunction = 
input.getExecutionEnvironment().clean(foldFunction);
+
+               String callLocation = Utils.getCallLocationName();
+               String udfName = "AllWindowedStream." + callLocation;
+
+               String opName;
+               KeySelector<T, Byte> keySel = input.getKeySelector();
+
+               OneInputStreamOperator<T, R> operator;
+
+               if (evictor != null) {
+                       @SuppressWarnings({"unchecked", "rawtypes"})
+                       TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+                               (TypeSerializer<StreamRecord<T>>) new 
StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+                       ListStateDescriptor<StreamRecord<T>> stateDesc =
+                               new ListStateDescriptor<>("window-contents", 
streamRecordSerializer);
+
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+                       operator =
+                               new EvictingWindowOperator<>(windowAssigner,
+                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                                       keySel,
+                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                       stateDesc,
+                                       new 
InternalIterableProcessAllWindowFunction<>(new 
FoldApplyProcessAllWindowFunction<>(initialValue, foldFunction, function, 
foldAccumulatorType)),
+                                       trigger,
+                                       evictor,
+                                       allowedLateness);
+
+               } else {
+                       FoldingStateDescriptor<T, ACC> stateDesc = new 
FoldingStateDescriptor<>("window-contents",
+                               initialValue, foldFunction, 
foldAccumulatorType.createSerializer(getExecutionEnvironment().getConfig()));
+
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + udfName + ")";
+
+                       operator =
+                               new WindowOperator<>(windowAssigner,
+                                       
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                                       keySel,
+                                       
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                                       stateDesc,
+                                       new 
InternalSingleValueProcessAllWindowFunction<>(function),
+                                       trigger,
+                                       allowedLateness);
+               }
+
+               return input.transform(opName, resultType, 
operator).forceNonParallel();
+       }
+
        // 
------------------------------------------------------------------------
        //  Apply (Window Function)
        // 
------------------------------------------------------------------------
 
        /**
         * Applies the given window function to each window. The window 
function is called for each
-        * evaluation of the window for each key individually. The output of 
the window function is
+        * evaluation of the window. The output of the window function is
         * interpreted as a regular non-windowed stream.
         *
         * <p>
@@ -647,15 +987,16 @@ public class AllWindowedStream<T, W extends Window> {
         * @return The data stream that is the result of applying the window 
function to the window.
         */
        public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, 
W> function) {
+               String callLocation = Utils.getCallLocationName();
+               function = input.getExecutionEnvironment().clean(function);
                TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
                                function, AllWindowFunction.class, true, true, 
getInputType(), null, false);
-
-               return apply(function, resultType);
+               return apply(new InternalIterableAllWindowFunction<>(function), 
resultType, callLocation);
        }
 
        /**
         * Applies the given window function to each window. The window 
function is called for each
-        * evaluation of the window for each key individually. The output of 
the window function is
+        * evaluation of the window. The output of the window function is
         * interpreted as a regular non-windowed stream.
         *
         * <p>
@@ -663,15 +1004,56 @@ public class AllWindowedStream<T, W extends Window> {
         * is evaluated, as the function provides no means of incremental 
aggregation.
         *
         * @param function The window function.
-        * @param resultType Type information for the result type of the window 
function
         * @return The data stream that is the result of applying the window 
function to the window.
         */
        public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, 
W> function, TypeInformation<R> resultType) {
+               String callLocation = Utils.getCallLocationName();
+               function = input.getExecutionEnvironment().clean(function);
+               return apply(new InternalIterableAllWindowFunction<>(function), 
resultType, callLocation);
+       }
 
-               //clean the closure
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window. The output of the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>
+        * Not that this function requires that all data in the windows is 
buffered until the window
+        * is evaluated, as the function provides no means of incremental 
aggregation.
+        *
+        * @param function The process window function.
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       @PublicEvolving
+       public <R> SingleOutputStreamOperator<R> 
process(ProcessAllWindowFunction<T, R, W> function) {
+               String callLocation = Utils.getCallLocationName();
                function = input.getExecutionEnvironment().clean(function);
+               TypeInformation<R> resultType = 
TypeExtractor.getUnaryOperatorReturnType(
+                               function, ProcessAllWindowFunction.class, true, 
true, getInputType(), null, false);
+               return apply(new 
InternalIterableProcessAllWindowFunction<>(function), resultType, callLocation);
+       }
 
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window. The output of the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>
+        * Not that this function requires that all data in the windows is 
buffered until the window
+        * is evaluated, as the function provides no means of incremental 
aggregation.
+        *
+        * @param function The process window function.
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       @PublicEvolving
+       public <R> SingleOutputStreamOperator<R> 
process(ProcessAllWindowFunction<T, R, W> function, TypeInformation<R> 
resultType) {
                String callLocation = Utils.getCallLocationName();
+               function = input.getExecutionEnvironment().clean(function);
+               return apply(new 
InternalIterableProcessAllWindowFunction<>(function), resultType, callLocation);
+       }
+
+       private <R> SingleOutputStreamOperator<R> 
apply(InternalWindowFunction<Iterable<T>, R, Byte, W> function, 
TypeInformation<R> resultType, String callLocation) {
+
                String udfName = "AllWindowedStream." + callLocation;
 
                String opName;
@@ -695,7 +1077,7 @@ public class AllWindowedStream<T, W extends Window> {
                                        keySel,
                                        
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                                        stateDesc,
-                                       new 
InternalIterableAllWindowFunction<>(function),
+                                       function,
                                        trigger,
                                        evictor,
                                        allowedLateness);
@@ -712,7 +1094,7 @@ public class AllWindowedStream<T, W extends Window> {
                                        keySel,
                                        
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                                        stateDesc,
-                                       new 
InternalIterableAllWindowFunction<>(function),
+                                       function,
                                        trigger,
                                        allowedLateness);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
new file mode 100644
index 0000000..5ac6766
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+
+@Internal
+public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
+       extends RichProcessAllWindowFunction<T, R, W>
+       implements OutputTypeConfigurable<R> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final FoldFunction<T, ACC> foldFunction;
+       private final ProcessAllWindowFunction<ACC, R, W> windowFunction;
+
+       private byte[] serializedInitialValue;
+       private TypeSerializer<ACC> accSerializer;
+       private final TypeInformation<ACC> accTypeInformation;
+       private transient ACC initialValue;
+
+       public FoldApplyProcessAllWindowFunction(ACC initialValue, 
FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> 
windowFunction, TypeInformation<ACC> accTypeInformation) {
+               this.windowFunction = windowFunction;
+               this.foldFunction = foldFunction;
+               this.initialValue = initialValue;
+               this.accTypeInformation = accTypeInformation;
+       }
+
+       @Override
+       public void open(Configuration configuration) throws Exception {
+               FunctionUtils.openFunction(this.windowFunction, configuration);
+
+               if (serializedInitialValue == null) {
+                       throw new RuntimeException("No initial value was 
serialized for the fold " +
+                               "window function. Probably the setOutputType 
method was not called.");
+               }
+
+               ByteArrayInputStream bais = new 
ByteArrayInputStream(serializedInitialValue);
+               DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais);
+               initialValue = accSerializer.deserialize(in);
+       }
+
+       @Override
+       public void close() throws Exception {
+               FunctionUtils.closeFunction(this.windowFunction);
+       }
+
+       @Override
+       public void setRuntimeContext(RuntimeContext t) {
+               super.setRuntimeContext(t);
+
+               FunctionUtils.setFunctionRuntimeContext(this.windowFunction, t);
+       }
+
+       @Override
+       public void process(final Context context, Iterable<T> values, 
Collector<R> out) throws Exception {
+               ACC result = accSerializer.copy(initialValue);
+
+               for (T val : values) {
+                       result = foldFunction.fold(result, val);
+               }
+
+               windowFunction.process(windowFunction.new Context() {
+                       @Override
+                       public W window() {
+                               return context.window();
+                       }
+               }, Collections.singletonList(result), out);
+       }
+
+       @Override
+       public void setOutputType(TypeInformation<R> outTypeInfo, 
ExecutionConfig executionConfig) {
+               accSerializer = 
accTypeInformation.createSerializer(executionConfig);
+
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
+
+               try {
+                       accSerializer.serialize(initialValue, out);
+               } catch (IOException ioe) {
+                       throw new RuntimeException("Unable to serialize initial 
value of type " +
+                               initialValue.getClass().getSimpleName() + " of 
fold window function.", ioe);
+               }
+
+               serializedInitialValue = baos.toByteArray();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
new file mode 100644
index 0000000..622e020
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ * Base abstract class for functions that are evaluated over non-keyed windows 
using a context
+ * for retrieving extra information.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <W> The type of {@code Window} that this window function can be 
applied on.
+ */
+@PublicEvolving
+public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> 
implements Function {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Evaluates the window and outputs none or several elements.
+        *
+        * @param context The context in which the window is being evaluated.
+        * @param elements The elements in the window being evaluated.
+        * @param out A collector for emitting elements.
+        *
+        * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery.
+        */
+       public abstract void process(Context context, Iterable<IN> elements, 
Collector<OUT> out) throws Exception;
+
+       /**
+        * The context holding window metadata
+        */
+       public abstract class Context {
+               /**
+                * @return The window that is being evaluated.
+                */
+               public abstract W window();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
new file mode 100644
index 0000000..142c71e
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+@Internal
+public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R>
+       extends RichProcessAllWindowFunction<T, R, W> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final ReduceFunction<T> reduceFunction;
+       private final ProcessAllWindowFunction<T, R, W> windowFunction;
+
+       public ReduceApplyProcessAllWindowFunction(ReduceFunction<T> 
reduceFunction, ProcessAllWindowFunction<T, R, W> windowFunction) {
+               this.windowFunction = windowFunction;
+               this.reduceFunction = reduceFunction;
+       }
+
+       @Override
+       public void process(final Context context, Iterable<T> input, 
Collector<R> out) throws Exception {
+
+               T curr = null;
+               for (T val: input) {
+                       if (curr == null) {
+                               curr = val;
+                       } else {
+                               curr = reduceFunction.reduce(curr, val);
+                       }
+               }
+               windowFunction.process(windowFunction.new Context() {
+                       @Override
+                       public W window() {
+                               return context.window();
+                       }
+               }, Collections.singletonList(curr), out);
+       }
+
+       @Override
+       public void open(Configuration configuration) throws Exception {
+               FunctionUtils.openFunction(this.windowFunction, configuration);
+       }
+
+       @Override
+       public void close() throws Exception {
+               FunctionUtils.closeFunction(this.windowFunction);
+       }
+
+       @Override
+       public void setRuntimeContext(RuntimeContext t) {
+               super.setRuntimeContext(t);
+
+               FunctionUtils.setFunctionRuntimeContext(this.windowFunction, t);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
new file mode 100644
index 0000000..1130fa5
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Base rich abstract class for functions that are evaluated over keyed 
(grouped) windows using a context
+ * for passing extra information.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <W> The type of {@code Window} that this window function can be 
applied on.
+ */
+@PublicEvolving
+public abstract class RichProcessAllWindowFunction<IN, OUT, W extends Window>
+               extends ProcessAllWindowFunction<IN, OUT, W>
+               implements RichFunction {
+
+       private static final long serialVersionUID = 1L;
+
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Runtime context access
+       // 
--------------------------------------------------------------------------------------------
+
+       private transient RuntimeContext runtimeContext;
+
+       @Override
+       public void setRuntimeContext(RuntimeContext t) {
+               this.runtimeContext = t;
+       }
+
+       @Override
+       public RuntimeContext getRuntimeContext() {
+               if (this.runtimeContext != null) {
+                       return this.runtimeContext;
+               } else {
+                       throw new IllegalStateException("The runtime context 
has not been initialized.");
+               }
+       }
+
+       @Override
+       public IterationRuntimeContext getIterationRuntimeContext() {
+               if (this.runtimeContext == null) {
+                       throw new IllegalStateException("The runtime context 
has not been initialized.");
+               } else if (this.runtimeContext instanceof 
IterationRuntimeContext) {
+                       return (IterationRuntimeContext) this.runtimeContext;
+               } else {
+                       throw new IllegalStateException("This stub is not part 
of an iteration step function.");
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Default life cycle methods
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void open(Configuration parameters) throws Exception {}
+
+       @Override
+       public void close() throws Exception {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
new file mode 100644
index 0000000..9533c95
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+/**
+ * Internal window function for wrapping a {@link ProcessAllWindowFunction} 
that takes an
+ * {@code Iterable} and an {@link AggregateFunction}.
+ *
+ * @param <W> The window type
+ * @param <T> The type of the input to the AggregateFunction
+ * @param <ACC> The type of the AggregateFunction's accumulator
+ * @param <V> The type of the AggregateFunction's result, and the input to the 
WindowFunction
+ * @param <R> The result type of the WindowFunction
+ */
+public final class InternalAggregateProcessAllWindowFunction<T, ACC, V, R, W 
extends Window>
+               extends WrappingFunction<ProcessAllWindowFunction<V, R, W>>
+               implements InternalWindowFunction<Iterable<T>, R, Byte, W> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final AggregateFunction<T, ACC, V> aggFunction;
+
+       public InternalAggregateProcessAllWindowFunction(
+                       AggregateFunction<T, ACC, V> aggFunction,
+                       ProcessAllWindowFunction<V, R, W> windowFunction) {
+               super(windowFunction);
+               this.aggFunction = aggFunction;
+       }
+
+       @Override
+       public void apply(Byte key, final W window, Iterable<T> input, 
Collector<R> out) throws Exception {
+               ProcessAllWindowFunction<V, R, W> wrappedFunction = 
this.wrappedFunction;
+               ProcessAllWindowFunction<V, R, W>.Context context = 
wrappedFunction.new Context() {
+                       @Override
+                       public W window() {
+                               return window;
+                       }
+               };
+
+               final ACC acc = aggFunction.createAccumulator();
+
+               for (T val : input) {
+                       aggFunction.add(val, acc);
+               }
+
+               wrappedFunction.process(context, 
Collections.singletonList(aggFunction.getResult(acc)), out);
+       }
+
+       @Override
+       public RuntimeContext getRuntimeContext() {
+               throw new RuntimeException("This should never be called.");
+       }
+
+       @Override
+       public IterationRuntimeContext getIterationRuntimeContext() {
+               throw new RuntimeException("This should never be called.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
new file mode 100644
index 0000000..e33cc2a
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ * Internal window function for wrapping a {@link ProcessAllWindowFunction} 
that takes an {@code Iterable}
+ * when the window state also is an {@code Iterable}.
+ */
+public final class InternalIterableProcessAllWindowFunction<IN, OUT, W extends 
Window>
+               extends WrappingFunction<ProcessAllWindowFunction<IN, OUT, W>>
+               implements InternalWindowFunction<Iterable<IN>, OUT, Byte, W> {
+
+       private static final long serialVersionUID = 1L;
+
+       public 
InternalIterableProcessAllWindowFunction(ProcessAllWindowFunction<IN, OUT, W> 
wrappedFunction) {
+               super(wrappedFunction);
+       }
+
+       @Override
+       public void apply(Byte key, final W window, Iterable<IN> input, 
Collector<OUT> out) throws Exception {
+               ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = 
this.wrappedFunction;
+               ProcessAllWindowFunction<IN, OUT, W>.Context context = 
wrappedFunction.new Context() {
+                       @Override
+                       public W window() {
+                               return window;
+                       }
+               };
+
+               wrappedFunction.process(context, input, out);
+       }
+
+       @Override
+       public RuntimeContext getRuntimeContext() {
+               throw new RuntimeException("This should never be called.");
+       }
+
+       @Override
+       public IterationRuntimeContext getIterationRuntimeContext() {
+               throw new RuntimeException("This should never be called.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
new file mode 100644
index 0000000..0284ef7
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+/**
+ * Internal window function for wrapping a {@link ProcessAllWindowFunction} 
that takes an {@code Iterable}
+ * when the window state is a single value.
+ */
+public final class InternalSingleValueProcessAllWindowFunction<IN, OUT, W 
extends Window>
+               extends WrappingFunction<ProcessAllWindowFunction<IN, OUT, W>>
+               implements InternalWindowFunction<IN, OUT, Byte, W> {
+
+       private static final long serialVersionUID = 1L;
+
+       public 
InternalSingleValueProcessAllWindowFunction(ProcessAllWindowFunction<IN, OUT, 
W> wrappedFunction) {
+               super(wrappedFunction);
+       }
+
+       @Override
+       public void apply(Byte key, final W window, IN input, Collector<OUT> 
out) throws Exception {
+               ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = 
this.wrappedFunction;
+               ProcessAllWindowFunction<IN, OUT, W>.Context context = 
wrappedFunction.new Context() {
+                       @Override
+                       public W window() {
+                               return window;
+                       }
+               };
+
+               wrappedFunction.process(context, 
Collections.singletonList(input), out);
+       }
+
+       @Override
+       public RuntimeContext getRuntimeContext() {
+               throw new RuntimeException("This should never be called.");
+       }
+
+       @Override
+       public IterationRuntimeContext getIterationRuntimeContext() {
+               throw new RuntimeException("This should never be called.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
index b28c208..7a4e8c6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
@@ -21,14 +21,13 @@ import 
org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
 import java.util.Collections;
 
 /**
- * Internal window function for wrapping a {@link WindowFunction} that takes 
an {@code Iterable}
+ * Internal window function for wrapping a {@link ProcessWindowFunction} that 
takes an {@code Iterable}
  * when the window state is a single value.
  */
 public final class InternalSingleValueProcessWindowFunction<IN, OUT, KEY, W 
extends Window>

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
index af5c77a..734879d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
@@ -22,14 +22,17 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessAllWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -39,6 +42,7 @@ import 
org.apache.flink.streaming.api.transformations.SourceTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import 
org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
@@ -145,6 +149,101 @@ public class FoldApplyProcessWindowFunctionTest {
                Assert.assertEquals(expected, result);
        }
 
+               /**
+        * Tests that the FoldWindowFunction gets the output type serializer 
set by the
+        * StreamGraphGenerator and checks that the FoldWindowFunction computes 
the correct result.
+        */
+       @Test
+       public void testFoldAllWindowFunctionOutputTypeConfigurable() throws 
Exception{
+               StreamExecutionEnvironment env = new 
DummyStreamExecutionEnvironment();
+
+               List<StreamTransformation<?>> transformations = new 
ArrayList<>();
+
+               int initValue = 1;
+
+               FoldApplyProcessAllWindowFunction<TimeWindow, Integer, Integer, 
Integer> foldWindowFunction = new FoldApplyProcessAllWindowFunction<>(
+                       initValue,
+                       new FoldFunction<Integer, Integer>() {
+                               @Override
+                               public Integer fold(Integer accumulator, 
Integer value) throws Exception {
+                                       return accumulator + value;
+                               }
+
+                       },
+                       new ProcessAllWindowFunction<Integer, Integer, 
TimeWindow>() {
+                               @Override
+                               public void process(Context context,
+                                                                       
Iterable<Integer> input,
+                                                                       
Collector<Integer> out) throws Exception {
+                                       for (Integer in: input) {
+                                               out.collect(in);
+                                       }
+                               }
+                       },
+                       BasicTypeInfo.INT_TYPE_INFO
+               );
+
+               AccumulatingProcessingTimeWindowOperator<Byte, Integer, 
Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>(
+                       new 
InternalIterableProcessAllWindowFunction<>(foldWindowFunction),
+                       new KeySelector<Integer, Byte>() {
+                               private static final long serialVersionUID = 
-7951310554369722809L;
+
+                               @Override
+                               public Byte getKey(Integer value) throws 
Exception {
+                                       return 0;
+                               }
+                       },
+                       ByteSerializer.INSTANCE,
+                       IntSerializer.INSTANCE,
+                       3000,
+                       3000
+               );
+
+               SourceFunction<Integer> sourceFunction = new 
SourceFunction<Integer>(){
+
+                       private static final long serialVersionUID = 
8297735565464653028L;
+
+                       @Override
+                       public void run(SourceContext<Integer> ctx) throws 
Exception {
+
+                       }
+
+                       @Override
+                       public void cancel() {
+
+                       }
+               };
+
+               SourceTransformation<Integer> source = new 
SourceTransformation<>("", new StreamSource<>(sourceFunction), 
BasicTypeInfo.INT_TYPE_INFO, 1);
+
+               transformations.add(new OneInputTransformation<>(source, 
"test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
+
+               StreamGraph streamGraph = StreamGraphGenerator.generate(env, 
transformations);
+
+               List<Integer> result = new ArrayList<>();
+               List<Integer> input = new ArrayList<>();
+               List<Integer> expected = new ArrayList<>();
+
+               input.add(1);
+               input.add(2);
+               input.add(3);
+
+               for (int value : input) {
+                       initValue += value;
+               }
+
+               expected.add(initValue);
+
+               foldWindowFunction.process(foldWindowFunction.new Context() {
+                       @Override
+                       public TimeWindow window() {
+                               return new TimeWindow(0, 1);
+                       }
+               }, input, new ListCollector<>(result));
+
+               Assert.assertEquals(expected, result);
+       }
+
        public static class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment {
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
index 813ca96..f306231 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -162,6 +163,24 @@ public class StateDescriptorPassingTest {
        }
 
        @Test
+       public void testProcessAllWindowState() throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               env.registerTypeWithKryoSerializer(File.class, 
JavaSerializer.class);
+
+               DataStream<File> src = env.fromElements(new File("/"));
+
+               SingleOutputStreamOperator<?> result = src
+                               .timeWindowAll(Time.milliseconds(1000))
+                               .process(new ProcessAllWindowFunction<File, 
String, TimeWindow>() {
+                                       @Override
+                                       public void process(Context ctx, 
Iterable<File> input, Collector<String> out) {}
+                               });
+
+               validateListStateDescriptorConfigured(result);
+       }
+
+       @Test
        public void testFoldWindowAllState() throws Exception {
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index e49a496..8f795e9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -26,15 +26,19 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils;
 import 
org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.RichProcessAllWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessAllWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessAllWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.util.Collector;
@@ -99,6 +103,47 @@ public class InternalWindowFunctionTest {
 
        @SuppressWarnings("unchecked")
        @Test
+       public void testInternalIterableProcessAllWindowFunction() throws 
Exception {
+
+               ProcessAllWindowFunctionMock mock = 
mock(ProcessAllWindowFunctionMock.class);
+               InternalIterableProcessAllWindowFunction<Long, String, 
TimeWindow> windowFunction =
+                       new InternalIterableProcessAllWindowFunction<>(mock);
+
+               // check setOutputType
+               TypeInformation<String> stringType = 
BasicTypeInfo.STRING_TYPE_INFO;
+               ExecutionConfig execConf = new ExecutionConfig();
+               execConf.setParallelism(42);
+
+               StreamingFunctionUtils.setOutputType(windowFunction, 
stringType, execConf);
+               verify(mock).setOutputType(stringType, execConf);
+
+               // check open
+               Configuration config = new Configuration();
+
+               windowFunction.open(config);
+               verify(mock).open(config);
+
+               // check setRuntimeContext
+               RuntimeContext rCtx = mock(RuntimeContext.class);
+
+               windowFunction.setRuntimeContext(rCtx);
+               verify(mock).setRuntimeContext(rCtx);
+
+               // check apply
+               TimeWindow w = mock(TimeWindow.class);
+               Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
+               Collector<String> c = (Collector<String>) mock(Collector.class);
+
+               windowFunction.apply(((byte)0), w, i, c);
+               verify(mock).process((ProcessAllWindowFunctionMock.Context) 
anyObject(), eq(i), eq(c));
+
+               // check close
+               windowFunction.close();
+               verify(mock).close();
+       }
+
+       @SuppressWarnings("unchecked")
+       @Test
        public void testInternalIterableWindowFunction() throws Exception {
 
                WindowFunctionMock mock = mock(WindowFunctionMock.class);
@@ -263,6 +308,47 @@ public class InternalWindowFunctionTest {
 
        @SuppressWarnings("unchecked")
        @Test
+       public void testInternalSingleValueProcessAllWindowFunction() throws 
Exception {
+
+               ProcessAllWindowFunctionMock mock = 
mock(ProcessAllWindowFunctionMock.class);
+               InternalSingleValueProcessAllWindowFunction<Long, String, 
TimeWindow> windowFunction =
+                       new InternalSingleValueProcessAllWindowFunction<>(mock);
+
+               // check setOutputType
+               TypeInformation<String> stringType = 
BasicTypeInfo.STRING_TYPE_INFO;
+               ExecutionConfig execConf = new ExecutionConfig();
+               execConf.setParallelism(42);
+
+               StreamingFunctionUtils.setOutputType(windowFunction, 
stringType, execConf);
+
+               verify(mock).setOutputType(stringType, execConf);
+
+               // check open
+               Configuration config = new Configuration();
+
+               windowFunction.open(config);
+               verify(mock).open(config);
+
+               // check setRuntimeContext
+               RuntimeContext rCtx = mock(RuntimeContext.class);
+
+               windowFunction.setRuntimeContext(rCtx);
+               verify(mock).setRuntimeContext(rCtx);
+
+               // check apply
+               TimeWindow w = mock(TimeWindow.class);
+               Collector<String> c = (Collector<String>) mock(Collector.class);
+
+               windowFunction.apply(((byte)0), w, 23L, c);
+               verify(mock).process((ProcessAllWindowFunctionMock.Context) 
anyObject(), 
(Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
+
+               // check close
+               windowFunction.close();
+               verify(mock).close();
+       }
+
+       @SuppressWarnings("unchecked")
+       @Test
        public void testInternalSingleValueProcessWindowFunction() throws 
Exception {
 
                ProcessWindowFunctionMock mock = 
mock(ProcessWindowFunctionMock.class);
@@ -310,7 +396,7 @@ public class InternalWindowFunctionTest {
                InternalAggregateProcessWindowFunction<Long, Set<Long>, 
Map<Long, Long>, String, Long, TimeWindow> windowFunction =
                                new 
InternalAggregateProcessWindowFunction<>(new AggregateFunction<Long, Set<Long>, 
Map<Long, Long>>() {
                                        private static final long 
serialVersionUID = 1L;
-                                       
+
                                        @Override
                                        public Set<Long> createAccumulator() {
                                                return new HashSet<>();
@@ -364,7 +450,7 @@ public class InternalWindowFunctionTest {
                List<Long> args = new LinkedList<>();
                args.add(23L);
                args.add(24L);
-               
+
                windowFunction.apply(42L, w, args, c);
                verify(mock).process(
                                eq(42L),
@@ -379,6 +465,83 @@ public class InternalWindowFunctionTest {
                verify(mock).close();
        }
 
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testInternalAggregateProcessAllWindowFunction() throws 
Exception {
+
+               AggregateProcessAllWindowFunctionMock mock = 
mock(AggregateProcessAllWindowFunctionMock.class);
+
+               InternalAggregateProcessAllWindowFunction<Long, Set<Long>, 
Map<Long, Long>, String, TimeWindow> windowFunction =
+                               new 
InternalAggregateProcessAllWindowFunction<>(new AggregateFunction<Long, 
Set<Long>, Map<Long, Long>>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public Set<Long> createAccumulator() {
+                                               return new HashSet<>();
+                                       }
+
+                                       @Override
+                                       public void add(Long value, Set<Long> 
accumulator) {
+                                               accumulator.add(value);
+                                       }
+
+                                       @Override
+                                       public Map<Long, Long> 
getResult(Set<Long> accumulator) {
+                                               Map<Long, Long> result = new 
HashMap<>();
+                                               for (Long in : accumulator) {
+                                                       result.put(in, in);
+                                               }
+                                               return result;
+                                       }
+
+                                       @Override
+                                       public Set<Long> merge(Set<Long> a, 
Set<Long> b) {
+                                               a.addAll(b);
+                                               return a;
+                                       }
+                               }, mock);
+
+               // check setOutputType
+               TypeInformation<String> stringType = 
BasicTypeInfo.STRING_TYPE_INFO;
+               ExecutionConfig execConf = new ExecutionConfig();
+               execConf.setParallelism(42);
+
+               StreamingFunctionUtils.setOutputType(windowFunction, 
stringType, execConf);
+               verify(mock).setOutputType(stringType, execConf);
+
+               // check open
+               Configuration config = new Configuration();
+
+               windowFunction.open(config);
+               verify(mock).open(config);
+
+               // check setRuntimeContext
+               RuntimeContext rCtx = mock(RuntimeContext.class);
+
+               windowFunction.setRuntimeContext(rCtx);
+               verify(mock).setRuntimeContext(rCtx);
+
+               // check apply
+               TimeWindow w = mock(TimeWindow.class);
+               Collector<String> c = (Collector<String>) mock(Collector.class);
+
+               List<Long> args = new LinkedList<>();
+               args.add(23L);
+               args.add(24L);
+
+               windowFunction.apply(((byte)0), w, args, c);
+               verify(mock).process(
+                               (AggregateProcessAllWindowFunctionMock.Context) 
anyObject(),
+                               (Iterable) argThat(containsInAnyOrder(allOf(
+                                               hasEntry(is(23L), is(23L)),
+                                               hasEntry(is(24L), is(24L))))),
+                               eq(c));
+
+               // check close
+               windowFunction.close();
+               verify(mock).close();
+       }
+
        public static class ProcessWindowFunctionMock
                extends RichProcessWindowFunction<Long, String, Long, 
TimeWindow>
                implements OutputTypeConfigurable<String> {
@@ -405,6 +568,19 @@ public class InternalWindowFunctionTest {
                public void process(Long aLong, Context context, 
Iterable<Map<Long, Long>> input, Collector<String> out) throws Exception { }
        }
 
+       public static class AggregateProcessAllWindowFunctionMock
+                       extends RichProcessAllWindowFunction<Map<Long, Long>, 
String, TimeWindow>
+                       implements OutputTypeConfigurable<String> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void setOutputType(TypeInformation<String> outTypeInfo, 
ExecutionConfig executionConfig) { }
+
+               @Override
+               public void process(Context context, Iterable<Map<Long, Long>> 
input, Collector<String> out) throws Exception { }
+       }
+
        public static class WindowFunctionMock
                extends RichWindowFunction<Long, String, Long, TimeWindow>
                implements OutputTypeConfigurable<String> {
@@ -430,4 +606,17 @@ public class InternalWindowFunctionTest {
                @Override
                public void apply(TimeWindow window, Iterable<Long> values, 
Collector<String> out) throws Exception { }
        }
+
+       public static class ProcessAllWindowFunctionMock
+               extends RichProcessAllWindowFunction<Long, String, TimeWindow>
+               implements OutputTypeConfigurable<String> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void setOutputType(TypeInformation<String> outTypeInfo, 
ExecutionConfig executionConfig) { }
+
+               @Override
+               public void process(Context context, Iterable<Long> input, 
Collector<String> out) throws Exception { }
+       }
 }

Reply via email to