http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 88e619a..0d01733 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -36,10 +37,9 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.functions.windowing.FoldWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
-import 
org.apache.flink.streaming.api.functions.windowing.ReduceIterableWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
@@ -71,7 +71,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
  * If an {@link Evictor} is specified it will be used to evict elements from 
the window after
  * evaluation was triggered by the {@code Trigger} but before the actual 
evaluation of the window.
  * When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
+ * incremental aggregation of window results cannot be used.
  *
  * <p>
  * Note that the {@code WindowedStream} is purely and API construct, during 
runtime
@@ -120,7 +120,7 @@ public class WindowedStream<T, K, W extends Window> {
         *
         * <p>
         * Note: When using an evictor window performance will degrade 
significantly, since
-        * pre-aggregation of window results cannot be used.
+        * incremental aggregation of window results cannot be used.
         */
        @PublicEvolving
        public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> 
evictor) {
@@ -137,13 +137,14 @@ public class WindowedStream<T, K, W extends Window> {
         * Applies a reduce function to the window. The window function is 
called for each evaluation
         * of the window for each key individually. The output of the reduce 
function is interpreted
         * as a regular non-windowed stream.
+        *
         * <p>
-        * This window will try and pre-aggregate data as much as the window 
policies permit. For example,
-        * tumbling time windows can perfectly pre-aggregate the data, meaning 
that only one element per
-        * key is stored. Sliding time windows will pre-aggregate on the 
granularity of the slide interval,
+        * This window will try and incrementally aggregate data as much as the 
window policies permit.
+        * For example, tumbling time windows can aggregate the data, meaning 
that only one element per
+        * key is stored. Sliding time windows will aggregate on the 
granularity of the slide interval,
         * so a few elements are stored per key (one per slide interval).
-        * Custom windows may not be able to pre-aggregate, or may need to 
store extra values in an
-        * aggregation tree.
+        * Custom windows may not be able to incrementally aggregate, or may 
need to store extra values
+        * in an aggregation tree.
         * 
         * @param function The reduce function.
         * @return The data stream that is the result of applying the reduce 
function to the window. 
@@ -159,48 +160,14 @@ public class WindowedStream<T, K, W extends Window> {
                function = input.getExecutionEnvironment().clean(function);
 
                String callLocation = Utils.getCallLocationName();
-               String udfName = "Reduce at " + callLocation;
+               String udfName = "WindowedStream." + callLocation;
 
                SingleOutputStreamOperator<T, ?> result = 
createFastTimeOperatorIfValid(function, input.getType(), udfName);
                if (result != null) {
                        return result;
                }
 
-               String opName = "TriggerWindow(" + windowAssigner + ", " + 
trigger + ", " + udfName + ")";
-               KeySelector<T, K> keySel = input.getKeySelector();
-
-               OneInputStreamOperator<T, T> operator;
-
-               boolean setProcessingTime = 
input.getExecutionEnvironment().getStreamTimeCharacteristic() == 
TimeCharacteristic.ProcessingTime;
-
-               if (evictor != null) {
-                       ListStateDescriptor<StreamRecord<T>> stateDesc = new 
ListStateDescriptor<>("window-contents",
-                               new 
StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
-
-                       operator = new EvictingWindowOperator<>(windowAssigner,
-                               
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-                               keySel,
-                               
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-                               stateDesc,
-                               new ReduceIterableWindowFunction<K, W, 
T>(function),
-                               trigger,
-                               
evictor).enableSetProcessingTime(setProcessingTime);
-
-               } else {
-                       ReducingStateDescriptor<T> stateDesc = new 
ReducingStateDescriptor<>("window-contents",
-                               function,
-                               
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
-
-                       operator = new WindowOperator<>(windowAssigner,
-                               
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-                               keySel,
-                               
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-                               stateDesc,
-                               new ReduceWindowFunction<K, W, T>(),
-                               
trigger).enableSetProcessingTime(setProcessingTime);
-               }
-
-               return input.transform(opName, input.getType(), operator);
+               return apply(function, new PassThroughWindowFunction<K, W, 
T>());
        }
 
        /**
@@ -212,13 +179,15 @@ public class WindowedStream<T, K, W extends Window> {
         * @return The data stream that is the result of applying the fold 
function to the window.
         */
        public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, 
FoldFunction<T, R> function) {
-               //clean the closure
-               function = input.getExecutionEnvironment().clean(function);
+               if (function instanceof RichFunction) {
+                       throw new UnsupportedOperationException("FoldFunction 
can not be a RichFunction. " +
+                               "Please use apply(FoldFunction, WindowFunction) 
instead.");
+               }
 
                TypeInformation<R> resultType = 
TypeExtractor.getFoldReturnTypes(function, input.getType(),
                                Utils.getCallLocationName(), true);
 
-               return apply(new FoldWindowFunction<K, W, T, R>(initialValue, 
function), resultType);
+               return fold(initialValue, function, resultType);
        }
 
        /**
@@ -230,9 +199,12 @@ public class WindowedStream<T, K, W extends Window> {
         * @return The data stream that is the result of applying the fold 
function to the window.
         */
        public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, 
FoldFunction<T, R> function, TypeInformation<R> resultType) {
-               //clean the closure
-               function = input.getExecutionEnvironment().clean(function);
-               return apply(new FoldWindowFunction<K, W, T, R>(initialValue, 
function), resultType);
+               if (function instanceof RichFunction) {
+                       throw new UnsupportedOperationException("FoldFunction 
can not be a RichFunction. " +
+                               "Please use apply(FoldFunction, WindowFunction) 
instead.");
+               }
+
+               return apply(initialValue, function, new 
PassThroughWindowFunction<K, W, R>(), resultType);
        }
 
        /**
@@ -242,7 +214,7 @@ public class WindowedStream<T, K, W extends Window> {
         *
         * <p>
         * Not that this function requires that all data in the windows is 
buffered until the window
-        * is evaluated, as the function provides no means of pre-aggregation.
+        * is evaluated, as the function provides no means of incremental 
aggregation.
         * 
         * @param function The window function.
         * @return The data stream that is the result of applying the window 
function to the window.
@@ -263,7 +235,7 @@ public class WindowedStream<T, K, W extends Window> {
         *
         * <p>
         * Not that this function requires that all data in the windows is 
buffered until the window
-        * is evaluated, as the function provides no means of pre-aggregation.
+        * is evaluated, as the function provides no means of incremental 
aggregation.
         *
         * @param function The window function.
         * @param resultType Type information for the result type of the window 
function
@@ -275,7 +247,7 @@ public class WindowedStream<T, K, W extends Window> {
                function = input.getExecutionEnvironment().clean(function);
 
                String callLocation = Utils.getCallLocationName();
-               String udfName = "WindowApply at " + callLocation;
+               String udfName = "WindowedStream." + callLocation;
 
                SingleOutputStreamOperator<R, ?> result = 
createFastTimeOperatorIfValid(function, resultType, udfName);
                if (result != null) {
@@ -283,18 +255,19 @@ public class WindowedStream<T, K, W extends Window> {
                }
 
 
-               String opName = "TriggerWindow(" + windowAssigner + ", " + 
trigger + ", " + udfName + ")";
+               String opName;
                KeySelector<T, K> keySel = input.getKeySelector();
 
                WindowOperator<K, T, Iterable<T>, R, W> operator;
 
                boolean setProcessingTime = 
input.getExecutionEnvironment().getStreamTimeCharacteristic() == 
TimeCharacteristic.ProcessingTime;
 
-
                if (evictor != null) {
                        ListStateDescriptor<StreamRecord<T>> stateDesc = new 
ListStateDescriptor<>("window-contents",
                                new 
StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
 
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
                        operator = new EvictingWindowOperator<>(windowAssigner,
                                        
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                                        keySel,
@@ -308,6 +281,8 @@ public class WindowedStream<T, K, W extends Window> {
                        ListStateDescriptor<T> stateDesc = new 
ListStateDescriptor<>("window-contents",
                                
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
 
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + udfName + ")";
+
                        operator = new WindowOperator<>(windowAssigner,
                                        
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                                        keySel,
@@ -326,9 +301,9 @@ public class WindowedStream<T, K, W extends Window> {
         * interpreted as a regular non-windowed stream.
         *
         * <p>
-        * Arriving data is pre-aggregated using the given pre-aggregation 
reducer.
+        * Arriving data is incrementally aggregated using the given reducer.
         *
-        * @param reduceFunction The reduce function that is used for 
pre-aggregation
+        * @param reduceFunction The reduce function that is used for 
incremental aggregation.
         * @param function The window function.
         * @return The data stream that is the result of applying the window 
function to the window.
         */
@@ -347,16 +322,16 @@ public class WindowedStream<T, K, W extends Window> {
         * interpreted as a regular non-windowed stream.
         *
         * <p>
-        * Arriving data is pre-aggregated using the given pre-aggregation 
reducer.
+        * Arriving data is incrementally aggregated using the given reducer.
         *
-        * @param reduceFunction The reduce function that is used for 
pre-aggregation
+        * @param reduceFunction The reduce function that is used for 
incremental aggregation.
         * @param function The window function.
         * @param resultType Type information for the result type of the window 
function
         * @return The data stream that is the result of applying the window 
function to the window.
         */
        public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> 
reduceFunction, WindowFunction<T, R, K, W> function, TypeInformation<R> 
resultType) {
                if (reduceFunction instanceof RichFunction) {
-                       throw new UnsupportedOperationException("Pre-aggregator 
of apply can not be a RichFunction.");
+                       throw new UnsupportedOperationException("ReduceFunction 
of apply can not be a RichFunction.");
                }
 
                //clean the closures
@@ -364,9 +339,9 @@ public class WindowedStream<T, K, W extends Window> {
                reduceFunction = 
input.getExecutionEnvironment().clean(reduceFunction);
 
                String callLocation = Utils.getCallLocationName();
-               String udfName = "WindowApply at " + callLocation;
+               String udfName = "WindowedStream." + callLocation;
 
-               String opName = "TriggerWindow(" + windowAssigner + ", " + 
trigger + ", " + udfName + ")";
+               String opName;
                KeySelector<T, K> keySel = input.getKeySelector();
 
                OneInputStreamOperator<T, R> operator;
@@ -374,10 +349,11 @@ public class WindowedStream<T, K, W extends Window> {
                boolean setProcessingTime = 
input.getExecutionEnvironment().getStreamTimeCharacteristic() == 
TimeCharacteristic.ProcessingTime;
 
                if (evictor != null) {
-
                        ListStateDescriptor<StreamRecord<T>> stateDesc = new 
ListStateDescriptor<>("window-contents",
                                new 
StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
 
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
                        operator = new EvictingWindowOperator<>(windowAssigner,
                                
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                                keySel,
@@ -392,6 +368,8 @@ public class WindowedStream<T, K, W extends Window> {
                                reduceFunction,
                                
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
 
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + udfName + ")";
+
                        operator = new WindowOperator<>(windowAssigner,
                                        
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                                        keySel,
@@ -404,6 +382,96 @@ public class WindowedStream<T, K, W extends Window> {
                return input.transform(opName, resultType, operator);
        }
 
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>
+        * Arriving data is incrementally aggregated using the given fold 
function.
+        *
+        * @param initialValue The initial value of the fold.
+        * @param foldFunction The fold function that is used for incremental 
aggregation.
+        * @param function The window function.
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       public <R> SingleOutputStreamOperator<R, ?> apply(R initialValue, 
FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function) {
+
+               TypeInformation<R> resultType = 
TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
+                       Utils.getCallLocationName(), true);
+
+               return apply(initialValue, foldFunction, function, resultType);
+       }
+
+       /**
+        * Applies the given window function to each window. The window 
function is called for each
+        * evaluation of the window for each key individually. The output of 
the window function is
+        * interpreted as a regular non-windowed stream.
+        *
+        * <p>
+        * Arriving data is incrementally aggregated using the given fold 
function.
+        *
+        * @param initialValue The initial value of the fold.
+        * @param foldFunction The fold function that is used for incremental 
aggregation.
+        * @param function The window function.
+        * @param resultType Type information for the result type of the window 
function
+        * @return The data stream that is the result of applying the window 
function to the window.
+        */
+       public <R> SingleOutputStreamOperator<R, ?> apply(R initialValue, 
FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function, 
TypeInformation<R> resultType) {
+               if (foldFunction instanceof RichFunction) {
+                       throw new UnsupportedOperationException("ReduceFunction 
of apply can not be a RichFunction.");
+               }
+
+               //clean the closures
+               function = input.getExecutionEnvironment().clean(function);
+               foldFunction = 
input.getExecutionEnvironment().clean(foldFunction);
+
+               String callLocation = Utils.getCallLocationName();
+               String udfName = "WindowedStream." + callLocation;
+
+               String opName;
+               KeySelector<T, K> keySel = input.getKeySelector();
+
+               OneInputStreamOperator<T, R> operator;
+
+               boolean setProcessingTime = 
input.getExecutionEnvironment().getStreamTimeCharacteristic() == 
TimeCharacteristic.ProcessingTime;
+
+               if (evictor != null) {
+
+                       ListStateDescriptor<StreamRecord<T>> stateDesc = new 
ListStateDescriptor<>("window-contents",
+                               new 
StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig())));
+
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+                       operator = new EvictingWindowOperator<>(windowAssigner,
+                               
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                               keySel,
+                               
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                               stateDesc,
+                               new FoldApplyWindowFunction<>(initialValue, 
foldFunction, function),
+                               trigger,
+                               
evictor).enableSetProcessingTime(setProcessingTime);
+
+               } else {
+                       FoldingStateDescriptor<T, R> stateDesc = new 
FoldingStateDescriptor<>("window-contents",
+                               initialValue,
+                               foldFunction,
+                               resultType);
+
+                       opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + udfName + ")";
+
+                       operator = new WindowOperator<>(windowAssigner,
+                               
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+                               keySel,
+                               
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+                               stateDesc,
+                               function,
+                               
trigger).enableSetProcessingTime(setProcessingTime);
+               }
+
+               return input.transform(opName, resultType, operator);
+       }
+
        // 
------------------------------------------------------------------------
        //  Aggregations on the keyed windows
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java
deleted file mode 100644
index 46f9b3c..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-public class FoldAllWindowFunction<W extends Window, T, R>
-               extends WrappingFunction<FoldFunction<T, R>>
-               implements AllWindowFunction<Iterable<T>, R, W>, 
OutputTypeConfigurable<R> {
-       private static final long serialVersionUID = 1L;
-
-       private byte[] serializedInitialValue;
-       private TypeSerializer<R> outSerializer;
-       private transient R initialValue;
-
-       public FoldAllWindowFunction(R initialValue, FoldFunction<T, R> 
reduceFunction) {
-               super(reduceFunction);
-               this.initialValue = initialValue;
-       }
-
-       @Override
-       public void open(Configuration configuration) throws Exception {
-               super.open(configuration);
-
-               if (serializedInitialValue == null) {
-                       throw new RuntimeException("No initial value was 
serialized for the fold " +
-                                       "window function. Probably the 
setOutputType method was not called.");
-               }
-
-               ByteArrayInputStream bais = new 
ByteArrayInputStream(serializedInitialValue);
-               DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais);
-               initialValue = outSerializer.deserialize(in);
-       }
-
-       @Override
-       public void apply(W window, Iterable<T> values, Collector<R> out) 
throws Exception {
-               R result = outSerializer.copy(initialValue);
-
-               for (T val: values) {
-                       result = wrappedFunction.fold(result, val);
-               }
-
-               out.collect(result);
-       }
-
-       @Override
-       public void setOutputType(TypeInformation<R> outTypeInfo, 
ExecutionConfig executionConfig) {
-               outSerializer = outTypeInfo.createSerializer(executionConfig);
-
-               ByteArrayOutputStream baos = new ByteArrayOutputStream();
-               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
-
-               try {
-                       outSerializer.serialize(initialValue, out);
-               } catch (IOException ioe) {
-                       throw new RuntimeException("Unable to serialize initial 
value of type " +
-                                       initialValue.getClass().getSimpleName() 
+ " of fold window function.", ioe);
-               }
-
-               serializedInitialValue = baos.toByteArray();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
new file mode 100644
index 0000000..7828a23
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class FoldApplyAllWindowFunction<W extends Window, T, ACC>
+       extends WrappingFunction<AllWindowFunction<ACC, ACC, W>>
+       implements AllWindowFunction<Iterable<T>, ACC, W>, 
OutputTypeConfigurable<ACC> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final FoldFunction<T, ACC> foldFunction;
+
+       private byte[] serializedInitialValue;
+       private TypeSerializer<ACC> accSerializer;
+       private transient ACC initialValue;
+
+       public FoldApplyAllWindowFunction(ACC initialValue, FoldFunction<T, 
ACC> foldFunction, AllWindowFunction<ACC, ACC, W> windowFunction) {
+               super(windowFunction);
+               this.foldFunction = foldFunction;
+               this.initialValue = initialValue;
+       }
+
+       @Override
+       public void open(Configuration configuration) throws Exception {
+               super.open(configuration);
+
+               if (serializedInitialValue == null) {
+                       throw new RuntimeException("No initial value was 
serialized for the fold " +
+                               "window function. Probably the setOutputType 
method was not called.");
+               }
+
+               ByteArrayInputStream bais = new 
ByteArrayInputStream(serializedInitialValue);
+               DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais);
+               initialValue = accSerializer.deserialize(in);
+       }
+
+       @Override
+       public void apply(W window, Iterable<T> values, Collector<ACC> out) 
throws Exception {
+               ACC result = accSerializer.copy(initialValue);
+
+               for (T val: values) {
+                       result = foldFunction.fold(result, val);
+               }
+
+               wrappedFunction.apply(window, result, out);
+       }
+
+       @Override
+       public void setOutputType(TypeInformation<ACC> outTypeInfo, 
ExecutionConfig executionConfig) {
+               accSerializer = outTypeInfo.createSerializer(executionConfig);
+
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
+
+               try {
+                       accSerializer.serialize(initialValue, out);
+               } catch (IOException ioe) {
+                       throw new RuntimeException("Unable to serialize initial 
value of type " +
+                               initialValue.getClass().getSimpleName() + " of 
fold window function.", ioe);
+               }
+
+               serializedInitialValue = baos.toByteArray();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
new file mode 100644
index 0000000..94356dc
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class FoldApplyWindowFunction<K, W extends Window, T, ACC>
+       extends WrappingFunction<WindowFunction<ACC, ACC, K, W>>
+       implements WindowFunction<Iterable<T>, ACC, K, W>, 
OutputTypeConfigurable<ACC> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final FoldFunction<T, ACC> foldFunction;
+
+       private byte[] serializedInitialValue;
+       private TypeSerializer<ACC> accSerializer;
+       private transient ACC initialValue;
+
+       public FoldApplyWindowFunction(ACC initialValue, FoldFunction<T, ACC> 
foldFunction, WindowFunction<ACC, ACC, K, W> windowFunction) {
+               super(windowFunction);
+               this.foldFunction = foldFunction;
+               this.initialValue = initialValue;
+       }
+
+       @Override
+       public void open(Configuration configuration) throws Exception {
+               super.open(configuration);
+
+               if (serializedInitialValue == null) {
+                       throw new RuntimeException("No initial value was 
serialized for the fold " +
+                               "window function. Probably the setOutputType 
method was not called.");
+               }
+
+               ByteArrayInputStream bais = new 
ByteArrayInputStream(serializedInitialValue);
+               DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais);
+               initialValue = accSerializer.deserialize(in);
+       }
+
+       @Override
+       public void apply(K key, W window, Iterable<T> values, Collector<ACC> 
out) throws Exception {
+               ACC result = accSerializer.copy(initialValue);
+
+               for (T val: values) {
+                       result = foldFunction.fold(result, val);
+               }
+
+               wrappedFunction.apply(key, window, result, out);
+       }
+
+       @Override
+       public void setOutputType(TypeInformation<ACC> outTypeInfo, 
ExecutionConfig executionConfig) {
+               accSerializer = outTypeInfo.createSerializer(executionConfig);
+
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
+
+               try {
+                       accSerializer.serialize(initialValue, out);
+               } catch (IOException ioe) {
+                       throw new RuntimeException("Unable to serialize initial 
value of type " +
+                               initialValue.getClass().getSimpleName() + " of 
fold window function.", ioe);
+               }
+
+               serializedInitialValue = baos.toByteArray();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
deleted file mode 100644
index db6d1bb..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-
-public class FoldWindowFunction<K, W extends Window, T, R>
-               extends WrappingFunction<FoldFunction<T, R>>
-               implements WindowFunction<Iterable<T>, R, K, W>, 
OutputTypeConfigurable<R> {
-       private static final long serialVersionUID = 1L;
-
-       private byte[] serializedInitialValue;
-       private TypeSerializer<R> outSerializer;
-       private transient R initialValue;
-
-       public FoldWindowFunction(R initialValue, FoldFunction<T, R> 
reduceFunction) {
-               super(reduceFunction);
-               this.initialValue = initialValue;
-       }
-
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               in.defaultReadObject();
-
-               if (serializedInitialValue == null) {
-                       throw new RuntimeException("No initial value was 
serialized for the fold " +
-                                       "window function. Probably the 
setOutputType method was not called.");
-               }
-
-               ByteArrayInputStream bais = new 
ByteArrayInputStream(serializedInitialValue);
-               DataInputViewStreamWrapper inStream = new 
DataInputViewStreamWrapper(bais);
-               initialValue = outSerializer.deserialize(inStream);
-       }
-
-       @Override
-       public void apply(K k, W window, Iterable<T> values, Collector<R> out) 
throws Exception {
-               R result = outSerializer.copy(initialValue);
-
-               for (T val: values) {
-                       result = wrappedFunction.fold(result, val);
-               }
-
-               out.collect(result);
-       }
-
-       @Override
-       public void setOutputType(TypeInformation<R> outTypeInfo, 
ExecutionConfig executionConfig) {
-               outSerializer = outTypeInfo.createSerializer(executionConfig);
-
-               ByteArrayOutputStream baos = new ByteArrayOutputStream();
-               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
-
-               try {
-                       outSerializer.serialize(initialValue, out);
-               } catch (IOException ioe) {
-                       throw new RuntimeException("Unable to serialize initial 
value of type " +
-                                       initialValue.getClass().getSimpleName() 
+ " of fold window function.", ioe);
-               }
-
-               serializedInitialValue = baos.toByteArray();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java
new file mode 100644
index 0000000..3ac2e2c
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+public class PassThroughAllWindowFunction<W extends Window, T> implements 
AllWindowFunction<T, T, W> {
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public void apply(W window, T input, Collector<T> out) throws Exception 
{
+               out.collect(input);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java
new file mode 100644
index 0000000..254c489
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+public class PassThroughWindowFunction<K, W extends Window, T> implements 
WindowFunction<T, T, K, W> {
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public void apply(K k, W window, T input, Collector<T> out) throws 
Exception {
+               out.collect(input);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
deleted file mode 100644
index 76b095b..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-public class ReduceAllWindowFunction<W extends Window, T> extends 
RichAllWindowFunction<T, T, W> {
-       private static final long serialVersionUID = 1L;
-
-       @Override
-       public void apply(W window, T input, Collector<T> out) throws Exception 
{
-               out.collect(input);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
deleted file mode 100644
index 8be4553b..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-public class ReduceWindowFunction<K, W extends Window, T> implements 
WindowFunction<T, T, K, W> {
-       private static final long serialVersionUID = 1L;
-
-       @Override
-       public void apply(K k, W window, T input, Collector<T> out) throws 
Exception {
-               out.collect(input);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
deleted file mode 100644
index fe42cd3..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-public class ReduceWindowFunctionWithWindow<K, W extends Window, T> extends 
RichWindowFunction<T, Tuple2<W, T>, K, W> {
-       private static final long serialVersionUID = 1L;
-
-       @Override
-       public void apply(K k, W window, T input, Collector<Tuple2<W, T>> out) 
throws Exception {
-               out.collect(Tuple2.of(window, input));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
new file mode 100644
index 0000000..bb91f2a
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import 
org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FoldApplyWindowFunctionTest {
+
+       /**
+        * Tests that the FoldWindowFunction gets the output type serializer 
set by the
+        * StreamGraphGenerator and checks that the FoldWindowFunction computes 
the correct result.
+        */
+       @Test
+       public void testFoldWindowFunctionOutputTypeConfigurable() throws 
Exception{
+               StreamExecutionEnvironment env = new 
DummyStreamExecutionEnvironment();
+
+               List<StreamTransformation<?>> transformations = new 
ArrayList<>();
+
+               int initValue = 1;
+
+               FoldApplyWindowFunction<Integer, TimeWindow, Integer, Integer> 
foldWindowFunction = new FoldApplyWindowFunction<>(
+                       initValue,
+                       new FoldFunction<Integer, Integer>() {
+                               private static final long serialVersionUID = 
-4849549768529720587L;
+
+                               @Override
+                               public Integer fold(Integer accumulator, 
Integer value) throws Exception {
+                                       return accumulator + value;
+                               }
+                       },
+                       new WindowFunction<Integer, Integer, Integer, 
TimeWindow>() {
+                               @Override
+                               public void apply(Integer integer,
+                                       TimeWindow window,
+                                       Integer input,
+                                       Collector<Integer> out) throws 
Exception {
+                                       out.collect(input);
+                               }
+                       }
+               );
+
+               AccumulatingProcessingTimeWindowOperator<Integer, Integer, 
Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>(
+                       foldWindowFunction,
+                       new KeySelector<Integer, Integer>() {
+                               private static final long serialVersionUID = 
-7951310554369722809L;
+
+                               @Override
+                               public Integer getKey(Integer value) throws 
Exception {
+                                       return value;
+                               }
+                       },
+                       IntSerializer.INSTANCE,
+                       IntSerializer.INSTANCE,
+                       3000,
+                       3000
+               );
+
+               SourceFunction<Integer> sourceFunction = new 
SourceFunction<Integer>(){
+
+                       private static final long serialVersionUID = 
8297735565464653028L;
+
+                       @Override
+                       public void run(SourceContext<Integer> ctx) throws 
Exception {
+
+                       }
+
+                       @Override
+                       public void cancel() {
+
+                       }
+               };
+
+               SourceTransformation<Integer> source = new 
SourceTransformation<>("", new StreamSource<Integer>(sourceFunction), 
BasicTypeInfo.INT_TYPE_INFO, 1);
+
+               transformations.add(new OneInputTransformation<>(source, 
"test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
+
+               StreamGraph streamGraph = StreamGraphGenerator.generate(env, 
transformations);
+
+               List<Integer> result = new ArrayList<>();
+               List<Integer> input = new ArrayList<>();
+               List<Integer> expected = new ArrayList<>();
+
+               input.add(1);
+               input.add(2);
+               input.add(3);
+
+               for (int value : input) {
+                       initValue += value;
+               }
+
+               expected.add(initValue);
+
+               foldWindowFunction.apply(0, new TimeWindow(0, 1), input, new 
ListCollector<Integer>(result));
+
+               Assert.assertEquals(expected, result);
+       }
+
+       public static class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment {
+
+               @Override
+               public JobExecutionResult execute(String jobName) throws 
Exception {
+                       return null;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldWindowFunctionTest.java
deleted file mode 100644
index 98e4d47..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldWindowFunctionTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.util.ListCollector;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.windowing.FoldWindowFunction;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.transformations.SourceTransformation;
-import org.apache.flink.streaming.api.transformations.StreamTransformation;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import 
org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
-import org.junit.Test;
-import org.junit.Assert;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class FoldWindowFunctionTest {
-
-       /**
-        * Tests that the FoldWindowFunction gets the output type serializer 
set by the
-        * StreamGraphGenerator and checks that the FoldWindowFunction computes 
the correct result.
-        */
-       @Test
-       public void testFoldWindowFunctionOutputTypeConfigurable() throws 
Exception{
-               StreamExecutionEnvironment env = new 
DummyStreamExecutionEnvironment();
-
-               List<StreamTransformation<?>> transformations = new 
ArrayList<>();
-
-               int initValue = 1;
-
-               FoldWindowFunction<Integer, TimeWindow, Integer, Integer> 
foldWindowFunction = new FoldWindowFunction<>(
-                       initValue,
-                       new FoldFunction<Integer, Integer>() {
-                               private static final long serialVersionUID = 
-4849549768529720587L;
-
-                               @Override
-                               public Integer fold(Integer accumulator, 
Integer value) throws Exception {
-                                       return accumulator + value;
-                               }
-                       }
-               );
-
-               AccumulatingProcessingTimeWindowOperator<Integer, Integer, 
Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>(
-                       foldWindowFunction,
-                       new KeySelector<Integer, Integer>() {
-                               private static final long serialVersionUID = 
-7951310554369722809L;
-
-                               @Override
-                               public Integer getKey(Integer value) throws 
Exception {
-                                       return value;
-                               }
-                       },
-                       IntSerializer.INSTANCE,
-                       IntSerializer.INSTANCE,
-                       3000,
-                       3000
-               );
-
-               SourceFunction<Integer> sourceFunction = new 
SourceFunction<Integer>(){
-
-                       private static final long serialVersionUID = 
8297735565464653028L;
-
-                       @Override
-                       public void run(SourceContext<Integer> ctx) throws 
Exception {
-
-                       }
-
-                       @Override
-                       public void cancel() {
-
-                       }
-               };
-
-               SourceTransformation<Integer> source = new 
SourceTransformation<>("", new StreamSource<Integer>(sourceFunction), 
BasicTypeInfo.INT_TYPE_INFO, 1);
-
-               transformations.add(new OneInputTransformation<>(source, 
"test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
-
-               StreamGraph streamGraph = StreamGraphGenerator.generate(env, 
transformations);
-
-               List<Integer> result = new ArrayList<>();
-               List<Integer> input = new ArrayList<>();
-               List<Integer> expected = new ArrayList<>();
-
-               input.add(1);
-               input.add(2);
-               input.add(3);
-
-               for (int value : input) {
-                       initValue += value;
-               }
-
-               expected.add(initValue);
-
-               foldWindowFunction.apply(0, new TimeWindow(0, 1), input, new 
ListCollector<Integer>(result));
-
-               Assert.assertEquals(expected, result);
-       }
-
-       public static class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment {
-
-               @Override
-               public JobExecutionResult execute(String jobName) throws 
Exception {
-                       return null;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 9d4a41a..c1111a0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
@@ -34,7 +35,6 @@ import 
org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
 import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
@@ -140,7 +140,7 @@ public class WindowOperatorTest {
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
-                               new ReduceWindowFunction<String, TimeWindow, 
Tuple2<String, Integer>>(),
+                               new PassThroughWindowFunction<String, 
TimeWindow, Tuple2<String, Integer>>(),
                                EventTimeTrigger.create());
 
                operator.setInputType(inputType, new ExecutionConfig());
@@ -271,7 +271,7 @@ public class WindowOperatorTest {
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
-                               new ReduceWindowFunction<String, TimeWindow, 
Tuple2<String, Integer>>(),
+                               new PassThroughWindowFunction<String, 
TimeWindow, Tuple2<String, Integer>>(),
                                EventTimeTrigger.create());
 
                operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -344,7 +344,7 @@ public class WindowOperatorTest {
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
-                               new ReduceWindowFunction<String, GlobalWindow, 
Tuple2<String, Integer>>(),
+                               new PassThroughWindowFunction<String, 
GlobalWindow, Tuple2<String, Integer>>(),
                                
ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
 
                operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -434,7 +434,7 @@ public class WindowOperatorTest {
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
-                               new ReduceWindowFunction<String, GlobalWindow, 
Tuple2<String, Integer>>(),
+                               new PassThroughWindowFunction<String, 
GlobalWindow, Tuple2<String, Integer>>(),
                                
PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));
 
                operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse(

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
index 3a49331..3aa60dc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
@@ -19,6 +19,8 @@
 package org.apache.flink.streaming.runtime.state;
 
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -120,6 +122,12 @@ public class StateBackendITCase extends 
StreamingMultipleProgramsTestBase {
                }
 
                @Override
+               protected <N, T, ACC> FoldingState<T, ACC> 
createFoldingState(TypeSerializer<N> namespaceSerializer,
+                       FoldingStateDescriptor<T, ACC> stateDesc) throws 
Exception {
+                       return null;
+               }
+
+               @Override
                public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID,
                        long timestamp) throws Exception {
                        return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 8f0d785..e8d3e05 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -265,6 +265,66 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
     javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])
   }
 
+  /**
+    * Applies the given window function to each window. The window function is 
called for each
+    * evaluation of the window for each key individually. The output of the 
window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is pre-aggregated using the given pre-aggregation folder.
+    *
+    * @param initialValue Initial value of the fold
+    * @param preAggregator The reduce function that is used for pre-aggregation
+    * @param function The window function.
+    * @return The data stream that is the result of applying the window 
function to the window.
+    */
+  def apply[R: TypeInformation: ClassTag](
+      initialValue: R,
+      preAggregator: FoldFunction[T, R],
+      function: AllWindowFunction[R, R, W]): DataStream[R] = {
+    javaStream.apply(
+      initialValue,
+      clean(preAggregator),
+      clean(function),
+      implicitly[TypeInformation[R]])
+  }
+
+  /**
+    * Applies the given window function to each window. The window function is 
called for each
+    * evaluation of the window for each key individually. The output of the 
window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is pre-aggregated using the given pre-aggregation folder.
+    *
+    * @param initialValue Initial value of the fold
+    * @param preAggregator The reduce function that is used for pre-aggregation
+    * @param function The window function.
+    * @return The data stream that is the result of applying the window 
function to the window.
+    */
+  def apply[R: TypeInformation: ClassTag](
+      initialValue: R,
+      preAggregator: (R, T) => R,
+      function: (W, R, Collector[R]) => Unit): DataStream[R] = {
+    if (function == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    if (function == null) {
+      throw new NullPointerException("WindowApply function must not be null.")
+    }
+
+    val cleanFolder = clean(preAggregator)
+    val folder = new FoldFunction[T, R] {
+      def fold(v1: R, v2: T) = { cleanFolder(v1, v2) }
+    }
+
+    val cleanApply = clean(function)
+    val applyFunction = new AllWindowFunction[R, R, W] {
+      def apply(window: W, input: R, out: Collector[R]): Unit = {
+        cleanApply(window, input, out)
+      }
+    }
+    javaStream.apply(initialValue, folder, applyFunction, 
implicitly[TypeInformation[R]])
+  }
+
   // ------------------------------------------------------------------------
   //  Aggregations on the keyed windows
   // ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 6385831..22d24fa 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -272,6 +272,66 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
     javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])
   }
 
+  /**
+    * Applies the given window function to each window. The window function is 
called for each
+    * evaluation of the window for each key individually. The output of the 
window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is incrementally aggregated using the given fold function.
+    *
+    * @param initialValue The initial value of the fold
+    * @param foldFunction The fold function that is used for incremental 
aggregation
+    * @param function The window function.
+    * @return The data stream that is the result of applying the window 
function to the window.
+    */
+  def apply[R: TypeInformation: ClassTag](
+      initialValue: R,
+      foldFunction: FoldFunction[T, R],
+      function: WindowFunction[R, R, K, W]): DataStream[R] = {
+    javaStream.apply(
+      initialValue,
+      clean(foldFunction),
+      clean(function),
+      implicitly[TypeInformation[R]])
+  }
+
+  /**
+    * Applies the given window function to each window. The window function is 
called for each
+    * evaluation of the window for each key individually. The output of the 
window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is incrementally aggregated using the given fold function.
+    *
+    * @param foldFunction The fold function that is used for incremental 
aggregation
+    * @param function The window function.
+    * @return The data stream that is the result of applying the window 
function to the window.
+    */
+  def apply[R: TypeInformation: ClassTag](
+      initialValue: R,
+      foldFunction: (R, T) => R,
+      function: (K, W, R, Collector[R]) => Unit): DataStream[R] = {
+    if (function == null) {
+      throw new NullPointerException("Fold function must not be null.")
+    }
+    if (function == null) {
+      throw new NullPointerException("WindowApply function must not be null.")
+    }
+
+    val cleanFolder = clean(foldFunction)
+    val folder = new FoldFunction[T, R] {
+      def fold(acc: R, v: T) = { cleanFolder(acc, v) }
+    }
+
+    val cleanApply = clean(function)
+    val applyFunction = new WindowFunction[R, R, K, W] {
+      def apply(key: K, window: W, input: R, out: Collector[R]): Unit = {
+        cleanApply(key, window, input, out)
+      }
+    }
+    javaStream.apply(initialValue, folder, applyFunction, 
implicitly[TypeInformation[R]])
+  }
+
+
   // ------------------------------------------------------------------------
   //  Aggregations on the keyed windows
   // ------------------------------------------------------------------------

Reply via email to