http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
index 448f95f..00d4722 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
@@ -21,7 +21,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 @Internal
@@ -31,12 +30,6 @@ public class CoStreamMap<IN1, IN2, OUT>
 
        private static final long serialVersionUID = 1L;
 
-       // We keep track of watermarks from both inputs, the combined input is 
the minimum
-       // Once the minimum advances we emit a new watermark for downstream 
operators
-       private long combinedWatermark = Long.MIN_VALUE;
-       private long input1Watermark = Long.MIN_VALUE;
-       private long input2Watermark = Long.MIN_VALUE;
-
        public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) {
                super(mapper);
        }
@@ -50,24 +43,4 @@ public class CoStreamMap<IN1, IN2, OUT>
        public void processElement2(StreamRecord<IN2> element) throws Exception 
{
                
output.collect(element.replace(userFunction.map2(element.getValue())));
        }
-
-       @Override
-       public void processWatermark1(Watermark mark) throws Exception {
-               input1Watermark = mark.getTimestamp();
-               long newMin = Math.min(input1Watermark, input2Watermark);
-               if (newMin > combinedWatermark) {
-                       combinedWatermark = newMin;
-                       output.emitWatermark(new Watermark(combinedWatermark));
-               }
-       }
-
-       @Override
-       public void processWatermark2(Watermark mark) throws Exception {
-               input2Watermark = mark.getTimestamp();
-               long newMin = Math.min(input1Watermark, input2Watermark);
-               if (newMin > combinedWatermark) {
-                       combinedWatermark = newMin;
-                       output.emitWatermark(new Watermark(combinedWatermark));
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
new file mode 100644
index 0000000..df2320f
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+@Internal
+public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT>
+               extends AbstractUdfStreamOperator<OUT, 
TimelyCoFlatMapFunction<IN1, IN2, OUT>>
+               implements TwoInputStreamOperator<IN1, IN2, OUT>, 
Triggerable<K, VoidNamespace> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final TypeSerializer<K> keySerializer;
+
+       private transient TimestampedCollector<OUT> collector;
+
+       private transient TimerService timerService;
+
+       public CoStreamTimelyFlatMap(
+                       TypeSerializer<K> keySerializer,
+                       TimelyCoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
+               super(flatMapper);
+
+               this.keySerializer = keySerializer;
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               collector = new TimestampedCollector<>(output);
+
+               InternalTimerService<VoidNamespace> internalTimerService =
+                               getInternalTimerService("user-timers", 
keySerializer, VoidNamespaceSerializer.INSTANCE, this);
+
+               this.timerService = new 
SimpleTimerService(internalTimerService);
+       }
+
+       @Override
+       public void processElement1(StreamRecord<IN1> element) throws Exception 
{
+               collector.setTimestamp(element);
+               userFunction.flatMap1(element.getValue(), timerService, 
collector);
+
+       }
+
+       @Override
+       public void processElement2(StreamRecord<IN2> element) throws Exception 
{
+               collector.setTimestamp(element);
+               userFunction.flatMap2(element.getValue(), timerService, 
collector);
+       }
+
+       @Override
+       public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
+               collector.setAbsoluteTimestamp(timer.getTimestamp());
+               userFunction.onTimer(timer.getTimestamp(), 
TimeDomain.EVENT_TIME, timerService, collector);
+       }
+
+       @Override
+       public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) 
throws Exception {
+               collector.setAbsoluteTimestamp(timer.getTimestamp());
+               userFunction.onTimer(timer.getTimestamp(), 
TimeDomain.PROCESSING_TIME, timerService, collector);
+       }
+
+       protected TimestampedCollector<OUT> getCollector() {
+               return collector;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index b5500b7..36492d7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -30,7 +30,6 @@ import 
org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
@@ -76,6 +75,7 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
 
        @Override
        public void open() throws Exception {
+               super.open();
                committer.setOperatorId(id);
                
committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
                committer.open();
@@ -113,6 +113,7 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
        public void snapshotState(FSDataOutputStream out,
                        long checkpointId,
                        long timestamp) throws Exception {
+               super.snapshotState(out, checkpointId, timestamp);
 
                saveHandleInState(checkpointId, timestamp);
 
@@ -121,6 +122,7 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
 
        @Override
        public void restoreState(FSDataInputStream in) throws Exception {
+               super.restoreState(in);
 
                this.state = InstantiationUtil.deserializeObject(in, 
getUserCodeClassloader());
        }
@@ -203,11 +205,6 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
                serializer.serialize(value, new 
DataOutputViewStreamWrapper(out));
        }
 
-       @Override
-       public void processWatermark(Watermark mark) throws Exception {
-               //don't do anything, since we are a sink
-       }
-
        /**
         * This state is used to keep a list of all StateHandles (essentially 
references to past OperatorStates) that were
         * used since the last completed checkpoint.

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index d331d4d..2a77c0a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -32,7 +32,6 @@ import org.apache.flink.util.MathUtils;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -208,11 +207,6 @@ public abstract class 
AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
        }
 
        @Override
-       public void processWatermark(Watermark mark) {
-               // this operator does not react to watermarks
-       }
-
-       @Override
        public void trigger(long timestamp) throws Exception {
                // first we check if we actually trigger the window function
                if (timestamp == nextEvaluationTime) {

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index 79ef4c6..a252ece 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -66,7 +66,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
                        // optimized path for single pane case (tumbling window)
                        for (KeyMap.Entry<Key, ArrayList<Type>> entry : 
latestPane) {
                                Key key = entry.getKey();
-                               operator.setKeyContext(key);
+                               operator.setCurrentKey(key);
                                function.apply(entry.getKey(), window, 
entry.getValue(), out);
                        }
                }
@@ -122,7 +122,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> 
extends AbstractKeyed
 
                @Override
                public void keyDone() throws Exception {
-                       contextOperator.setKeyContext(currentKey);
+                       contextOperator.setCurrentKey(currentKey);
                        function.apply(currentKey, window, unionIterator, out);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
index dfa357e..84686a7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
@@ -93,7 +93,7 @@ public class AggregatingKeyedTimePanes<Type, Key> extends 
AbstractKeyedTimePanes
                @Override
                public void startNewKey(Key key) {
                        currentValue = null;
-                       operator.setKeyContext(key);
+                       operator.setCurrentKey(key);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 141b5b8..2f4dbde 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.operators.InternalTimer;
 import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
@@ -204,110 +204,82 @@ public class EvictingWindowOperator<K, IN, OUT, W 
extends Window> extends Window
        }
 
        @Override
-       public void processWatermark(Watermark mark) throws Exception {
-               boolean fire;
-               do {
-                       Timer<K, W> timer = watermarkTimersQueue.peek();
-                       if (timer != null && timer.timestamp <= 
mark.getTimestamp()) {
-                               fire = true;
-
-                               watermarkTimers.remove(timer);
-                               watermarkTimersQueue.remove();
-
-                               context.key = timer.key;
-                               context.window = timer.window;
-                               setKeyContext(timer.key);
-
-                               ListState<StreamRecord<IN>> windowState;
-                               MergingWindowSet<W> mergingWindows = null;
-
-                               if (windowAssigner instanceof 
MergingWindowAssigner) {
-                                       mergingWindows = getMergingWindowSet();
-                                       W stateWindow = 
mergingWindows.getStateWindow(context.window);
-                                       if (stateWindow == null) {
-                                               // then the window is already 
purged and this is a cleanup
-                                               // timer set due to allowed 
lateness that has nothing to clean,
-                                               // so it is safe to just ignore
-                                               continue;
-                                       }
-                                       windowState = 
getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
-                               } else {
-                                       windowState = 
getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
-                               }
+       public void onEventTime(InternalTimer<K, W> timer) throws Exception {
 
-                               Iterable<StreamRecord<IN>> contents = 
windowState.get();
-                               if (contents == null) {
-                                       // if we have no state, there is 
nothing to do
-                                       continue;
-                               }
-
-                               TriggerResult triggerResult = 
context.onEventTime(timer.timestamp);
-                               if (triggerResult.isFire()) {
-                                       fire(context.window, contents);
-                               }
+               context.key = timer.getKey();
+               context.window = timer.getNamespace();
 
-                               if (triggerResult.isPurge() || 
(windowAssigner.isEventTime() && isCleanupTime(context.window, 
timer.timestamp))) {
-                                       cleanup(context.window, windowState, 
mergingWindows);
-                               }
+               ListState<StreamRecord<IN>> windowState;
+               MergingWindowSet<W> mergingWindows = null;
 
-                       } else {
-                               fire = false;
+               if (windowAssigner instanceof MergingWindowAssigner) {
+                       mergingWindows = getMergingWindowSet();
+                       W stateWindow = 
mergingWindows.getStateWindow(context.window);
+                       if (stateWindow == null) {
+                               // then the window is already purged and this 
is a cleanup
+                               // timer set due to allowed lateness that has 
nothing to clean,
+                               // so it is safe to just ignore
+                               return;
                        }
-               } while (fire);
+                       windowState = getPartitionedState(stateWindow, 
windowSerializer, windowStateDescriptor);
+               } else {
+                       windowState = getPartitionedState(
+                                       context.window,
+                                       windowSerializer,
+                                       windowStateDescriptor);
+               }
+
+               Iterable<StreamRecord<IN>> contents = windowState.get();
+               if (contents == null) {
+                       // if we have no state, there is nothing to do
+                       return;
+               }
 
-               output.emitWatermark(mark);
+               TriggerResult triggerResult = 
context.onEventTime(timer.getTimestamp());
+               if (triggerResult.isFire()) {
+                       fire(context.window, contents);
+               }
 
-               this.currentWatermark = mark.getTimestamp();
+               if (triggerResult.isPurge() || (windowAssigner.isEventTime() && 
isCleanupTime(context.window, timer.getTimestamp()))) {
+                       cleanup(context.window, windowState, mergingWindows);
+               }
        }
 
        @Override
-       public void trigger(long time) throws Exception {
-               Timer<K, W> timer;
-
-               while ((timer = processingTimeTimersQueue.peek()) != null && 
timer.timestamp <= time) {
-
-                       processingTimeTimers.remove(timer);
-                       processingTimeTimersQueue.remove();
-
-                       context.key = timer.key;
-                       context.window = timer.window;
-                       setKeyContext(timer.key);
+       public void onProcessingTime(InternalTimer<K, W> timer) throws 
Exception {
+               context.key = timer.getKey();
+               context.window = timer.getNamespace();
 
-                       ListState<StreamRecord<IN>> windowState;
-                       MergingWindowSet<W> mergingWindows = null;
+               ListState<StreamRecord<IN>> windowState;
+               MergingWindowSet<W> mergingWindows = null;
 
-                       if (windowAssigner instanceof MergingWindowAssigner) {
-                               mergingWindows = getMergingWindowSet();
-                               W stateWindow = 
mergingWindows.getStateWindow(context.window);
-                               if (stateWindow == null) {
-                                       // then the window is already purged 
and this is a cleanup
-                                       // timer set due to allowed lateness 
that has nothing to clean,
-                                       // so it is safe to just ignore
-                                       continue;
-                               }
-                               windowState = getPartitionedState(stateWindow, 
windowSerializer, windowStateDescriptor);
-                       } else {
-                               windowState = 
getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
-                       }
-
-                       Iterable<StreamRecord<IN>> contents = windowState.get();
-                       if (contents == null) {
-                               // if we have no state, there is nothing to do
-                               continue;
+               if (windowAssigner instanceof MergingWindowAssigner) {
+                       mergingWindows = getMergingWindowSet();
+                       W stateWindow = 
mergingWindows.getStateWindow(context.window);
+                       if (stateWindow == null) {
+                               // then the window is already purged and this 
is a cleanup
+                               // timer set due to allowed lateness that has 
nothing to clean,
+                               // so it is safe to just ignore
+                               return;
                        }
+                       windowState = getPartitionedState(stateWindow, 
windowSerializer, windowStateDescriptor);
+               } else {
+                       windowState = getPartitionedState(context.window, 
windowSerializer, windowStateDescriptor);
+               }
 
-                       TriggerResult triggerResult = 
context.onProcessingTime(timer.timestamp);
-                       if (triggerResult.isFire()) {
-                               fire(context.window, contents);
-                       }
+               Iterable<StreamRecord<IN>> contents = windowState.get();
+               if (contents == null) {
+                       // if we have no state, there is nothing to do
+                       return;
+               }
 
-                       if (triggerResult.isPurge() || 
(!windowAssigner.isEventTime() && isCleanupTime(context.window, 
timer.timestamp))) {
-                               cleanup(context.window, windowState, 
mergingWindows);
-                       }
+               TriggerResult triggerResult = 
context.onProcessingTime(timer.getTimestamp());
+               if (triggerResult.isFire()) {
+                       fire(context.window, contents);
                }
 
-               if (timer != null) {
-                       nextTimer = 
getProcessingTimeService().registerTimer(timer.timestamp, this);
+               if (triggerResult.isPurge() || (!windowAssigner.isEventTime() 
&& isCleanupTime(context.window, timer.getTimestamp()))) {
+                       cleanup(context.window, windowState, mergingWindows);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 459c679..bc37692 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -37,36 +37,28 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.concurrent.ScheduledFuture;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -96,7 +88,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class WindowOperator<K, IN, ACC, OUT, W extends Window>
        extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, 
K, W>>
-       implements OneInputStreamOperator<IN, OUT>, Triggerable {
+       implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {
 
        private static final long serialVersionUID = 1L;
 
@@ -141,14 +133,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
         */
        protected transient TimestampedCollector<OUT> timestampedCollector;
 
-       protected transient ScheduledFuture<?> nextTimer;
-
-       /**
-        * To keep track of the current watermark so that we can immediately 
fire if a trigger
-        * registers an event time callback for a timestamp that lies in the 
past.
-        */
-       protected long currentWatermark = Long.MIN_VALUE;
-
        protected transient Context context = new Context(null, null);
 
        protected transient WindowAssigner.WindowAssignerContext 
windowAssignerContext;
@@ -157,17 +141,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        // State that needs to be checkpointed
        // 
------------------------------------------------------------------------
 
-       /**
-        * Processing time timers that are currently in-flight.
-        */
-       protected transient PriorityQueue<Timer<K, W>> 
processingTimeTimersQueue;
-       protected transient Set<Timer<K, W>> processingTimeTimers;
-
-       /**
-        * Current waiting watermark callbacks.
-        */
-       protected transient Set<Timer<K, W>> watermarkTimers;
-       protected transient PriorityQueue<Timer<K, W>> watermarkTimersQueue;
+       private transient InternalTimerService<W> internalTimerService;
 
        protected transient Map<K, MergingWindowSet<W>> mergingWindowsByKey;
 
@@ -208,49 +182,27 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
                timestampedCollector = new TimestampedCollector<>(output);
 
-               // these could already be initialized from restoreState()
-               if (watermarkTimers == null) {
-                       watermarkTimers = new HashSet<>();
-                       watermarkTimersQueue = new PriorityQueue<>(100);
-               }
-               if (processingTimeTimers == null) {
-                       processingTimeTimers = new HashSet<>();
-                       processingTimeTimersQueue = new PriorityQueue<>(100);
-               }
+               internalTimerService =
+                               getInternalTimerService("window-timers", 
keySerializer, windowSerializer, this);
 
                context = new Context(null, null);
 
                windowAssignerContext = new 
WindowAssigner.WindowAssignerContext() {
                        @Override
                        public long getCurrentProcessingTime() {
-                               return 
WindowOperator.this.getProcessingTimeService().getCurrentProcessingTime();
+                               return 
internalTimerService.currentProcessingTime();
                        }
                };
 
                if (windowAssigner instanceof MergingWindowAssigner) {
                        mergingWindowsByKey = new HashMap<>();
                }
-
-               // re-register the restored timers (if any)
-               if (processingTimeTimersQueue.size() > 0) {
-                       nextTimer = 
getProcessingTimeService().registerTimer(processingTimeTimersQueue.peek().timestamp,
 this);
-               }
        }
 
        @Override
        public final void close() throws Exception {
                super.close();
-
-               if (nextTimer != null) {
-                       nextTimer.cancel(false);
-                       nextTimer = null;
-               }
-
                timestampedCollector = null;
-               watermarkTimers = null;
-               watermarkTimersQueue = null;
-               processingTimeTimers = null;
-               processingTimeTimersQueue = null;
                context = null;
                windowAssignerContext = null;
                mergingWindowsByKey = null;
@@ -259,17 +211,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        @Override
        public void dispose() throws Exception {
                super.dispose();
-
-               if (nextTimer != null) {
-                       nextTimer.cancel(false);
-                       nextTimer = null;
-               }
-
                timestampedCollector = null;
-               watermarkTimers = null;
-               watermarkTimersQueue = null;
-               processingTimeTimers = null;
-               processingTimeTimersQueue = null;
                context = null;
                windowAssignerContext = null;
                mergingWindowsByKey = null;
@@ -392,110 +334,81 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        }
 
        @Override
-       public void processWatermark(Watermark mark) throws Exception {
-               boolean fire;
-               do {
-                       Timer<K, W> timer = watermarkTimersQueue.peek();
-                       if (timer != null && timer.timestamp <= 
mark.getTimestamp()) {
-                               fire = true;
-
-                               watermarkTimers.remove(timer);
-                               watermarkTimersQueue.remove();
-
-                               context.key = timer.key;
-                               context.window = timer.window;
-                               setKeyContext(timer.key);
-
-                               AppendingState<IN, ACC> windowState;
-                               MergingWindowSet<W> mergingWindows = null;
-
-                               if (windowAssigner instanceof 
MergingWindowAssigner) {
-                                       mergingWindows = getMergingWindowSet();
-                                       W stateWindow = 
mergingWindows.getStateWindow(context.window);
-                                       if (stateWindow == null) {
-                                               // then the window is already 
purged and this is a cleanup
-                                               // timer set due to allowed 
lateness that has nothing to clean,
-                                               // so it is safe to just ignore
-                                               continue;
-                                       }
-                                       windowState = 
getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
-                               } else {
-                                       windowState = 
getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
-                               }
-
-                               ACC contents = windowState.get();
-                               if (contents == null) {
-                                       // if we have no state, there is 
nothing to do
-                                       continue;
-                               }
-
-                               TriggerResult triggerResult = 
context.onEventTime(timer.timestamp);
-                               if (triggerResult.isFire()) {
-                                       fire(context.window, contents);
-                               }
+       public void onEventTime(InternalTimer<K, W> timer) throws Exception {
+               context.key = timer.getKey();
+               context.window = timer.getNamespace();
 
-                               if (triggerResult.isPurge() || 
(windowAssigner.isEventTime() && isCleanupTime(context.window, 
timer.timestamp))) {
-                                       cleanup(context.window, windowState, 
mergingWindows);
-                               }
+               AppendingState<IN, ACC> windowState;
+               MergingWindowSet<W> mergingWindows = null;
 
-                       } else {
-                               fire = false;
+               if (windowAssigner instanceof MergingWindowAssigner) {
+                       mergingWindows = getMergingWindowSet();
+                       W stateWindow = 
mergingWindows.getStateWindow(context.window);
+                       if (stateWindow == null) {
+                               // then the window is already purged and this 
is a cleanup
+                               // timer set due to allowed lateness that has 
nothing to clean,
+                               // so it is safe to just ignore
+                               return;
                        }
-               } while (fire);
+                       windowState = getPartitionedState(stateWindow, 
windowSerializer, windowStateDescriptor);
+               } else {
+                       windowState = getPartitionedState(
+                                       context.window,
+                                       windowSerializer,
+                                       windowStateDescriptor);
+               }
+
+               ACC contents = windowState.get();
+               if (contents == null) {
+                       // if we have no state, there is nothing to do
+                       return;
+               }
 
-               output.emitWatermark(mark);
+               TriggerResult triggerResult = 
context.onEventTime(timer.getTimestamp());
+               if (triggerResult.isFire()) {
+                       fire(context.window, contents);
+               }
 
-               this.currentWatermark = mark.getTimestamp();
+               if (triggerResult.isPurge() || (windowAssigner.isEventTime() && 
isCleanupTime(context.window, timer.getTimestamp()))) {
+                       cleanup(context.window, windowState, mergingWindows);
+               }
        }
 
        @Override
-       public void trigger(long time) throws Exception {
-               Timer<K, W> timer;
-
-               while ((timer = processingTimeTimersQueue.peek()) != null && 
timer.timestamp <= time) {
-
-                       processingTimeTimers.remove(timer);
-                       processingTimeTimersQueue.remove();
-
-                       context.key = timer.key;
-                       context.window = timer.window;
-                       setKeyContext(timer.key);
-
-                       AppendingState<IN, ACC> windowState;
-                       MergingWindowSet<W> mergingWindows = null;
+       public void onProcessingTime(InternalTimer<K, W> timer) throws 
Exception {
+               context.key = timer.getKey();
+               context.window = timer.getNamespace();
 
-                       if (windowAssigner instanceof MergingWindowAssigner) {
-                               mergingWindows = getMergingWindowSet();
-                               W stateWindow = 
mergingWindows.getStateWindow(context.window);
-                               if (stateWindow == null) {
-                                       // then the window is already purged 
and this is a cleanup
-                                       // timer set due to allowed lateness 
that has nothing to clean,
-                                       // so it is safe to just ignore
-                                       continue;
-                               }
-                               windowState = getPartitionedState(stateWindow, 
windowSerializer, windowStateDescriptor);
-                       } else {
-                               windowState = 
getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
-                       }
+               AppendingState<IN, ACC> windowState;
+               MergingWindowSet<W> mergingWindows = null;
 
-                       ACC contents = windowState.get();
-                       if (contents == null) {
-                               // if we have no state, there is nothing to do
-                               continue;
+               if (windowAssigner instanceof MergingWindowAssigner) {
+                       mergingWindows = getMergingWindowSet();
+                       W stateWindow = 
mergingWindows.getStateWindow(context.window);
+                       if (stateWindow == null) {
+                               // then the window is already purged and this 
is a cleanup
+                               // timer set due to allowed lateness that has 
nothing to clean,
+                               // so it is safe to just ignore
+                               return;
                        }
+                       windowState = getPartitionedState(stateWindow, 
windowSerializer, windowStateDescriptor);
+               } else {
+                       windowState = getPartitionedState(context.window, 
windowSerializer, windowStateDescriptor);
+               }
 
-                       TriggerResult triggerResult = 
context.onProcessingTime(timer.timestamp);
-                       if (triggerResult.isFire()) {
-                               fire(context.window, contents);
-                       }
+               ACC contents = windowState.get();
+               if (contents == null) {
+                       // if we have no state, there is nothing to do
+                       return;
+               }
 
-                       if (triggerResult.isPurge() || 
(!windowAssigner.isEventTime() && isCleanupTime(context.window, 
timer.timestamp))) {
-                               cleanup(context.window, windowState, 
mergingWindows);
-                       }
+               TriggerResult triggerResult = 
context.onProcessingTime(timer.getTimestamp());
+               if (triggerResult.isFire()) {
+                       fire(context.window, contents);
                }
 
-               if (timer != null) {
-                       nextTimer = 
getProcessingTimeService().registerTimer(timer.timestamp, this);
+               if (triggerResult.isPurge() || (!windowAssigner.isEventTime() 
&& isCleanupTime(context.window, timer.getTimestamp()))) {
+                       cleanup(context.window, windowState, mergingWindows);
                }
        }
 
@@ -555,7 +468,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
         *                                      considered when triggering.
         */
        protected boolean isLate(W window) {
-               return (windowAssigner.isEventTime() && (cleanupTime(window) <= 
currentWatermark));
+               return (windowAssigner.isEventTime() && (cleanupTime(window) <= 
internalTimerService.currentWatermark()));
        }
 
        /**
@@ -638,7 +551,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                }
 
                public long getCurrentWatermark() {
-                       return currentWatermark;
+                       return internalTimerService.currentWatermark();
                }
 
                @Override
@@ -697,54 +610,27 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
                @Override
                public long getCurrentProcessingTime() {
-                       return 
WindowOperator.this.getProcessingTimeService().getCurrentProcessingTime();
+                       return internalTimerService.currentProcessingTime();
                }
 
                @Override
                public void registerProcessingTimeTimer(long time) {
-                       Timer<K, W> timer = new Timer<>(time, key, window);
-
-                       // make sure we only put one timer per key into the 
queue
-                       if (processingTimeTimers.add(timer)) {
-
-                               Timer<K, W> oldHead = 
processingTimeTimersQueue.peek();
-                               long nextTriggerTime = oldHead != null ? 
oldHead.timestamp : Long.MAX_VALUE; 
-
-                               processingTimeTimersQueue.add(timer);
-
-                               // check if we need to re-schedule our timer to 
earlier
-                               if (time < nextTriggerTime) {
-                                       if (nextTimer != null) {
-                                               nextTimer.cancel(false);
-                                       }
-                                       nextTimer = 
getProcessingTimeService().registerTimer(time, WindowOperator.this);
-                               }
-                       }
+                       
internalTimerService.registerProcessingTimeTimer(window, time);
                }
 
                @Override
                public void registerEventTimeTimer(long time) {
-                       Timer<K, W> timer = new Timer<>(time, key, window);
-                       if (watermarkTimers.add(timer)) {
-                               watermarkTimersQueue.add(timer);
-                       }
+                       internalTimerService.registerEventTimeTimer(window, 
time);
                }
 
                @Override
                public void deleteProcessingTimeTimer(long time) {
-                       Timer<K, W> timer = new Timer<>(time, key, window);
-
-                       if (processingTimeTimers.remove(timer)) {
-                               processingTimeTimersQueue.remove(timer);
-                       }
+                       internalTimerService.deleteProcessingTimeTimer(window, 
time);
                }
 
                @Override
                public void deleteEventTimeTimer(long time) {
-                       Timer<K, W> timer = new Timer<>(time, key, window);
-                       if (watermarkTimers.remove(timer)) {
-                               watermarkTimersQueue.remove(timer);
-                       }
+                       internalTimerService.deleteEventTimeTimer(window, time);
                }
 
                public TriggerResult onElement(StreamRecord<IN> element) throws 
Exception {
@@ -843,67 +729,21 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                        TupleSerializer<Tuple2<W, W>> tupleSerializer = new 
TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, 
windowSerializer} );
                        ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor 
= new ListStateDescriptor<>("merging-window-set", tupleSerializer);
                        for (Map.Entry<K, MergingWindowSet<W>> key: 
mergingWindowsByKey.entrySet()) {
-                               setKeyContext(key.getKey());
+                               setCurrentKey(key.getKey());
                                ListState<Tuple2<W, W>> mergeState = 
getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, 
mergeStateDescriptor);
                                mergeState.clear();
                                key.getValue().persist(mergeState);
                        }
                }
 
-               snapshotTimers(new DataOutputViewStreamWrapper(out));
-
                super.snapshotState(out, checkpointId, timestamp);
        }
 
        @Override
        public void restoreState(FSDataInputStream in) throws Exception {
-               restoreTimers(new DataInputViewStreamWrapper(in));
-
                super.restoreState(in);
        }
 
-       private void restoreTimers(DataInputView in ) throws IOException {
-               int numWatermarkTimers = in.readInt();
-               watermarkTimers = new HashSet<>(numWatermarkTimers);
-               watermarkTimersQueue = new 
PriorityQueue<>(Math.max(numWatermarkTimers, 1));
-               for (int i = 0; i < numWatermarkTimers; i++) {
-                       K key = keySerializer.deserialize(in);
-                       W window = windowSerializer.deserialize(in);
-                       long timestamp = in.readLong();
-                       Timer<K, W> timer = new Timer<>(timestamp, key, window);
-                       watermarkTimers.add(timer);
-                       watermarkTimersQueue.add(timer);
-               }
-
-               int numProcessingTimeTimers = in.readInt();
-               processingTimeTimersQueue = new 
PriorityQueue<>(Math.max(numProcessingTimeTimers, 1));
-               processingTimeTimers = new HashSet<>();
-               for (int i = 0; i < numProcessingTimeTimers; i++) {
-                       K key = keySerializer.deserialize(in);
-                       W window = windowSerializer.deserialize(in);
-                       long timestamp = in.readLong();
-                       Timer<K, W> timer = new Timer<>(timestamp, key, window);
-                       processingTimeTimersQueue.add(timer);
-                       processingTimeTimers.add(timer);
-               }
-       }
-
-       private void snapshotTimers(DataOutputView out) throws IOException {
-               out.writeInt(watermarkTimersQueue.size());
-               for (Timer<K, W> timer : watermarkTimersQueue) {
-                       keySerializer.serialize(timer.key, out);
-                       windowSerializer.serialize(timer.window, out);
-                       out.writeLong(timer.timestamp);
-               }
-
-               out.writeInt(processingTimeTimers.size());
-               for (Timer<K,W> timer : processingTimeTimers) {
-                       keySerializer.serialize(timer.key, out);
-                       windowSerializer.serialize(timer.window, out);
-                       out.writeLong(timer.timestamp);
-               }
-       }
-
        // 
------------------------------------------------------------------------
        // Getters for testing
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index d2bf133..d0a2ea9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -20,9 +20,11 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutionException;
@@ -42,7 +44,7 @@ public class TestProcessingTimeService extends 
ProcessingTimeService {
        private volatile boolean isQuiesced;
 
        // sorts the timers by timestamp so that they are processed in the 
correct order.
-       private final Map<Long, List<Triggerable>> registeredTasks = new 
TreeMap<>();
+       private final Map<Long, List<ScheduledTimerFuture>> registeredTasks = 
new TreeMap<>();
 
        
        public void setCurrentTime(long timestamp) throws Exception {
@@ -53,10 +55,10 @@ public class TestProcessingTimeService extends 
ProcessingTimeService {
                        // we do not fire them here to be able to accommodate 
timers
                        // that register other timers.
        
-                       Iterator<Map.Entry<Long, List<Triggerable>>> it = 
registeredTasks.entrySet().iterator();
-                       List<Map.Entry<Long, List<Triggerable>>> toRun = new 
ArrayList<>();
+                       Iterator<Map.Entry<Long, List<ScheduledTimerFuture>>> 
it = registeredTasks.entrySet().iterator();
+                       List<Map.Entry<Long, List<ScheduledTimerFuture>>> toRun 
= new ArrayList<>();
                        while (it.hasNext()) {
-                               Map.Entry<Long, List<Triggerable>> t = 
it.next();
+                               Map.Entry<Long, List<ScheduledTimerFuture>> t = 
it.next();
                                if (t.getKey() <= this.currentTime) {
                                        toRun.add(t);
                                        it.remove();
@@ -64,10 +66,10 @@ public class TestProcessingTimeService extends 
ProcessingTimeService {
                        }
        
                        // now do the actual firing.
-                       for (Map.Entry<Long, List<Triggerable>> tasks: toRun) {
+                       for (Map.Entry<Long, List<ScheduledTimerFuture>> tasks: 
toRun) {
                                long now = tasks.getKey();
-                               for (Triggerable task: tasks.getValue()) {
-                                       task.trigger(now);
+                               for (ScheduledTimerFuture task: 
tasks.getValue()) {
+                                       task.getTriggerable().trigger(now);
                                }
                        }
                }
@@ -84,7 +86,7 @@ public class TestProcessingTimeService extends 
ProcessingTimeService {
                        throw new IllegalStateException("terminated");
                }
                if (isQuiesced) {
-                       return new DummyFuture();
+                       return new ScheduledTimerFuture(null, -1);
                }
 
                if (timestamp <= currentTime) {
@@ -94,14 +96,17 @@ public class TestProcessingTimeService extends 
ProcessingTimeService {
                                throw new RuntimeException(e);
                        }
                }
-               List<Triggerable> tasks = registeredTasks.get(timestamp);
+
+               ScheduledTimerFuture result = new ScheduledTimerFuture(target, 
timestamp);
+
+               List<ScheduledTimerFuture> tasks = 
registeredTasks.get(timestamp);
                if (tasks == null) {
                        tasks = new ArrayList<>();
                        registeredTasks.put(timestamp, tasks);
                }
-               tasks.add(target);
+               tasks.add(result);
 
-               return new DummyFuture();
+               return result;
        }
 
        @Override
@@ -124,15 +129,34 @@ public class TestProcessingTimeService extends 
ProcessingTimeService {
 
        public int getNumRegisteredTimers() {
                int count = 0;
-               for (List<Triggerable> tasks: registeredTasks.values()) {
+               for (List<ScheduledTimerFuture> tasks: 
registeredTasks.values()) {
                        count += tasks.size();
                }
                return count;
        }
 
+       public Set<Long> getRegisteredTimerTimestamps() {
+               Set<Long> actualTimestamps = new HashSet<>();
+               for (List<ScheduledTimerFuture> timerFutures : 
registeredTasks.values()) {
+                       for (ScheduledTimerFuture timer : timerFutures) {
+                               actualTimestamps.add(timer.getTimestamp());
+                       }
+               }
+               return actualTimestamps;
+       }
+
        // 
------------------------------------------------------------------------
 
-       private static class DummyFuture implements ScheduledFuture<Object> {
+       private class ScheduledTimerFuture implements ScheduledFuture<Object> {
+
+               private final Triggerable triggerable;
+
+               private final long timestamp;
+
+               public ScheduledTimerFuture(Triggerable triggerable, long 
timestamp) {
+                       this.triggerable = triggerable;
+                       this.timestamp = timestamp;
+               }
 
                @Override
                public long getDelay(TimeUnit unit) {
@@ -146,6 +170,10 @@ public class TestProcessingTimeService extends 
ProcessingTimeService {
 
                @Override
                public boolean cancel(boolean mayInterruptIfRunning) {
+                       List<ScheduledTimerFuture> scheduledTimerFutures = 
registeredTasks.get(timestamp);
+                       if (scheduledTimerFutures != null) {
+                               scheduledTimerFutures.remove(this);
+                       }
                        return true;
                }
 
@@ -168,5 +196,13 @@ public class TestProcessingTimeService extends 
ProcessingTimeService {
                public Object get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
                        throw new UnsupportedOperationException();
                }
+
+               public Triggerable getTriggerable() {
+                       return triggerable;
+               }
+
+               public long getTimestamp() {
+                       return timestamp;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
new file mode 100644
index 0000000..84af997
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link HeapInternalTimerService}.
+ */
+public class HeapInternalTimerServiceTest {
+
+       private static InternalTimer<Integer, String> anyInternalTimer() {
+               return any();
+       }
+
+       /**
+        * Verify that we only ever have one processing-time task registered at 
the
+        * {@link ProcessingTimeService}.
+        */
+       @Test
+       public void testOnlySetsOnePhysicalProcessingTimeTimer() throws 
Exception {
+               @SuppressWarnings("unchecked")
+               Triggerable<Integer, String> mockTriggerable = 
mock(Triggerable.class);
+
+               TestKeyContext keyContext = new TestKeyContext();
+
+               TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+
+               HeapInternalTimerService<Integer, String> timerService =
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+
+               keyContext.setCurrentKey(0);
+
+               timerService.registerProcessingTimeTimer("ciao", 10);
+               timerService.registerProcessingTimeTimer("ciao", 20);
+               timerService.registerProcessingTimeTimer("ciao", 30);
+               timerService.registerProcessingTimeTimer("hello", 10);
+               timerService.registerProcessingTimeTimer("hello", 20);
+
+               assertEquals(5, timerService.numProcessingTimeTimers());
+               assertEquals(2, timerService.numProcessingTimeTimers("hello"));
+               assertEquals(3, timerService.numProcessingTimeTimers("ciao"));
+
+               assertEquals(1, processingTimeService.getNumRegisteredTimers());
+               
assertThat(processingTimeService.getRegisteredTimerTimestamps(), 
containsInAnyOrder(10L));
+
+               processingTimeService.setCurrentTime(10);
+
+               assertEquals(3, timerService.numProcessingTimeTimers());
+               assertEquals(1, timerService.numProcessingTimeTimers("hello"));
+               assertEquals(2, timerService.numProcessingTimeTimers("ciao"));
+
+               assertEquals(1, processingTimeService.getNumRegisteredTimers());
+               
assertThat(processingTimeService.getRegisteredTimerTimestamps(), 
containsInAnyOrder(20L));
+
+               processingTimeService.setCurrentTime(20);
+
+               assertEquals(1, timerService.numProcessingTimeTimers());
+               assertEquals(0, timerService.numProcessingTimeTimers("hello"));
+               assertEquals(1, timerService.numProcessingTimeTimers("ciao"));
+
+               assertEquals(1, processingTimeService.getNumRegisteredTimers());
+               
assertThat(processingTimeService.getRegisteredTimerTimestamps(), 
containsInAnyOrder(30L));
+
+               processingTimeService.setCurrentTime(30);
+
+               assertEquals(0, timerService.numProcessingTimeTimers());
+
+               assertEquals(0, processingTimeService.getNumRegisteredTimers());
+
+               timerService.registerProcessingTimeTimer("ciao", 40);
+
+               assertEquals(1, processingTimeService.getNumRegisteredTimers());
+       }
+
+       /**
+        * Verify that registering a processing-time timer that is earlier than 
the existing timers
+        * removes the one physical timer and creates one for the earlier 
timestamp
+        * {@link ProcessingTimeService}.
+        */
+       @Test
+       public void 
testRegisterEarlierProcessingTimerMovesPhysicalProcessingTimer() throws 
Exception {
+               @SuppressWarnings("unchecked")
+               Triggerable<Integer, String> mockTriggerable = 
mock(Triggerable.class);
+
+               TestKeyContext keyContext = new TestKeyContext();
+
+               TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+
+               HeapInternalTimerService<Integer, String> timerService =
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+
+               keyContext.setCurrentKey(0);
+
+               timerService.registerProcessingTimeTimer("ciao", 20);
+
+               assertEquals(1, timerService.numProcessingTimeTimers());
+
+               assertEquals(1, processingTimeService.getNumRegisteredTimers());
+               
assertThat(processingTimeService.getRegisteredTimerTimestamps(), 
containsInAnyOrder(20L));
+
+               timerService.registerProcessingTimeTimer("ciao", 10);
+
+               assertEquals(2, timerService.numProcessingTimeTimers());
+
+               assertEquals(1, processingTimeService.getNumRegisteredTimers());
+               
assertThat(processingTimeService.getRegisteredTimerTimestamps(), 
containsInAnyOrder(10L));
+       }
+
+       /**
+        */
+       @Test
+       public void 
testRegisteringProcessingTimeTimerInOnProcessingTimeDoesNotLeakPhysicalTimers() 
throws Exception {
+               @SuppressWarnings("unchecked")
+               Triggerable<Integer, String> mockTriggerable = 
mock(Triggerable.class);
+
+               TestKeyContext keyContext = new TestKeyContext();
+
+               TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+
+               final HeapInternalTimerService<Integer, String> timerService =
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+
+               keyContext.setCurrentKey(0);
+
+               timerService.registerProcessingTimeTimer("ciao", 10);
+
+               assertEquals(1, timerService.numProcessingTimeTimers());
+
+               assertEquals(1, processingTimeService.getNumRegisteredTimers());
+               
assertThat(processingTimeService.getRegisteredTimerTimestamps(), 
containsInAnyOrder(10L));
+
+               doAnswer(new Answer<Object>() {
+                       @Override
+                       public Object answer(InvocationOnMock invocation) 
throws Exception {
+                               
timerService.registerProcessingTimeTimer("ciao", 20);
+                               return null;
+                       }
+               }).when(mockTriggerable).onProcessingTime(anyInternalTimer());
+
+               processingTimeService.setCurrentTime(10);
+
+               assertEquals(1, processingTimeService.getNumRegisteredTimers());
+               
assertThat(processingTimeService.getRegisteredTimerTimestamps(), 
containsInAnyOrder(20L));
+
+               doAnswer(new Answer<Object>() {
+                       @Override
+                       public Object answer(InvocationOnMock invocation) 
throws Exception {
+                               
timerService.registerProcessingTimeTimer("ciao", 30);
+                               return null;
+                       }
+               }).when(mockTriggerable).onProcessingTime(anyInternalTimer());
+
+               processingTimeService.setCurrentTime(20);
+
+               assertEquals(1, timerService.numProcessingTimeTimers());
+
+               assertEquals(1, processingTimeService.getNumRegisteredTimers());
+               
assertThat(processingTimeService.getRegisteredTimerTimestamps(), 
containsInAnyOrder(30L));
+       }
+
+
+       @Test
+       public void testCurrentProcessingTime() throws Exception {
+
+               @SuppressWarnings("unchecked")
+               Triggerable<Integer, String> mockTriggerable = 
mock(Triggerable.class);
+
+               TestKeyContext keyContext = new TestKeyContext();
+               TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+               HeapInternalTimerService<Integer, String> timerService =
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+
+               processingTimeService.setCurrentTime(17L);
+               assertEquals(17, timerService.currentProcessingTime());
+
+               processingTimeService.setCurrentTime(42);
+               assertEquals(42, timerService.currentProcessingTime());
+       }
+
+       @Test
+       public void testCurrentEventTime() throws Exception {
+
+               @SuppressWarnings("unchecked")
+               Triggerable<Integer, String> mockTriggerable = 
mock(Triggerable.class);
+
+               TestKeyContext keyContext = new TestKeyContext();
+               TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+               HeapInternalTimerService<Integer, String> timerService =
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+
+               timerService.advanceWatermark(17);
+               assertEquals(17, timerService.currentWatermark());
+
+               timerService.advanceWatermark(42);
+               assertEquals(42, timerService.currentWatermark());
+       }
+
+       /**
+        * This also verifies that we don't have leakage between 
keys/namespaces.
+        */
+       @Test
+       public void testSetAndFireEventTimeTimers() throws Exception {
+               @SuppressWarnings("unchecked")
+               Triggerable<Integer, String> mockTriggerable = 
mock(Triggerable.class);
+
+               TestKeyContext keyContext = new TestKeyContext();
+               TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+               HeapInternalTimerService<Integer, String> timerService =
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+
+               keyContext.setCurrentKey(0);
+
+               timerService.registerEventTimeTimer("ciao", 10);
+               timerService.registerEventTimeTimer("hello", 10);
+
+               keyContext.setCurrentKey(1);
+
+               timerService.registerEventTimeTimer("ciao", 10);
+               timerService.registerEventTimeTimer("hello", 10);
+
+               assertEquals(4, timerService.numEventTimeTimers());
+               assertEquals(2, timerService.numEventTimeTimers("hello"));
+               assertEquals(2, timerService.numEventTimeTimers("ciao"));
+
+               timerService.advanceWatermark(10);
+
+               verify(mockTriggerable, 
times(4)).onEventTime(anyInternalTimer());
+               verify(mockTriggerable, times(1)).onEventTime(eq(new 
InternalTimer<>(10, 0, "ciao")));
+               verify(mockTriggerable, times(1)).onEventTime(eq(new 
InternalTimer<>(10, 0, "hello")));
+               verify(mockTriggerable, times(1)).onEventTime(eq(new 
InternalTimer<>(10, 1, "ciao")));
+               verify(mockTriggerable, times(1)).onEventTime(eq(new 
InternalTimer<>(10, 1, "hello")));
+
+               assertEquals(0, timerService.numEventTimeTimers());
+       }
+
+       /**
+        * This also verifies that we don't have leakage between 
keys/namespaces.
+        */
+       @Test
+       public void testSetAndFireProcessingTimeTimers() throws Exception {
+               @SuppressWarnings("unchecked")
+               Triggerable<Integer, String> mockTriggerable = 
mock(Triggerable.class);
+
+               TestKeyContext keyContext = new TestKeyContext();
+               TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+               HeapInternalTimerService<Integer, String> timerService =
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+
+               keyContext.setCurrentKey(0);
+
+               timerService.registerProcessingTimeTimer("ciao", 10);
+               timerService.registerProcessingTimeTimer("hello", 10);
+
+               keyContext.setCurrentKey(1);
+
+               timerService.registerProcessingTimeTimer("ciao", 10);
+               timerService.registerProcessingTimeTimer("hello", 10);
+
+               assertEquals(4, timerService.numProcessingTimeTimers());
+               assertEquals(2, timerService.numProcessingTimeTimers("hello"));
+               assertEquals(2, timerService.numProcessingTimeTimers("ciao"));
+
+               processingTimeService.setCurrentTime(10);
+
+               verify(mockTriggerable, 
times(4)).onProcessingTime(anyInternalTimer());
+               verify(mockTriggerable, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, 0, "ciao")));
+               verify(mockTriggerable, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, 0, "hello")));
+               verify(mockTriggerable, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, 1, "ciao")));
+               verify(mockTriggerable, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, 1, "hello")));
+
+               assertEquals(0, timerService.numProcessingTimeTimers());
+       }
+
+       /**
+        * This also verifies that we don't have leakage between 
keys/namespaces.
+        *
+        * <p>This also verifies that deleted timers don't fire.
+        */
+       @Test
+       public void testDeleteEventTimeTimers() throws Exception {
+               @SuppressWarnings("unchecked")
+               Triggerable<Integer, String> mockTriggerable = 
mock(Triggerable.class);
+
+               TestKeyContext keyContext = new TestKeyContext();
+               TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+               HeapInternalTimerService<Integer, String> timerService =
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+
+               keyContext.setCurrentKey(0);
+
+               timerService.registerEventTimeTimer("ciao", 10);
+               timerService.registerEventTimeTimer("hello", 10);
+
+               keyContext.setCurrentKey(1);
+
+               timerService.registerEventTimeTimer("ciao", 10);
+               timerService.registerEventTimeTimer("hello", 10);
+
+               assertEquals(4, timerService.numEventTimeTimers());
+               assertEquals(2, timerService.numEventTimeTimers("hello"));
+               assertEquals(2, timerService.numEventTimeTimers("ciao"));
+
+               keyContext.setCurrentKey(0);
+               timerService.deleteEventTimeTimer("hello", 10);
+
+               keyContext.setCurrentKey(1);
+               timerService.deleteEventTimeTimer("ciao", 10);
+
+               assertEquals(2, timerService.numEventTimeTimers());
+               assertEquals(1, timerService.numEventTimeTimers("hello"));
+               assertEquals(1, timerService.numEventTimeTimers("ciao"));
+
+               timerService.advanceWatermark(10);
+
+               verify(mockTriggerable, 
times(2)).onEventTime(anyInternalTimer());
+               verify(mockTriggerable, times(1)).onEventTime(eq(new 
InternalTimer<>(10, 0, "ciao")));
+               verify(mockTriggerable, times(0)).onEventTime(eq(new 
InternalTimer<>(10, 0, "hello")));
+               verify(mockTriggerable, times(0)).onEventTime(eq(new 
InternalTimer<>(10, 1, "ciao")));
+               verify(mockTriggerable, times(1)).onEventTime(eq(new 
InternalTimer<>(10, 1, "hello")));
+
+               assertEquals(0, timerService.numEventTimeTimers());
+       }
+
+       /**
+        * This also verifies that we don't have leakage between 
keys/namespaces.
+        *
+        * <p>This also verifies that deleted timers don't fire.
+        */
+       @Test
+       public void testDeleteProcessingTimeTimers() throws Exception {
+               @SuppressWarnings("unchecked")
+               Triggerable<Integer, String> mockTriggerable = 
mock(Triggerable.class);
+
+               TestKeyContext keyContext = new TestKeyContext();
+               TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+               HeapInternalTimerService<Integer, String> timerService =
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+
+               keyContext.setCurrentKey(0);
+
+               timerService.registerProcessingTimeTimer("ciao", 10);
+               timerService.registerProcessingTimeTimer("hello", 10);
+
+               keyContext.setCurrentKey(1);
+
+               timerService.registerProcessingTimeTimer("ciao", 10);
+               timerService.registerProcessingTimeTimer("hello", 10);
+
+               assertEquals(4, timerService.numProcessingTimeTimers());
+               assertEquals(2, timerService.numProcessingTimeTimers("hello"));
+               assertEquals(2, timerService.numProcessingTimeTimers("ciao"));
+
+               keyContext.setCurrentKey(0);
+               timerService.deleteProcessingTimeTimer("hello", 10);
+
+               keyContext.setCurrentKey(1);
+               timerService.deleteProcessingTimeTimer("ciao", 10);
+
+               assertEquals(2, timerService.numProcessingTimeTimers());
+               assertEquals(1, timerService.numProcessingTimeTimers("hello"));
+               assertEquals(1, timerService.numProcessingTimeTimers("ciao"));
+
+               processingTimeService.setCurrentTime(10);
+
+               verify(mockTriggerable, 
times(2)).onProcessingTime(anyInternalTimer());
+               verify(mockTriggerable, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, 0, "ciao")));
+               verify(mockTriggerable, times(0)).onProcessingTime(eq(new 
InternalTimer<>(10, 0, "hello")));
+               verify(mockTriggerable, times(0)).onProcessingTime(eq(new 
InternalTimer<>(10, 1, "ciao")));
+               verify(mockTriggerable, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, 1, "hello")));
+
+               assertEquals(0, timerService.numEventTimeTimers());
+       }
+
+       @Test
+       public void testSnapshotAndRestore() throws Exception {
+               @SuppressWarnings("unchecked")
+               Triggerable<Integer, String> mockTriggerable = 
mock(Triggerable.class);
+
+               TestKeyContext keyContext = new TestKeyContext();
+               TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+               HeapInternalTimerService<Integer, String> timerService =
+                               createTimerService(mockTriggerable, keyContext, 
processingTimeService);
+
+               keyContext.setCurrentKey(0);
+
+               timerService.registerProcessingTimeTimer("ciao", 10);
+               timerService.registerEventTimeTimer("hello", 10);
+
+               keyContext.setCurrentKey(1);
+
+               timerService.registerEventTimeTimer("ciao", 10);
+               timerService.registerProcessingTimeTimer("hello", 10);
+
+               assertEquals(2, timerService.numProcessingTimeTimers());
+               assertEquals(1, timerService.numProcessingTimeTimers("hello"));
+               assertEquals(1, timerService.numProcessingTimeTimers("ciao"));
+               assertEquals(2, timerService.numEventTimeTimers());
+               assertEquals(1, timerService.numEventTimeTimers("hello"));
+               assertEquals(1, timerService.numEventTimeTimers("ciao"));
+
+               ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+               timerService.snapshotTimers(outStream);
+               outStream.close();
+
+               @SuppressWarnings("unchecked")
+               Triggerable<Integer, String> mockTriggerable2 = 
mock(Triggerable.class);
+
+               keyContext = new TestKeyContext();
+               processingTimeService = new TestProcessingTimeService();
+
+               timerService = restoreTimerService(
+                               new 
ByteArrayInputStream(outStream.toByteArray()),
+                               mockTriggerable2,
+                               keyContext,
+                               processingTimeService);
+
+               processingTimeService.setCurrentTime(10);
+               timerService.advanceWatermark(10);
+
+               verify(mockTriggerable2, 
times(2)).onProcessingTime(anyInternalTimer());
+               verify(mockTriggerable2, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, 0, "ciao")));
+               verify(mockTriggerable2, times(1)).onProcessingTime(eq(new 
InternalTimer<>(10, 1, "hello")));
+               verify(mockTriggerable2, 
times(2)).onEventTime(anyInternalTimer());
+               verify(mockTriggerable2, times(1)).onEventTime(eq(new 
InternalTimer<>(10, 0, "hello")));
+               verify(mockTriggerable2, times(1)).onEventTime(eq(new 
InternalTimer<>(10, 1, "ciao")));
+
+               assertEquals(0, timerService.numEventTimeTimers());
+       }
+
+
+       private static class TestKeyContext implements KeyContext {
+
+               private Object key;
+
+               @Override
+               public void setCurrentKey(Object key) {
+                       this.key = key;
+               }
+
+               @Override
+               public Object getCurrentKey() {
+                       return key;
+               }
+       }
+
+       private static HeapInternalTimerService<Integer, String> 
createTimerService(
+                       Triggerable<Integer, String> triggerable,
+                       KeyContext keyContext,
+                       ProcessingTimeService processingTimeService) {
+               return new HeapInternalTimerService<>(
+                               IntSerializer.INSTANCE,
+                               StringSerializer.INSTANCE,
+                               triggerable,
+                               keyContext,
+                               processingTimeService);
+       }
+
+       private static HeapInternalTimerService<Integer, String> 
restoreTimerService(
+                       InputStream stateStream,
+                       Triggerable<Integer, String> triggerable,
+                       KeyContext keyContext,
+                       ProcessingTimeService processingTimeService) throws 
Exception {
+               HeapInternalTimerService.RestoredTimers<Integer, String> 
restoredTimers =
+                               new HeapInternalTimerService.RestoredTimers<>(
+                                               stateStream,
+                                               
HeapInternalTimerServiceTest.class.getClassLoader());
+
+               return new HeapInternalTimerService<>(
+                               IntSerializer.INSTANCE,
+                               StringSerializer.INSTANCE,
+                               triggerable,
+                               keyContext,
+                               processingTimeService,
+                               restoredTimers);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
new file mode 100644
index 0000000..6edf20a
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction;
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link StreamTimelyFlatMap}.
+ */
+public class TimelyFlatMapTest extends TestLogger {
+
+       @Test
+       public void testCurrentEventTime() throws Exception {
+
+               StreamTimelyFlatMap<Integer, Integer, String> operator =
+                               new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
+
+               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processWatermark(new Watermark(17));
+               testHarness.processElement(new StreamRecord<>(5, 12L));
+
+               testHarness.processWatermark(new Watermark(42));
+               testHarness.processElement(new StreamRecord<>(6, 13L));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new Watermark(17L));
+               expectedOutput.add(new StreamRecord<>("5TIME:17", 12L));
+               expectedOutput.add(new Watermark(42L));
+               expectedOutput.add(new StreamRecord<>("6TIME:42", 13L));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       @Test
+       public void testCurrentProcessingTime() throws Exception {
+
+               StreamTimelyFlatMap<Integer, Integer, String> operator =
+                               new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.setProcessingTime(17);
+               testHarness.processElement(new StreamRecord<>(5));
+
+               testHarness.setProcessingTime(42);
+               testHarness.processElement(new StreamRecord<>(6));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new StreamRecord<>("5TIME:17"));
+               expectedOutput.add(new StreamRecord<>("6TIME:42"));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       @Test
+       public void testEventTimeTimers() throws Exception {
+
+               StreamTimelyFlatMap<Integer, Integer, Integer> operator =
+                               new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
+
+               OneInputStreamOperatorTestHarness<Integer, Integer> testHarness 
=
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processWatermark(new Watermark(0));
+
+               testHarness.processElement(new StreamRecord<>(17, 42L));
+
+               testHarness.processWatermark(new Watermark(5));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new Watermark(0L));
+               expectedOutput.add(new StreamRecord<>(17, 42L));
+               expectedOutput.add(new StreamRecord<>(1777, 5L));
+               expectedOutput.add(new Watermark(5L));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       @Test
+       public void testProcessingTimeTimers() throws Exception {
+
+               StreamTimelyFlatMap<Integer, Integer, Integer> operator =
+                               new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+               OneInputStreamOperatorTestHarness<Integer, Integer> testHarness 
=
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processElement(new StreamRecord<>(17));
+
+               testHarness.setProcessingTime(5);
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new StreamRecord<>(17));
+               expectedOutput.add(new StreamRecord<>(1777, 5L));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       /**
+        * Verifies that we don't have leakage between different keys.
+        */
+       @Test
+       public void testEventTimeTimerWithState() throws Exception {
+
+               StreamTimelyFlatMap<Integer, Integer, String> operator =
+                               new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
+
+               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processWatermark(new Watermark(1));
+               testHarness.processElement(new StreamRecord<>(17, 0L)); // 
should set timer for 6
+
+               testHarness.processWatermark(new Watermark(2));
+               testHarness.processElement(new StreamRecord<>(42, 1L)); // 
should set timer for 7
+
+               testHarness.processWatermark(new Watermark(6));
+               testHarness.processWatermark(new Watermark(7));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new Watermark(1L));
+               expectedOutput.add(new StreamRecord<>("INPUT:17", 0L));
+               expectedOutput.add(new Watermark(2L));
+               expectedOutput.add(new StreamRecord<>("INPUT:42", 1L));
+               expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+               expectedOutput.add(new Watermark(6L));
+               expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+               expectedOutput.add(new Watermark(7L));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       /**
+        * Verifies that we don't have leakage between different keys.
+        */
+       @Test
+       public void testProcessingTimeTimerWithState() throws Exception {
+
+               StreamTimelyFlatMap<Integer, Integer, String> operator =
+                               new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.setProcessingTime(1);
+               testHarness.processElement(new StreamRecord<>(17)); // should 
set timer for 6
+
+               testHarness.setProcessingTime(2);
+               testHarness.processElement(new StreamRecord<>(42)); // should 
set timer for 7
+
+               testHarness.setProcessingTime(6);
+               testHarness.setProcessingTime(7);
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new StreamRecord<>("INPUT:17"));
+               expectedOutput.add(new StreamRecord<>("INPUT:42"));
+               expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+               expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       @Test
+       public void testSnapshotAndRestore() throws Exception {
+
+               StreamTimelyFlatMap<Integer, Integer, String> operator =
+                               new 
StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new 
BothTriggeringFlatMapFunction());
+
+               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processElement(new StreamRecord<>(5, 12L));
+
+               // snapshot and restore from scratch
+               StreamStateHandle snapshot = testHarness.snapshotLegacy(0, 0);
+
+               testHarness.close();
+
+               operator = new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, 
new BothTriggeringFlatMapFunction());
+
+               testHarness = new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.restore(snapshot);
+               testHarness.open();
+
+               testHarness.setProcessingTime(5);
+               testHarness.processWatermark(new Watermark(6));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
+               expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
+               expectedOutput.add(new Watermark(6));
+
+               System.out.println("GOT: " + testHarness.getOutput());
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       private static class IdentityKeySelector<T> implements KeySelector<T, 
T> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public T getKey(T value) throws Exception {
+                       return value;
+               }
+       }
+
+       private static class QueryingFlatMapFunction implements 
TimelyFlatMapFunction<Integer, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final TimeDomain timeDomain;
+
+               public QueryingFlatMapFunction(TimeDomain timeDomain) {
+                       this.timeDomain = timeDomain;
+               }
+
+               @Override
+               public void flatMap(Integer value, TimerService timerService, 
Collector<String> out) throws Exception {
+                       if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+                               out.collect(value + "TIME:" + 
timerService.currentWatermark());
+                       } else {
+                               out.collect(value + "TIME:" + 
timerService.currentProcessingTime());
+                       }
+               }
+
+               @Override
+               public void onTimer(
+                               long timestamp,
+                               TimeDomain timeDomain,
+                               TimerService timerService,
+                               Collector<String> out) throws Exception {
+               }
+       }
+
+       private static class TriggeringFlatMapFunction implements 
TimelyFlatMapFunction<Integer, Integer> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final TimeDomain timeDomain;
+
+               public TriggeringFlatMapFunction(TimeDomain timeDomain) {
+                       this.timeDomain = timeDomain;
+               }
+
+               @Override
+               public void flatMap(Integer value, TimerService timerService, 
Collector<Integer> out) throws Exception {
+                       out.collect(value);
+                       if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+                               
timerService.registerEventTimeTimer(timerService.currentWatermark() + 5);
+                       } else {
+                               
timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 
5);
+                       }
+               }
+
+               @Override
+               public void onTimer(
+                               long timestamp,
+                               TimeDomain timeDomain,
+                               TimerService timerService,
+                               Collector<Integer> out) throws Exception {
+
+                       assertEquals(this.timeDomain, timeDomain);
+                       out.collect(1777);
+               }
+       }
+
+       private static class TriggeringStatefulFlatMapFunction extends 
RichTimelyFlatMapFunction<Integer, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final ValueStateDescriptor<Integer> state =
+                               new ValueStateDescriptor<>("seen-element", 
IntSerializer.INSTANCE, null);
+
+               private final TimeDomain timeDomain;
+
+               public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) 
{
+                       this.timeDomain = timeDomain;
+               }
+
+               @Override
+               public void flatMap(Integer value, TimerService timerService, 
Collector<String> out) throws Exception {
+                       out.collect("INPUT:" + value);
+                       getRuntimeContext().getState(state).update(value);
+                       if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+                               
timerService.registerEventTimeTimer(timerService.currentWatermark() + 5);
+                       } else {
+                               
timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 
5);
+                       }
+               }
+
+               @Override
+               public void onTimer(
+                               long timestamp,
+                               TimeDomain timeDomain,
+                               TimerService timerService,
+                               Collector<String> out) throws Exception {
+                       assertEquals(this.timeDomain, timeDomain);
+                       out.collect("STATE:" + 
getRuntimeContext().getState(state).value());
+               }
+       }
+
+       private static class BothTriggeringFlatMapFunction implements 
TimelyFlatMapFunction<Integer, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void flatMap(Integer value, TimerService timerService, 
Collector<String> out) throws Exception {
+                       timerService.registerProcessingTimeTimer(5);
+                       timerService.registerEventTimeTimer(6);
+
+               }
+
+               @Override
+               public void onTimer(
+                               long timestamp,
+                               TimeDomain timeDomain,
+                               TimerService timerService,
+                               Collector<String> out) throws Exception {
+                       if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
+                               out.collect("EVENT:1777");
+                       } else {
+                               out.collect("PROC:1777");
+                       }
+               }
+       }
+
+}

Reply via email to