[FLINK-3674] Add an interface for Time aware User Functions

This moves the event-time/processing-time trigger code from
WindowOperator behind a well defined interface that can be used by
operators (and user functions).

InternalTimerService is the new interface that has the same
functionality that WindowOperator used to have. TimerService is the user
facing interface that does not allow dealing with namespaces/payloads
and also does not allow deleting timers. There is a default
implementation in HeapInternalTimerService that can checkpoint timers to
a stream and also restore from a stream. Right now, this is managed in
AbstractStreamOperator and operators can ask for an
InternalTimerService.

This also adds tests for HeapInternalTimerService.

This adds two new user functions:
 - TimelyFlatMapFunction: an extension of FlatMapFunction that also
   allows querying time and setting timers
 - TimelyCoFlatMapFunction: the same, but for CoFlatMapFunction

There are two new StreamOperator implementations for these that use the
InternalTimerService interface.

This also adds tests for the two new operators.

This also adds the new interface KeyContext that is used for
setting/querying the current key context for state and timers. Timers
are always scoped to a key, for now.

Also, this moves the handling of watermarks for both one-input and
two-input operators to AbstractStreamOperators so that we have a central
ground-truth.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81b19e53
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81b19e53
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81b19e53

Branch: refs/heads/master
Commit: 81b19e5323edd384e00f77eaa4a5c543db7e2499
Parents: f305baa
Author: Aljoscha Krettek <[email protected]>
Authored: Mon Sep 26 16:21:51 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Oct 21 19:03:05 2016 +0200

----------------------------------------------------------------------
 .../state/RocksDBAsyncSnapshotTest.java         |  17 +-
 .../flink/storm/wrappers/BoltWrapper.java       |   7 -
 .../operator/AbstractCEPPatternOperator.java    |   9 +-
 .../AbstractKeyedCEPPatternOperator.java        |   7 +-
 .../flink/streaming/api/SimpleTimerService.java |  55 ++
 .../apache/flink/streaming/api/TimeDomain.java  |  34 ++
 .../flink/streaming/api/TimerService.java       |  53 ++
 .../streaming/api/datastream/KeyedStream.java   |  41 +-
 .../functions/RichTimelyFlatMapFunction.java    |  40 ++
 .../api/functions/TimelyFlatMapFunction.java    |  78 +++
 .../co/RichTimelyCoFlatMapFunction.java         |  41 ++
 .../functions/co/TimelyCoFlatMapFunction.java   |  92 ++++
 .../query/AbstractQueryableStateOperator.java   |   6 -
 .../source/ContinuousFileReaderOperator.java    |   2 +
 .../api/operators/AbstractStreamOperator.java   | 177 +++++-
 .../operators/AbstractUdfStreamOperator.java    |   2 +
 .../api/operators/HeapInternalTimerService.java | 318 +++++++++++
 .../streaming/api/operators/InternalTimer.java  |  90 +++
 .../api/operators/InternalTimerService.java     |  60 ++
 .../streaming/api/operators/KeyContext.java     |  31 ++
 .../streaming/api/operators/StreamFilter.java   |   6 -
 .../streaming/api/operators/StreamFlatMap.java  |   8 +-
 .../api/operators/StreamGroupedFold.java        |   6 -
 .../api/operators/StreamGroupedReduce.java      |   7 -
 .../streaming/api/operators/StreamMap.java      |   6 -
 .../streaming/api/operators/StreamProject.java  |   6 -
 .../streaming/api/operators/StreamSink.java     |   6 -
 .../api/operators/StreamTimelyFlatMap.java      |  79 +++
 .../streaming/api/operators/Triggerable.java    |  40 ++
 .../api/operators/co/CoStreamFlatMap.java       |  27 -
 .../streaming/api/operators/co/CoStreamMap.java |  27 -
 .../api/operators/co/CoStreamTimelyFlatMap.java |  96 ++++
 .../operators/GenericWriteAheadSink.java        |   9 +-
 ...ractAlignedProcessingTimeWindowOperator.java |   6 -
 .../windowing/AccumulatingKeyedTimePanes.java   |   4 +-
 .../windowing/AggregatingKeyedTimePanes.java    |   2 +-
 .../windowing/EvictingWindowOperator.java       | 150 +++--
 .../operators/windowing/WindowOperator.java     | 312 +++--------
 .../tasks/TestProcessingTimeService.java        |  62 ++-
 .../operators/HeapInternalTimerServiceTest.java | 509 +++++++++++++++++
 .../api/operators/TimelyFlatMapTest.java        | 410 ++++++++++++++
 .../api/operators/co/TimelyCoFlatMapTest.java   | 544 +++++++++++++++++++
 .../runtime/tasks/OneInputStreamTaskTest.java   |   9 +-
 .../test/checkpointing/RescalingITCase.java     |   5 +
 .../test/checkpointing/SavepointITCase.java     |   1 +
 .../test/streaming/runtime/TimestampITCase.java |   9 +-
 46 files changed, 3017 insertions(+), 489 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 4d1ab50..fdd1bf4 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
@@ -39,7 +41,7 @@ import 
org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
@@ -364,7 +366,7 @@ public class RocksDBAsyncSnapshotTest {
 
        public static class AsyncCheckpointOperator
                extends AbstractStreamOperator<String>
-               implements OneInputStreamOperator<String, String> {
+               implements OneInputStreamOperator<String, String>, 
StreamCheckpointedOperator {
 
                @Override
                public void open() throws Exception {
@@ -394,9 +396,16 @@ public class RocksDBAsyncSnapshotTest {
                }
 
                @Override
-               public void processWatermark(Watermark mark) throws Exception {
-                       // not interested
+               public void snapshotState(
+                               FSDataOutputStream out, long checkpointId, long 
timestamp) throws Exception {
+                       // do nothing so that we don't block
                }
+
+               @Override
+               public void restoreState(FSDataInputStream in) throws Exception 
{
+                       // do nothing so that we don't block
+               }
+
        }
 
        public static class DummyMapFunction<T> implements MapFunction<T, T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
index d59ff04..55a8e28 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
@@ -36,7 +36,6 @@ import org.apache.flink.storm.util.StormConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Collection;
@@ -318,10 +317,4 @@ public class BoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> implements
                                        MessageId.makeUnanchored()));
                }
        }
-
-       @Override
-       public void processWatermark(Watermark mark) throws Exception {
-               this.output.emitWatermark(mark);
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
index 455e864..1deb192 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -65,7 +65,8 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> 
extends AbstractCEPBas
        }
 
        @Override
-       public void open() {
+       public void open() throws Exception {
+               super.open();
                if (priorityQueue == null) {
                        priorityQueue = new 
PriorityQueue<StreamRecord<IN>>(INITIAL_PRIORITY_QUEUE_CAPACITY, new 
StreamRecordComparator<IN>());
                }
@@ -93,6 +94,9 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> 
extends AbstractCEPBas
 
        @Override
        public void processWatermark(Watermark mark) throws Exception {
+               // we do our own watermark handling, no super call. we will 
never be able to use
+               // the timer service like this, however.
+
                while(!priorityQueue.isEmpty() && 
priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
                        StreamRecord<IN> streamRecord = priorityQueue.poll();
 
@@ -104,6 +108,7 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> 
extends AbstractCEPBas
 
        @Override
        public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
+               super.snapshotState(out, checkpointId, timestamp);
                final ObjectOutputStream oos = new ObjectOutputStream(out);
 
                oos.writeObject(nfa);
@@ -118,6 +123,8 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> 
extends AbstractCEPBas
        @Override
        @SuppressWarnings("unchecked")
        public void restoreState(FSDataInputStream state) throws Exception {
+               super.restoreState(state);
+
                final ObjectInputStream ois = new ObjectInputStream(state);
 
                nfa = (NFA<IN>)ois.readObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index c3898c3..54baf6d 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -166,9 +166,12 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT> extends Abst
 
        @Override
        public void processWatermark(Watermark mark) throws Exception {
+               // we do our own watermark handling, no super call. we will 
never be able to use
+               // the timer service like this, however.
+
                // iterate over all keys to trigger the execution of the 
buffered elements
                for (KEY key: keys) {
-                       setKeyContext(key);
+                       setCurrentKey(key);
 
                        PriorityQueue<StreamRecord<IN>> priorityQueue = 
getPriorityQueue();
                        NFA<IN> nfa = getNFA();
@@ -187,6 +190,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT> extends Abst
 
        @Override
        public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
+               super.snapshotState(out, checkpointId, timestamp);
 
                DataOutputView ov = new DataOutputViewStreamWrapper(out);
                ov.writeInt(keys.size());
@@ -198,6 +202,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT> extends Abst
 
        @Override
        public void restoreState(FSDataInputStream state) throws Exception {
+               super.restoreState(state);
 
                DataInputView inputView = new DataInputViewStreamWrapper(state);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/SimpleTimerService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/SimpleTimerService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/SimpleTimerService.java
new file mode 100644
index 0000000..43d2659
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/SimpleTimerService.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+
+/**
+ * Implementation of {@link TimerService} that uses a {@link 
InternalTimerService}.
+ */
+@Internal
+public class SimpleTimerService implements TimerService {
+
+       private final InternalTimerService<VoidNamespace> internalTimerService;
+
+       public SimpleTimerService(InternalTimerService<VoidNamespace> 
internalTimerService) {
+               this.internalTimerService = internalTimerService;
+       }
+
+       @Override
+       public long currentProcessingTime() {
+               return internalTimerService.currentProcessingTime();
+       }
+
+       @Override
+       public long currentWatermark() {
+               return internalTimerService.currentWatermark();
+       }
+
+       @Override
+       public void registerProcessingTimeTimer(long time) {
+               
internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, time);
+       }
+
+       @Override
+       public void registerEventTimeTimer(long time) {
+               
internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, time);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimeDomain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimeDomain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimeDomain.java
new file mode 100644
index 0000000..7cdfdc2
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimeDomain.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api;
+
+/**
+ * {@code TimeDomain} specifies whether a firing timer is based on event time 
or processing time.
+ */
+public enum TimeDomain {
+
+       /**
+        * Time is based on the timestamp of events.
+        */
+       EVENT_TIME,
+
+       /**
+        * Time is based on the current processing-time of a machine where 
processing happens.
+        */
+       PROCESSING_TIME
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimerService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimerService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimerService.java
new file mode 100644
index 0000000..ef8b631
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/TimerService.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Interface for working with time and timers.
+ */
+@PublicEvolving
+public interface TimerService {
+
+       /** Returns the current processing time. */
+       long currentProcessingTime();
+
+       /** Returns the current event-time watermark. */
+       long currentWatermark();
+
+       /**
+        * Registers a timer to be fired when processing time passes the given 
time.
+        *
+        * <p>Timers can internally be scoped to keys and/or windows. When you 
set a timer
+        * in a keyed context, such as in an operation on
+        * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then 
that context
+        * will also be active when you receive the timer notification.
+        */
+       void registerProcessingTimeTimer(long time);
+
+       /**
+        * Registers a timer to be fired when the event time watermark passes 
the given time.
+        *
+        * <p>Timers can internally be scoped to keys and/or windows. When you 
set a timer
+        * in a keyed context, such as in an operation on
+        * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then 
that context
+        * will also be active when you receive the timer notification.
+        */
+       void registerEventTimeTimer(long time);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index af907e3..1bce6a2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -20,8 +20,10 @@ package org.apache.flink.streaming.api.datastream;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
@@ -32,6 +34,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
@@ -41,6 +44,7 @@ import 
org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamGroupedFold;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
+import org.apache.flink.streaming.api.operators.StreamTimelyFlatMap;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
@@ -169,7 +173,42 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
                result.getTransformation().setStateKeyType(keyType);
                return result;
        }
-       
+
+       /**
+        * Applies a FlatMap transformation on a {@link DataStream}. The
+        * transformation calls a {@link FlatMapFunction} for each element of 
the
+        * DataStream. Each FlatMapFunction call can return any number of 
elements
+        * including none. The user can also extend {@link RichFlatMapFunction} 
to
+        * gain access to other features provided by the
+        * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+        *
+        * @param flatMapper
+        *            The FlatMapFunction that is called for each element of the
+        *            DataStream
+        *
+        * @param <R>
+        *            output type
+        * @return The transformed {@link DataStream}.
+        */
+       public <R> SingleOutputStreamOperator<R> 
flatMap(TimelyFlatMapFunction<T, R> flatMapper) {
+
+               TypeInformation<R> outType = 
TypeExtractor.getUnaryOperatorReturnType(
+                               flatMapper,
+                               TimelyFlatMapFunction.class,
+                               false,
+                               true,
+                               getType(),
+                               Utils.getCallLocationName(),
+                               true);
+
+               StreamTimelyFlatMap<KEY, T, R> operator =
+                               new 
StreamTimelyFlatMap<>(keyType.createSerializer(getExecutionConfig()), 
clean(flatMapper));
+
+               return transform("Flat Map", outType, operator);
+
+       }
+
+
        // 
------------------------------------------------------------------------
        //  Windowing
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java
new file mode 100644
index 0000000..0d86da9
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+/**
+ * Rich variant of the {@link TimelyFlatMapFunction}. As a
+ * {@link org.apache.flink.api.common.functions.RichFunction}, it gives access 
to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides 
setup and teardown methods:
+ * {@link 
org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
+ *
+ * @param <I> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
+@PublicEvolving
+public abstract class RichTimelyFlatMapFunction<I, O>
+               extends AbstractRichFunction
+               implements TimelyFlatMapFunction<I, O> {
+
+       private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
new file mode 100644
index 0000000..77fe35e
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for timely flatMap functions. FlatMap functions take 
elements and transform them,
+ * into zero, one, or more elements. Typical applications can be splitting 
elements, or unnesting lists
+ * and arrays.
+ *
+ * <p>A {@code TimelyFlatMapFunction} can, in addition to the functionality of 
a normal
+ * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set 
timers and react
+ * to them firing.
+ *
+ * <pre>{@code
+ * DataStream<X> input = ...;
+ *
+ * DataStream<Y> result = input.flatMap(new MyTimelyFlatMapFunction());
+ * }</pre>
+ *
+ * @param <I> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
+@PublicEvolving
+public interface TimelyFlatMapFunction<I, O> extends Function, Serializable {
+
+       /**
+        * The core method of the {@code TimelyFlatMapFunction}. Takes an 
element from the input data set and transforms
+        * it into zero, one, or more elements.
+        *
+        * @param value The input value.
+        * @param timerService A {@link TimerService} that allows setting 
timers and querying the
+        *                        current time.
+        * @param out The collector for returning result values.
+        *
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
+        */
+       void flatMap(I value, TimerService timerService, Collector<O> out) 
throws Exception;
+
+       /**
+        * Called when a timer set using {@link TimerService} fires.
+        *
+        * @param timestamp The timestamp of the firing timer.
+        * @param timeDomain The {@link TimeDomain} of the firing timer.
+        * @param timerService A {@link TimerService} that allows setting 
timers and querying the
+        *                        current time.
+        * @param out The collector for returning result values.
+        *
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
+        */
+       void onTimer(long timestamp, TimeDomain timeDomain, TimerService 
timerService, Collector<O> out) throws Exception ;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java
new file mode 100644
index 0000000..12fe181
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * Rich variant of the {@link TimelyCoFlatMapFunction}. As a {@link 
RichFunction}, it gives
+ * access to the {@link org.apache.flink.api.common.functions.RuntimeContext} 
and provides
+ * setup and teardown methods: {@link 
RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link RichFunction#close()}.
+ *
+ * @param <IN1> Type of the first input.
+ * @param <IN2> Type of the second input.
+ * @param <OUT> Type of the returned elements.
+ */
+@PublicEvolving
+public abstract class RichTimelyCoFlatMapFunction<IN1, IN2, OUT>
+               extends AbstractRichFunction
+               implements TimelyCoFlatMapFunction<IN1, IN2, OUT> {
+
+       private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
new file mode 100644
index 0000000..87355c6
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * A {@code TimelyCoFlatMapFunction} implements a flat-map transformation over 
two
+ * connected streams.
+ * 
+ * <p>The same instance of the transformation function is used to transform
+ * both of the connected streams. That way, the stream transformations can
+ * share state.
+ *
+ * <p>A {@code TimelyCoFlatMapFunction} can, in addition to the functionality 
of a normal
+ * {@link CoFlatMapFunction}, also set timers and react to them firing.
+ * 
+ * <p>An example for the use of connected streams would be to apply rules that 
change over time
+ * onto elements of a stream. One of the connected streams has the rules, the 
other stream the
+ * elements to apply the rules to. The operation on the connected stream 
maintains the 
+ * current set of rules in the state. It may receive either a rule update 
(from the first stream)
+ * and update the state, or a data element (from the second stream) and apply 
the rules in the
+ * state to the element. The result of applying the rules would be emitted.
+ *
+ * @param <IN1> Type of the first input.
+ * @param <IN2> Type of the second input.
+ * @param <OUT> Output type.
+ */
+@PublicEvolving
+public interface TimelyCoFlatMapFunction<IN1, IN2, OUT> extends Function, 
Serializable {
+
+       /**
+        * This method is called for each element in the first of the connected 
streams.
+        * 
+        * @param value The stream element
+        * @param timerService A {@link TimerService} that allows setting 
timers and querying the
+        *                        current time.
+        * @param out The collector to emit resulting elements to
+        * @throws Exception The function may throw exceptions which cause the 
streaming program
+        *                   to fail and go into recovery.
+        */
+       void flatMap1(IN1 value, TimerService timerService, Collector<OUT> out) 
throws Exception;
+
+       /**
+        * This method is called for each element in the second of the 
connected streams.
+        * 
+        * @param value The stream element
+        * @param timerService A {@link TimerService} that allows setting 
timers and querying the
+        *                        current time.
+        * @param out The collector to emit resulting elements to
+        * @throws Exception The function may throw exceptions which cause the 
streaming program
+        *                   to fail and go into recovery.
+        */
+       void flatMap2(IN2 value, TimerService timerService, Collector<OUT> out) 
throws Exception;
+
+       /**
+        * Called when a timer set using {@link TimerService} fires.
+        *
+        * @param timestamp The timestamp of the firing timer.
+        * @param timeDomain The {@link TimeDomain} of the firing timer.
+        * @param timerService A {@link TimerService} that allows setting 
timers and querying the
+        *                        current time.
+        * @param out The collector for returning result values.
+        *
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
+        */
+       void onTimer(long timestamp, TimeDomain timeDomain, TimerService 
timerService, Collector<OUT> out) throws Exception ;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
index 09c9b01..7522a61 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -76,9 +75,4 @@ abstract class AbstractQueryableStateOperator<S extends 
State, IN>
                super.open();
                state = getPartitionedState(stateDescriptor);
        }
-
-       @Override
-       public void processWatermark(Watermark mark) throws Exception {
-               // Nothing to do
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index be22677..4cc5206 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -387,6 +387,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
 
        @Override
        public void snapshotState(FSDataOutputStream os, long checkpointId, 
long timestamp) throws Exception {
+               super.snapshotState(os, checkpointId, timestamp);
 
                final ObjectOutputStream oos = new ObjectOutputStream(os);
 
@@ -409,6 +410,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
 
        @Override
        public void restoreState(FSDataInputStream is) throws Exception {
+               super.restoreState(is);
 
                final ObjectInputStream ois = new ObjectInputStream(is);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index b789c95..82ce493 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -21,12 +21,17 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -82,7 +87,7 @@ import java.util.Map;
  */
 @PublicEvolving
 public abstract class AbstractStreamOperator<OUT>
-               implements StreamOperator<OUT>, java.io.Serializable {
+               implements StreamOperator<OUT>, java.io.Serializable, 
KeyContext, StreamCheckpointedOperator {
 
        private static final long serialVersionUID = 1L;
        
@@ -99,7 +104,7 @@ public abstract class AbstractStreamOperator<OUT>
        /** The task that contains this operator (and other operators in the 
same chain) */
        private transient StreamTask<?, ?> container;
        
-       private transient StreamConfig config;
+       protected transient StreamConfig config;
 
        protected transient Output<StreamRecord<OUT>> output;
 
@@ -107,7 +112,6 @@ public abstract class AbstractStreamOperator<OUT>
        private transient StreamingRuntimeContext runtimeContext;
 
 
-
        // ---------------- key/value state ------------------
 
        /** key selector used to get the key for the state. Non-null only is 
the operator uses key/value state */
@@ -131,6 +135,20 @@ public abstract class AbstractStreamOperator<OUT>
 
        protected LatencyGauge latencyGauge;
 
+       // ---------------- timers ------------------
+
+       private transient Map<String, HeapInternalTimerService<?, ?>> 
timerServices;
+       private transient Map<String, 
HeapInternalTimerService.RestoredTimers<?, ?>> restoredServices;
+
+
+       // ---------------- two-input operator watermarks ------------------
+
+       // We keep track of watermarks from both inputs, the combined input is 
the minimum
+       // Once the minimum advances we emit a new watermark for downstream 
operators
+       private long combinedWatermark = Long.MIN_VALUE;
+       private long input1Watermark = Long.MIN_VALUE;
+       private long input2Watermark = Long.MIN_VALUE;
+
        // 
------------------------------------------------------------------------
        //  Life Cycle
        // 
------------------------------------------------------------------------
@@ -230,7 +248,9 @@ public abstract class AbstractStreamOperator<OUT>
         */
        @Override
        public void open() throws Exception {
-
+               if (timerServices == null) {
+                       timerServices = new HashMap<>();
+               }
        }
 
        private void initKeyedState() {
@@ -449,12 +469,12 @@ public abstract class AbstractStreamOperator<OUT>
        private <T> void setKeyContextElement(StreamRecord<T> record, 
KeySelector<T, ?> selector) throws Exception {
                if (selector != null) {
                        Object key = selector.getKey(record.getValue());
-                       setKeyContext(key);
+                       setCurrentKey(key);
                }
        }
 
        @SuppressWarnings({"unchecked", "rawtypes"})
-       public void setKeyContext(Object key) {
+       public void setCurrentKey(Object key) {
                if (keyedStateBackend != null) {
                        try {
                                // need to work around type restrictions
@@ -468,6 +488,15 @@ public abstract class AbstractStreamOperator<OUT>
                }
        }
 
+       @SuppressWarnings({"unchecked", "rawtypes"})
+       public Object getCurrentKey() {
+               if (keyedStateBackend != null) {
+                       return keyedStateBackend.getCurrentKey();
+               } else {
+                       throw new UnsupportedOperationException("Key can only 
be retrieven on KeyedStream.");
+               }
+       }
+
        public KeyedStateStore getKeyedStateStore() {
                return keyedStateStore;
        }
@@ -666,4 +695,140 @@ public abstract class AbstractStreamOperator<OUT>
                }
        }
 
+       // 
------------------------------------------------------------------------
+       //  Watermark handling
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Returns a {@link InternalTimerService} that can be used to query 
current processing time
+        * and event time and to set timers. An operator can have several timer 
services, where
+        * each has its own namespace serializer. Timer services are 
differentiated by the string
+        * key that is given when requesting them, if you call this method with 
the same key
+        * multiple times you will get the same timer service instance in 
subsequent requests.
+        *
+        * <p>Timers are always scoped to a key, the currently active key of a 
keyed stream operation.
+        * When a timer fires, this key will also be set as the currently 
active key.
+        *
+        * <p>Each timer has attached metadata, the namespace. Different timer 
services
+        * can have a different namespace type. If you don't need namespace 
differentiation you
+        * can use {@link VoidNamespaceSerializer} as the namespace serializer.
+        *
+        * @param name The name of the requested timer service. If no service 
exists under the given
+        *             name a new one will be created and returned.
+        * @param keySerializer {@code TypeSerializer} for the keys of the 
timers.
+        * @param namespaceSerializer {@code TypeSerializer} for the timer 
namespace.
+        * @param triggerable The {@link Triggerable} that should be invoked 
when timers fire
+        *
+        * @param <K> The type of the timer keys.
+        * @param <N> The type of the timer namespace.
+        */
+       public <K, N> InternalTimerService<N> getInternalTimerService(
+                       String name,
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer,
+                       Triggerable<K, N> triggerable) {
+
+               @SuppressWarnings("unchecked")
+               HeapInternalTimerService<K, N> service = 
(HeapInternalTimerService<K, N>) timerServices.get(name);
+
+               if (service == null) {
+                       if (restoredServices != null && 
restoredServices.containsKey(name)) {
+                               @SuppressWarnings("unchecked")
+                               HeapInternalTimerService.RestoredTimers<K, N> 
restoredService =
+                                               
(HeapInternalTimerService.RestoredTimers<K, N>) restoredServices.remove(name);
+
+                               service = new HeapInternalTimerService<>(
+                                               keySerializer,
+                                               namespaceSerializer,
+                                               triggerable,
+                                               this,
+                                               
getRuntimeContext().getProcessingTimeService(),
+                                               restoredService);
+
+                       } else {
+                               service = new HeapInternalTimerService<>(
+                                               keySerializer,
+                                               namespaceSerializer,
+                                               triggerable,
+                                               this,
+                                               
getRuntimeContext().getProcessingTimeService());
+                       }
+                       timerServices.put(name, service);
+               }
+
+               return service;
+       }
+
+       public void processWatermark(Watermark mark) throws Exception {
+               for (HeapInternalTimerService<?, ?> service : 
timerServices.values()) {
+                       service.advanceWatermark(mark.getTimestamp());
+               }
+               output.emitWatermark(mark);
+       }
+
+       public void processWatermark1(Watermark mark) throws Exception {
+               input1Watermark = mark.getTimestamp();
+               long newMin = Math.min(input1Watermark, input2Watermark);
+               if (newMin > combinedWatermark) {
+                       combinedWatermark = newMin;
+                       processWatermark(new Watermark(combinedWatermark));
+               }
+       }
+
+       public void processWatermark2(Watermark mark) throws Exception {
+               input2Watermark = mark.getTimestamp();
+               long newMin = Math.min(input1Watermark, input2Watermark);
+               if (newMin > combinedWatermark) {
+                       combinedWatermark = newMin;
+                       processWatermark(new Watermark(combinedWatermark));
+               }
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
+               DataOutputViewStreamWrapper dataOutputView = new 
DataOutputViewStreamWrapper(out);
+
+               dataOutputView.writeInt(timerServices.size());
+
+               for (Map.Entry<String, HeapInternalTimerService<?, ?>> service 
: timerServices.entrySet()) {
+                       dataOutputView.writeUTF(service.getKey());
+                       service.getValue().snapshotTimers(dataOutputView);
+               }
+
+       }
+
+       @Override
+       public void restoreState(FSDataInputStream in) throws Exception {
+               DataInputViewStreamWrapper dataInputView = new 
DataInputViewStreamWrapper(in);
+
+               restoredServices = new HashMap<>();
+
+               int numServices = dataInputView.readInt();
+
+               for (int i = 0; i < numServices; i++) {
+                       String name = dataInputView.readUTF();
+                       HeapInternalTimerService.RestoredTimers restoredService 
=
+                                       new 
HeapInternalTimerService.RestoredTimers(in, getUserCodeClassloader());
+                       restoredServices.put(name, restoredService);
+               }
+       }
+
+       @VisibleForTesting
+       public int numProcessingTimeTimers() {
+               int count = 0;
+               for (HeapInternalTimerService<?, ?> timerService : 
timerServices.values()) {
+                       count += timerService.numProcessingTimeTimers();
+               }
+               return count;
+       }
+
+       @VisibleForTesting
+       public int numEventTimeTimers() {
+               int count = 0;
+               for (HeapInternalTimerService<?, ?> timerService : 
timerServices.values()) {
+                       count += timerService.numEventTimeTimers();
+               }
+               return count;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 5e1a252..67d204a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -176,6 +176,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F 
extends Function>
        
        @Override
        public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
+               super.snapshotState(out, checkpointId, timestamp);
 
 
                if (userFunction instanceof Checkpointed) {
@@ -199,6 +200,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F 
extends Function>
 
        @Override
        public void restoreState(FSDataInputStream in) throws Exception {
+               super.restoreState(in);
 
                if (userFunction instanceof CheckpointedRestoring) {
                        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
new file mode 100644
index 0000000..c77b634
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link InternalTimerService} that stores timers on the Java heap.
+ */
+public class HeapInternalTimerService<K, N> implements 
InternalTimerService<N>, Triggerable {
+
+       private final TypeSerializer<K> keySerializer;
+
+       private final TypeSerializer<N> namespaceSerializer;
+
+       private final ProcessingTimeService processingTimeService;
+
+       private long currentWatermark = Long.MIN_VALUE;
+
+       private final org.apache.flink.streaming.api.operators.Triggerable<K, 
N> triggerTarget;
+
+       private final KeyContext keyContext;
+
+       /**
+        * Processing time timers that are currently in-flight.
+        */
+       private final PriorityQueue<InternalTimer<K, N>> 
processingTimeTimersQueue;
+       private final Set<InternalTimer<K, N>> processingTimeTimers;
+
+       protected ScheduledFuture<?> nextTimer = null;
+
+       /**
+        * Currently waiting watermark callbacks.
+        */
+       private final Set<InternalTimer<K, N>> eventTimeTimers;
+       private final PriorityQueue<InternalTimer<K, N>> eventTimeTimersQueue;
+
+       public HeapInternalTimerService(
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer,
+                       org.apache.flink.streaming.api.operators.Triggerable<K, 
N> triggerTarget,
+                       KeyContext keyContext,
+                       ProcessingTimeService processingTimeService) {
+               this.keySerializer = checkNotNull(keySerializer);
+               this.namespaceSerializer = checkNotNull(namespaceSerializer);
+               this.triggerTarget = checkNotNull(triggerTarget);
+               this.keyContext = keyContext;
+               this.processingTimeService = 
checkNotNull(processingTimeService);
+
+               eventTimeTimers = new HashSet<>();
+               eventTimeTimersQueue = new PriorityQueue<>(100);
+
+               processingTimeTimers = new HashSet<>();
+               processingTimeTimersQueue = new PriorityQueue<>(100);
+       }
+
+       public HeapInternalTimerService(
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer,
+                       org.apache.flink.streaming.api.operators.Triggerable<K, 
N> triggerTarget,
+                       KeyContext keyContext,
+                       ProcessingTimeService processingTimeService,
+                       RestoredTimers<K, N> restoredTimers) {
+
+               this.keySerializer = checkNotNull(keySerializer);
+               this.namespaceSerializer = checkNotNull(namespaceSerializer);
+               this.triggerTarget = checkNotNull(triggerTarget);
+               this.keyContext = keyContext;
+               this.processingTimeService = 
checkNotNull(processingTimeService);
+
+               eventTimeTimers = restoredTimers.watermarkTimers;
+               eventTimeTimersQueue = restoredTimers.watermarkTimersQueue;
+
+               processingTimeTimers = restoredTimers.processingTimeTimers;
+               processingTimeTimersQueue = 
restoredTimers.processingTimeTimersQueue;
+
+               // re-register the restored timers (if any)
+               if (processingTimeTimersQueue.size() > 0) {
+                       nextTimer =
+                                       
processingTimeService.registerTimer(processingTimeTimersQueue.peek().getTimestamp(),
 this);
+               }
+       }
+
+
+       @Override
+       public long currentProcessingTime() {
+               return processingTimeService.getCurrentProcessingTime();
+       }
+
+       @Override
+       public long currentWatermark() {
+               return currentWatermark;
+       }
+
+       @Override
+       public void registerProcessingTimeTimer(N namespace, long time) {
+               InternalTimer<K, N> timer = new InternalTimer<>(time, (K) 
keyContext.getCurrentKey(), namespace);
+
+               // make sure we only put one timer per key into the queue
+               if (processingTimeTimers.add(timer)) {
+
+                       InternalTimer<K, N> oldHead = 
processingTimeTimersQueue.peek();
+                       long nextTriggerTime = oldHead != null ? 
oldHead.getTimestamp() : Long.MAX_VALUE;
+
+                       processingTimeTimersQueue.add(timer);
+
+                       // check if we need to re-schedule our timer to earlier
+                       if (time < nextTriggerTime) {
+                               if (nextTimer != null) {
+                                       nextTimer.cancel(false);
+                               }
+                               nextTimer = 
processingTimeService.registerTimer(time, this);
+                       }
+               }
+       }
+
+       @Override
+       public void registerEventTimeTimer(N namespace, long time) {
+               InternalTimer<K, N> timer = new InternalTimer<>(time, (K) 
keyContext.getCurrentKey(), namespace);
+               if (eventTimeTimers.add(timer)) {
+                       eventTimeTimersQueue.add(timer);
+               }
+       }
+
+       @Override
+       public void deleteProcessingTimeTimer(N namespace, long time) {
+               InternalTimer<K, N> timer = new InternalTimer<>(time, (K) 
keyContext.getCurrentKey(), namespace);
+
+               if (processingTimeTimers.remove(timer)) {
+                       processingTimeTimersQueue.remove(timer);
+               }
+       }
+
+       @Override
+       public void deleteEventTimeTimer(N namespace, long time) {
+               InternalTimer<K, N> timer = new InternalTimer<>(time, (K) 
keyContext.getCurrentKey(), namespace);
+               if (eventTimeTimers.remove(timer)) {
+                       eventTimeTimersQueue.remove(timer);
+               }
+       }
+
+       @Override
+       public void trigger(long time) throws Exception {
+               // null out the timer in case the Triggerable calls 
registerProcessingTimeTimer()
+               // inside the callback.
+               nextTimer = null;
+
+               InternalTimer<K, N> timer;
+
+               while ((timer  = processingTimeTimersQueue.peek()) != null && 
timer.getTimestamp() <= time) {
+
+                       processingTimeTimers.remove(timer);
+                       processingTimeTimersQueue.remove();
+
+                       keyContext.setCurrentKey(timer.getKey());
+                       triggerTarget.onProcessingTime(timer);
+               }
+
+               if (timer != null) {
+                       if (nextTimer == null) {
+                               nextTimer = 
processingTimeService.registerTimer(timer.getTimestamp(), this);
+                       }
+               }
+       }
+
+       public void advanceWatermark(long time) throws Exception {
+               currentWatermark = time;
+
+               InternalTimer<K, N> timer;
+
+               while ((timer  = eventTimeTimersQueue.peek()) != null && 
timer.getTimestamp() <= time) {
+
+                       eventTimeTimers.remove(timer);
+                       eventTimeTimersQueue.remove();
+
+                       keyContext.setCurrentKey(timer.getKey());
+                       triggerTarget.onEventTime(timer);
+
+                       timer = eventTimeTimersQueue.peek();
+               }
+       }
+
+       public void snapshotTimers(OutputStream outStream) throws IOException {
+               InstantiationUtil.serializeObject(outStream, keySerializer);
+               InstantiationUtil.serializeObject(outStream, 
namespaceSerializer);
+
+               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(outStream);
+
+               out.writeInt(eventTimeTimers.size());
+               for (InternalTimer<K, N> timer : eventTimeTimers) {
+                       keySerializer.serialize(timer.getKey(), out);
+                       namespaceSerializer.serialize(timer.getNamespace(), 
out);
+                       out.writeLong(timer.getTimestamp());
+               }
+
+               out.writeInt(processingTimeTimers.size());
+               for (InternalTimer<K, N> timer : processingTimeTimers) {
+                       keySerializer.serialize(timer.getKey(), out);
+                       namespaceSerializer.serialize(timer.getNamespace(), 
out);
+                       out.writeLong(timer.getTimestamp());
+               }
+       }
+
+       public int numProcessingTimeTimers() {
+               return processingTimeTimers.size();
+       }
+
+       public int numEventTimeTimers() {
+               return eventTimeTimers.size();
+       }
+
+       public int numProcessingTimeTimers(N namespace) {
+               int count = 0;
+               for (InternalTimer<K, N> timer : processingTimeTimers) {
+                       if (timer.getNamespace().equals(namespace)) {
+                               count++;
+                       }
+               }
+
+               return count;
+       }
+
+       public int numEventTimeTimers(N namespace) {
+               int count = 0;
+               for (InternalTimer<K, N> timer : eventTimeTimers) {
+                       if (timer.getNamespace().equals(namespace)) {
+                               count++;
+                       }
+               }
+
+               return count;
+       }
+
+       public static class RestoredTimers<K, N> {
+
+               private final TypeSerializer<K> keySerializer;
+
+               private final TypeSerializer<N> namespaceSerializer;
+
+               /**
+                * Processing time timers that are currently in-flight.
+                */
+               private final PriorityQueue<InternalTimer<K, N>> 
processingTimeTimersQueue;
+               private final Set<InternalTimer<K, N>> processingTimeTimers;
+
+               /**
+                * Currently waiting watermark callbacks.
+                */
+               private final Set<InternalTimer<K, N>> watermarkTimers;
+               private final PriorityQueue<InternalTimer<K, N>> 
watermarkTimersQueue;
+
+               public RestoredTimers(InputStream inputStream, ClassLoader 
userCodeClassLoader) throws Exception {
+
+                       watermarkTimers = new HashSet<>();
+                       watermarkTimersQueue = new PriorityQueue<>(100);
+
+                       processingTimeTimers = new HashSet<>();
+                       processingTimeTimersQueue = new PriorityQueue<>(100);
+
+                       keySerializer = 
InstantiationUtil.deserializeObject(inputStream, userCodeClassLoader);
+                       namespaceSerializer = 
InstantiationUtil.deserializeObject(
+                                       inputStream,
+                                       userCodeClassLoader);
+
+                       DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(inputStream);
+
+                       int numWatermarkTimers = inView.readInt();
+                       for (int i = 0; i < numWatermarkTimers; i++) {
+                               K key = keySerializer.deserialize(inView);
+                               N namespace = 
namespaceSerializer.deserialize(inView);
+                               long timestamp = inView.readLong();
+                               InternalTimer<K, N> timer = new 
InternalTimer<>(timestamp, key, namespace);
+                               watermarkTimers.add(timer);
+                               watermarkTimersQueue.add(timer);
+                       }
+
+                       int numProcessingTimeTimers = inView.readInt();
+                       for (int i = 0; i < numProcessingTimeTimers; i++) {
+                               K key = keySerializer.deserialize(inView);
+                               N namespace = 
namespaceSerializer.deserialize(inView);
+                               long timestamp = inView.readLong();
+                               InternalTimer<K, N> timer = new 
InternalTimer<>(timestamp, key, namespace);
+                               processingTimeTimersQueue.add(timer);
+                               processingTimeTimers.add(timer);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
new file mode 100644
index 0000000..c74ac2e
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Internal class for keeping track of in-flight timers.
+ *
+ * @param <K> Type of the keys to which timers are scoped.
+ * @param <N> Type of the namespace to which timers are scoped.
+ */
+@Internal
+public class InternalTimer<K, N> implements Comparable<InternalTimer<K, N>> {
+       private final long timestamp;
+       private final K key;
+       private final N namespace;
+
+       public InternalTimer(long timestamp, K key, N namespace) {
+               this.timestamp = timestamp;
+               this.key = key;
+               this.namespace = namespace;
+       }
+
+       public long getTimestamp() {
+               return timestamp;
+       }
+
+       public K getKey() {
+               return key;
+       }
+
+       public N getNamespace() {
+               return namespace;
+       }
+
+       @Override
+       public int compareTo(InternalTimer<K, N> o) {
+               return Long.compare(this.timestamp, o.timestamp);
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()){
+                       return false;
+               }
+
+               InternalTimer<?, ?> timer = (InternalTimer<?, ?>) o;
+
+               return timestamp == timer.timestamp
+                               && key.equals(timer.key)
+                               && namespace.equals(timer.namespace);
+
+       }
+
+       @Override
+       public int hashCode() {
+               int result = (int) (timestamp ^ (timestamp >>> 32));
+               result = 31 * result + key.hashCode();
+               result = 31 * result + namespace.hashCode();
+               return result;
+       }
+
+       @Override
+       public String toString() {
+               return "Timer{" +
+                               "timestamp=" + timestamp +
+                               ", key=" + key +
+                               ", namespace=" + namespace +
+                               '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
new file mode 100644
index 0000000..805f9d4
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for working with time and timers.
+ *
+ * <p>This is the internal version of {@link 
org.apache.flink.streaming.api.TimerService}
+ * that allows to specify a key and a namespace to which timers should be 
scoped.
+ *
+ * @param <N> Type of the namespace to which timers are scoped.
+ */
+@Internal
+public interface InternalTimerService<N> {
+
+       /** Returns the current processing time. */
+       long currentProcessingTime();
+
+       /** Returns the current event-time watermark. */
+       long currentWatermark();
+
+       /**
+        * Registers a timer to be fired when processing time passes the given 
time. The namespace
+        * you pass here will be provided when the timer fires.
+        */
+       void registerProcessingTimeTimer(N namespace, long time);
+
+       /**
+        * Deletes the timer for the given key and namespace.
+        */
+       void deleteProcessingTimeTimer(N namespace, long time);
+
+       /**
+        * Registers a timer to be fired when processing time passes the given 
time. The namespace
+        * you pass here will be provided when the timer fires.
+        */
+       void registerEventTimeTimer(N namespace, long time);
+
+       /**
+        * Deletes the timer for the given key and namespace.
+        */
+       void deleteEventTimeTimer(N namespace, long time);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContext.java
new file mode 100644
index 0000000..e0fd493
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContext.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+/**
+ * Inteface for setting and querying the current key of keyed operations.
+ *
+ * <p>This is mainly used by the timer system to query the key when creating 
timers
+ * and to set the correct key context when firing a timer.
+ */
+public interface KeyContext {
+
+       void setCurrentKey(Object key);
+
+       Object getCurrentKey();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
index de1f8d3..2df95ca 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 @Internal
@@ -38,9 +37,4 @@ public class StreamFilter<IN> extends 
AbstractUdfStreamOperator<IN, FilterFuncti
                        output.collect(element);
                }
        }
-
-       @Override
-       public void processWatermark(Watermark mark) throws Exception {
-               output.emitWatermark(mark);
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
index ec7b713..c3ad260 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 @Internal
@@ -39,7 +38,7 @@ public class StreamFlatMap<IN, OUT>
        @Override
        public void open() throws Exception {
                super.open();
-               collector = new TimestampedCollector<OUT>(output);
+               collector = new TimestampedCollector<>(output);
        }
 
        @Override
@@ -47,9 +46,4 @@ public class StreamFlatMap<IN, OUT>
                collector.setTimestamp(element);
                userFunction.flatMap(element.getValue(), collector);
        }
-
-       @Override
-       public void processWatermark(Watermark mark) throws Exception {
-               output.emitWatermark(mark);
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index 7bd7380..86fd8e4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 @Internal
@@ -92,11 +91,6 @@ public class StreamGroupedFold<IN, OUT, KEY>
        }
 
        @Override
-       public void processWatermark(Watermark mark) throws Exception {
-               output.emitWatermark(mark);
-       }
-
-       @Override
        public void setOutputType(TypeInformation<OUT> outTypeInfo, 
ExecutionConfig executionConfig) {
                outTypeSerializer = 
outTypeInfo.createSerializer(executionConfig);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index 229c254..48b9c2d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 @Internal
@@ -64,10 +63,4 @@ public class StreamGroupedReduce<IN> extends 
AbstractUdfStreamOperator<IN, Reduc
                        output.collect(element.replace(value));
                }
        }
-
-       @Override
-       public void processWatermark(Watermark mark) throws Exception {
-               output.emitWatermark(mark);
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
index a505001..6755bc0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 @Internal
@@ -38,9 +37,4 @@ public class StreamMap<IN, OUT>
        public void processElement(StreamRecord<IN> element) throws Exception {
                
output.collect(element.replace(userFunction.map(element.getValue())));
        }
-
-       @Override
-       public void processWatermark(Watermark mark) throws Exception {
-               output.emitWatermark(mark);
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
index 9c2242f..ef51d8e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 @Internal
@@ -58,9 +57,4 @@ public class StreamProject<IN, OUT extends Tuple>
                super.open();
                outTuple = outSerializer.createInstance();
        }
-
-       @Override
-       public void processWatermark(Watermark mark) throws Exception {
-               output.emitWatermark(mark);
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
index bd0f574..e238566 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -40,11 +39,6 @@ public class StreamSink<IN> extends 
AbstractUdfStreamOperator<Object, SinkFuncti
        }
 
        @Override
-       public void processWatermark(Watermark mark) throws Exception {
-               // ignore it for now, we are a sink, after all
-       }
-
-       @Override
        protected void reportOrForwardLatencyMarker(LatencyMarker maker) {
                // all operators are tracking latencies
                this.latencyGauge.reportLatency(maker, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
new file mode 100644
index 0000000..962f264
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+@Internal
+public class StreamTimelyFlatMap<K, IN, OUT>
+               extends AbstractUdfStreamOperator<OUT, 
TimelyFlatMapFunction<IN, OUT>>
+               implements OneInputStreamOperator<IN, OUT>, Triggerable<K, 
VoidNamespace> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final TypeSerializer<K> keySerializer;
+
+       private transient TimestampedCollector<OUT> collector;
+
+       private transient TimerService timerService;
+
+       public StreamTimelyFlatMap(TypeSerializer<K> keySerializer, 
TimelyFlatMapFunction<IN, OUT> flatMapper) {
+               super(flatMapper);
+
+               this.keySerializer = keySerializer;
+
+               chainingStrategy = ChainingStrategy.ALWAYS;
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               collector = new TimestampedCollector<>(output);
+
+               InternalTimerService<VoidNamespace> internalTimerService =
+                               getInternalTimerService("user-timers", 
keySerializer, VoidNamespaceSerializer.INSTANCE, this);
+
+               this.timerService = new 
SimpleTimerService(internalTimerService);
+       }
+
+       @Override
+       public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
+               collector.setAbsoluteTimestamp(timer.getTimestamp());
+               userFunction.onTimer(timer.getTimestamp(), 
TimeDomain.EVENT_TIME, timerService, collector);
+       }
+
+       @Override
+       public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) 
throws Exception {
+               collector.setAbsoluteTimestamp(timer.getTimestamp());
+               userFunction.onTimer(timer.getTimestamp(), 
TimeDomain.PROCESSING_TIME, timerService, collector);
+       }
+
+       @Override
+       public void processElement(StreamRecord<IN> element) throws Exception {
+               collector.setTimestamp(element);
+               userFunction.flatMap(element.getValue(), timerService, 
collector);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java
new file mode 100644
index 0000000..36e9ad1
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for things that can be called by {@link InternalTimerService}.
+ *
+ * @param <K> Type of the keys to which timers are scoped.
+ * @param <N> Type of the namespace to which timers are scoped.
+ */
+@Internal
+public interface Triggerable<K, N> {
+
+       /**
+        * Invoked when an event-time timer fires.
+        */
+       void onEventTime(InternalTimer<K, N> timer) throws Exception;
+
+       /**
+        * Invoked when a processing-time timer fires.
+        */
+       void onProcessingTime(InternalTimer<K, N> timer) throws Exception ;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
index 580a860..ee58a0a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
@@ -22,7 +22,6 @@ import 
org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 @Internal
@@ -34,12 +33,6 @@ public class CoStreamFlatMap<IN1, IN2, OUT>
 
        private transient TimestampedCollector<OUT> collector;
 
-       // We keep track of watermarks from both inputs, the combined input is 
the minimum
-       // Once the minimum advances we emit a new watermark for downstream 
operators
-       private long combinedWatermark = Long.MIN_VALUE;
-       private long input1Watermark = Long.MIN_VALUE;
-       private long input2Watermark = Long.MIN_VALUE;
-
        public CoStreamFlatMap(CoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
                super(flatMapper);
        }
@@ -63,26 +56,6 @@ public class CoStreamFlatMap<IN1, IN2, OUT>
                userFunction.flatMap2(element.getValue(), collector);
        }
 
-       @Override
-       public void processWatermark1(Watermark mark) throws Exception {
-               input1Watermark = mark.getTimestamp();
-               long newMin = Math.min(input1Watermark, input2Watermark);
-               if (newMin > combinedWatermark) {
-                       combinedWatermark = newMin;
-                       output.emitWatermark(new Watermark(combinedWatermark));
-               }
-       }
-
-       @Override
-       public void processWatermark2(Watermark mark) throws Exception {
-               input2Watermark = mark.getTimestamp();
-               long newMin = Math.min(input1Watermark, input2Watermark);
-               if (newMin > combinedWatermark) {
-                       combinedWatermark = newMin;
-                       output.emitWatermark(new Watermark(combinedWatermark));
-               }
-       }
-
        protected TimestampedCollector<OUT> getCollector() {
                return collector;
        }

Reply via email to