http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index aecfd5d..7ab33cf 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -18,12 +18,15 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
-
+import org.apache.commons.lang.SerializationUtils;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -37,11 +40,16 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -65,26 +73,70 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
 
        private static final long serialVersionUID = 1L;
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(NonKeyedWindowOperator.class);
+       private static final Logger LOG = 
LoggerFactory.getLogger(WindowOperator.class);
 
+       // 
------------------------------------------------------------------------
+       // Configuration values and stuff from the user
+       // 
------------------------------------------------------------------------
 
        private final WindowAssigner<? super IN, W> windowAssigner;
 
-       private final Trigger<? super IN, ? super W> triggerTemplate;
+       private final Trigger<? super IN, ? super W> trigger;
+
        private final WindowBufferFactory<? super IN, ? extends 
WindowBuffer<IN>> windowBufferFactory;
 
-       protected transient Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> 
windows;
+       /**
+        * If this is true. The current processing time is set as the timestamp 
of incoming elements.
+        * This for use with a {@link 
org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
+        * if eviction should happen based on processing time.
+        */
+       private boolean setProcessingTime = false;
 
-       private transient Map<Long, Set<TriggerContext>> processingTimeTimers;
-       private transient Map<Long, Set<TriggerContext>> watermarkTimers;
+       /**
+        * This is used to copy the incoming element because it can be put into 
several window
+        * buffers.
+        */
+       private TypeSerializer<IN> inputSerializer;
 
+       /**
+        * For serializing the window in checkpoints.
+        */
+       private final TypeSerializer<W> windowSerializer;
+
+       // 
------------------------------------------------------------------------
+       // State that is not checkpointed
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Processing time timers that are currently in-flight.
+        */
+       private transient Map<Long, Set<Context>> processingTimeTimers;
+
+       /**
+        * Current waiting watermark callbacks.
+        */
+       private transient Map<Long, Set<Context>> watermarkTimers;
+
+       /**
+        * This is given to the {@code WindowFunction} for emitting elements 
with a given timestamp.
+        */
        protected transient TimestampedCollector<OUT> timestampedCollector;
 
-       private boolean setProcessingTime = false;
+       // 
------------------------------------------------------------------------
+       // State that needs to be checkpointed
+       // 
------------------------------------------------------------------------
 
-       private TypeSerializer<IN> inputSerializer;
+       /**
+        * The windows (panes) that are currently in-flight. Each pane has a 
{@code WindowBuffer}
+        * and a {@code TriggerContext} that stores the {@code Trigger} for 
that pane.
+        */
+       protected transient Map<W, Context> windows;
 
+       /**
+        * Creates a new {@code WindowOperator} based on the given policies and 
user functions.
+        */
        public NonKeyedWindowOperator(WindowAssigner<? super IN, W> 
windowAssigner,
+                       TypeSerializer<W> windowSerializer,
                        WindowBufferFactory<? super IN, ? extends 
WindowBuffer<IN>> windowBufferFactory,
                        AllWindowFunction<IN, OUT, W> windowFunction,
                        Trigger<? super IN, ? super W> trigger) {
@@ -92,25 +144,23 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                super(windowFunction);
 
                this.windowAssigner = requireNonNull(windowAssigner);
+               this.windowSerializer = windowSerializer;
 
                this.windowBufferFactory = requireNonNull(windowBufferFactory);
-               this.triggerTemplate = requireNonNull(trigger);
+               this.trigger = requireNonNull(trigger);
 
                setChainingStrategy(ChainingStrategy.ALWAYS);
        }
 
        @Override
        @SuppressWarnings("unchecked")
-       public void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
+       public final void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
                inputSerializer = (TypeSerializer<IN>) 
type.createSerializer(executionConfig);
        }
 
        @Override
-       public void open() throws Exception {
+       public final void open() throws Exception {
                super.open();
-               windows = new HashMap<>();
-               watermarkTimers = new HashMap<>();
-               processingTimeTimers = new HashMap<>();
                timestampedCollector = new TimestampedCollector<>(output);
 
                if (inputSerializer == null) {
@@ -119,14 +169,47 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
 
                windowBufferFactory.setRuntimeContext(getRuntimeContext());
                windowBufferFactory.open(getUserFunctionParameters());
+
+               // these could already be initialized from restoreState()
+               if (watermarkTimers == null) {
+                       watermarkTimers = new HashMap<>();
+               }
+               if (processingTimeTimers == null) {
+                       processingTimeTimers = new HashMap<>();
+               }
+               if (windows == null) {
+                       windows = new HashMap<>();
+               }
+
+               // re-register timers that this window context had set
+               for (Context context: windows.values()) {
+                       if (context.processingTimeTimer > 0) {
+                               Set<Context> triggers = 
processingTimeTimers.get(context.processingTimeTimer);
+                               if (triggers == null) {
+                                       
getRuntimeContext().registerTimer(context.processingTimeTimer, 
NonKeyedWindowOperator.this);
+                                       triggers = new HashSet<>();
+                                       
processingTimeTimers.put(context.processingTimeTimer, triggers);
+                               }
+                               triggers.add(context);
+                       }
+                       if (context.watermarkTimer > 0) {
+                               Set<Context> triggers = 
watermarkTimers.get(context.watermarkTimer);
+                               if (triggers == null) {
+                                       triggers = new HashSet<>();
+                                       
watermarkTimers.put(context.watermarkTimer, triggers);
+                               }
+                               triggers.add(context);
+                       }
+
+               }
        }
 
        @Override
-       public void close() throws Exception {
+       public final void close() throws Exception {
                super.close();
                // emit the elements that we still keep
-               for (W window: windows.keySet()) {
-                       emitWindow(window, false);
+               for (Context window: windows.values()) {
+                       emitWindow(window);
                }
                windows.clear();
                windowBufferFactory.close();
@@ -134,58 +217,60 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
 
        @Override
        @SuppressWarnings("unchecked")
-       public void processElement(StreamRecord<IN> element) throws Exception {
+       public final void processElement(StreamRecord<IN> element) throws 
Exception {
                if (setProcessingTime) {
                        element.replace(element.getValue(), 
System.currentTimeMillis());
                }
+
                Collection<W> elementWindows = 
windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
 
                for (W window: elementWindows) {
-                       Tuple2<WindowBuffer<IN>, TriggerContext> 
bufferAndTrigger = windows.get(window);
-                       if (bufferAndTrigger == null) {
-                               bufferAndTrigger = new Tuple2<>();
-                               bufferAndTrigger.f0 = 
windowBufferFactory.create();
-                               bufferAndTrigger.f1 = new 
TriggerContext(window, triggerTemplate.duplicate());
-                               windows.put(window, bufferAndTrigger);
+                       Context context = windows.get(window);
+                       if (context == null) {
+                               WindowBuffer<IN> windowBuffer = 
windowBufferFactory.create();
+                               context = new Context(window, windowBuffer);
+                               windows.put(window, context);
                        }
                        StreamRecord<IN> elementCopy = new 
StreamRecord<>(inputSerializer.copy(element.getValue()), 
element.getTimestamp());
-                       bufferAndTrigger.f0.storeElement(elementCopy);
-                       Trigger.TriggerResult triggerResult = 
bufferAndTrigger.f1.trigger.onElement(elementCopy.getValue(), 
elementCopy.getTimestamp(), window, bufferAndTrigger.f1);
+                       context.windowBuffer.storeElement(elementCopy);
+                       Trigger.TriggerResult triggerResult = 
trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, 
context);
                        processTriggerResult(triggerResult, window);
                }
        }
 
-       protected void emitWindow(W window, boolean purge) throws Exception {
-               timestampedCollector.setTimestamp(window.getEnd());
-
-               Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger;
-               if (purge) {
-                       bufferAndTrigger = windows.remove(window);
-               } else {
-                       bufferAndTrigger = windows.get(window);
-               }
-
-               if (bufferAndTrigger == null) {
-                       LOG.debug("Window {} already gone.", window);
-                       return;
-               }
-
+       protected void emitWindow(Context context) throws Exception {
+               
timestampedCollector.setTimestamp(context.window.maxTimestamp());
 
                userFunction.apply(
-                               window,
-                               bufferAndTrigger.f0.getUnpackedElements(),
+                               context.window,
+                               context.windowBuffer.getUnpackedElements(),
                                timestampedCollector);
        }
 
        private void processTriggerResult(Trigger.TriggerResult triggerResult, 
W window) throws Exception {
                switch (triggerResult) {
-                       case FIRE:
-                               emitWindow(window, false);
+                       case FIRE: {
+                               Context context = windows.get(window);
+                               if (context == null) {
+                                       LOG.debug("Window {} already gone.", 
window);
+                                       return;
+                               }
+
+
+                               emitWindow(context);
                                break;
+                       }
 
-                       case FIRE_AND_PURGE:
-                               emitWindow(window, true);
+                       case FIRE_AND_PURGE: {
+                               Context context = windows.remove(window);
+                               if (context == null) {
+                                       LOG.debug("Window {} already gone.", 
window);
+                                       return;
+                               }
+
+                               emitWindow(context);
                                break;
+                       }
 
                        case CONTINUE:
                                // ingore
@@ -193,14 +278,14 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
        }
 
        @Override
-       public void processWatermark(Watermark mark) throws Exception {
+       public final void processWatermark(Watermark mark) throws Exception {
                Set<Long> toRemove = new HashSet<>();
 
-               for (Map.Entry<Long, Set<TriggerContext>> triggers: 
watermarkTimers.entrySet()) {
+               for (Map.Entry<Long, Set<Context>> triggers: 
watermarkTimers.entrySet()) {
                        if (triggers.getKey() <= mark.getTimestamp()) {
-                               for (TriggerContext trigger: 
triggers.getValue()) {
-                                       Trigger.TriggerResult triggerResult = 
trigger.trigger.onTime(mark.getTimestamp(), trigger);
-                                       processTriggerResult(triggerResult, 
trigger.window);
+                               for (Context context: triggers.getValue()) {
+                                       Trigger.TriggerResult triggerResult = 
context.onEventTime(triggers.getKey());
+                                       processTriggerResult(triggerResult, 
context.window);
                                }
                                toRemove.add(triggers.getKey());
                        }
@@ -213,14 +298,14 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
        }
 
        @Override
-       public void trigger(long time) throws Exception {
+       public final void trigger(long time) throws Exception {
                Set<Long> toRemove = new HashSet<>();
 
-               for (Map.Entry<Long, Set<TriggerContext>> triggers: 
processingTimeTimers.entrySet()) {
+               for (Map.Entry<Long, Set<Context>> triggers: 
processingTimeTimers.entrySet()) {
                        if (triggers.getKey() < time) {
-                               for (TriggerContext trigger: 
triggers.getValue()) {
-                                       Trigger.TriggerResult triggerResult = 
trigger.trigger.onTime(time, trigger);
-                                       processTriggerResult(triggerResult, 
trigger.window);
+                               for (Context context: triggers.getValue()) {
+                                       Trigger.TriggerResult triggerResult = 
context.onProcessingTime(time);
+                                       processTriggerResult(triggerResult, 
context.window);
                                }
                                toRemove.add(triggers.getKey());
                        }
@@ -231,35 +316,139 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
                }
        }
 
-       protected class TriggerContext implements Trigger.TriggerContext {
-               Trigger<? super IN, ? super W> trigger;
-               W window;
+       /**
+        * A context object that is given to {@code Trigger} functions to allow 
them to register
+        * timer/watermark callbacks.
+        */
+       protected class Context implements Trigger.TriggerContext {
+               protected W window;
+
+               protected WindowBuffer<IN> windowBuffer;
+
+               protected HashMap<String, Serializable> state;
+
+               // use these to only allow one timer in flight at a time of 
each type
+               // if the trigger registers another timer this value here will 
be overwritten,
+               // the timer is not removed from the set of in-flight timers to 
improve performance.
+               // When a trigger fires it is just checked against the last 
timer that was set.
+               protected long watermarkTimer;
+               protected long processingTimeTimer;
 
-               public TriggerContext(W window, Trigger<? super IN, ? super W> 
trigger) {
+               public Context(
+                               W window,
+                               WindowBuffer<IN> windowBuffer) {
                        this.window = window;
-                       this.trigger = trigger;
+                       this.windowBuffer = windowBuffer;
+                       state = new HashMap<>();
+
+                       this.watermarkTimer = -1;
+                       this.processingTimeTimer = -1;
+               }
+
+
+               @SuppressWarnings("unchecked")
+               protected Context(DataInputView in) throws Exception {
+                       this.window = windowSerializer.deserialize(in);
+                       this.watermarkTimer = in.readLong();
+                       this.processingTimeTimer = in.readLong();
+
+                       int stateSize = in.readInt();
+                       byte[] stateData = new byte[stateSize];
+                       in.read(stateData);
+                       ByteArrayInputStream bais = new 
ByteArrayInputStream(stateData);
+                       state = (HashMap<String, Serializable>) 
SerializationUtils.deserialize(bais);
+
+                       this.windowBuffer = windowBufferFactory.create();
+                       int numElements = in.readInt();
+                       MultiplexingStreamRecordSerializer<IN> recordSerializer 
= new MultiplexingStreamRecordSerializer<>(inputSerializer);
+                       for (int i = 0; i < numElements; i++) {
+                               
windowBuffer.storeElement(recordSerializer.deserialize(in).<IN>asRecord());
+                       }
+               }
+
+               protected void 
writeToState(StateBackend.CheckpointStateOutputView out) throws IOException {
+                       windowSerializer.serialize(window, out);
+                       out.writeLong(watermarkTimer);
+                       out.writeLong(processingTimeTimer);
+
+                       ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                       SerializationUtils.serialize(state, baos);
+                       out.writeInt(baos.size());
+                       out.write(baos.toByteArray(), 0, baos.size());
+
+                       MultiplexingStreamRecordSerializer<IN> recordSerializer 
= new MultiplexingStreamRecordSerializer<>(inputSerializer);
+                       out.writeInt(windowBuffer.size());
+                       for (StreamRecord<IN> element: 
windowBuffer.getElements()) {
+                               recordSerializer.serialize(element, out);
+                       }
+               }
+
+               @SuppressWarnings("unchecked")
+               public <S extends Serializable> OperatorState<S> 
getKeyValueState(final String name, final S defaultState) {
+                       return new OperatorState<S>() {
+                               @Override
+                               public S value() throws IOException {
+                                       Serializable value = state.get(name);
+                                       if (value == null) {
+                                               state.put(name, defaultState);
+                                               value = defaultState;
+                                       }
+                                       return (S) value;
+                               }
+
+                               @Override
+                               public void update(S value) throws IOException {
+                                       state.put(name, value);
+                               }
+                       };
                }
 
                @Override
                public void registerProcessingTimeTimer(long time) {
-                       Set<TriggerContext> triggers = 
processingTimeTimers.get(time);
+                       if (this.processingTimeTimer == time) {
+                               // we already have set a trigger for that time
+                               return;
+                       }
+                       Set<Context> triggers = processingTimeTimers.get(time);
                        if (triggers == null) {
                                getRuntimeContext().registerTimer(time, 
NonKeyedWindowOperator.this);
                                triggers = new HashSet<>();
                                processingTimeTimers.put(time, triggers);
                        }
+                       this.processingTimeTimer = time;
                        triggers.add(this);
                }
 
                @Override
                public void registerWatermarkTimer(long time) {
-                       Set<TriggerContext> triggers = 
watermarkTimers.get(time);
+                       if (watermarkTimer == time) {
+                               // we already have set a trigger for that time
+                               return;
+                       }
+                       Set<Context> triggers = watermarkTimers.get(time);
                        if (triggers == null) {
                                triggers = new HashSet<>();
                                watermarkTimers.put(time, triggers);
                        }
+                       this.watermarkTimer = time;
                        triggers.add(this);
                }
+
+               public Trigger.TriggerResult onProcessingTime(long time) throws 
Exception {
+                       if (time == processingTimeTimer) {
+                               return trigger.onTime(time, this);
+                       } else {
+                               return Trigger.TriggerResult.CONTINUE;
+                       }
+               }
+
+               public Trigger.TriggerResult onEventTime(long time) throws 
Exception {
+                       if (time == watermarkTimer) {
+                               return trigger.onTime(time, this);
+                       } else {
+                               return Trigger.TriggerResult.CONTINUE;
+                       }
+               }
        }
 
        /**
@@ -274,7 +463,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
        }
 
        @Override
-       public void setOutputType(TypeInformation<OUT> outTypeInfo, 
ExecutionConfig executionConfig) {
+       public final void setOutputType(TypeInformation<OUT> outTypeInfo, 
ExecutionConfig executionConfig) {
                if (userFunction instanceof OutputTypeConfigurable) {
                        @SuppressWarnings("unchecked")
                        OutputTypeConfigurable<OUT> typeConfigurable = 
(OutputTypeConfigurable<OUT>) userFunction;
@@ -283,12 +472,59 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
        }
 
        // 
------------------------------------------------------------------------
+       //  Checkpointing
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public StreamTaskState snapshotOperatorState(long checkpointId, long 
timestamp) throws Exception {
+               StreamTaskState taskState = 
super.snapshotOperatorState(checkpointId, timestamp);
+
+               // we write the panes with the key/value maps into the stream
+               StateBackend.CheckpointStateOutputView out = 
getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+
+               int numWindows = windows.size();
+               out.writeInt(numWindows);
+               for (Context context: windows.values()) {
+                       context.writeToState(out);
+               }
+
+               taskState.setOperatorState(out.closeAndGetHandle());
+               return taskState;
+       }
+
+       @Override
+       public void restoreState(StreamTaskState taskState) throws Exception {
+               super.restoreState(taskState);
+
+
+               @SuppressWarnings("unchecked")
+               StateHandle<DataInputView> inputState = 
(StateHandle<DataInputView>) taskState.getOperatorState();
+               DataInputView in = 
inputState.getState(getUserCodeClassloader());
+
+               int numWindows = in.readInt();
+               this.windows = new HashMap<>(numWindows);
+               this.processingTimeTimers = new HashMap<>();
+               this.watermarkTimers = new HashMap<>();
+
+               for (int j = 0; j < numWindows; j++) {
+                       Context context = new Context(in);
+                       windows.put(context.window, context);
+               }
+       }
+
+
+       // 
------------------------------------------------------------------------
        // Getters for testing
        // 
------------------------------------------------------------------------
 
        @VisibleForTesting
-       public Trigger<? super IN, ? super W> getTriggerTemplate() {
-               return triggerTemplate;
+       public boolean isSetProcessingTime() {
+               return setProcessingTime;
+       }
+
+       @VisibleForTesting
+       public Trigger<? super IN, ? super W> getTrigger() {
+               return trigger;
        }
 
        @VisibleForTesting
@@ -300,9 +536,4 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
        public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> 
getWindowBufferFactory() {
                return windowBufferFactory;
        }
-
-       @VisibleForTesting
-       public boolean isSetProcessingTime() {
-               return setProcessingTime;
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 82a3f9a..0b3274f 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -18,13 +18,16 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
-
+import org.apache.commons.lang.SerializationUtils;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -38,10 +41,16 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -84,49 +93,77 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
        private static final Logger LOG = 
LoggerFactory.getLogger(WindowOperator.class);
 
+       // 
------------------------------------------------------------------------
+       // Configuration values and user functions
+       // 
------------------------------------------------------------------------
+
        private final WindowAssigner<? super IN, W> windowAssigner;
 
        private final KeySelector<IN, K> keySelector;
 
-       private final Trigger<? super IN, ? super W> triggerTemplate;
+       private final Trigger<? super IN, ? super W> trigger;
 
        private final WindowBufferFactory<? super IN, ? extends 
WindowBuffer<IN>> windowBufferFactory;
 
        /**
-        * The windows (panes) that are currently in-flight. Each pane has a 
{@code WindowBuffer}
-        * and a {@code TriggerContext} that stores the {@code Trigger} for 
that pane.
+        * If this is true. The current processing time is set as the timestamp 
of incoming elements.
+        * This for use with a {@link 
org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
+        * if eviction should happen based on processing time.
+        */
+       private boolean setProcessingTime = false;
+
+       /**
+        * This is used to copy the incoming element because it can be put into 
several window
+        * buffers.
+        */
+       private TypeSerializer<IN> inputSerializer;
+
+       /**
+        * For serializing the key in checkpoints.
         */
-       protected transient Map<K, Map<W, Tuple2<WindowBuffer<IN>, 
TriggerContext>>> windows;
+       private final TypeSerializer<K> keySerializer;
+
+       /**
+        * For serializing the window in checkpoints.
+        */
+       private final TypeSerializer<W> windowSerializer;
+
+       // 
------------------------------------------------------------------------
+       // State that is not checkpointed
+       // 
------------------------------------------------------------------------
 
        /**
         * Processing time timers that are currently in-flight.
         */
-       private transient Map<Long, Set<TriggerContext>> processingTimeTimers;
+       private transient Map<Long, Set<Context>> processingTimeTimers;
 
        /**
         * Current waiting watermark callbacks.
         */
-       private transient Map<Long, Set<TriggerContext>> watermarkTimers;
+       private transient Map<Long, Set<Context>> watermarkTimers;
 
        /**
         * This is given to the {@code WindowFunction} for emitting elements 
with a given timestamp.
         */
        protected transient TimestampedCollector<OUT> timestampedCollector;
 
+       // 
------------------------------------------------------------------------
+       // State that needs to be checkpointed
+       // 
------------------------------------------------------------------------
+
        /**
-        * If this is true. The current processing time is set as the timestamp 
of incoming elements.
-        * This for use with a {@link 
org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
-        * if eviction should happen based on processing time.
+        * The windows (panes) that are currently in-flight. Each pane has a 
{@code WindowBuffer}
+        * and a {@code TriggerContext} that stores the {@code Trigger} for 
that pane.
         */
-       private boolean setProcessingTime = false;
-
-       private TypeSerializer<IN> inputSerializer;
+       protected transient Map<K, Map<W, Context>> windows;
 
        /**
         * Creates a new {@code WindowOperator} based on the given policies and 
user functions.
         */
        public WindowOperator(WindowAssigner<? super IN, W> windowAssigner,
+                       TypeSerializer<W> windowSerializer,
                        KeySelector<IN, K> keySelector,
+                       TypeSerializer<K> keySerializer,
                        WindowBufferFactory<? super IN, ? extends 
WindowBuffer<IN>> windowBufferFactory,
                        WindowFunction<IN, OUT, K, W> windowFunction,
                        Trigger<? super IN, ? super W> trigger) {
@@ -134,27 +171,26 @@ public class WindowOperator<K, IN, OUT, W extends Window>
                super(windowFunction);
 
                this.windowAssigner = requireNonNull(windowAssigner);
+               this.windowSerializer = windowSerializer;
                this.keySelector = requireNonNull(keySelector);
+               this.keySerializer = requireNonNull(keySerializer);
 
                this.windowBufferFactory = requireNonNull(windowBufferFactory);
-               this.triggerTemplate = requireNonNull(trigger);
+               this.trigger = requireNonNull(trigger);
 
                setChainingStrategy(ChainingStrategy.ALWAYS);
-//             forceInputCopy();
        }
 
        @Override
        @SuppressWarnings("unchecked")
-       public void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
+       public final void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
                inputSerializer = (TypeSerializer<IN>) 
type.createSerializer(executionConfig);
        }
 
        @Override
-       public void open() throws Exception {
+       public final void open() throws Exception {
                super.open();
-               windows = new HashMap<>();
-               watermarkTimers = new HashMap<>();
-               processingTimeTimers = new HashMap<>();
+
                timestampedCollector = new TimestampedCollector<>(output);
 
                if (inputSerializer == null) {
@@ -163,17 +199,53 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
                windowBufferFactory.setRuntimeContext(getRuntimeContext());
                windowBufferFactory.open(getUserFunctionParameters());
+
+
+               // these could already be initialized from restoreState()
+               if (watermarkTimers == null) {
+                       watermarkTimers = new HashMap<>();
+               }
+               if (processingTimeTimers == null) {
+                       processingTimeTimers = new HashMap<>();
+               }
+               if (windows == null) {
+                       windows = new HashMap<>();
+               }
+
+               // re-register timers that this window context had set
+               for (Map.Entry<K, Map<W, Context>> entry: windows.entrySet()) {
+                       Map<W, Context> keyWindows = entry.getValue();
+                       for (Context context: keyWindows.values()) {
+                               if (context.processingTimeTimer > 0) {
+                                       Set<Context> triggers = 
processingTimeTimers.get(context.processingTimeTimer);
+                                       if (triggers == null) {
+                                               
getRuntimeContext().registerTimer(context.processingTimeTimer, 
WindowOperator.this);
+                                               triggers = new HashSet<>();
+                                               
processingTimeTimers.put(context.processingTimeTimer, triggers);
+                                       }
+                                       triggers.add(context);
+                               }
+                               if (context.watermarkTimer > 0) {
+                                       Set<Context> triggers = 
watermarkTimers.get(context.watermarkTimer);
+                                       if (triggers == null) {
+                                               triggers = new HashSet<>();
+                                               
watermarkTimers.put(context.watermarkTimer, triggers);
+                                       }
+                                       triggers.add(context);
+                               }
+
+                       }
+               }
        }
 
        @Override
-       public void close() throws Exception {
+       public final void close() throws Exception {
                super.close();
                // emit the elements that we still keep
-               for (Map.Entry<K, Map<W, Tuple2<WindowBuffer<IN>, 
TriggerContext>>> entry: windows.entrySet()) {
-                       K key = entry.getKey();
-                       Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> 
keyWindows = entry.getValue();
-                       for (W window: keyWindows.keySet()) {
-                               emitWindow(key, window, false);
+               for (Map.Entry<K, Map<W, Context>> entry: windows.entrySet()) {
+                       Map<W, Context> keyWindows = entry.getValue();
+                       for (Context window: keyWindows.values()) {
+                               emitWindow(window);
                        }
                }
                windows.clear();
@@ -182,77 +254,81 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
        @Override
        @SuppressWarnings("unchecked")
-       public void processElement(StreamRecord<IN> element) throws Exception {
+       public final void processElement(StreamRecord<IN> element) throws 
Exception {
                if (setProcessingTime) {
                        element.replace(element.getValue(), 
System.currentTimeMillis());
                }
+
                Collection<W> elementWindows = 
windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
 
                K key = keySelector.getKey(element.getValue());
 
-               Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = 
windows.get(key);
+               Map<W, Context> keyWindows = windows.get(key);
                if (keyWindows == null) {
                        keyWindows = new HashMap<>();
                        windows.put(key, keyWindows);
                }
 
                for (W window: elementWindows) {
-                       Tuple2<WindowBuffer<IN>, TriggerContext> 
bufferAndTrigger = keyWindows.get(window);
-                       if (bufferAndTrigger == null) {
-                               bufferAndTrigger = new Tuple2<>();
-                               bufferAndTrigger.f0 = 
windowBufferFactory.create();
-                               bufferAndTrigger.f1 = new TriggerContext(key, 
window, triggerTemplate.duplicate());
-                               keyWindows.put(window, bufferAndTrigger);
+                       Context context = keyWindows.get(window);
+                       if (context == null) {
+                               WindowBuffer<IN> windowBuffer = 
windowBufferFactory.create();
+                               context = new Context(key, window, 
windowBuffer);
+                               keyWindows.put(window, context);
                        }
                        StreamRecord<IN> elementCopy = new 
StreamRecord<>(inputSerializer.copy(element.getValue()), 
element.getTimestamp());
-                       bufferAndTrigger.f0.storeElement(elementCopy);
-                       Trigger.TriggerResult triggerResult = 
bufferAndTrigger.f1.trigger.onElement(elementCopy.getValue(), 
elementCopy.getTimestamp(), window, bufferAndTrigger.f1);
+                       context.windowBuffer.storeElement(elementCopy);
+                       Trigger.TriggerResult triggerResult = 
trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, 
context);
                        processTriggerResult(triggerResult, key, window);
                }
        }
 
-       protected void emitWindow(K key, W window, boolean purge) throws 
Exception {
-               timestampedCollector.setTimestamp(window.getEnd());
-
-               Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = 
windows.get(key);
-
-               if (keyWindows == null) {
-                       LOG.debug("Window {} for key {} already gone.", window, 
key);
-                       return;
-               }
-
-               Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger;
-               if (purge) {
-                       bufferAndTrigger = keyWindows.remove(window);
-               } else {
-                       bufferAndTrigger = keyWindows.get(window);
-               }
-
-               if (bufferAndTrigger == null) {
-                       LOG.debug("Window {} for key {} already gone.", window, 
key);
-                       return;
-               }
+       protected void emitWindow(Context context) throws Exception {
+               
timestampedCollector.setTimestamp(context.window.maxTimestamp());
 
-
-               userFunction.apply(key,
-                               window,
-                               bufferAndTrigger.f0.getUnpackedElements(),
+               userFunction.apply(context.key,
+                               context.window,
+                               context.windowBuffer.getUnpackedElements(),
                                timestampedCollector);
-
-               if (keyWindows.isEmpty()) {
-                       windows.remove(key);
-               }
        }
 
        private void processTriggerResult(Trigger.TriggerResult triggerResult, 
K key, W window) throws Exception {
                switch (triggerResult) {
-                       case FIRE:
-                               emitWindow(key, window, false);
+                       case FIRE: {
+                               Map<W, Context> keyWindows = windows.get(key);
+                               if (keyWindows == null) {
+                                       LOG.debug("Window {} for key {} already 
gone.", window, key);
+                                       return;
+                               }
+                               Context context = keyWindows.get(window);
+                               if (context == null) {
+                                       LOG.debug("Window {} for key {} already 
gone.", window, key);
+                                       return;
+                               }
+
+
+                               emitWindow(context);
                                break;
+                       }
 
-                       case FIRE_AND_PURGE:
-                               emitWindow(key, window, true);
+                       case FIRE_AND_PURGE: {
+                               Map<W, Context> keyWindows = windows.get(key);
+                               if (keyWindows == null) {
+                                       LOG.debug("Window {} for key {} already 
gone.", window, key);
+                                       return;
+                               }
+                               Context context = keyWindows.remove(window);
+                               if (context == null) {
+                                       LOG.debug("Window {} for key {} already 
gone.", window, key);
+                                       return;
+                               }
+                               if (keyWindows.isEmpty()) {
+                                       windows.remove(key);
+                               }
+
+                               emitWindow(context);
                                break;
+                       }
 
                        case CONTINUE:
                                // ingore
@@ -260,14 +336,14 @@ public class WindowOperator<K, IN, OUT, W extends Window>
        }
 
        @Override
-       public void processWatermark(Watermark mark) throws Exception {
+       public final void processWatermark(Watermark mark) throws Exception {
                Set<Long> toRemove = new HashSet<>();
 
-               for (Map.Entry<Long, Set<TriggerContext>> triggers: 
watermarkTimers.entrySet()) {
+               for (Map.Entry<Long, Set<Context>> triggers: 
watermarkTimers.entrySet()) {
                        if (triggers.getKey() <= mark.getTimestamp()) {
-                               for (TriggerContext trigger: 
triggers.getValue()) {
-                                       Trigger.TriggerResult triggerResult = 
trigger.trigger.onTime(mark.getTimestamp(), trigger);
-                                       processTriggerResult(triggerResult, 
trigger.key, trigger.window);
+                               for (Context context: triggers.getValue()) {
+                                       Trigger.TriggerResult triggerResult = 
context.onEventTime(triggers.getKey());
+                                       processTriggerResult(triggerResult, 
context.key, context.window);
                                }
                                toRemove.add(triggers.getKey());
                        }
@@ -280,14 +356,14 @@ public class WindowOperator<K, IN, OUT, W extends Window>
        }
 
        @Override
-       public void trigger(long time) throws Exception {
+       public final void trigger(long time) throws Exception {
                Set<Long> toRemove = new HashSet<>();
 
-               for (Map.Entry<Long, Set<TriggerContext>> triggers: 
processingTimeTimers.entrySet()) {
+               for (Map.Entry<Long, Set<Context>> triggers: 
processingTimeTimers.entrySet()) {
                        if (triggers.getKey() < time) {
-                               for (TriggerContext trigger: 
triggers.getValue()) {
-                                       Trigger.TriggerResult triggerResult = 
trigger.trigger.onTime(time, trigger);
-                                       processTriggerResult(triggerResult, 
trigger.key, trigger.window);
+                               for (Context context: triggers.getValue()) {
+                                       Trigger.TriggerResult triggerResult = 
context.onProcessingTime(time);
+                                       processTriggerResult(triggerResult, 
context.key, context.window);
                                }
                                toRemove.add(triggers.getKey());
                        }
@@ -302,37 +378,146 @@ public class WindowOperator<K, IN, OUT, W extends Window>
         * A context object that is given to {@code Trigger} functions to allow 
them to register
         * timer/watermark callbacks.
         */
-       protected class TriggerContext implements Trigger.TriggerContext {
-               Trigger<? super IN, ? super W> trigger;
-               K key;
-               W window;
+       protected class Context implements Trigger.TriggerContext {
+               protected K key;
+               protected W window;
+
+               protected WindowBuffer<IN> windowBuffer;
 
-               public TriggerContext(K key, W window, Trigger<? super IN, ? 
super W> trigger) {
+               protected HashMap<String, Serializable> state;
+
+               // use these to only allow one timer in flight at a time of 
each type
+               // if the trigger registers another timer this value here will 
be overwritten,
+               // the timer is not removed from the set of in-flight timers to 
improve performance.
+               // When a trigger fires it is just checked against the last 
timer that was set.
+               protected long watermarkTimer;
+               protected long processingTimeTimer;
+
+               public Context(K key,
+                               W window,
+                               WindowBuffer<IN> windowBuffer) {
                        this.key = key;
                        this.window = window;
-                       this.trigger = trigger;
+                       this.windowBuffer = windowBuffer;
+                       state = new HashMap<>();
+
+                       this.watermarkTimer = -1;
+                       this.processingTimeTimer = -1;
+               }
+
+               /**
+                * Constructs a new {@code Context} by reading from a {@link 
DataInputView} that
+                * contains a serialized context that we wrote in
+                * {@link #writeToState(StateBackend.CheckpointStateOutputView)}
+                */
+               @SuppressWarnings("unchecked")
+               protected Context(DataInputView in) throws Exception {
+                       this.key = keySerializer.deserialize(in);
+                       this.window = windowSerializer.deserialize(in);
+                       this.watermarkTimer = in.readLong();
+                       this.processingTimeTimer = in.readLong();
+
+                       int stateSize = in.readInt();
+                       byte[] stateData = new byte[stateSize];
+                       in.read(stateData);
+                       ByteArrayInputStream bais = new 
ByteArrayInputStream(stateData);
+                       state = (HashMap<String, Serializable>) 
SerializationUtils.deserialize(bais);
+
+                       this.windowBuffer = windowBufferFactory.create();
+                       int numElements = in.readInt();
+                       MultiplexingStreamRecordSerializer<IN> recordSerializer 
= new MultiplexingStreamRecordSerializer<>(inputSerializer);
+                       for (int i = 0; i < numElements; i++) {
+                               
windowBuffer.storeElement(recordSerializer.deserialize(in).<IN>asRecord());
+                       }
+               }
+
+               /**
+                * Writes the {@code Context} to the given state checkpoint 
output.
+                */
+               protected void 
writeToState(StateBackend.CheckpointStateOutputView out) throws IOException {
+                       keySerializer.serialize(key, out);
+                       windowSerializer.serialize(window, out);
+                       out.writeLong(watermarkTimer);
+                       out.writeLong(processingTimeTimer);
+
+                       ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                       SerializationUtils.serialize(state, baos);
+                       out.writeInt(baos.size());
+                       out.write(baos.toByteArray(), 0, baos.size());
+
+                       MultiplexingStreamRecordSerializer<IN> recordSerializer 
= new MultiplexingStreamRecordSerializer<>(inputSerializer);
+                       out.writeInt(windowBuffer.size());
+                       for (StreamRecord<IN> element: 
windowBuffer.getElements()) {
+                               recordSerializer.serialize(element, out);
+                       }
+               }
+
+               @SuppressWarnings("unchecked")
+               public <S extends Serializable> OperatorState<S> 
getKeyValueState(final String name, final S defaultState) {
+                       return new OperatorState<S>() {
+                               @Override
+                               public S value() throws IOException {
+                                       Serializable value = state.get(name);
+                                       if (value == null) {
+                                               state.put(name, defaultState);
+                                               value = defaultState;
+                                       }
+                                       return (S) value;
+                               }
+
+                               @Override
+                               public void update(S value) throws IOException {
+                                       state.put(name, value);
+                               }
+                       };
                }
 
                @Override
                public void registerProcessingTimeTimer(long time) {
-                       Set<TriggerContext> triggers = 
processingTimeTimers.get(time);
+                       if (this.processingTimeTimer == time) {
+                               // we already have set a trigger for that time
+                               return;
+                       }
+                       Set<Context> triggers = processingTimeTimers.get(time);
                        if (triggers == null) {
                                getRuntimeContext().registerTimer(time, 
WindowOperator.this);
                                triggers = new HashSet<>();
                                processingTimeTimers.put(time, triggers);
                        }
+                       this.processingTimeTimer = time;
                        triggers.add(this);
                }
 
                @Override
                public void registerWatermarkTimer(long time) {
-                       Set<TriggerContext> triggers = 
watermarkTimers.get(time);
+                       if (watermarkTimer == time) {
+                               // we already have set a trigger for that time
+                               return;
+                       }
+                       Set<Context> triggers = watermarkTimers.get(time);
                        if (triggers == null) {
                                triggers = new HashSet<>();
                                watermarkTimers.put(time, triggers);
                        }
+                       this.watermarkTimer = time;
                        triggers.add(this);
                }
+
+               public Trigger.TriggerResult onProcessingTime(long time) throws 
Exception {
+                       if (time == processingTimeTimer) {
+                               return trigger.onTime(time, this);
+                       } else {
+                               return Trigger.TriggerResult.CONTINUE;
+                       }
+               }
+
+               public Trigger.TriggerResult onEventTime(long time) throws 
Exception {
+                       if (time == watermarkTimer) {
+                               return trigger.onTime(time, this);
+                       } else {
+                               return Trigger.TriggerResult.CONTINUE;
+                       }
+               }
        }
 
        /**
@@ -347,7 +532,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
        }
 
        @Override
-       public void setOutputType(TypeInformation<OUT> outTypeInfo, 
ExecutionConfig executionConfig) {
+       public final void setOutputType(TypeInformation<OUT> outTypeInfo, 
ExecutionConfig executionConfig) {
                if (userFunction instanceof OutputTypeConfigurable) {
                        @SuppressWarnings("unchecked")
                        OutputTypeConfigurable<OUT> typeConfigurable = 
(OutputTypeConfigurable<OUT>) userFunction;
@@ -356,6 +541,60 @@ public class WindowOperator<K, IN, OUT, W extends Window>
        }
 
        // 
------------------------------------------------------------------------
+       //  Checkpointing
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public StreamTaskState snapshotOperatorState(long checkpointId, long 
timestamp) throws Exception {
+               StreamTaskState taskState = 
super.snapshotOperatorState(checkpointId, timestamp);
+
+               // we write the panes with the key/value maps into the stream
+               StateBackend.CheckpointStateOutputView out = 
getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+
+               int numKeys = windows.size();
+               out.writeInt(numKeys);
+
+               for (Map.Entry<K, Map<W, Context>> keyWindows: 
windows.entrySet()) {
+                       int numWindows = keyWindows.getValue().size();
+                       out.writeInt(numWindows);
+                       for (Context context: keyWindows.getValue().values()) {
+                               context.writeToState(out);
+                       }
+               }
+
+               taskState.setOperatorState(out.closeAndGetHandle());
+               return taskState;
+       }
+
+       @Override
+       public void restoreState(StreamTaskState taskState) throws Exception {
+               super.restoreState(taskState);
+
+
+               @SuppressWarnings("unchecked")
+               StateHandle<DataInputView> inputState = 
(StateHandle<DataInputView>) taskState.getOperatorState();
+               DataInputView in = 
inputState.getState(getUserCodeClassloader());
+
+               int numKeys = in.readInt();
+               this.windows = new HashMap<>(numKeys);
+               this.processingTimeTimers = new HashMap<>();
+               this.watermarkTimers = new HashMap<>();
+
+               for (int i = 0; i < numKeys; i++) {
+                       int numWindows = in.readInt();
+                       for (int j = 0; j < numWindows; j++) {
+                               Context context = new Context(in);
+                               Map<W, Context> keyWindows = 
windows.get(context.key);
+                               if (keyWindows == null) {
+                                       keyWindows = new HashMap<>(numWindows);
+                                       windows.put(context.key, keyWindows);
+                               }
+                               keyWindows.put(context.window, context);
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
        // Getters for testing
        // 
------------------------------------------------------------------------
 
@@ -365,8 +604,8 @@ public class WindowOperator<K, IN, OUT, W extends Window>
        }
 
        @VisibleForTesting
-       public Trigger<? super IN, ? super W> getTriggerTemplate() {
-               return triggerTemplate;
+       public Trigger<? super IN, ? super W> getTrigger() {
+               return trigger;
        }
 
        @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 4fa16ac..45ef29f 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -71,7 +71,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
                NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) 
operator1;
                Assert.assertFalse(winOperator1.isSetProcessingTime());
-               Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof 
WatermarkTrigger);
+               Assert.assertTrue(winOperator1.getTrigger() instanceof 
WatermarkTrigger);
                Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
                Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof PreAggregatingHeapWindowBuffer.Factory);
 
@@ -94,7 +94,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
                NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) 
operator2;
                Assert.assertFalse(winOperator2.isSetProcessingTime());
-               Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof 
WatermarkTrigger);
+               Assert.assertTrue(winOperator2.getTrigger() instanceof 
WatermarkTrigger);
                Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
                Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
        }
@@ -119,7 +119,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
                NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) 
operator1;
                Assert.assertTrue(winOperator1.isSetProcessingTime());
-               Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator1.getTrigger() instanceof 
CountTrigger);
                Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
                Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof PreAggregatingHeapWindowBuffer.Factory);
 
@@ -143,7 +143,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
                NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) 
operator2;
                Assert.assertTrue(winOperator1.isSetProcessingTime());
-               Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator2.getTrigger() instanceof 
CountTrigger);
                Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
                Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
        }
@@ -168,7 +168,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                Assert.assertTrue(operator1 instanceof 
EvictingNonKeyedWindowOperator);
                EvictingNonKeyedWindowOperator winOperator1 = 
(EvictingNonKeyedWindowOperator) operator1;
                Assert.assertFalse(winOperator1.isSetProcessingTime());
-               Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof 
WatermarkTrigger);
+               Assert.assertTrue(winOperator1.getTrigger() instanceof 
WatermarkTrigger);
                Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
                Assert.assertTrue(winOperator1.getEvictor() instanceof 
CountEvictor);
                Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
@@ -194,7 +194,7 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                Assert.assertTrue(operator2 instanceof 
EvictingNonKeyedWindowOperator);
                EvictingNonKeyedWindowOperator winOperator2 = 
(EvictingNonKeyedWindowOperator) operator2;
                Assert.assertFalse(winOperator2.isSetProcessingTime());
-               Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator2.getTrigger() instanceof 
CountTrigger);
                Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
                Assert.assertTrue(winOperator2.getEvictor() instanceof 
TimeEvictor);
                Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
index 3139941..39033cc 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
@@ -54,6 +54,7 @@ public class EvictingNonKeyedWindowOperatorTest {
 
                EvictingNonKeyedWindowOperator<Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new 
EvictingNonKeyedWindowOperator<>(
                                GlobalWindows.create(),
+                               new GlobalWindow.Serializer(),
                                new HeapWindowBuffer.Factory<Tuple2<String, 
Integer>>(),
                                new ReduceAllWindowFunction<GlobalWindow, 
Tuple2<String, Integer>>(new SumReducer(closeCalled)),
                                CountTrigger.of(WINDOW_SLIDE),

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 3d9605e..afc65d5 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -17,8 +17,10 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import jdk.nashorn.internal.objects.Global;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
@@ -54,7 +56,9 @@ public class EvictingWindowOperatorTest {
 
                EvictingWindowOperator<String, Tuple2<String, Integer>, 
Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>(
                                GlobalWindows.create(),
+                               new GlobalWindow.Serializer(),
                                new TupleKeySelector(),
+                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                new HeapWindowBuffer.Factory<Tuple2<String, 
Integer>>(),
                                new ReduceWindowFunction<String, GlobalWindow, 
Tuple2<String, Integer>>(new SumReducer(closeCalled)),
                                CountTrigger.of(WINDOW_SLIDE),

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
index 6cc8931..a91d957 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
@@ -76,6 +76,7 @@ public class NonKeyedWindowOperatorTest {
 
                NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
                                SlidingTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+                               new TimeWindow.Serializer(),
                                windowBufferFactory,
                                new ReduceAllWindowFunction<TimeWindow, 
Tuple2<String, Integer>>(new SumReducer()),
                                WatermarkTrigger.create());
@@ -156,6 +157,7 @@ public class NonKeyedWindowOperatorTest {
 
                NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
                                TumblingTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS)),
+                               new TimeWindow.Serializer(),
                                windowBufferFactory,
                                new ReduceAllWindowFunction<TimeWindow, 
Tuple2<String, Integer>>(new SumReducer()),
                                WatermarkTrigger.create());
@@ -234,6 +236,7 @@ public class NonKeyedWindowOperatorTest {
 
                NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
                                GlobalWindows.create(),
+                               new GlobalWindow.Serializer(),
                                windowBufferFactory,
                                new ReduceAllWindowFunction<GlobalWindow, 
Tuple2<String, Integer>>(new SumReducer()),
                                
ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
@@ -312,6 +315,7 @@ public class NonKeyedWindowOperatorTest {
 
                NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
                                GlobalWindows.create(),
+                               new GlobalWindow.Serializer(),
                                windowBufferFactory,
                                new ReduceAllWindowFunction<GlobalWindow, 
Tuple2<String, Integer>>(new SumReducer()),
                                
PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index d387df0..e825b88 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -19,6 +19,7 @@ package 
org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
@@ -76,7 +77,9 @@ public class WindowOperatorTest {
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, TimeWindow> operator = new WindowOperator<>(
                                SlidingTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+                               new TimeWindow.Serializer(),
                                new TupleKeySelector(),
+                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                windowBufferFactory,
                                new ReduceWindowFunction<String, TimeWindow, 
Tuple2<String, Integer>>(new SumReducer()),
                                WatermarkTrigger.create());
@@ -163,7 +166,9 @@ public class WindowOperatorTest {
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, TimeWindow> operator = new WindowOperator<>(
                                TumblingTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS)),
+                               new TimeWindow.Serializer(),
                                new TupleKeySelector(),
+                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                windowBufferFactory,
                                new ReduceWindowFunction<String, TimeWindow, 
Tuple2<String, Integer>>(new SumReducer()),
                                WatermarkTrigger.create());
@@ -246,7 +251,9 @@ public class WindowOperatorTest {
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, GlobalWindow> operator = new WindowOperator<>(
                                GlobalWindows.create(),
+                               new GlobalWindow.Serializer(),
                                new TupleKeySelector(),
+                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                windowBufferFactory,
                                new ReduceWindowFunction<String, GlobalWindow, 
Tuple2<String, Integer>>(new SumReducer()),
                                
ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
@@ -331,7 +338,9 @@ public class WindowOperatorTest {
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, GlobalWindow> operator = new WindowOperator<>(
                                GlobalWindows.create(),
+                               new GlobalWindow.Serializer(),
                                new TupleKeySelector(),
+                               
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                windowBufferFactory,
                                new ReduceWindowFunction<String, GlobalWindow, 
Tuple2<String, Integer>>(new SumReducer()),
                                
PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 10fe734..02ec820 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -116,7 +116,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                Assert.assertTrue(operator1 instanceof WindowOperator);
                WindowOperator winOperator1 = (WindowOperator) operator1;
                Assert.assertFalse(winOperator1.isSetProcessingTime());
-               Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof 
WatermarkTrigger);
+               Assert.assertTrue(winOperator1.getTrigger() instanceof 
WatermarkTrigger);
                Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
                Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof PreAggregatingHeapWindowBuffer.Factory);
 
@@ -140,7 +140,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                Assert.assertTrue(operator2 instanceof WindowOperator);
                WindowOperator winOperator2 = (WindowOperator) operator2;
                Assert.assertFalse(winOperator2.isSetProcessingTime());
-               Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof 
WatermarkTrigger);
+               Assert.assertTrue(winOperator2.getTrigger() instanceof 
WatermarkTrigger);
                Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
                Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
        }
@@ -166,7 +166,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                Assert.assertTrue(operator1 instanceof WindowOperator);
                WindowOperator winOperator1 = (WindowOperator) operator1;
                Assert.assertTrue(winOperator1.isSetProcessingTime());
-               Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator1.getTrigger() instanceof 
CountTrigger);
                Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
                Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof PreAggregatingHeapWindowBuffer.Factory);
 
@@ -191,7 +191,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                Assert.assertTrue(operator2 instanceof WindowOperator);
                WindowOperator winOperator2 = (WindowOperator) operator2;
                Assert.assertTrue(winOperator2.isSetProcessingTime());
-               Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator2.getTrigger() instanceof 
CountTrigger);
                Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
                Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
        }
@@ -217,7 +217,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
                EvictingWindowOperator winOperator1 = (EvictingWindowOperator) 
operator1;
                Assert.assertFalse(winOperator1.isSetProcessingTime());
-               Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof 
WatermarkTrigger);
+               Assert.assertTrue(winOperator1.getTrigger() instanceof 
WatermarkTrigger);
                Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
                Assert.assertTrue(winOperator1.getEvictor() instanceof 
CountEvictor);
                Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
@@ -244,7 +244,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
                EvictingWindowOperator winOperator2 = (EvictingWindowOperator) 
operator2;
                Assert.assertFalse(winOperator2.isSetProcessingTime());
-               Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof 
CountTrigger);
+               Assert.assertTrue(winOperator2.getTrigger() instanceof 
CountTrigger);
                Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
                Assert.assertTrue(winOperator2.getEvictor() instanceof 
TimeEvictor);
                Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 950b0f5..60b7894 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -97,8 +98,7 @@ public class SessionWindowing {
 
                private static final long serialVersionUID = 1L;
 
-               private volatile Long lastSeenEvent = 1L;
-               private Long sessionTimeout;
+               private final Long sessionTimeout;
 
                public SessionTrigger(Long sessionTimeout) {
                        this.sessionTimeout = sessionTimeout;
@@ -106,13 +106,17 @@ public class SessionWindowing {
                }
 
                @Override
-               public TriggerResult onElement(Tuple3<String, Long, Integer> 
element, long timestamp, GlobalWindow window, TriggerContext ctx) {
-                       Long timeSinceLastEvent = timestamp - lastSeenEvent;
+               public TriggerResult onElement(Tuple3<String, Long, Integer> 
element, long timestamp, GlobalWindow window, TriggerContext ctx) throws 
Exception {
+
+                       OperatorState<Long> lastSeenState = 
ctx.getKeyValueState("last-seen", 1L);
+                       Long lastSeen = lastSeenState.value();
+
+                       Long timeSinceLastEvent = timestamp - lastSeen;
 
                        // Update the last seen event time
-                       lastSeenEvent = timestamp;
+                       lastSeenState.update(timestamp);
 
-                       ctx.registerWatermarkTimer(lastSeenEvent + 
sessionTimeout);
+                       ctx.registerWatermarkTimer(lastSeen + sessionTimeout);
 
                        if (timeSinceLastEvent > sessionTimeout) {
                                return TriggerResult.FIRE_AND_PURGE;
@@ -122,17 +126,15 @@ public class SessionWindowing {
                }
 
                @Override
-               public TriggerResult onTime(long time, TriggerContext ctx) {
-                       if (time - lastSeenEvent >= sessionTimeout) {
+               public TriggerResult onTime(long time, TriggerContext ctx) 
throws Exception {
+                       OperatorState<Long> lastSeenState = 
ctx.getKeyValueState("last-seen", 1L);
+                       Long lastSeen = lastSeenState.value();
+
+                       if (time - lastSeen >= sessionTimeout) {
                                return TriggerResult.FIRE_AND_PURGE;
                        }
                        return TriggerResult.CONTINUE;
                }
-
-               @Override
-               public SessionTrigger duplicate() {
-                       return new SessionTrigger(sessionTimeout);
-               }
        }
 
        // 
*************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 33104ab..0357144 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -202,6 +202,58 @@ class AllWindowedStream[T, W <: Window](javaStream: 
JavaAllWStream[T, W]) {
     javaStream.apply(applyFunction, implicitly[TypeInformation[R]])
   }
 
+  /**
+   * Applies the given window function to each window. The window function is 
called for each
+   * evaluation of the window for each key individually. The output of the 
window function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+   *
+   * @param preAggregator The reduce function that is used for pre-aggregation
+   * @param function The window function.
+   * @return The data stream that is the result of applying the window 
function to the window.
+   */
+  def apply[R: TypeInformation: ClassTag](
+      preAggregator: ReduceFunction[T],
+      function: AllWindowFunction[T, R, W]): DataStream[R] = {
+    javaStream.apply(clean(preAggregator), clean(function), 
implicitly[TypeInformation[R]])
+  }
+
+  /**
+   * Applies the given window function to each window. The window function is 
called for each
+   * evaluation of the window for each key individually. The output of the 
window function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+   *
+   * @param preAggregator The reduce function that is used for pre-aggregation
+   * @param function The window function.
+   * @return The data stream that is the result of applying the window 
function to the window.
+   */
+  def apply[R: TypeInformation: ClassTag](
+      preAggregator: (T, T) => T,
+      function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
+    if (function == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    if (function == null) {
+      throw new NullPointerException("WindowApply function must not be null.")
+    }
+
+    val cleanReducer = clean(preAggregator)
+    val reducer = new ReduceFunction[T] {
+      def reduce(v1: T, v2: T) = { cleanReducer(v1, v2) }
+    }
+
+    val cleanApply = clean(function)
+    val applyFunction = new AllWindowFunction[T, R, W] {
+      def apply(window: W, elements: java.lang.Iterable[T], out: 
Collector[R]): Unit = {
+        cleanApply(window, elements.asScala, out)
+      }
+    }
+    javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])
+  }
+
   // ------------------------------------------------------------------------
   //  Aggregations on the keyed windows
   // ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index d4f4618..93b91ff 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -196,6 +196,10 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
    */
   def apply[R: TypeInformation: ClassTag](
       function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
+    if (function == null) {
+      throw new NullPointerException("WindowApply function must not be null.")
+    }
+
     val cleanedFunction = clean(function)
     val applyFunction = new WindowFunction[T, R, K, W] {
       def apply(key: K, window: W, elements: java.lang.Iterable[T], out: 
Collector[R]): Unit = {
@@ -205,6 +209,58 @@ class WindowedStream[T, K, W <: Window](javaStream: 
JavaWStream[T, K, W]) {
     javaStream.apply(applyFunction, implicitly[TypeInformation[R]])
   }
 
+  /**
+   * Applies the given window function to each window. The window function is 
called for each
+   * evaluation of the window for each key individually. The output of the 
window function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+   *
+   * @param preAggregator The reduce function that is used for pre-aggregation
+   * @param function The window function.
+   * @return The data stream that is the result of applying the window 
function to the window.
+   */
+  def apply[R: TypeInformation: ClassTag](
+      preAggregator: ReduceFunction[T],
+      function: WindowFunction[T, R, K, W]): DataStream[R] = {
+    javaStream.apply(clean(preAggregator), clean(function), 
implicitly[TypeInformation[R]])
+  }
+
+  /**
+   * Applies the given window function to each window. The window function is 
called for each
+   * evaluation of the window for each key individually. The output of the 
window function is
+   * interpreted as a regular non-windowed stream.
+   *
+   * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+   *
+   * @param preAggregator The reduce function that is used for pre-aggregation
+   * @param function The window function.
+   * @return The data stream that is the result of applying the window 
function to the window.
+   */
+  def apply[R: TypeInformation: ClassTag](
+      preAggregator: (T, T) => T,
+      function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
+    if (function == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    if (function == null) {
+      throw new NullPointerException("WindowApply function must not be null.")
+    }
+
+    val cleanReducer = clean(preAggregator)
+    val reducer = new ReduceFunction[T] {
+      def reduce(v1: T, v2: T) = { cleanReducer(v1, v2) }
+    }
+
+    val cleanApply = clean(function)
+    val applyFunction = new WindowFunction[T, R, K, W] {
+      def apply(key: K, window: W, elements: java.lang.Iterable[T], out: 
Collector[R]): Unit = {
+        cleanApply(key, window, elements.asScala, out)
+      }
+    }
+    javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])
+  }
+
   // ------------------------------------------------------------------------
   //  Aggregations on the keyed windows
   // ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index 99fcd07..7da7bc3 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -22,8 +22,9 @@ package org.apache.flink.streaming.api.scala
 import java.util.concurrent.TimeUnit
 
 import org.apache.flink.api.common.functions.RichReduceFunction
+import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
+import org.apache.flink.streaming.api.functions.windowing.{WindowFunction, 
AllWindowFunction}
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import 
org.apache.flink.streaming.api.windowing.assigners.{TumblingTimeWindows, 
SlidingTimeWindows}
 import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, 
TimeEvictor}
@@ -111,7 +112,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
     assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
     val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
-    assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     assertTrue(
       
winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
@@ -134,7 +135,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
     assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
     val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
-    assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
     
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
   }
@@ -161,7 +162,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
     assertTrue(operator1.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]])
     val winOperator1 = 
operator1.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
-    
assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger])
     assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
     assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     
assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
@@ -185,11 +186,72 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
     assertTrue(operator2.isInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]])
     val winOperator2 = 
operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
-    assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
     
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
     
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
   }
+
+  @Test
+  def testPreReduce(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val reducer = new DummyReducer
+
+    val window1 = source
+      .keyBy(0)
+      .window(SlidingTimeWindows.of(
+        Time.of(1, TimeUnit.SECONDS),
+        Time.of(100, TimeUnit.MILLISECONDS)))
+      .trigger(CountTrigger.of(100))
+      .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, 
TimeWindow]() {
+        def apply(
+                   tuple: Tuple,
+                   window: TimeWindow,
+                   values: java.lang.Iterable[(String, Int)],
+                   out: Collector[(String, Int)]) { }
+      })
+
+    val transform1 = window1.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator1 = transform1.getOperator
+
+    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
+    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
+    assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
+    assertTrue(
+      
winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+
+
+    val window2 = source
+      .keyBy(0)
+      .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+      .trigger(CountTrigger.of(100))
+      .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, 
TimeWindow]() {
+        def apply(
+                   tuple: Tuple,
+                   window: TimeWindow,
+                   values: java.lang.Iterable[(String, Int)],
+                   out: Collector[(String, Int)]) { }
+      })
+
+    val transform2 = window2.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator2 = transform2.getOperator
+
+    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
+    val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
+    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
+    
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
+    assertTrue(
+      
winOperator2.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+  }
+
 }
 
 // ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/44422697/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index 65f978c..46981ab 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -108,7 +108,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
     assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
     val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
-    assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     assertTrue(
       
winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
@@ -133,7 +133,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
     assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
     val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
-    assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
     
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
   }
@@ -161,7 +161,7 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
     assertTrue(operator1.isInstanceOf[EvictingWindowOperator[_, _, _, _]])
     val winOperator1 = operator1.asInstanceOf[EvictingWindowOperator[_, _, _, 
_]]
-    
assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[ProcessingTimeTrigger])
+    assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger])
     assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
     assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     
assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
@@ -187,9 +187,69 @@ class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
     assertTrue(operator2.isInstanceOf[EvictingWindowOperator[_, _, _, _]])
     val winOperator2 = operator2.asInstanceOf[EvictingWindowOperator[_, _, _, 
_]]
-    assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
     
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
     
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
   }
+
+  @Test
+  def testPreReduce(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val reducer = new DummyReducer
+
+    val window1 = source
+      .keyBy(0)
+      .window(SlidingTimeWindows.of(
+        Time.of(1, TimeUnit.SECONDS),
+        Time.of(100, TimeUnit.MILLISECONDS)))
+      .trigger(CountTrigger.of(100))
+      .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, 
TimeWindow]() {
+        def apply(
+                   tuple: Tuple,
+                   window: TimeWindow,
+                   values: java.lang.Iterable[(String, Int)],
+                   out: Collector[(String, Int)]) { }
+      })
+
+    val transform1 = window1.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator1 = transform1.getOperator
+
+    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
+    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
+    assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
+    assertTrue(
+      
winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+
+
+    val window2 = source
+      .keyBy(0)
+      .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+      .trigger(CountTrigger.of(100))
+      .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, 
TimeWindow]() {
+        def apply(
+                   tuple: Tuple,
+                   window: TimeWindow,
+                   values: java.lang.Iterable[(String, Int)],
+                   out: Collector[(String, Int)]) { }
+      })
+
+    val transform2 = window2.getJavaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator2 = transform2.getOperator
+
+    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
+    val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
+    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
+    
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
+    assertTrue(
+      
winOperator2.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
+  }
 }

Reply via email to