Repository: flink
Updated Branches:
  refs/heads/release-1.0 ba069f35b -> 75e03cacc


[FLINK-3528] Add FoldingWindowBuffer for Non-Keyed Windows

This makes AllWindowedStream.fold() take constant space, just like the
keyed WindowOperator.

Also this adds a new test case in EventTimeAllWindowCheckpointingITCase
to verify that the FoldingWindowBuffer works.

This also renames the preexisting window buffers to ReducingWindowBuffer
and ListWindowBuffer to make the naming scheme consistent.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/75e03cac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/75e03cac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/75e03cac

Branch: refs/heads/release-1.0
Commit: 75e03caccad159fb04df4c7085a49d7f76e994c5
Parents: ba069f3
Author: Aljoscha Krettek <[email protected]>
Authored: Fri Feb 26 23:27:26 2016 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Sat Feb 27 00:30:33 2016 +0100

----------------------------------------------------------------------
 .../api/common/functions/FoldFunction.java      |   2 +-
 .../api/datastream/AllWindowedStream.java       |  27 +--
 .../EvictingNonKeyedWindowOperator.java         |   9 +-
 .../windowing/NonKeyedWindowOperator.java       |  41 ++---
 .../windowing/buffers/EvictingWindowBuffer.java |   3 +-
 .../windowing/buffers/FoldingWindowBuffer.java  | 163 +++++++++++++++++++
 .../windowing/buffers/HeapWindowBuffer.java     |  94 -----------
 .../windowing/buffers/ListWindowBuffer.java     | 127 +++++++++++++++
 .../buffers/PreAggregatingHeapWindowBuffer.java |  99 -----------
 .../windowing/buffers/ReducingWindowBuffer.java | 121 ++++++++++++++
 .../windowing/buffers/WindowBuffer.java         |  15 +-
 .../windowing/buffers/WindowBufferFactory.java  |  29 ++--
 .../windowing/AllWindowTranslationTest.java     |  67 +++++++-
 .../EvictingNonKeyedWindowOperatorTest.java     |  12 +-
 .../windowing/NonKeyedWindowOperatorTest.java   |  43 ++---
 .../api/scala/AllWindowTranslationTest.scala    |  26 +--
 .../EventTimeAllWindowCheckpointingITCase.java  |  72 ++++++++
 17 files changed, 633 insertions(+), 317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
index 8194663..b52828e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
@@ -40,7 +40,7 @@ import java.io.Serializable;
  * @param <O> Type of the elements that the group/list/stream contains
  */
 @Public
-public interface FoldFunction<O,T> extends Function, Serializable {
+public interface FoldFunction<O, T> extends Function, Serializable {
        /**
         * The core method of FoldFunction, combining two values into one value 
of the same type.
         * The fold function is consecutively applied to all values of a group 
until only a single value remains.

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 6b32880..268dd8c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -43,8 +43,9 @@ import 
org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import 
org.apache.flink.streaming.runtime.operators.windowing.EvictingNonKeyedWindowOperator;
 import 
org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.FoldingWindowBuffer;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.ListWindowBuffer;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.ReducingWindowBuffer;
 
 /**
  * A {@code AllWindowedStream} represents a data stream where the stream of
@@ -157,7 +158,7 @@ public class AllWindowedStream<T, W extends Window> {
                if (evictor != null) {
                        operator = new 
EvictingNonKeyedWindowOperator<>(windowAssigner,
                                        
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-                                       new HeapWindowBuffer.Factory<T>(),
+                                       new 
ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
                                        new ReduceIterableAllWindowFunction<W, 
T>(function),
                                        trigger,
                                        evictor);
@@ -165,7 +166,7 @@ public class AllWindowedStream<T, W extends Window> {
                } else {
                        operator = new NonKeyedWindowOperator<>(windowAssigner,
                                        
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-                                       new 
PreAggregatingHeapWindowBuffer.Factory<>(function),
+                                       new 
ReducingWindowBuffer.Factory<>(function, 
getInputType().createSerializer(getExecutionEnvironment().getConfig())),
                                        new ReduceIterableAllWindowFunction<W, 
T>(function),
                                        trigger);
                }
@@ -255,12 +256,12 @@ public class AllWindowedStream<T, W extends Window> {
 
                String opName = "TriggerWindow(" + windowAssigner + ", " + 
trigger + ", " + udfName + ")";
 
-               NonKeyedWindowOperator<T, R, W> operator;
+               NonKeyedWindowOperator<T, T, R, W> operator;
 
                if (evictor != null) {
                        operator = new 
EvictingNonKeyedWindowOperator<>(windowAssigner,
                                        
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-                                       new HeapWindowBuffer.Factory<T>(),
+                                       new 
ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
                                        function,
                                        trigger,
                                        evictor);
@@ -268,7 +269,7 @@ public class AllWindowedStream<T, W extends Window> {
                } else {
                        operator = new NonKeyedWindowOperator<>(windowAssigner,
                                        
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-                                       new HeapWindowBuffer.Factory<T>(),
+                                       new 
ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
                                        function,
                                        trigger);
                }
@@ -329,7 +330,7 @@ public class AllWindowedStream<T, W extends Window> {
                if (evictor != null) {
                        operator = new 
EvictingNonKeyedWindowOperator<>(windowAssigner,
                                        
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-                                       new HeapWindowBuffer.Factory<T>(),
+                                       new 
ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
                                        new 
ReduceApplyAllWindowFunction<>(preAggregator, function),
                                        trigger,
                                        evictor);
@@ -337,8 +338,8 @@ public class AllWindowedStream<T, W extends Window> {
                } else {
                        operator = new NonKeyedWindowOperator<>(windowAssigner,
                                
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-                               new 
PreAggregatingHeapWindowBuffer.Factory<>(preAggregator),
-                               new 
ReduceApplyAllWindowFunction<>(preAggregator, function),
+                               new 
ReducingWindowBuffer.Factory<>(preAggregator, 
getInputType().createSerializer(getExecutionEnvironment().getConfig())),
+                               function,
                                trigger);
                }
 
@@ -400,7 +401,7 @@ public class AllWindowedStream<T, W extends Window> {
 
                        operator = new 
EvictingNonKeyedWindowOperator<>(windowAssigner,
                                
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-                               new HeapWindowBuffer.Factory<T>(),
+                               new 
ListWindowBuffer.Factory<>(getInputType().createSerializer(getExecutionEnvironment().getConfig())),
                                new FoldApplyAllWindowFunction<>(initialValue, 
foldFunction, function),
                                trigger,
                                evictor);
@@ -410,8 +411,8 @@ public class AllWindowedStream<T, W extends Window> {
 
                        operator = new NonKeyedWindowOperator<>(windowAssigner,
                                
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-                               new HeapWindowBuffer.Factory<T>(),
-                               new FoldApplyAllWindowFunction<>(initialValue, 
foldFunction, function),
+                               new FoldingWindowBuffer.Factory<>(foldFunction, 
initialValue, 
resultType.createSerializer(getExecutionEnvironment().getConfig())),
+                               function,
                                trigger);
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
index 221367d..22d207d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
@@ -36,11 +36,12 @@ import static java.util.Objects.requireNonNull;
  * @see 
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator
  *
  * @param <IN> The type of the incoming elements.
+ * @param <ACC> The type of elements stored in the window buffers.
  * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
  * @param <W> The type of {@code Window} that the {@code WindowAssigner} 
assigns.
  */
 @Internal
-public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends 
NonKeyedWindowOperator<IN, OUT, W> {
+public class EvictingNonKeyedWindowOperator<IN, ACC, OUT, W extends Window> 
extends NonKeyedWindowOperator<IN, ACC, OUT, W> {
 
        private static final long serialVersionUID = 1L;
 
@@ -48,8 +49,8 @@ public class EvictingNonKeyedWindowOperator<IN, OUT, W 
extends Window> extends N
 
        public EvictingNonKeyedWindowOperator(WindowAssigner<? super IN, W> 
windowAssigner,
                        TypeSerializer<W> windowSerializer,
-                       WindowBufferFactory<? super IN, ? extends 
EvictingWindowBuffer<IN>> windowBufferFactory,
-                       AllWindowFunction<IN, OUT, W> windowFunction,
+                       WindowBufferFactory<? super IN, ACC, ? extends 
EvictingWindowBuffer<IN, ACC>> windowBufferFactory,
+                       AllWindowFunction<ACC, OUT, W> windowFunction,
                        Trigger<? super IN, ? super W> trigger,
                        Evictor<? super IN, ? super W> evictor) {
                super(windowAssigner, windowSerializer, windowBufferFactory, 
windowFunction, trigger);
@@ -60,7 +61,7 @@ public class EvictingNonKeyedWindowOperator<IN, OUT, W 
extends Window> extends N
        @SuppressWarnings("unchecked, rawtypes")
        protected void emitWindow(Context context) throws Exception {
                
timestampedCollector.setAbsoluteTimestamp(context.window.maxTimestamp());
-               EvictingWindowBuffer<IN> windowBuffer = 
(EvictingWindowBuffer<IN>) context.windowBuffer;
+               EvictingWindowBuffer<IN, ACC> windowBuffer = 
(EvictingWindowBuffer<IN, ACC>) context.windowBuffer;
 
                int toEvict = 0;
                if (windowBuffer.size() > 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 95feadc..6bd5c7d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -45,7 +45,6 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.util.InstantiationUtil;
@@ -72,12 +71,13 @@ import static java.util.Objects.requireNonNull;
  * @see org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
  *
  * @param <IN> The type of the incoming elements.
+ * @param <ACC> The type of elements stored in the window buffers.
  * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
  * @param <W> The type of {@code Window} that the {@code WindowAssigner} 
assigns.
  */
 @Internal
-public class NonKeyedWindowOperator<IN, OUT, W extends Window>
-               extends AbstractUdfStreamOperator<OUT, AllWindowFunction<IN, 
OUT, W>>
+public class NonKeyedWindowOperator<IN, ACC, OUT, W extends Window>
+               extends AbstractUdfStreamOperator<OUT, AllWindowFunction<ACC, 
OUT, W>>
                implements OneInputStreamOperator<IN, OUT>, Triggerable, 
InputTypeConfigurable {
 
        private static final long serialVersionUID = 1L;
@@ -92,7 +92,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
        private final Trigger<? super IN, ? super W> trigger;
 
-       private final WindowBufferFactory<? super IN, ? extends 
WindowBuffer<IN>> windowBufferFactory;
+       private final WindowBufferFactory<? super IN, ACC, ? extends 
WindowBuffer<IN, ACC>> windowBufferFactory;
 
        /**
         * This is used to copy the incoming element because it can be put into 
several window
@@ -145,8 +145,8 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
         */
        public NonKeyedWindowOperator(WindowAssigner<? super IN, W> 
windowAssigner,
                        TypeSerializer<W> windowSerializer,
-                       WindowBufferFactory<? super IN, ? extends 
WindowBuffer<IN>> windowBufferFactory,
-                       AllWindowFunction<IN, OUT, W> windowFunction,
+                       WindowBufferFactory<? super IN, ACC, ? extends 
WindowBuffer<IN, ACC>> windowBufferFactory,
+                       AllWindowFunction<ACC, OUT, W> windowFunction,
                        Trigger<? super IN, ? super W> trigger) {
 
                super(windowFunction);
@@ -180,9 +180,6 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                        throw new IllegalStateException("Input serializer was 
not set.");
                }
 
-               windowBufferFactory.setRuntimeContext(getRuntimeContext());
-               windowBufferFactory.open(getUserFunctionParameters());
-
                // these could already be initialized from restoreState()
                if (watermarkTimers == null) {
                        watermarkTimers = new HashMap<>();
@@ -221,11 +218,6 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
        public final void dispose() {
                super.dispose();
                windows.clear();
-               try {
-                       windowBufferFactory.close();
-               } catch (Exception e) {
-                       throw new RuntimeException("Error while closing 
WindowBufferFactory.", e);
-               }
        }
 
        @Override
@@ -236,7 +228,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                for (W window: elementWindows) {
                        Context context = windows.get(window);
                        if (context == null) {
-                               WindowBuffer<IN> windowBuffer = 
windowBufferFactory.create();
+                               WindowBuffer<IN, ACC> windowBuffer = 
windowBufferFactory.create();
                                context = new Context(window, windowBuffer);
                                windows.put(window, context);
                        }
@@ -356,7 +348,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
        protected class Context implements TriggerContext {
                protected W window;
 
-               protected WindowBuffer<IN> windowBuffer;
+               protected WindowBuffer<IN, ACC> windowBuffer;
 
                protected HashMap<String, Serializable> state;
 
@@ -369,7 +361,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
 
                public Context(
                                W window,
-                               WindowBuffer<IN> windowBuffer) {
+                               WindowBuffer<IN, ACC> windowBuffer) {
                        this.window = window;
                        this.windowBuffer = windowBuffer;
                        state = new HashMap<>();
@@ -394,12 +386,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                        in.read(stateData);
                        state = InstantiationUtil.deserializeObject(stateData, 
userClassloader);
 
-                       this.windowBuffer = windowBufferFactory.create();
-                       int numElements = in.readInt();
-                       MultiplexingStreamRecordSerializer<IN> recordSerializer 
= new MultiplexingStreamRecordSerializer<>(inputSerializer);
-                       for (int i = 0; i < numElements; i++) {
-                               
windowBuffer.storeElement(recordSerializer.deserialize(in).<IN>asRecord());
-                       }
+                       this.windowBuffer = 
windowBufferFactory.restoreFromSnapshot(in);
                }
 
                protected void 
writeToState(AbstractStateBackend.CheckpointStateOutputView out) throws 
IOException {
@@ -411,11 +398,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                        out.writeInt(serializedState.length);
                        out.write(serializedState, 0, serializedState.length);
 
-                       MultiplexingStreamRecordSerializer<IN> recordSerializer 
= new MultiplexingStreamRecordSerializer<>(inputSerializer);
-                       out.writeInt(windowBuffer.size());
-                       for (StreamRecord<IN> element: 
windowBuffer.getElements()) {
-                               recordSerializer.serialize(element, out);
-                       }
+                       windowBuffer.snapshot(out);
                }
 
                @Override
@@ -635,7 +618,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
        }
 
        @VisibleForTesting
-       public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> 
getWindowBufferFactory() {
+       public WindowBufferFactory<? super IN, ACC, ? extends WindowBuffer<IN, 
ACC>> getWindowBufferFactory() {
                return windowBufferFactory;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
index 25a8211..75f646d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
@@ -25,9 +25,10 @@ import org.apache.flink.annotation.Internal;
  * the buffer.
  *
  * @param <T> The type of elements that this {@code WindowBuffer} can store.
+ * @param <O> The type of elements that this window buffer will return when 
asked for its contents.
  */
 @Internal
-public interface EvictingWindowBuffer<T> extends WindowBuffer<T> {
+public interface EvictingWindowBuffer<T, O> extends WindowBuffer<T, O> {
 
        /**
         * Removes the given number of elements, starting from the beginning.

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
new file mode 100644
index 0000000..fa44f9d
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collections;
+
+/**
+ * An {@link WindowBuffer} that stores elements on the Java Heap. This buffer 
uses a
+ * {@link FoldFunction} to incrementally aggregate elements that are added to 
the buffer.
+ *
+ * @param <T> The type of elements that can be added to this {@code 
WindowBuffer}.
+ * @param <ACC> The type of the accumulator that this {@code WindowBuffer} can 
store.
+ */
+@Internal
+public class FoldingWindowBuffer<T, ACC> implements WindowBuffer<T, ACC> {
+
+       private final FoldFunction<T, ACC> foldFunction;
+       private final TypeSerializer<ACC> accSerializer;
+       private StreamRecord<ACC> data;
+
+       protected FoldingWindowBuffer(FoldFunction<T, ACC> foldFunction, ACC 
initialAccumulator, TypeSerializer<ACC> accSerializer) {
+               this.foldFunction = foldFunction;
+               this.accSerializer = accSerializer;
+               this.data = new StreamRecord<>(initialAccumulator);
+       }
+
+       protected FoldingWindowBuffer(FoldFunction<T, ACC> foldFunction, 
StreamRecord<ACC> initialAccumulator, TypeSerializer<ACC> accSerializer) {
+               this.foldFunction = foldFunction;
+               this.accSerializer = accSerializer;
+               this.data = initialAccumulator;
+       }
+
+       @Override
+       public void storeElement(StreamRecord<T> element) throws Exception {
+               data.replace(foldFunction.fold(data.getValue(), 
element.getValue()), element.getTimestamp());
+       }
+
+       @Override
+       public Iterable<StreamRecord<ACC>> getElements() {
+               return Collections.singleton(data);
+       }
+
+       @Override
+       public Iterable<ACC> getUnpackedElements() {
+               return Collections.singleton(data.getValue());
+       }
+
+       @Override
+       public int size() {
+               return 1;
+       }
+
+       @Override
+       public void snapshot(DataOutputView out) throws IOException {
+               MultiplexingStreamRecordSerializer<ACC> recordSerializer = new 
MultiplexingStreamRecordSerializer<>(accSerializer);
+               recordSerializer.serialize(data, out);
+       }
+
+       public static class Factory<T, ACC> implements WindowBufferFactory<T, 
ACC, FoldingWindowBuffer<T, ACC>> {
+               private static final long serialVersionUID = 1L;
+
+               private final FoldFunction<T, ACC> foldFunction;
+
+               private final TypeSerializer<ACC> accSerializer;
+
+               private transient ACC initialAccumulator;
+
+               public Factory(FoldFunction<T, ACC> foldFunction, ACC 
initialValue, TypeSerializer<ACC> accSerializer) {
+                       this.foldFunction = foldFunction;
+                       this.accSerializer = accSerializer;
+                       this.initialAccumulator = initialValue;
+               }
+
+               @Override
+               public FoldingWindowBuffer<T, ACC> create() {
+                       return new FoldingWindowBuffer<>(foldFunction, 
accSerializer.copy(initialAccumulator), accSerializer);
+               }
+
+               @Override
+               public FoldingWindowBuffer<T, ACC> 
restoreFromSnapshot(DataInputView in) throws IOException {
+                       MultiplexingStreamRecordSerializer<ACC> 
recordSerializer = new MultiplexingStreamRecordSerializer<>(accSerializer);
+                       StreamElement element = 
recordSerializer.deserialize(in);
+                       return new FoldingWindowBuffer<>(foldFunction, 
element.<ACC>asRecord(), accSerializer);
+               }
+
+               private void writeObject(final ObjectOutputStream out) throws 
IOException {
+                       // write all the non-transient fields
+                       out.defaultWriteObject();
+
+
+                       byte[] serializedDefaultValue;
+                       try (ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                                       DataOutputViewStreamWrapper outView = 
new DataOutputViewStreamWrapper(baos))
+                       {
+                               accSerializer.serialize(initialAccumulator, 
outView);
+
+                               outView.flush();
+                               serializedDefaultValue = baos.toByteArray();
+                       }
+                       catch (Exception e) {
+                               throw new IOException("Unable to serialize 
initial accumulator of type " +
+                                               
initialAccumulator.getClass().getSimpleName() + ".", e);
+                       }
+
+                       out.writeInt(serializedDefaultValue.length);
+                       out.write(serializedDefaultValue);
+               }
+
+               private void readObject(final ObjectInputStream in) throws 
IOException, ClassNotFoundException {
+                       // read the non-transient fields
+                       in.defaultReadObject();
+
+                       // read the default value field
+                       int size = in.readInt();
+                       byte[] buffer = new byte[size];
+                       int bytesRead = in.read(buffer);
+
+                       if (bytesRead != size) {
+                               throw new RuntimeException("Read size does not 
match expected size.");
+                       }
+
+                       try (ByteArrayInputStream bais = new 
ByteArrayInputStream(buffer);
+                                       DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(bais))
+                       {
+                               initialAccumulator = 
accSerializer.deserialize(inView);
+                       }
+                       catch (Exception e) {
+                               throw new IOException("Unable to deserialize 
initial accumulator.", e);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
deleted file mode 100644
index 9db449b..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.ArrayDeque;
-
-/**
- * An {@link EvictingWindowBuffer} that stores elements on the Java Heap.
- *
- * @param <T> The type of elements that this {@code WindowBuffer} can store.
- */
-@Internal
-public class HeapWindowBuffer<T> implements EvictingWindowBuffer<T> {
-       private static final long serialVersionUID = 1L;
-
-       private ArrayDeque<StreamRecord<T>> elements;
-
-       protected HeapWindowBuffer() {
-               this.elements = new ArrayDeque<>();
-       }
-
-       @Override
-       public void storeElement(StreamRecord<T> element) {
-               elements.add(element);
-       }
-
-       @Override
-       public void removeElements(int count) {
-               // TODO determine if this can be done in a better way
-               for (int i = 0; i < count; i++) {
-                       elements.removeFirst();
-               }
-       }
-
-       @Override
-       public Iterable<StreamRecord<T>> getElements() {
-               return elements;
-       }
-
-       @Override
-       public Iterable<T> getUnpackedElements() {
-               return FluentIterable.from(elements).transform(new 
Function<StreamRecord<T>, T>() {
-                       @Override
-                       public T apply(StreamRecord<T> record) {
-                               return record.getValue();
-                       }
-               });
-       }
-
-       @Override
-       public int size() {
-               return elements.size();
-       }
-
-       public static class Factory<T> implements WindowBufferFactory<T, 
HeapWindowBuffer<T>> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void setRuntimeContext(RuntimeContext ctx) {}
-
-               @Override
-               public void open(Configuration config) {}
-
-               @Override
-               public void close() {}
-
-               @Override
-               public HeapWindowBuffer<T> create() {
-                       return new HeapWindowBuffer<>();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ListWindowBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ListWindowBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ListWindowBuffer.java
new file mode 100644
index 0000000..5b9dd3c
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ListWindowBuffer.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+/**
+ * An {@link EvictingWindowBuffer} that stores elements on the Java Heap.
+ *
+ * @param <T> The type of elements that this {@code WindowBuffer} can store.
+ */
+@Internal
+public class ListWindowBuffer<T> implements EvictingWindowBuffer<T, T> {
+
+       private final TypeSerializer<T>  serializer;
+
+       private ArrayDeque<StreamRecord<T>> elements;
+
+       protected ListWindowBuffer(TypeSerializer<T> serializer) {
+               this.serializer = serializer;
+               this.elements = new ArrayDeque<>();
+       }
+
+       protected ListWindowBuffer(ArrayDeque<StreamRecord<T>> elements, 
TypeSerializer<T> serializer) {
+               this.serializer = serializer;
+               this.elements = elements;
+       }
+
+       @Override
+       public void storeElement(StreamRecord<T> element) {
+               elements.add(element);
+       }
+
+       @Override
+       public void removeElements(int count) {
+               // TODO determine if this can be done in a better way
+               for (int i = 0; i < count; i++) {
+                       elements.removeFirst();
+               }
+       }
+
+       @Override
+       public Iterable<StreamRecord<T>> getElements() {
+               return elements;
+       }
+
+       @Override
+       public Iterable<T> getUnpackedElements() {
+               return FluentIterable.from(elements).transform(new 
Function<StreamRecord<T>, T>() {
+                       @Override
+                       public T apply(StreamRecord<T> record) {
+                               return record.getValue();
+                       }
+               });
+       }
+
+       @Override
+       public int size() {
+               return elements.size();
+       }
+
+       @Override
+       public void snapshot(DataOutputView out) throws IOException {
+               out.writeInt(elements.size());
+
+               MultiplexingStreamRecordSerializer<T> recordSerializer = new 
MultiplexingStreamRecordSerializer<>(serializer);
+
+               for (StreamRecord<T> e: elements) {
+                       recordSerializer.serialize(e, out);
+               }
+       }
+
+       public static class Factory<T> implements WindowBufferFactory<T, T, 
ListWindowBuffer<T>> {
+               private static final long serialVersionUID = 1L;
+
+               private final TypeSerializer<T> serializer;
+
+               public Factory(TypeSerializer<T> serializer) {
+                       this.serializer = serializer;
+               }
+
+               @Override
+               public ListWindowBuffer<T> create() {
+                       return new ListWindowBuffer<>(serializer);
+               }
+
+               @Override
+               public ListWindowBuffer<T> restoreFromSnapshot(DataInputView 
in) throws IOException {
+                       int size = in.readInt();
+
+                       MultiplexingStreamRecordSerializer<T> recordSerializer 
= new MultiplexingStreamRecordSerializer<>(serializer);
+
+                       ArrayDeque<StreamRecord<T>> elements = new 
ArrayDeque<>();
+
+                       for (int i = 0; i < size; i++) {
+                               
elements.add(recordSerializer.deserialize(in).<T>asRecord());
+                       }
+
+                       return new ListWindowBuffer<>(elements, serializer);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
deleted file mode 100644
index 5f8de4b..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing.buffers;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.Collections;
-
-/**
- * An {@link WindowBuffer} that stores elements on the Java Heap. This buffer 
uses a
- * {@link ReduceFunction} to pre-aggregate elements that are added to the 
buffer.
- *
- * @param <T> The type of elements that this {@code WindowBuffer} can store.
- */
-@Internal
-public class PreAggregatingHeapWindowBuffer<T> implements WindowBuffer<T> {
-       private static final long serialVersionUID = 1L;
-
-       private final ReduceFunction<T> reduceFunction;
-       private transient StreamRecord<T> data;
-
-       protected PreAggregatingHeapWindowBuffer(ReduceFunction<T> 
reduceFunction) {
-               this.reduceFunction = reduceFunction;
-       }
-
-       @Override
-       public void storeElement(StreamRecord<T> element) throws Exception {
-               if (data == null) {
-                       data = new StreamRecord<>(element.getValue(), 
element.getTimestamp());
-               } else {
-                       data.replace(reduceFunction.reduce(data.getValue(), 
element.getValue()));
-               }
-       }
-
-       @Override
-       public Iterable<StreamRecord<T>> getElements() {
-               return Collections.singleton(data);
-       }
-
-       @Override
-       public Iterable<T> getUnpackedElements() {
-               return Collections.singleton(data.getValue());
-       }
-
-       @Override
-       public int size() {
-               return 1;
-       }
-
-       public static class Factory<T> implements WindowBufferFactory<T, 
PreAggregatingHeapWindowBuffer<T>> {
-               private static final long serialVersionUID = 1L;
-
-               private final ReduceFunction<T> reduceFunction;
-
-               public Factory(ReduceFunction<T> reduceFunction) {
-                       this.reduceFunction = reduceFunction;
-               }
-
-               @Override
-               public void setRuntimeContext(RuntimeContext ctx) {
-                       FunctionUtils.setFunctionRuntimeContext(reduceFunction, 
ctx);
-               }
-
-               @Override
-               public void open(Configuration config) throws Exception {
-                       FunctionUtils.openFunction(reduceFunction, config);
-               }
-
-               @Override
-               public void close() throws Exception {
-                       FunctionUtils.closeFunction(reduceFunction);
-               }
-
-               @Override
-               public PreAggregatingHeapWindowBuffer<T> create() {
-                       return new 
PreAggregatingHeapWindowBuffer<>(reduceFunction);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
new file mode 100644
index 0000000..1f2b639
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.Collections;
+
+/**
+ * An {@link WindowBuffer} that stores elements on the Java Heap. This buffer 
uses a
+ * {@link ReduceFunction} to incrementally aggregate elements that are added 
to the buffer.
+ *
+ * @param <T> The type of elements that this {@code WindowBuffer} can store.
+ */
+@Internal
+public class ReducingWindowBuffer<T> implements WindowBuffer<T, T> {
+
+       private final ReduceFunction<T> reduceFunction;
+       private final TypeSerializer<T> serializer;
+       private  StreamRecord<T> data;
+
+       protected ReducingWindowBuffer(ReduceFunction<T> reduceFunction, 
TypeSerializer<T> serializer) {
+               this.reduceFunction = reduceFunction;
+               this.serializer = serializer;
+               this.data = null;
+       }
+
+       protected ReducingWindowBuffer(ReduceFunction<T> reduceFunction, 
StreamRecord<T> data, TypeSerializer<T> serializer) {
+               this.reduceFunction = reduceFunction;
+               this.serializer = serializer;
+               this.data = data;
+       }
+
+       @Override
+       public void storeElement(StreamRecord<T> element) throws Exception {
+               if (data == null) {
+                       data = new StreamRecord<>(element.getValue(), 
element.getTimestamp());
+               } else {
+                       data.replace(reduceFunction.reduce(data.getValue(), 
element.getValue()));
+               }
+       }
+
+       @Override
+       public Iterable<StreamRecord<T>> getElements() {
+               return Collections.singleton(data);
+       }
+
+       @Override
+       public Iterable<T> getUnpackedElements() {
+               return Collections.singleton(data.getValue());
+       }
+
+       @Override
+       public int size() {
+               return 1;
+       }
+
+       @Override
+       public void snapshot(DataOutputView out) throws IOException {
+               if (data != null) {
+                       out.writeBoolean(true);
+                       MultiplexingStreamRecordSerializer<T> recordSerializer 
= new MultiplexingStreamRecordSerializer<>(serializer);
+                       recordSerializer.serialize(data, out);
+               } else {
+                       out.writeBoolean(false);
+               }
+       }
+
+       public static class Factory<T> implements WindowBufferFactory<T, T, 
ReducingWindowBuffer<T>> {
+               private static final long serialVersionUID = 1L;
+
+               private final ReduceFunction<T> reduceFunction;
+
+               private final TypeSerializer<T> serializer;
+
+               public Factory(ReduceFunction<T> reduceFunction, 
TypeSerializer<T> serializer) {
+                       this.reduceFunction = reduceFunction;
+                       this.serializer = serializer;
+               }
+
+               @Override
+               public ReducingWindowBuffer<T> create() {
+                       return new ReducingWindowBuffer<>(reduceFunction, 
serializer);
+               }
+
+               @Override
+               public ReducingWindowBuffer<T> 
restoreFromSnapshot(DataInputView in) throws IOException {
+                       boolean hasValue = in.readBoolean();
+                       if (hasValue) {
+                               MultiplexingStreamRecordSerializer<T> 
recordSerializer = new MultiplexingStreamRecordSerializer<>(serializer);
+                               StreamElement element = 
recordSerializer.deserialize(in);
+                               return new 
ReducingWindowBuffer<>(reduceFunction, element.<T>asRecord(), serializer);
+                       } else {
+                               return new 
ReducingWindowBuffer<>(reduceFunction, serializer);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
index cbf7dda..16be0f3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
@@ -19,10 +19,11 @@ package 
org.apache.flink.streaming.runtime.operators.windowing.buffers;
 
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-import java.io.Serializable;
+import java.io.IOException;
 
 /**
  * A {@code WindowBuffer} is used by
@@ -37,9 +38,10 @@ import java.io.Serializable;
  * have their own instance of the {@code Evictor}.
  *
  * @param <T> The type of elements that this {@code WindowBuffer} can store.
+ * @param <O> The type of elements that this window buffer will return when 
asked for its contents.
  */
 @Internal
-public interface WindowBuffer<T> extends Serializable {
+public interface WindowBuffer<T, O> {
 
        /**
         * Adds the element to the buffer.
@@ -51,16 +53,21 @@ public interface WindowBuffer<T> extends Serializable {
        /**
         * Returns all elements that are currently in the buffer.
         */
-       Iterable<StreamRecord<T>> getElements();
+       Iterable<StreamRecord<O>> getElements();
 
        /**
         * Returns all elements that are currently in the buffer. This will 
unwrap the contained
         * elements from their {@link StreamRecord}.
         */
-       Iterable<T> getUnpackedElements();
+       Iterable<O> getUnpackedElements();
 
        /**
         * Returns the number of elements that are currently in the buffer.
         */
        int size();
+
+       /**
+        * Writes the contents of the window buffer to a {@link DataOutputView} 
for checkpointing.
+        */
+       void snapshot(DataOutputView out) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
index a4f4b27..1ca6350 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
@@ -18,39 +18,30 @@
 package org.apache.flink.streaming.runtime.operators.windowing.buffers;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 
+import java.io.IOException;
 import java.io.Serializable;
 
 /**
  * A factory for {@link WindowBuffer WindowBuffers}.
  *
  * @param <T> The type of elements that the created {@code WindowBuffer} can 
store.
+ * @param <O> The type of elements that the created buffer will return when 
asked for its contents.
  * @param <B> The type of the created {@code WindowBuffer}
  */
 @Internal
-public interface WindowBufferFactory<T, B extends WindowBuffer<T>> extends 
Serializable {
+public interface WindowBufferFactory<T, O, B extends WindowBuffer<T, O>> 
extends Serializable {
 
        /**
-        * Sets the {@link RuntimeContext} that is used to initialize eventual 
user functions
-        * inside the created buffers.
-        */
-       void setRuntimeContext(RuntimeContext ctx);
-
-       /**
-        * Calls {@code open()} on eventual user functions inside the buffer.
-        */
-       void open(Configuration config) throws Exception;
-
-       /**
-        * Calls {@code close()} on eventual user functions inside the buffer.
+        * Creates a new {@code WindowBuffer}.
         */
-
-       void close() throws Exception;
+       B create();
 
        /**
-        * Creates a new {@code WindowBuffer}.
+        * Restores a {@code WindowBuffer} from a previous snapshot written 
using
+        * {@link WindowBuffer#snapshot(DataOutputView)}.
         */
-       B create();
+       B restoreFromSnapshot(DataInputView in) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 42f452c..f6e3dcc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -33,8 +34,9 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.FoldingWindowBuffer;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.ListWindowBuffer;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.ReducingWindowBuffer;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
@@ -72,7 +74,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) 
operator1;
                Assert.assertTrue(winOperator1.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
-               Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof PreAggregatingHeapWindowBuffer.Factory);
+               Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof ReducingWindowBuffer.Factory);
 
                DataStream<Tuple2<String, Integer>> window2 = source
                                .windowAll(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
@@ -94,7 +96,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) 
operator2;
                Assert.assertTrue(winOperator2.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
-               Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
+               Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof ListWindowBuffer.Factory);
        }
 
        @Test
@@ -118,7 +120,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) 
operator1;
                Assert.assertTrue(winOperator1.getTrigger() instanceof 
CountTrigger);
                Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
-               Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof PreAggregatingHeapWindowBuffer.Factory);
+               Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof ReducingWindowBuffer.Factory);
 
                DataStream<Tuple2<String, Integer>> window2 = source
                                .windowAll(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
@@ -141,7 +143,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) 
operator2;
                Assert.assertTrue(winOperator2.getTrigger() instanceof 
CountTrigger);
                Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
-               Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
+               Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof ListWindowBuffer.Factory);
        }
 
        @Test
@@ -166,7 +168,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                Assert.assertTrue(winOperator1.getTrigger() instanceof 
EventTimeTrigger);
                Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
                Assert.assertTrue(winOperator1.getEvictor() instanceof 
CountEvictor);
-               Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
+               Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof ListWindowBuffer.Factory);
 
                DataStream<Tuple2<String, Integer>> window2 = source
                                .windowAll(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
@@ -191,7 +193,46 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                Assert.assertTrue(winOperator2.getTrigger() instanceof 
CountTrigger);
                Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
                Assert.assertTrue(winOperator2.getEvictor() instanceof 
TimeEvictor);
-               Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
+               Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof ListWindowBuffer.Factory);
+       }
+
+       /**
+        * These tests ensure that a Fold buffer is used if possible
+        */
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testFoldBuffer() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DummyFolder folder = new DummyFolder();
+
+               DataStream<Integer> window1 = source
+                               .windowAll(SlidingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+                               .fold(0, folder);
+
+               OneInputTransformation<Tuple2<String, Integer>, Integer> 
transform1 = (OneInputTransformation<Tuple2<String, Integer>, Integer>) 
window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Integer> 
operator1 = transform1.getOperator();
+               Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
+               NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) 
operator1;
+               Assert.assertTrue(winOperator1.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
+               Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof FoldingWindowBuffer.Factory);
+
+               DataStream<Integer> window2 = source
+                               .windowAll(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
+                               .evictor(CountEvictor.of(13))
+                               .fold(0, folder);
+
+               OneInputTransformation<Tuple2<String, Integer>, Integer> 
transform2 = (OneInputTransformation<Tuple2<String, Integer>, Integer>) 
window2.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Integer> 
operator2 = transform2.getOperator();
+               Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
+               NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) 
operator2;
+               Assert.assertTrue(winOperator2.getTrigger() instanceof 
EventTimeTrigger);
+               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
+               Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof ListWindowBuffer.Factory);
        }
 
        // 
------------------------------------------------------------------------
@@ -206,4 +247,14 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                        return value1;
                }
        }
+
+       public static class DummyFolder implements FoldFunction<Tuple2<String, 
Integer>, Integer> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Integer fold(Integer accumulator, Tuple2<String, 
Integer> value) throws Exception {
+                       return accumulator;
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
index 571838f..c3a36dd 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
@@ -19,6 +19,7 @@ package 
org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
@@ -28,11 +29,10 @@ import 
org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.ListWindowBuffer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Comparator;
@@ -51,15 +51,17 @@ public class EvictingNonKeyedWindowOperatorTest {
                final int WINDOW_SIZE = 4;
                final int WINDOW_SLIDE = 2;
 
-               EvictingNonKeyedWindowOperator<Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new 
EvictingNonKeyedWindowOperator<>(
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+               EvictingNonKeyedWindowOperator<Tuple2<String, Integer>, 
Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new 
EvictingNonKeyedWindowOperator<>(
                                GlobalWindows.create(),
                                new GlobalWindow.Serializer(),
-                               new HeapWindowBuffer.Factory<Tuple2<String, 
Integer>>(),
+                               new ListWindowBuffer.Factory<Tuple2<String, 
Integer>>(inputType.createSerializer(new ExecutionConfig())),
                                new 
ReduceIterableAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new 
SumReducer()),
                                CountTrigger.of(WINDOW_SLIDE),
                                CountEvictor.of(WINDOW_SIZE));
 
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+               operator.setInputType(inputType, new ExecutionConfig());
 
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
index c0e6ad4..406f3b0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
@@ -20,8 +20,12 @@ package 
org.apache.flink.streaming.runtime.operators.windowing;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -35,8 +39,8 @@ import 
org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.ListWindowBuffer;
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.ReducingWindowBuffer;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -59,7 +63,7 @@ public class NonKeyedWindowOperatorTest {
        @SuppressWarnings("unchecked,rawtypes")
        private WindowBufferFactory windowBufferFactory;
 
-       public NonKeyedWindowOperatorTest(WindowBufferFactory<?, ?> 
windowBufferFactory) {
+       public NonKeyedWindowOperatorTest(WindowBufferFactory<?, ?, ?> 
windowBufferFactory) {
                this.windowBufferFactory = windowBufferFactory;
        }
 
@@ -74,7 +78,7 @@ public class NonKeyedWindowOperatorTest {
                final int WINDOW_SIZE = 3;
                final int WINDOW_SLIDE = 1;
 
-               NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
+               NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new 
NonKeyedWindowOperator<>(
                                SlidingTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                windowBufferFactory,
@@ -150,7 +154,7 @@ public class NonKeyedWindowOperatorTest {
 
                final int WINDOW_SIZE = 3;
 
-               NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
+               NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new 
NonKeyedWindowOperator<>(
                                TumblingTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                windowBufferFactory,
@@ -224,7 +228,7 @@ public class NonKeyedWindowOperatorTest {
 
                final int WINDOW_SIZE = 3;
 
-               NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
+               NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new 
NonKeyedWindowOperator<>(
                                GlobalWindows.create(),
                                new GlobalWindow.Serializer(),
                                windowBufferFactory,
@@ -298,7 +302,7 @@ public class NonKeyedWindowOperatorTest {
 
                final int WINDOW_SIZE = 4;
 
-               NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
+               NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new 
NonKeyedWindowOperator<>(
                                GlobalWindows.create(),
                                new GlobalWindow.Serializer(),
                                windowBufferFactory,
@@ -360,26 +364,9 @@ public class NonKeyedWindowOperatorTest {
        public static class RichSumReducer extends 
RichReduceFunction<Tuple2<String, Integer>> {
                private static final long serialVersionUID = 1L;
 
-               private boolean openCalled = false;
-
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       super.open(parameters);
-                       openCalled = true;
-               }
-
-               @Override
-               public void close() throws Exception {
-                       super.close();
-                       closeCalled.incrementAndGet();
-               }
-
                @Override
                public Tuple2<String, Integer> reduce(Tuple2<String, Integer> 
value1,
                                Tuple2<String, Integer> value2) throws 
Exception {
-                       if (!openCalled) {
-                               Assert.fail("Open was not called");
-                       }
                        return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
                }
        }
@@ -389,9 +376,11 @@ public class NonKeyedWindowOperatorTest {
 
        @Parameterized.Parameters(name = "WindowBuffer = {0}")
        @SuppressWarnings("unchecked,rawtypes")
-       public static Collection<WindowBufferFactory[]> windowBuffers(){
-               return Arrays.asList(new WindowBufferFactory[]{new 
PreAggregatingHeapWindowBuffer.Factory(new RichSumReducer())},
-                               new WindowBufferFactory[]{new 
HeapWindowBuffer.Factory()}
+       public static Collection<WindowBufferFactory[]> windowBuffers() {
+               TupleSerializer<Tuple2> tuple2TupleSerializer = new 
TupleSerializer<>(Tuple2.class,
+                               new 
TypeSerializer<?>[]{StringSerializer.INSTANCE, IntSerializer.INSTANCE});
+               return Arrays.asList(new WindowBufferFactory[]{new 
ReducingWindowBuffer.Factory(new SumReducer(), tuple2TupleSerializer)},
+                               new WindowBufferFactory[]{new 
ListWindowBuffer.Factory(tuple2TupleSerializer)}
                                );
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index f73307c..a676757 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -32,7 +32,7 @@ import 
org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvic
 import org.apache.flink.streaming.api.windowing.time.Time
 import 
org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, 
CountTrigger}
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.{HeapWindowBuffer,
 PreAggregatingHeapWindowBuffer}
+import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.{ListWindowBuffer,
 ReducingWindowBuffer}
 import org.apache.flink.streaming.runtime.operators.windowing._
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.util.Collector
@@ -111,12 +111,12 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
     val operator1 = transform1.getOperator
 
-    assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
-    val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
+    assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _, _]])
+    val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _, 
_]]
     assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     assertTrue(
-      
winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+      
winOperator1.getWindowBufferFactory.isInstanceOf[ReducingWindowBuffer.Factory[_]])
 
 
     val window2 = source
@@ -134,11 +134,11 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
     val operator2 = transform2.getOperator
 
-    assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
-    val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
+    assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _, _]])
+    val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _, 
_]]
     assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
-    
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
+    
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[ListWindowBuffer.Factory[_]])
   }
 
   @Test
@@ -161,12 +161,12 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
     val operator1 = transform1.getOperator
 
-    assertTrue(operator1.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]])
-    val winOperator1 = 
operator1.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
+    assertTrue(operator1.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _, 
_]])
+    val winOperator1 = 
operator1.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _, _]]
     assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger])
     assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
     
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
-    
assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
+    
assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[ListWindowBuffer.Factory[_]])
 
 
     val window2 = source
@@ -185,12 +185,12 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
     val operator2 = transform2.getOperator
 
-    assertTrue(operator2.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]])
-    val winOperator2 = 
operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
+    assertTrue(operator2.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _, 
_]])
+    val winOperator2 = 
operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _, _]]
     assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
     
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
-    
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
+    
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[ListWindowBuffer.Factory[_]])
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/75e03cac/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index d18a45e..4431106 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.checkpointing;
 
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -280,6 +281,77 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
        }
 
        @Test
+       public void testPreAggregatedFoldingTumblingTimeWindow() {
+               final int NUM_ELEMENTS_PER_KEY = 3000;
+               final int WINDOW_SIZE = 100;
+               final int NUM_KEYS = 1;
+               FailingSource.reset();
+
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
+                                       "localhost", 
cluster.getLeaderRPCPort());
+
+                       env.setParallelism(PARALLELISM);
+                       
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+                       env.enableCheckpointing(100);
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+                       env.getConfig().disableSysoutLogging();
+
+                       env
+                                       .addSource(new FailingSource(NUM_KEYS,
+                                                       NUM_ELEMENTS_PER_KEY,
+                                                       NUM_ELEMENTS_PER_KEY / 
3))
+                                       .rebalance()
+                                       .timeWindowAll(Time.of(WINDOW_SIZE, 
MILLISECONDS))
+                                       .apply(new Tuple4<>(0L, 0L, 0L, new 
IntType(0)),
+                                                       new 
FoldFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>>() {
+                                                               @Override
+                                                               public 
Tuple4<Long, Long, Long, IntType> fold(Tuple4<Long, Long, Long, IntType> 
accumulator,
+                                                                               
Tuple2<Long, IntType> value) throws Exception {
+                                                                       
accumulator.f0 = value.f0;
+                                                                       
accumulator.f3 = new IntType(accumulator.f3.value + value.f1.value);
+                                                                       return 
accumulator;
+                                                               }
+                                                       },
+                                                       new 
RichAllWindowFunction<Tuple4<Long, Long, Long, IntType>, Tuple4<Long, Long, 
Long, IntType>, TimeWindow>() {
+
+                                                               private boolean 
open = false;
+
+                                                               @Override
+                                                               public void 
open(Configuration parameters) {
+                                                                       
assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+                                                                       open = 
true;
+                                                               }
+
+                                                               @Override
+                                                               public void 
apply(
+                                                                               
TimeWindow window,
+                                                                               
Iterable<Tuple4<Long, Long, Long, IntType>> input,
+                                                                               
Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+                                                                       // 
validate that the function has been opened properly
+                                                                       
assertTrue(open);
+
+                                                                       for 
(Tuple4<Long, Long, Long, IntType> in: input) {
+                                                                               
out.collect(new Tuple4<>(in.f0,
+                                                                               
                window.getStart(),
+                                                                               
                window.getEnd(),
+                                                                               
                in.f3));
+                                                                       }
+                                                               }
+                                                       })
+                                       .addSink(new ValidatingSink(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
+
+
+                       tryExecute(env, "Tumbling Window Test");
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
        public void testPreAggregatedSlidingTimeWindow() {
                final int NUM_ELEMENTS_PER_KEY = 3000;
                final int WINDOW_SIZE = 1000;

Reply via email to