http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 48ad387..d9fa9f0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -19,10 +19,14 @@ package 
org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateHandle;
@@ -69,7 +73,7 @@ import static java.util.Objects.requireNonNull;
  * @param <W> The type of {@code Window} that the {@code WindowAssigner} 
assigns.
  */
 public class NonKeyedWindowOperator<IN, OUT, W extends Window>
-               extends AbstractUdfStreamOperator<OUT, AllWindowFunction<IN, 
OUT, W>>
+               extends AbstractUdfStreamOperator<OUT, 
AllWindowFunction<Iterable<IN>, OUT, W>>
                implements OneInputStreamOperator<IN, OUT>, Triggerable, 
InputTypeConfigurable {
 
        private static final long serialVersionUID = 1L;
@@ -145,7 +149,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
        public NonKeyedWindowOperator(WindowAssigner<? super IN, W> 
windowAssigner,
                        TypeSerializer<W> windowSerializer,
                        WindowBufferFactory<? super IN, ? extends 
WindowBuffer<IN>> windowBufferFactory,
-                       AllWindowFunction<IN, OUT, W> windowFunction,
+                       AllWindowFunction<Iterable<IN>, OUT, W> windowFunction,
                        Trigger<? super IN, ? super W> trigger) {
 
                super(windowFunction);
@@ -413,29 +417,72 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                        }
                }
 
-               @SuppressWarnings("unchecked")
-               public <S extends Serializable> ValueState<S> 
getKeyValueState(final String name, final S defaultState) {
-                       return new ValueState<S>() {
+               @Override
+               public <S extends Serializable> ValueState<S> 
getKeyValueState(String name,
+                       Class<S> stateType,
+                       S defaultState) {
+                       requireNonNull(stateType, "The state type class must 
not be null");
+
+                       TypeInformation<S> typeInfo;
+                       try {
+                               typeInfo = TypeExtractor.getForClass(stateType);
+                       }
+                       catch (Exception e) {
+                               throw new RuntimeException("Cannot analyze type 
'" + stateType.getName() +
+                                       "' from the class alone, due to generic 
type parameters. " +
+                                       "Please specify the TypeInformation 
directly.", e);
+                       }
+
+                       return getKeyValueState(name, typeInfo, defaultState);
+               }
+
+               @Override
+               public <S extends Serializable> ValueState<S> 
getKeyValueState(String name,
+                       TypeInformation<S> stateType,
+                       S defaultState) {
+
+                       requireNonNull(name, "The name of the state must not be 
null");
+                       requireNonNull(stateType, "The state type information 
must not be null");
+
+                       ValueStateDescriptor<S> stateDesc = new 
ValueStateDescriptor<>(name, defaultState, 
stateType.createSerializer(getExecutionConfig()));
+                       return getPartitionedState(stateDesc);
+               }
+
+               @Override
+               @SuppressWarnings("rawtypes, unchecked")
+               public <S extends State> S getPartitionedState(final 
StateDescriptor<S> stateDescriptor) {
+                       if (!(stateDescriptor instanceof ValueStateDescriptor)) 
{
+                               throw new 
UnsupportedOperationException("NonKeyedWindowOperator Triggers only " +
+                                       "support ValueState.");
+                       }
+                       @SuppressWarnings("unchecked")
+                       final ValueStateDescriptor<?> valueStateDescriptor = 
(ValueStateDescriptor<?>) stateDescriptor;
+                       ValueState valueState = new ValueState() {
                                @Override
-                               public S value() throws IOException {
-                                       Serializable value = state.get(name);
+                               public Object value() throws IOException {
+                                       Object value = 
state.get(stateDescriptor.getName());
                                        if (value == null) {
-                                               state.put(name, defaultState);
-                                               value = defaultState;
+                                               value = 
valueStateDescriptor.getDefaultValue();
+                                               
state.put(stateDescriptor.getName(), (Serializable) value);
                                        }
-                                       return (S) value;
+                                       return value;
                                }
 
                                @Override
-                               public void update(S value) throws IOException {
-                                       state.put(name, value);
+                               public void update(Object value) throws 
IOException {
+                                       if (!(value instanceof Serializable)) {
+                                               throw new 
UnsupportedOperationException(
+                                                       "Value state of 
NonKeyedWindowOperator must be serializable.");
+                                       }
+                                       state.put(stateDescriptor.getName(), 
(Serializable) value);
                                }
 
                                @Override
                                public void clear() {
-                                       state.remove(name);
+                                       state.remove(stateDescriptor.getName());
                                }
                        };
+                       return (S) valueState;
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index fd39481..5109dae 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -19,11 +19,16 @@ package 
org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateHandle;
@@ -37,25 +42,18 @@ import 
org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Set;
 
 import static java.util.Objects.requireNonNull;
@@ -86,8 +84,8 @@ import static java.util.Objects.requireNonNull;
  * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
  * @param <W> The type of {@code Window} that the {@code WindowAssigner} 
assigns.
  */
-public class WindowOperator<K, IN, OUT, W extends Window>
-               extends AbstractUdfStreamOperator<OUT, WindowFunction<IN, OUT, 
K, W>>
+public class WindowOperator<K, IN, ACC, OUT, W extends Window>
+               extends AbstractUdfStreamOperator<OUT, WindowFunction<ACC, OUT, 
K, W>>
                implements OneInputStreamOperator<IN, OUT>, Triggerable, 
InputTypeConfigurable {
 
        private static final long serialVersionUID = 1L;
@@ -98,52 +96,42 @@ public class WindowOperator<K, IN, OUT, W extends Window>
        // Configuration values and user functions
        // 
------------------------------------------------------------------------
 
-       private final WindowAssigner<? super IN, W> windowAssigner;
+       protected final WindowAssigner<? super IN, W> windowAssigner;
 
-       private final KeySelector<IN, K> keySelector;
+       protected final KeySelector<IN, K> keySelector;
 
-       private final Trigger<? super IN, ? super W> trigger;
+       protected final Trigger<? super IN, ? super W> trigger;
 
-       private final WindowBufferFactory<? super IN, ? extends 
WindowBuffer<IN>> windowBufferFactory;
+       protected final StateDescriptor<? extends MergingState<IN, ACC>> 
windowStateDescriptor;
 
        /**
         * If this is true. The current processing time is set as the timestamp 
of incoming elements.
         * This for use with a {@link 
org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
         * if eviction should happen based on processing time.
         */
-       private boolean setProcessingTime = false;
+       protected boolean setProcessingTime = false;
 
        /**
         * This is used to copy the incoming element because it can be put into 
several window
         * buffers.
         */
-       private TypeSerializer<IN> inputSerializer;
+       protected TypeSerializer<IN> inputSerializer;
 
        /**
         * For serializing the key in checkpoints.
         */
-       private final TypeSerializer<K> keySerializer;
+       protected final TypeSerializer<K> keySerializer;
 
        /**
         * For serializing the window in checkpoints.
         */
-       private final TypeSerializer<W> windowSerializer;
+       protected final TypeSerializer<W> windowSerializer;
 
        // 
------------------------------------------------------------------------
        // State that is not checkpointed
        // 
------------------------------------------------------------------------
 
        /**
-        * Processing time timers that are currently in-flight.
-        */
-       private transient Map<Long, Set<Context>> processingTimeTimers;
-
-       /**
-        * Current waiting watermark callbacks.
-        */
-       private transient Map<Long, Set<Context>> watermarkTimers;
-
-       /**
         * This is given to the {@code WindowFunction} for emitting elements 
with a given timestamp.
         */
        protected transient TimestampedCollector<OUT> timestampedCollector;
@@ -154,15 +142,23 @@ public class WindowOperator<K, IN, OUT, W extends Window>
         */
        protected transient long currentWatermark = -1L;
 
+       protected transient Context context = new Context(null, null);
+
        // 
------------------------------------------------------------------------
        // State that needs to be checkpointed
        // 
------------------------------------------------------------------------
 
        /**
-        * The windows (panes) that are currently in-flight. Each pane has a 
{@code WindowBuffer}
-        * and a {@code TriggerContext} that stores the {@code Trigger} for 
that pane.
+        * Processing time timers that are currently in-flight.
+        */
+       protected transient Set<Timer<K, W>> processingTimeTimers;
+       protected transient PriorityQueue<Timer<K, W>> 
processingTimeTimersQueue;
+
+       /**
+        * Current waiting watermark callbacks.
         */
-       protected transient Map<K, Map<W, Context>> windows;
+       protected transient Set<Timer<K, W>> watermarkTimers;
+       protected transient PriorityQueue<Timer<K, W>> watermarkTimersQueue;
 
        /**
         * Creates a new {@code WindowOperator} based on the given policies and 
user functions.
@@ -171,8 +167,8 @@ public class WindowOperator<K, IN, OUT, W extends Window>
                        TypeSerializer<W> windowSerializer,
                        KeySelector<IN, K> keySelector,
                        TypeSerializer<K> keySerializer,
-                       WindowBufferFactory<? super IN, ? extends 
WindowBuffer<IN>> windowBufferFactory,
-                       WindowFunction<IN, OUT, K, W> windowFunction,
+                       StateDescriptor<? extends MergingState<IN, ACC>> 
windowStateDescriptor,
+                       WindowFunction<ACC, OUT, K, W> windowFunction,
                        Trigger<? super IN, ? super W> trigger) {
 
                super(windowFunction);
@@ -182,7 +178,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
                this.keySelector = requireNonNull(keySelector);
                this.keySerializer = requireNonNull(keySerializer);
 
-               this.windowBufferFactory = requireNonNull(windowBufferFactory);
+               this.windowStateDescriptor = windowStateDescriptor;
                this.trigger = requireNonNull(trigger);
 
                setChainingStrategy(ChainingStrategy.ALWAYS);
@@ -209,159 +205,100 @@ public class WindowOperator<K, IN, OUT, W extends 
Window>
                        throw new IllegalStateException("Input serializer was 
not set.");
                }
 
-               windowBufferFactory.setRuntimeContext(getRuntimeContext());
-               windowBufferFactory.open(getUserFunctionParameters());
-
-
                // these could already be initialized from restoreState()
                if (watermarkTimers == null) {
-                       watermarkTimers = new HashMap<>();
+                       watermarkTimers = new HashSet<>();
+                       watermarkTimersQueue = new PriorityQueue<>(100);
                }
                if (processingTimeTimers == null) {
-                       processingTimeTimers = new HashMap<>();
+                       processingTimeTimers = new HashSet<>();
+                       processingTimeTimersQueue = new PriorityQueue<>(100);
                }
-               if (windows == null) {
-                       windows = new HashMap<>();
-               }
-
-               // re-register timers that this window context had set
-               for (Map.Entry<K, Map<W, Context>> entry: windows.entrySet()) {
-                       Map<W, Context> keyWindows = entry.getValue();
-                       for (Context context: keyWindows.values()) {
-                               if (context.processingTimeTimer > 0) {
-                                       Set<Context> triggers = 
processingTimeTimers.get(context.processingTimeTimer);
-                                       if (triggers == null) {
-                                               
getRuntimeContext().registerTimer(context.processingTimeTimer, 
WindowOperator.this);
-                                               triggers = new HashSet<>();
-                                               
processingTimeTimers.put(context.processingTimeTimer, triggers);
-                                       }
-                                       triggers.add(context);
-                               }
-                               if (context.watermarkTimer > 0) {
-                                       Set<Context> triggers = 
watermarkTimers.get(context.watermarkTimer);
-                                       if (triggers == null) {
-                                               triggers = new HashSet<>();
-                                               
watermarkTimers.put(context.watermarkTimer, triggers);
-                                       }
-                                       triggers.add(context);
-                               }
 
-                       }
-               }
+               context = new Context(null, null);
        }
 
        @Override
-       public final void dispose() {
-               super.dispose();
-               windows.clear();
-               try {
-                       windowBufferFactory.close();
-               } catch (Exception e) {
-                       throw new RuntimeException("Error while closing 
WindowBufferFactory.", e);
-               }
+       public final void close() throws Exception {
+               super.close();
        }
 
        @Override
        @SuppressWarnings("unchecked")
-       public final void processElement(StreamRecord<IN> element) throws 
Exception {
+       public void processElement(StreamRecord<IN> element) throws Exception {
                if (setProcessingTime) {
                        element.replace(element.getValue(), 
System.currentTimeMillis());
                }
 
                Collection<W> elementWindows = 
windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
 
-               K key = keySelector.getKey(element.getValue());
-
-               Map<W, Context> keyWindows = windows.get(key);
-               if (keyWindows == null) {
-                       keyWindows = new HashMap<>();
-                       windows.put(key, keyWindows);
-               }
+               K key = (K) getStateBackend().getCurrentKey();
 
                for (W window: elementWindows) {
-                       Context context = keyWindows.get(window);
-                       if (context == null) {
-                               WindowBuffer<IN> windowBuffer = 
windowBufferFactory.create();
-                               context = new Context(key, window, 
windowBuffer);
-                               keyWindows.put(window, context);
-                       }
 
-                       context.windowBuffer.storeElement(element);
-                       Trigger.TriggerResult triggerResult = 
context.onElement(element);
-                       processTriggerResult(triggerResult, key, window);
-               }
-       }
-
-       protected void emitWindow(Context context) throws Exception {
-               
timestampedCollector.setTimestamp(context.window.maxTimestamp());
+                       MergingState<IN, ACC> windowState = 
getPartitionedState(window, windowSerializer,
+                               windowStateDescriptor);
 
+                       windowState.add(element.getValue());
 
-               if (context.windowBuffer.size() > 0) {
-                       
setKeyContextElement1(context.windowBuffer.getElements().iterator().next());
+                       context.key = key;
+                       context.window = window;
+                       Trigger.TriggerResult triggerResult = 
context.onElement(element);
 
-                       userFunction.apply(context.key,
-                                       context.window,
-                                       
context.windowBuffer.getUnpackedElements(),
-                                       timestampedCollector);
+                       processTriggerResult(triggerResult, key, window);
                }
        }
 
-       private void processTriggerResult(Trigger.TriggerResult triggerResult, 
K key, W window) throws Exception {
+       protected void processTriggerResult(Trigger.TriggerResult 
triggerResult, K key, W window) throws Exception {
                if (!triggerResult.isFire() && !triggerResult.isPurge()) {
                        // do nothing
                        return;
                }
-               Context context;
-               Map<W, Context> keyWindows = windows.get(key);
-               if (keyWindows == null) {
-                       LOG.debug("Window {} for key {} already gone.", window, 
key);
-                       return;
-               }
-
-               if (triggerResult.isPurge()) {
-                       context = keyWindows.remove(window);
-                       if (keyWindows.isEmpty()) {
-                               windows.remove(key);
-                       }
-               } else {
-                       context = keyWindows.get(window);
-               }
-               if (context == null) {
-                       LOG.debug("Window {} for key {} already gone.", window, 
key);
-                       return;
-               }
 
                if (triggerResult.isFire()) {
-                       emitWindow(context);
+                       
timestampedCollector.setTimestamp(window.maxTimestamp());
+
+                       setKeyContext(key);
+
+                       MergingState<IN, ACC> windowState = 
getPartitionedState(window, windowSerializer,
+                               windowStateDescriptor);
+
+                       ACC contents = windowState.get();
+
+                       userFunction.apply(context.key, context.window, 
contents, timestampedCollector);
+
+                       if (triggerResult.isPurge()) {
+                               windowState.clear();
+                       }
+               } else if (triggerResult.isPurge()) {
+                       setKeyContext(key);
+                       MergingState<IN, ACC> windowState = 
getPartitionedState(window, windowSerializer,
+                               windowStateDescriptor);
+                       windowState.clear();
                }
        }
 
        @Override
        public final void processWatermark(Watermark mark) throws Exception {
-               List<Set<Context>> toTrigger = new ArrayList<>();
 
-               Iterator<Map.Entry<Long, Set<Context>>> it = 
watermarkTimers.entrySet().iterator();
+               boolean fire;
 
-               while (it.hasNext()) {
-                       Map.Entry<Long, Set<Context>> triggers = it.next();
-                       if (triggers.getKey() <= mark.getTimestamp()) {
-                               toTrigger.add(triggers.getValue());
-                               it.remove();
-                       }
-               }
+               do {
+                       Timer<K, W> timer = watermarkTimersQueue.peek();
+                       if (timer != null && timer.timestamp <= 
mark.getTimestamp()) {
+                               fire = true;
+
+                               watermarkTimers.remove(timer);
+                               watermarkTimersQueue.remove();
 
-               for (Set<Context> ctxs: toTrigger) {
-                       for (Context ctx: ctxs) {
-                                       // double check the time. it can happen 
that the trigger registers a new timer,
-                                       // in that case the entry is left in 
the watermarkTimers set for performance reasons.
-                                       // We have to check here whether the 
entry in the set still reflects the
-                                       // currently set timer in the Context.
-                                       if (ctx.watermarkTimer <= 
mark.getTimestamp()) {
-                                               Trigger.TriggerResult 
triggerResult = ctx.onEventTime(ctx.watermarkTimer);
-                                               
processTriggerResult(triggerResult, ctx.key, ctx.window);
-                                       }
+                               context.key = timer.key;
+                               context.window = timer.window;
+                               Trigger.TriggerResult triggerResult = 
context.onEventTime(mark.getTimestamp());
+                               processTriggerResult(triggerResult, 
context.key, context.window);
+                       } else {
+                               fire = false;
                        }
-               }
+               } while (fire);
 
                output.emitWatermark(mark);
 
@@ -370,206 +307,173 @@ public class WindowOperator<K, IN, OUT, W extends 
Window>
 
        @Override
        public final void trigger(long time) throws Exception {
-               List<Set<Context>> toTrigger = new ArrayList<>();
+               boolean fire;
 
-               Iterator<Map.Entry<Long, Set<Context>>> it = 
processingTimeTimers.entrySet().iterator();
+               do {
+                       Timer<K, W> timer = processingTimeTimersQueue.peek();
+                       if (timer != null && timer.timestamp <= time) {
+                               fire = true;
 
-               while (it.hasNext()) {
-                       Map.Entry<Long, Set<Context>> triggers = it.next();
-                       if (triggers.getKey() <= time) {
-                               toTrigger.add(triggers.getValue());
-                               it.remove();
-                       }
-               }
+                               processingTimeTimers.remove(timer);
+                               processingTimeTimersQueue.remove();
 
-               for (Set<Context> ctxs: toTrigger) {
-                       for (Context ctx: ctxs) {
-                               // double check the time. it can happen that 
the trigger registers a new timer,
-                               // in that case the entry is left in the 
processingTimeTimers set for
-                               // performance reasons. We have to check here 
whether the entry in the set still
-                               // reflects the currently set timer in the 
Context.
-                               if (ctx.processingTimeTimer <= time) {
-                                       Trigger.TriggerResult triggerResult = 
ctx.onProcessingTime(ctx.processingTimeTimer);
-                                       processTriggerResult(triggerResult, 
ctx.key, ctx.window);
-                               }
+                               context.key = timer.key;
+                               context.window = timer.window;
+                               Trigger.TriggerResult triggerResult = 
context.onProcessingTime(time);
+                               processTriggerResult(triggerResult, 
context.key, context.window);
+                       } else {
+                               fire = false;
                        }
-               }
+               } while (fire);
+
+               // Also check any watermark timers. We might have some in here 
since
+               // Context.registerEventTimeTimer sets a trigger if an 
event-time trigger is registered
+               // that is already behind the watermark.
+               processWatermark(new Watermark(currentWatermark));
        }
 
        /**
-        * The {@code Context} is responsible for keeping track of the state of 
one pane.
-        *
-        * <p>
-        * A pane is the bucket of elements that have the same key (assigned by 
the
-        * {@link org.apache.flink.api.java.functions.KeySelector}) and same 
{@link Window}. An element can
-        * be in multiple panes of it was assigned to multiple windows by the
-        * {@link 
org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes 
all
-        * have their own instance of the {@code Trigger}.
+        * {@code Context} is a utility for handling {@code Trigger} 
invocations. It can be reused
+        * by setting the {@code key} and {@code window} fields. No internal 
state must be kept in
+        * the {@code Context}
         */
        protected class Context implements Trigger.TriggerContext {
                protected K key;
                protected W window;
 
-               protected WindowBuffer<IN> windowBuffer;
-
-               protected HashMap<String, Serializable> state;
-
-               // use these to only allow one timer in flight at a time of 
each type
-               // if the trigger registers another timer this value here will 
be overwritten,
-               // the timer is not removed from the set of in-flight timers to 
improve performance.
-               // When a trigger fires it is just checked against the last 
timer that was set.
-               protected long watermarkTimer;
-               protected long processingTimeTimer;
-
-               public Context(K key,
-                               W window,
-                               WindowBuffer<IN> windowBuffer) {
+               public Context(K key, W window) {
                        this.key = key;
                        this.window = window;
-                       this.windowBuffer = windowBuffer;
-                       state = new HashMap<>();
-
-                       this.watermarkTimer = -1;
-                       this.processingTimeTimer = -1;
                }
 
-               /**
-                * Constructs a new {@code Context} by reading from a {@link 
DataInputView} that
-                * contains a serialized context that we wrote in
-                * {@link 
#writeToState(AbstractStateBackend.CheckpointStateOutputView)}
-                */
-               @SuppressWarnings("unchecked")
-               protected Context(DataInputView in, ClassLoader 
userClassloader) throws Exception {
-                       this.key = keySerializer.deserialize(in);
-                       this.window = windowSerializer.deserialize(in);
-                       this.watermarkTimer = in.readLong();
-                       this.processingTimeTimer = in.readLong();
-
-                       int stateSize = in.readInt();
-                       byte[] stateData = new byte[stateSize];
-                       in.read(stateData);
-                       state = InstantiationUtil.deserializeObject(stateData, 
userClassloader);
-
-                       this.windowBuffer = windowBufferFactory.create();
-                       int numElements = in.readInt();
-                       MultiplexingStreamRecordSerializer<IN> recordSerializer 
= new MultiplexingStreamRecordSerializer<>(inputSerializer);
-                       for (int i = 0; i < numElements; i++) {
-                               
windowBuffer.storeElement(recordSerializer.deserialize(in).<IN>asRecord());
+               @Override
+               public <S extends Serializable> ValueState<S> 
getKeyValueState(String name,
+                       Class<S> stateType,
+                       S defaultState) {
+                       requireNonNull(stateType, "The state type class must 
not be null");
+
+                       TypeInformation<S> typeInfo;
+                       try {
+                               typeInfo = TypeExtractor.getForClass(stateType);
                        }
+                       catch (Exception e) {
+                               throw new RuntimeException("Cannot analyze type 
'" + stateType.getName() +
+                                       "' from the class alone, due to generic 
type parameters. " +
+                                       "Please specify the TypeInformation 
directly.", e);
+                       }
+
+                       return getKeyValueState(name, typeInfo, defaultState);
                }
 
-               /**
-                * Writes the {@code Context} to the given state checkpoint 
output.
-                */
-               protected void 
writeToState(AbstractStateBackend.CheckpointStateOutputView out) throws 
IOException {
-                       keySerializer.serialize(key, out);
-                       windowSerializer.serialize(window, out);
-                       out.writeLong(watermarkTimer);
-                       out.writeLong(processingTimeTimer);
-
-                       byte[] serializedState = 
InstantiationUtil.serializeObject(state);
-                       out.writeInt(serializedState.length);
-                       out.write(serializedState, 0, serializedState.length);
-
-                       MultiplexingStreamRecordSerializer<IN> recordSerializer 
= new MultiplexingStreamRecordSerializer<>(inputSerializer);
-                       out.writeInt(windowBuffer.size());
-                       for (StreamRecord<IN> element: 
windowBuffer.getElements()) {
-                               recordSerializer.serialize(element, out);
-                       }
+               @Override
+               public <S extends Serializable> ValueState<S> 
getKeyValueState(String name,
+                       TypeInformation<S> stateType,
+                       S defaultState) {
+
+                       requireNonNull(name, "The name of the state must not be 
null");
+                       requireNonNull(stateType, "The state type information 
must not be null");
+
+                       ValueStateDescriptor<S> stateDesc = new 
ValueStateDescriptor<>(name, defaultState, 
stateType.createSerializer(getExecutionConfig()));
+                       return getPartitionedState(stateDesc);
                }
 
                @SuppressWarnings("unchecked")
-               public <S extends Serializable> ValueState<S> 
getKeyValueState(final String name, final S defaultState) {
-                       return new ValueState<S>() {
-                               @Override
-                               public S value() throws IOException {
-                                       Serializable value = state.get(name);
-                                       if (value == null) {
-                                               state.put(name, defaultState);
-                                               value = defaultState;
-                                       }
-                                       return (S) value;
-                               }
-
-                               @Override
-                               public void update(S value) throws IOException {
-                                       state.put(name, value);
-                               }
-
-                               @Override
-                               public void clear() {
-                                       state.remove(name);
-                               }
-                       };
+               public <S extends State> S 
getPartitionedState(StateDescriptor<S> stateDescriptor) {
+                       try {
+                               return 
WindowOperator.this.getPartitionedState(window, windowSerializer,
+                                       stateDescriptor);
+                       } catch (Exception e) {
+                               throw new RuntimeException("Could not retrieve 
state", e);
+                       }
                }
 
                @Override
                public void registerProcessingTimeTimer(long time) {
-                       if (this.processingTimeTimer == time) {
-                               // we already have set a trigger for that time
-                               return;
-                       }
-                       Set<Context> triggers = processingTimeTimers.get(time);
-                       if (triggers == null) {
+                       Timer<K, W> timer = new Timer<>(time, key, window);
+                       if (processingTimeTimers.add(timer)) {
+                               processingTimeTimersQueue.add(timer);
                                getRuntimeContext().registerTimer(time, 
WindowOperator.this);
-                               triggers = new HashSet<>();
-                               processingTimeTimers.put(time, triggers);
                        }
-                       this.processingTimeTimer = time;
-                       triggers.add(this);
                }
 
                @Override
                public void registerEventTimeTimer(long time) {
-                       if (watermarkTimer == time) {
-                               // we already have set a trigger for that time
-                               return;
+                       Timer<K, W> timer = new Timer<>(time, key, window);
+                       if (watermarkTimers.add(timer)) {
+                               watermarkTimersQueue.add(timer);
                        }
-                       Set<Context> triggers = watermarkTimers.get(time);
-                       if (triggers == null) {
-                               triggers = new HashSet<>();
-                               watermarkTimers.put(time, triggers);
+
+                       if (time <= currentWatermark) {
+                               // immediately schedule a trigger, so that we 
don't wait for the next
+                               // watermark update to fire the watermark 
trigger
+                               getRuntimeContext().registerTimer(time, 
WindowOperator.this);
                        }
-                       this.watermarkTimer = time;
-                       triggers.add(this);
                }
 
                public Trigger.TriggerResult onElement(StreamRecord<IN> 
element) throws Exception {
-                       Trigger.TriggerResult onElementResult = 
trigger.onElement(element.getValue(), element.getTimestamp(), window, this);
-                       if (watermarkTimer > 0 && watermarkTimer <= 
currentWatermark) {
-                               // fire now and don't wait for the next 
watermark update
-                               Trigger.TriggerResult onEventTimeResult = 
onEventTime(watermarkTimer);
-                               return 
Trigger.TriggerResult.merge(onElementResult, onEventTimeResult);
-                       } else {
-                               return onElementResult;
-                       }
+                       return trigger.onElement(element.getValue(), 
element.getTimestamp(), window, this);
                }
 
                public Trigger.TriggerResult onProcessingTime(long time) throws 
Exception {
-                       if (time == processingTimeTimer) {
-                               processingTimeTimer = -1;
-                               return trigger.onProcessingTime(time, window, 
this);
-                       } else {
-                               return Trigger.TriggerResult.CONTINUE;
-                       }
+                       return trigger.onProcessingTime(time, window, this);
                }
 
                public Trigger.TriggerResult onEventTime(long time) throws 
Exception {
-                       if (time == watermarkTimer) {
-                               watermarkTimer = -1;
-                               Trigger.TriggerResult firstTriggerResult = 
trigger.onEventTime(time, window, this);
-
-                               if (watermarkTimer > 0 && watermarkTimer <= 
currentWatermark) {
-                                       // fire now and don't wait for the next 
watermark update
-                                       Trigger.TriggerResult 
secondTriggerResult = onEventTime(watermarkTimer);
-                                       return 
Trigger.TriggerResult.merge(firstTriggerResult, secondTriggerResult);
-                               } else {
-                                       return firstTriggerResult;
-                               }
+                       return trigger.onEventTime(time, window, this);
+               }
+       }
 
-                       } else {
-                               return Trigger.TriggerResult.CONTINUE;
+       /**
+        * Internal class for keeping track of in-flight timers.
+        */
+       protected static class Timer<K, W extends Window> implements 
Comparable<Timer<K, W>> {
+               protected long timestamp;
+               protected K key;
+               protected W window;
+
+               public Timer(long timestamp, K key, W window) {
+                       this.timestamp = timestamp;
+                       this.key = key;
+                       this.window = window;
+               }
+
+               @Override
+               public int compareTo(Timer<K, W> o) {
+                       return Long.compare(this.timestamp, o.timestamp);
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
                        }
+                       if (o == null || getClass() != o.getClass()){
+                               return false;
+                       }
+
+                       Timer<?, ?> timer = (Timer<?, ?>) o;
+
+                       return timestamp == timer.timestamp
+                                       && key.equals(timer.key)
+                                       && window.equals(timer.window);
+
+               }
+
+               @Override
+               public int hashCode() {
+                       int result = (int) (timestamp ^ (timestamp >>> 32));
+                       result = 31 * result + key.hashCode();
+                       result = 31 * result + window.hashCode();
+                       return result;
+               }
+
+               @Override
+               public String toString() {
+                       return "Timer{" +
+                                       "timestamp=" + timestamp +
+                                       ", key=" + key +
+                                       ", window=" + window +
+                                       '}';
                }
        }
 
@@ -579,7 +483,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
         * {@link 
org.apache.flink.streaming.api.windowing.evictors.TimeEvictor} with processing
         * time semantics.
         */
-       public WindowOperator<K, IN, OUT, W> enableSetProcessingTime(boolean 
setProcessingTime) {
+       public WindowOperator<K, IN, ACC, OUT, W> 
enableSetProcessingTime(boolean setProcessingTime) {
                this.setProcessingTime = setProcessingTime;
                return this;
        }
@@ -592,21 +496,25 @@ public class WindowOperator<K, IN, OUT, W extends Window>
        public StreamTaskState snapshotOperatorState(long checkpointId, long 
timestamp) throws Exception {
                StreamTaskState taskState = 
super.snapshotOperatorState(checkpointId, timestamp);
 
-               // we write the panes with the key/value maps into the stream
-               AbstractStateBackend.CheckpointStateOutputView out = 
getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+               AbstractStateBackend.CheckpointStateOutputView out =
+                       
getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
 
-               int numKeys = windows.size();
-               out.writeInt(numKeys);
+               out.writeInt(watermarkTimersQueue.size());
+               for (Timer<K, W> timer : watermarkTimersQueue) {
+                       keySerializer.serialize(timer.key, out);
+                       windowSerializer.serialize(timer.window, out);
+                       out.writeLong(timer.timestamp);
+               }
 
-               for (Map.Entry<K, Map<W, Context>> keyWindows: 
windows.entrySet()) {
-                       int numWindows = keyWindows.getValue().size();
-                       out.writeInt(numWindows);
-                       for (Context context: keyWindows.getValue().values()) {
-                               context.writeToState(out);
-                       }
+               out.writeInt(processingTimeTimers.size());
+               for (Timer<K, W> timer : processingTimeTimersQueue) {
+                       keySerializer.serialize(timer.key, out);
+                       windowSerializer.serialize(timer.window, out);
+                       out.writeLong(timer.timestamp);
                }
 
                taskState.setOperatorState(out.closeAndGetHandle());
+
                return taskState;
        }
 
@@ -620,22 +528,28 @@ public class WindowOperator<K, IN, OUT, W extends Window>
                StateHandle<DataInputView> inputState = 
(StateHandle<DataInputView>) taskState.getOperatorState();
                DataInputView in = inputState.getState(userClassloader);
 
-               int numKeys = in.readInt();
-               this.windows = new HashMap<>(numKeys);
-               this.processingTimeTimers = new HashMap<>();
-               this.watermarkTimers = new HashMap<>();
-
-               for (int i = 0; i < numKeys; i++) {
-                       int numWindows = in.readInt();
-                       for (int j = 0; j < numWindows; j++) {
-                               Context context = new Context(in, 
userClassloader);
-                               Map<W, Context> keyWindows = 
windows.get(context.key);
-                               if (keyWindows == null) {
-                                       keyWindows = new HashMap<>(numWindows);
-                                       windows.put(context.key, keyWindows);
-                               }
-                               keyWindows.put(context.window, context);
-                       }
+               int numWatermarkTimers = in.readInt();
+               watermarkTimers = new HashSet<>(numWatermarkTimers);
+               watermarkTimersQueue = new 
PriorityQueue<>(Math.max(numWatermarkTimers, 1));
+               for (int i = 0; i < numWatermarkTimers; i++) {
+                       K key = keySerializer.deserialize(in);
+                       W window = windowSerializer.deserialize(in);
+                       long timestamp = in.readLong();
+                       Timer<K, W> timer = new Timer<>(timestamp, key, window);
+                       watermarkTimers.add(timer);
+                       watermarkTimersQueue.add(timer);
+               }
+
+               int numProcessingTimeTimers = in.readInt();
+               processingTimeTimers = new HashSet<>(numProcessingTimeTimers);
+               processingTimeTimersQueue = new 
PriorityQueue<>(Math.max(numProcessingTimeTimers, 1));
+               for (int i = 0; i < numProcessingTimeTimers; i++) {
+                       K key = keySerializer.deserialize(in);
+                       W window = windowSerializer.deserialize(in);
+                       long timestamp = in.readLong();
+                       Timer<K, W> timer = new Timer<>(timestamp, key, window);
+                       processingTimeTimers.add(timer);
+                       processingTimeTimersQueue.add(timer);
                }
        }
 
@@ -664,7 +578,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
        }
 
        @VisibleForTesting
-       public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> 
getWindowBufferFactory() {
-               return windowBufferFactory;
+       public StateDescriptor<? extends MergingState<IN, ACC>> 
getStateDescriptor() {
+               return windowStateDescriptor;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 475a95d..037afe4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -518,7 +518,7 @@ public class DataStreamTest extends 
StreamingMultipleProgramsTestBase {
                DataStream<String> window = map
                                .windowAll(GlobalWindows.create())
                                .trigger(PurgingTrigger.of(CountTrigger.of(5)))
-                               .apply(new AllWindowFunction<Tuple2<Integer, 
String>, String, GlobalWindow>() {
+                               .apply(new 
AllWindowFunction<Iterable<Tuple2<Integer, String>>, String, GlobalWindow>() {
                                        @Override
                                        public void apply(GlobalWindow window,
                                                        
Iterable<Tuple2<Integer, String>> values,

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 9b0a6d0..9297ae6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -21,9 +21,13 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
@@ -212,9 +216,11 @@ public class ComplexIntegrationTest extends 
StreamingMultipleProgramsTestBase {
                env.execute();
        }
 
+       // Ignore because the count(10_000) window actually only emits one 
element during processing
+       // and all the rest in close()
        @SuppressWarnings("unchecked")
-       @Test
        @Ignore
+       @Test
        public void complexIntegrationTest3() throws Exception {
                //Heavy prime factorisation with maps and flatmaps
 
@@ -248,6 +254,7 @@ public class ComplexIntegrationTest extends 
StreamingMultipleProgramsTestBase {
                DataStream<Long> sourceStream31 = env.generateSequence(1, 
10000);
                DataStream<Long> sourceStream32 = env.generateSequence(10001, 
20000);
 
+
                sourceStream31.filter(new PrimeFilterFunction())
                                .windowAll(GlobalWindows.create())
                                
.trigger(PurgingTrigger.of(CountTrigger.of(100)))
@@ -258,9 +265,10 @@ public class ComplexIntegrationTest extends 
StreamingMultipleProgramsTestBase {
                                                .max(0))
                                .writeAsText(resultPath1, 
FileSystem.WriteMode.OVERWRITE);
 
-               sourceStream31.flatMap(new DivisorsFlatMapFunction())
-                               .union(sourceStream32.flatMap(new 
DivisorsFlatMapFunction())).map(new MapFunction<Long, Tuple2<Long,
-                               Integer>>() {
+               sourceStream31
+                       .flatMap(new DivisorsFlatMapFunction())
+                       .union(sourceStream32.flatMap(new 
DivisorsFlatMapFunction()))
+                       .map(new MapFunction<Long, Tuple2<Long,Integer>>() {
 
                        @Override
                        public Tuple2<Long, Integer> map(Long value) throws 
Exception {
@@ -271,42 +279,49 @@ public class ComplexIntegrationTest extends 
StreamingMultipleProgramsTestBase {
                                .window(GlobalWindows.create())
                                
.trigger(PurgingTrigger.of(CountTrigger.of(10_000)))
                                .sum(1)
-                               .filter(new FilterFunction<Tuple2<Long, 
Integer>>() {
 
-                                       @Override
-                                       public boolean filter(Tuple2<Long, 
Integer> value) throws Exception {
-                                               return value.f0 < 100 || 
value.f0 > 19900;
-                                       }
-                               })
-                               .writeAsText(resultPath2, 
FileSystem.WriteMode.OVERWRITE);
+//                             .filter(new FilterFunction<Tuple2<Long, 
Integer>>() {
+//
+//                                     @Override
+//                                     public boolean filter(Tuple2<Long, 
Integer> value) throws Exception {
+//                                             return value.f0 < 100 || 
value.f0 > 19900;
+//                                     }
+//                             })
+                       .print();
+//                             .writeAsText(resultPath2, 
FileSystem.WriteMode.OVERWRITE);
 
                env.execute();
        }
 
        @Test
        @Ignore
+       @SuppressWarnings("unchecked, rawtypes")
        public void complexIntegrationTest4() throws Exception {
                //Testing mapping and delta-policy windowing with custom class
 
                expected1 = "((100,100),0)\n" + "((120,122),5)\n" + 
"((121,125),6)\n" + "((138,144),9)\n" +
-                               "((139,147),10)\n" + "((156,166),13)\n" + 
"((157,169),14)\n" + "((174,188),17)\n" + "((175,191),18)\n" +
-                               "((192,210),21)\n" + "((193,213),22)\n" + 
"((210,232),25)\n" + "((211,235),26)\n" + "((228,254),29)\n" +
-                               "((229,257),30)\n" + "((246,276),33)\n" + 
"((247,279),34)\n" + "((264,298),37)\n" + "((265,301),38)\n" +
-                               "((282,320),41)\n" + "((283,323),42)\n" + 
"((300,342),45)\n" + "((301,345),46)\n" + "((318,364),49)\n" +
-                               "((319,367),50)\n" + "((336,386),53)\n" + 
"((337,389),54)\n" + "((354,408),57)\n" + "((355,411),58)\n" +
-                               "((372,430),61)\n" + "((373,433),62)\n" + 
"((390,452),65)\n" + "((391,455),66)\n" + "((408,474),69)\n" +
-                               "((409,477),70)\n" + "((426,496),73)\n" + 
"((427,499),74)\n" + "((444,518),77)\n" + "((445,521),78)\n" +
-                               "((462,540),81)\n" + "((463,543),82)\n" + 
"((480,562),85)\n" + "((481,565),86)\n" + "((498,584),89)\n" +
-                               "((499,587),90)\n" + "((516,606),93)\n" + 
"((517,609),94)\n" + "((534,628),97)\n" + "((535,631),98)";
+                       "((139,147),10)\n" + "((156,166),13)\n" + 
"((157,169),14)\n" + "((174,188),17)\n" + "((175,191),18)\n" +
+                       "((192,210),21)\n" + "((193,213),22)\n" + 
"((210,232),25)\n" + "((211,235),26)\n" + "((228,254),29)\n" +
+                       "((229,257),30)\n" + "((246,276),33)\n" + 
"((247,279),34)\n" + "((264,298),37)\n" + "((265,301),38)\n" +
+                       "((282,320),41)\n" + "((283,323),42)\n" + 
"((300,342),45)\n" + "((301,345),46)\n" + "((318,364),49)\n" +
+                       "((319,367),50)\n" + "((336,386),53)\n" + 
"((337,389),54)\n" + "((354,408),57)\n" + "((355,411),58)\n" +
+                       "((372,430),61)\n" + "((373,433),62)\n" + 
"((390,452),65)\n" + "((391,455),66)\n" + "((408,474),69)\n" +
+                       "((409,477),70)\n" + "((426,496),73)\n" + 
"((427,499),74)\n" + "((444,518),77)\n" + "((445,521),78)\n" +
+                       "((462,540),81)\n" + "((463,543),82)\n" + 
"((480,562),85)\n" + "((481,565),86)\n" + "((498,584),89)\n" +
+                       "((499,587),90)\n" + "((516,606),93)\n" + 
"((517,609),94)\n" + "((534,628),97)\n" + "((535,631),98)";
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(1);
 
+               TupleSerializer<Tuple2<Rectangle, Integer>> deltaSerializer = 
new TupleSerializer<>((Class) Tuple2.class,
+                       new TypeSerializer[] {new 
KryoSerializer<>(Rectangle.class, env.getConfig()),
+                       IntSerializer.INSTANCE});
+
                env.addSource(new RectangleSource())
                                .global()
                                .map(new RectangleMapFunction())
                                .windowAll(GlobalWindows.create())
-                               .trigger(PurgingTrigger.of(DeltaTrigger.of(0.0, 
new MyDelta())))
+                               .trigger(PurgingTrigger.of(DeltaTrigger.of(0.0, 
new MyDelta(), deltaSerializer)))
                                .apply(new MyWindowMapFunction())
                                .writeAsText(resultPath1, 
FileSystem.WriteMode.OVERWRITE);
 
@@ -674,7 +689,7 @@ public class ComplexIntegrationTest extends 
StreamingMultipleProgramsTestBase {
                }
        }
 
-       private static class MyWindowMapFunction implements 
AllWindowFunction<Tuple2<Rectangle, Integer>, Tuple2<Rectangle, Integer>, 
GlobalWindow> {
+       private static class MyWindowMapFunction implements 
AllWindowFunction<Iterable<Tuple2<Rectangle, Integer>>, Tuple2<Rectangle, 
Integer>, GlobalWindow> {
                private static final long serialVersionUID = 1L;
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 02bb8b7..70adadf 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -66,7 +66,7 @@ import static org.junit.Assert.*;
 public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
        @SuppressWarnings("unchecked")
-       private final WindowFunction<String, String, String, TimeWindow> 
mockFunction = mock(WindowFunction.class);
+       private final WindowFunction<Iterable<String>, String, String, 
TimeWindow> mockFunction = mock(WindowFunction.class);
 
        @SuppressWarnings("unchecked")
        private final KeySelector<String, String> mockKeySelector = 
mock(KeySelector.class);
@@ -78,8 +78,8 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                }
        };
        
-       private final WindowFunction<Integer, Integer, Integer, TimeWindow> 
validatingIdentityFunction =
-                       new WindowFunction<Integer, Integer, Integer, 
TimeWindow>()
+       private final WindowFunction<Iterable<Integer>, Integer, Integer, 
TimeWindow> validatingIdentityFunction =
+                       new WindowFunction<Iterable<Integer>, Integer, Integer, 
TimeWindow>()
        {
                @Override
                public void apply(Integer key,
@@ -727,7 +727,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
        // 
------------------------------------------------------------------------
        
-       private static class FailingFunction implements WindowFunction<Integer, 
Integer, Integer, TimeWindow> {
+       private static class FailingFunction implements 
WindowFunction<Iterable<Integer>, Integer, Integer, TimeWindow> {
 
                private final int failAfterElements;
                
@@ -755,7 +755,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
        // 
------------------------------------------------------------------------
 
-       private static class StatefulFunction extends 
RichWindowFunction<Integer, Integer, Integer, TimeWindow> {
+       private static class StatefulFunction extends 
RichWindowFunction<Iterable<Integer>, Integer, Integer, TimeWindow> {
                
                // we use a concurrent map here even though there is no 
concurrency, to
                // get "volatile" style access to entries

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 35bd209..5c37f36 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -767,7 +767,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
        public void testKeyValueStateInWindowFunctionTumbling() {
                final ScheduledExecutorService timerService = 
Executors.newSingleThreadScheduledExecutor();
                try {
-                       final long thirtySeconds = 30_000;
+                       final long twoSeconds = 2000;
                        
                        final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>();
                        final Object lock = new Object();
@@ -778,7 +778,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
                                        new 
AggregatingProcessingTimeWindowOperator<>(
                                                        new StatefulFunction(), 
fieldOneSelector,
-                                                       IntSerializer.INSTANCE, 
tupleSerializer, thirtySeconds, thirtySeconds);
+                                                       IntSerializer.INSTANCE, 
tupleSerializer, twoSeconds, twoSeconds);
 
                        op.setup(mockTask, createTaskConfig(fieldOneSelector, 
IntSerializer.INSTANCE), out);
                        op.open();
@@ -798,18 +798,12 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                }
                        }
 
-                       out.waitForNElements(2, 60_000);
-
-                       List<Tuple2<Integer, Integer>> result = 
out.getElements();
-                       assertEquals(2, result.size());
-
-                       Collections.sort(result, tupleComparator);
-                       assertEquals(45, result.get(0).f1.intValue());
-                       assertEquals(45, result.get(1).f1.intValue());
-
-                       assertEquals(10, 
StatefulFunction.globalCounts.get(1).intValue());
-                       assertEquals(10, 
StatefulFunction.globalCounts.get(2).intValue());
-
+                       while (StatefulFunction.globalCounts.get(1) < 10 ||
+                                       StatefulFunction.globalCounts.get(2) < 
10)
+                       {
+                               Thread.sleep(50);
+                       }
+                       
                        op.close();
                        op.dispose();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 282c71f..d9ba872 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
-import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -77,7 +77,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
 
                DataStream<Tuple2<String, Integer>> window2 = source
                                .windowAll(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
-                               .apply(new AllWindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow>() {
+                               .apply(new 
AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
@@ -126,7 +126,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                DataStream<Tuple2<String, Integer>> window2 = source
                                .windowAll(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
                                .trigger(CountTrigger.of(100))
-                               .apply(new AllWindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow>() {
+                               .apply(new 
AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
@@ -177,7 +177,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                                .windowAll(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
                                .trigger(CountTrigger.of(100))
                                .evictor(TimeEvictor.of(Time.of(100, 
TimeUnit.MILLISECONDS)))
-                               .apply(new AllWindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow>() {
+                               .apply(new 
AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
@@ -204,7 +204,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
        //  UDFs
        // 
------------------------------------------------------------------------
 
-       public static class DummyReducer extends 
RichReduceFunction<Tuple2<String, Integer>> {
+       public static class DummyReducer implements 
ReduceFunction<Tuple2<String, Integer>> {
                private static final long serialVersionUID = 1L;
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
index 39033cc..571838f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
@@ -22,8 +22,7 @@ import 
org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
@@ -56,7 +55,7 @@ public class EvictingNonKeyedWindowOperatorTest {
                                GlobalWindows.create(),
                                new GlobalWindow.Serializer(),
                                new HeapWindowBuffer.Factory<Tuple2<String, 
Integer>>(),
-                               new ReduceAllWindowFunction<GlobalWindow, 
Tuple2<String, Integer>>(new SumReducer(closeCalled)),
+                               new 
ReduceIterableAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new 
SumReducer()),
                                CountTrigger.of(WINDOW_SLIDE),
                                CountEvictor.of(WINDOW_SIZE));
 
@@ -96,10 +95,6 @@ public class EvictingNonKeyedWindowOperatorTest {
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
 
                testHarness.close();
-
-               Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
-
-
        }
 
        // 
------------------------------------------------------------------------
@@ -109,32 +104,9 @@ public class EvictingNonKeyedWindowOperatorTest {
        public static class SumReducer extends 
RichReduceFunction<Tuple2<String, Integer>> {
                private static final long serialVersionUID = 1L;
 
-               private boolean openCalled = false;
-
-               private  AtomicInteger closeCalled;
-
-               public SumReducer(AtomicInteger closeCalled) {
-                       this.closeCalled = closeCalled;
-               }
-
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       super.open(parameters);
-                       openCalled = true;
-               }
-
-               @Override
-               public void close() throws Exception {
-                       super.close();
-                       closeCalled.incrementAndGet();
-               }
-
                @Override
                public Tuple2<String, Integer> reduce(Tuple2<String, Integer> 
value1,
                                Tuple2<String, Integer> value2) throws 
Exception {
-                       if (!openCalled) {
-                               Assert.fail("Open was not called");
-                       }
                        return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 1821308..2f1dce5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -18,22 +18,27 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.windowing.ReduceIterableWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -48,27 +53,35 @@ public class EvictingWindowOperatorTest {
        @Test
        @SuppressWarnings("unchecked")
        public void testCountTrigger() throws Exception {
-               AtomicInteger closeCalled = new AtomicInteger(0);
 
                final int WINDOW_SIZE = 4;
                final int WINDOW_SLIDE = 2;
 
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+
+               ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc = new ListStateDescriptor<>("window-contents",
+                       new 
StreamRecordSerializer<>(inputType.createSerializer(new ExecutionConfig())));
+
+
                EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
                                GlobalWindows.create(),
                                new GlobalWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-                               new HeapWindowBuffer.Factory<Tuple2<String, 
Integer>>(),
-                               new ReduceWindowFunction<String, GlobalWindow, 
Tuple2<String, Integer>>(new SumReducer(closeCalled)),
+                               stateDesc,
+                               new ReduceIterableWindowFunction<String, 
GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
                                CountTrigger.of(WINDOW_SLIDE),
                                CountEvictor.of(WINDOW_SIZE));
 
-               operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+               operator.setInputType(inputType, new ExecutionConfig());
 
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
OneInputStreamOperatorTestHarness<>(operator);
 
+               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
+
                long initialTime = 0L;
                ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
@@ -105,24 +118,104 @@ public class EvictingWindowOperatorTest {
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
 
                testHarness.close();
+       }
 
-               Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testCountTriggerWithApply() throws Exception {
+               AtomicInteger closeCalled = new AtomicInteger(0);
+
+               final int WINDOW_SIZE = 4;
+               final int WINDOW_SLIDE = 2;
+
+               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+
+               ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> 
stateDesc = new ListStateDescriptor<>("window-contents",
+                       new 
StreamRecordSerializer<>(inputType.createSerializer(new ExecutionConfig())));
+
+
+               EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
+                       GlobalWindows.create(),
+                       new GlobalWindow.Serializer(),
+                       new TupleKeySelector(),
+                       BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
+                       stateDesc,
+                       new RichSumReducer<GlobalWindow>(closeCalled),
+                       CountTrigger.of(WINDOW_SLIDE),
+                       CountEvictor.of(WINDOW_SIZE));
+
+               operator.setInputType(inputType, new ExecutionConfig());
+
+
+               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
+                       new OneInputStreamOperatorTestHarness<>(operator);
+
+               testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               long initialTime = 0L;
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               testHarness.open();
+
+               // The global window actually ignores these timestamps...
+
+               // add elements out-of-order
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 3000));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 3999));
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 20));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 999));
 
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1998));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1999));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1000));
+
+
+
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 
Long.MAX_VALUE));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 1), initialTime + 10999));
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), initialTime + 1000));
+
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), 
Long.MAX_VALUE));
+               expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 
Long.MAX_VALUE));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+               testHarness.close();
 
+               Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
        }
 
        // 
------------------------------------------------------------------------
        //  UDFs
        // 
------------------------------------------------------------------------
 
-       public static class SumReducer extends 
RichReduceFunction<Tuple2<String, Integer>> {
+       public static class SumReducer implements ReduceFunction<Tuple2<String, 
Integer>> {
+               private static final long serialVersionUID = 1L;
+
+
+               @Override
+               public Tuple2<String, Integer> reduce(Tuple2<String, Integer> 
value1,
+                               Tuple2<String, Integer> value2) throws 
Exception {
+                       return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
+               }
+       }
+
+       public static class RichSumReducer<W extends Window> extends 
RichWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
String, W> {
                private static final long serialVersionUID = 1L;
 
                private boolean openCalled = false;
 
-               private  AtomicInteger closeCalled;
+               private AtomicInteger closeCalled = new AtomicInteger(0);
 
-               public SumReducer(AtomicInteger closeCalled) {
+               public RichSumReducer(AtomicInteger closeCalled) {
                        this.closeCalled = closeCalled;
                }
 
@@ -139,13 +232,23 @@ public class EvictingWindowOperatorTest {
                }
 
                @Override
-               public Tuple2<String, Integer> reduce(Tuple2<String, Integer> 
value1,
-                               Tuple2<String, Integer> value2) throws 
Exception {
+               public void apply(String key,
+                       W window,
+                       Iterable<Tuple2<String, Integer>> input,
+                       Collector<Tuple2<String, Integer>> out) throws 
Exception {
+
                        if (!openCalled) {
                                Assert.fail("Open was not called");
                        }
-                       return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
+                       int sum = 0;
+
+                       for (Tuple2<String, Integer> t: input) {
+                               sum += t.f1;
+                       }
+                       out.collect(new Tuple2<>(key, sum));
+
                }
+
        }
 
        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
index 02e032a..c0e6ad4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
@@ -18,11 +18,12 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
@@ -77,7 +78,7 @@ public class NonKeyedWindowOperatorTest {
                                SlidingTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                windowBufferFactory,
-                               new ReduceAllWindowFunction<TimeWindow, 
Tuple2<String, Integer>>(new SumReducer()),
+                               new ReduceIterableAllWindowFunction<TimeWindow, 
Tuple2<String, Integer>>(new SumReducer()),
                                EventTimeTrigger.create());
 
                operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -140,11 +141,6 @@ public class NonKeyedWindowOperatorTest {
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
 
                testHarness.close();
-               if (windowBufferFactory instanceof 
PreAggregatingHeapWindowBuffer.Factory) {
-                       Assert.assertEquals("Close was not called.", 2, 
closeCalled.get());
-               } else {
-                       Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
-               }
        }
 
        @Test
@@ -158,7 +154,7 @@ public class NonKeyedWindowOperatorTest {
                                TumblingTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                windowBufferFactory,
-                               new ReduceAllWindowFunction<TimeWindow, 
Tuple2<String, Integer>>(new SumReducer()),
+                               new ReduceIterableAllWindowFunction<TimeWindow, 
Tuple2<String, Integer>>(new SumReducer()),
                                EventTimeTrigger.create());
 
                operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -219,11 +215,6 @@ public class NonKeyedWindowOperatorTest {
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
 
                testHarness.close();
-               if (windowBufferFactory instanceof 
PreAggregatingHeapWindowBuffer.Factory) {
-                       Assert.assertEquals("Close was not called.", 2, 
closeCalled.get());
-               } else {
-                       Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
-               }
        }
 
        @Test
@@ -237,7 +228,7 @@ public class NonKeyedWindowOperatorTest {
                                GlobalWindows.create(),
                                new GlobalWindow.Serializer(),
                                windowBufferFactory,
-                               new ReduceAllWindowFunction<GlobalWindow, 
Tuple2<String, Integer>>(new SumReducer()),
+                               new 
ReduceIterableAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new 
SumReducer()),
                                
ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
 
                operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -298,11 +289,6 @@ public class NonKeyedWindowOperatorTest {
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
 
                testHarness.close();
-               if (windowBufferFactory instanceof 
PreAggregatingHeapWindowBuffer.Factory) {
-                       Assert.assertEquals("Close was not called.", 2, 
closeCalled.get());
-               } else {
-                       Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
-               }
        }
 
        @Test
@@ -316,7 +302,7 @@ public class NonKeyedWindowOperatorTest {
                                GlobalWindows.create(),
                                new GlobalWindow.Serializer(),
                                windowBufferFactory,
-                               new ReduceAllWindowFunction<GlobalWindow, 
Tuple2<String, Integer>>(new SumReducer()),
+                               new 
ReduceIterableAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new 
SumReducer()),
                                
PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));
 
                operator.setInputType(TypeInfoParser.<Tuple2<String, 
Integer>>parse(
@@ -355,19 +341,23 @@ public class NonKeyedWindowOperatorTest {
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
 
                testHarness.close();
-               if (windowBufferFactory instanceof 
PreAggregatingHeapWindowBuffer.Factory) {
-                       Assert.assertEquals("Close was not called.", 2, 
closeCalled.get());
-               } else {
-                       Assert.assertEquals("Close was not called.", 1, 
closeCalled.get());
-               }
-
        }
 
        // 
------------------------------------------------------------------------
        //  UDFs
        // 
------------------------------------------------------------------------
 
-       public static class SumReducer extends 
RichReduceFunction<Tuple2<String, Integer>> {
+       public static class SumReducer implements ReduceFunction<Tuple2<String, 
Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<String, Integer> reduce(Tuple2<String, Integer> 
value1,
+                       Tuple2<String, Integer> value2) throws Exception {
+                       return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
+               }
+       }
+
+       public static class RichSumReducer extends 
RichReduceFunction<Tuple2<String, Integer>> {
                private static final long serialVersionUID = 1L;
 
                private boolean openCalled = false;
@@ -400,7 +390,7 @@ public class NonKeyedWindowOperatorTest {
        @Parameterized.Parameters(name = "WindowBuffer = {0}")
        @SuppressWarnings("unchecked,rawtypes")
        public static Collection<WindowBufferFactory[]> windowBuffers(){
-               return Arrays.asList(new WindowBufferFactory[]{new 
PreAggregatingHeapWindowBuffer.Factory(new SumReducer())},
+               return Arrays.asList(new WindowBufferFactory[]{new 
PreAggregatingHeapWindowBuffer.Factory(new RichSumReducer())},
                                new WindowBufferFactory[]{new 
HeapWindowBuffer.Factory()}
                                );
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index 76c6f20..b99232a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
-import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -68,7 +68,7 @@ public class TimeWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                DataStream<Tuple2<String, Integer>> window2 = source
                                .keyBy(0)
                                .timeWindow(Time.of(1000, 
TimeUnit.MILLISECONDS))
-                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+                               .apply(new 
WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
Tuple, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
@@ -111,7 +111,7 @@ public class TimeWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
 
                DataStream<Tuple2<String, Integer>> window2 = source
                                .timeWindowAll(Time.of(1000, 
TimeUnit.MILLISECONDS))
-                               .apply(new AllWindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow>() {
+                               .apply(new 
AllWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, 
TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
@@ -132,7 +132,7 @@ public class TimeWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
        //  UDFs
        // 
------------------------------------------------------------------------
 
-       public static class DummyReducer extends 
RichReduceFunction<Tuple2<String, Integer>> {
+       public static class DummyReducer implements 
ReduceFunction<Tuple2<String, Integer>> {
                private static final long serialVersionUID = 1L;
 
                @Override

Reply via email to