Repository: flink
Updated Branches:
  refs/heads/master 38785a007 -> 159986292


[FLINK-8560] Add KeyedProcessFunction exposing key in onTimer().

This closes #5481.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/15998629
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/15998629
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/15998629

Branch: refs/heads/master
Commit: 159986292e35a71737bcc434d5f20f385973fafa
Parents: 38785a0
Author: Bowen Li <bowenl...@gmail.com>
Authored: Thu Feb 15 21:37:44 2018 +0100
Committer: kkloudas <kklou...@gmail.com>
Committed: Tue Mar 6 16:52:50 2018 +0100

----------------------------------------------------------------------
 docs/dev/stream/operators/process_function.md   |  29 +-
 ...KeyedProcessOperatorWithWatermarkDelay.scala |   6 +-
 .../runtime/harness/NonWindowHarnessTest.scala  |   6 +-
 .../runtime/harness/OverWindowHarnessTest.scala |  16 +-
 .../SortProcessFunctionHarnessTest.scala        |   6 +-
 .../streaming/api/datastream/KeyedStream.java   |  72 ++-
 .../api/functions/KeyedProcessFunction.java     | 130 +++++
 .../api/operators/KeyedProcessOperator.java     |  46 +-
 .../operators/LegacyKeyedProcessOperator.java   | 178 +++++++
 .../flink/streaming/api/DataStreamTest.java     |  43 +-
 .../api/operators/KeyedProcessOperatorTest.java |  82 ++--
 .../LegacyKeyedProcessOperatorTest.java         | 483 +++++++++++++++++++
 .../flink/streaming/api/scala/KeyedStream.scala |  37 +-
 .../streaming/api/scala/DataStreamTest.scala    |  41 +-
 14 files changed, 1078 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/docs/dev/stream/operators/process_function.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/process_function.md 
b/docs/dev/stream/operators/process_function.md
index a52c5bf..d967983 100644
--- a/docs/dev/stream/operators/process_function.md
+++ b/docs/dev/stream/operators/process_function.md
@@ -242,4 +242,31 @@ class CountWithTimeoutFunction extends 
ProcessFunction[(String, String), (String
 the current processing time as event-time timestamp. This behavior is very 
subtle and might not be noticed by users. Well, it's
 harmful because processing-time timestamps are indeterministic and not aligned 
with watermarks. Besides, user-implemented logic
 depends on this wrong timestamp highly likely is unintendedly faulty. So we've 
decided to fix it. Upon upgrading to 1.4.0, Flink jobs
-that are using this incorrect event-time timestamp will fail, and users should 
adapt their jobs to the correct logic.
\ No newline at end of file
+that are using this incorrect event-time timestamp will fail, and users should 
adapt their jobs to the correct logic.
+
+## The KeyedProcessFunction
+
+`KeyedProcessFunction`, as an extension of `ProcessFunction`, gives access to 
the key of timers in its `onTimer(...)`
+method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+@Override
+public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) 
throws Exception {
+    K key = ctx.getCurrentKey();
+    // ...
+}
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+override def onTimer(timestamp: Long, ctx: OnTimerContext, out: 
Collector[OUT]): Unit = {
+  var key = ctx.getCurrentKey
+  // ...
+}
+{% endhighlight %}
+</div>
+</div>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelay.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelay.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelay.scala
index 74b4773..f63bdb5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelay.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelay.scala
@@ -19,16 +19,16 @@
 package org.apache.flink.table.runtime.operators
 
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
 import org.apache.flink.streaming.api.watermark.Watermark
 
 /**
-  * A [[KeyedProcessOperator]] that supports holding back watermarks with a 
static delay.
+  * A [[LegacyKeyedProcessOperator]] that supports holding back watermarks 
with a static delay.
   */
 class KeyedProcessOperatorWithWatermarkDelay[KEY, IN, OUT](
     private val function: ProcessFunction[IN, OUT],
     private var watermarkDelay: Long = 0L)
-  extends KeyedProcessOperator[KEY, IN, OUT](function) {
+  extends LegacyKeyedProcessOperator[KEY, IN, OUT](function) {
 
   /** emits watermark without delay */
   def emitWithoutDelay(mark: Watermark): Unit = output.emitWatermark(mark)

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
index ad50761..5c31cb2 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
 
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.runtime.aggregate._
@@ -39,7 +39,7 @@ class NonWindowHarnessTest extends HarnessTestBase {
   @Test
   def testNonWindow(): Unit = {
 
-    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
       new GroupAggProcessFunction(
         genSumAggFunction,
         sumAggregationStateType,
@@ -99,7 +99,7 @@ class NonWindowHarnessTest extends HarnessTestBase {
   @Test
   def testNonWindowWithRetract(): Unit = {
 
-    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
       new GroupAggProcessFunction(
         genSumAggFunction,
         sumAggregationStateType,

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index def1972..6f6fc0e 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
 
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.table.api.{StreamQueryConfig, Types}
 import org.apache.flink.table.runtime.aggregate._
@@ -40,7 +40,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testProcTimeBoundedRowsOver(): Unit = {
 
-    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
       new ProcTimeBoundedRowsOver(
         genMinMaxAggFunction,
         2,
@@ -141,7 +141,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testProcTimeBoundedRangeOver(): Unit = {
 
-    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
       new ProcTimeBoundedRangeOver(
         genMinMaxAggFunction,
         4000,
@@ -250,7 +250,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testProcTimeUnboundedOver(): Unit = {
 
-    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
       new ProcTimeUnboundedOver(
         genMinMaxAggFunction,
         minMaxAggregationStateType,
@@ -342,7 +342,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testRowTimeBoundedRangeOver(): Unit = {
 
-    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
       new RowTimeBoundedRangeOver(
         genMinMaxAggFunction,
         minMaxAggregationStateType,
@@ -492,7 +492,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testRowTimeBoundedRowsOver(): Unit = {
 
-    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
       new RowTimeBoundedRowsOver(
         genMinMaxAggFunction,
         minMaxAggregationStateType,
@@ -640,7 +640,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testRowTimeUnboundedRangeOver(): Unit = {
 
-    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
       new RowTimeUnboundedRangeOver(
         genMinMaxAggFunction,
         minMaxAggregationStateType,
@@ -776,7 +776,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testRowTimeUnboundedRowsOver(): Unit = {
 
-    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
       new RowTimeUnboundedRowsOver(
         genMinMaxAggFunction,
         minMaxAggregationStateType,

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
index 9490039..457bde2 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.java.typeutils.runtime.RowComparator
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import 
org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, 
TestHarnessUtil}
@@ -71,7 +71,7 @@ class SortProcessFunctionHarnessTest {
     
     val inputCRowType = CRowTypeInfo(rT)
     
-    val processFunction = new KeyedProcessOperator[Integer,CRow,CRow](
+    val processFunction = new LegacyKeyedProcessOperator[Integer,CRow,CRow](
       new ProcTimeSortProcessFunction(
         inputCRowType,
         collectionRowComparator))
@@ -170,7 +170,7 @@ class SortProcessFunctionHarnessTest {
 
     val inputCRowType = CRowTypeInfo(rT)
 
-    val processFunction = new KeyedProcessOperator[Integer,CRow,CRow](
+    val processFunction = new LegacyKeyedProcessOperator[Integer,CRow,CRow](
       new RowTimeSortProcessFunction(
         inputCRowType,
         4,

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 7beaa03..a948ae2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -37,6 +37,7 @@ import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
@@ -46,6 +47,7 @@ import 
org.apache.flink.streaming.api.functions.query.QueryableValueStateOperato
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamGroupedFold;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
@@ -272,8 +274,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
        }
 
        /**
-        * Applies the given {@link ProcessFunction} on the input stream, 
thereby
-        * creating a transformed output stream.
+        * Applies the given {@link ProcessFunction} on the input stream, 
thereby creating a transformed output stream.
         *
         * <p>The function will be called for every element in the input 
streams and can produce zero
         * or more output elements. Contrary to the {@link 
DataStream#flatMap(FlatMapFunction)}
@@ -286,7 +287,10 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         * @param <R> The type of elements emitted by the {@code 
ProcessFunction}.
         *
         * @return The transformed {@link DataStream}.
+        *
+        * @deprecated Use {@link KeyedStream#process(KeyedProcessFunction)}
         */
+       @Deprecated
        @Override
        @PublicEvolving
        public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> 
processFunction) {
@@ -306,8 +310,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
        }
 
        /**
-        * Applies the given {@link ProcessFunction} on the input stream, 
thereby
-        * creating a transformed output stream.
+        * Applies the given {@link ProcessFunction} on the input stream, 
thereby creating a transformed output stream.
         *
         * <p>The function will be called for every element in the input 
streams and can produce zero
         * or more output elements. Contrary to the {@link 
DataStream#flatMap(FlatMapFunction)}
@@ -321,19 +324,76 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         * @param <R> The type of elements emitted by the {@code 
ProcessFunction}.
         *
         * @return The transformed {@link DataStream}.
+        *
+        * @deprecated Use {@link KeyedStream#process(KeyedProcessFunction, 
TypeInformation)}
         */
+       @Deprecated
        @Override
        @Internal
        public <R> SingleOutputStreamOperator<R> process(
                        ProcessFunction<T, R> processFunction,
                        TypeInformation<R> outputType) {
 
-               KeyedProcessOperator<KEY, T, R> operator =
-                               new 
KeyedProcessOperator<>(clean(processFunction));
+               LegacyKeyedProcessOperator<KEY, T, R> operator = new 
LegacyKeyedProcessOperator<>(clean(processFunction));
 
                return transform("Process", outputType, operator);
        }
 
+       /**
+        * Applies the given {@link KeyedProcessFunction} on the input stream, 
thereby creating a transformed output stream.
+        *
+        * <p>The function will be called for every element in the input 
streams and can produce zero
+        * or more output elements. Contrary to the {@link 
DataStream#flatMap(FlatMapFunction)}
+        * function, this function can also query the time and set timers. When 
reacting to the firing
+        * of set timers the function can directly emit elements and/or 
register yet more timers.
+        *
+        * @param keyedProcessFunction The {@link KeyedProcessFunction} that is 
called for each element in the stream.
+        *
+        * @param <R> The type of elements emitted by the {@code 
KeyedProcessFunction}.
+        *
+        * @return The transformed {@link DataStream}.
+        */
+       @PublicEvolving
+       public <R> SingleOutputStreamOperator<R> 
process(KeyedProcessFunction<KEY, T, R> keyedProcessFunction) {
+
+               TypeInformation<R> outType = 
TypeExtractor.getUnaryOperatorReturnType(
+                               keyedProcessFunction,
+                               KeyedProcessFunction.class,
+                               1,
+                               2,
+                               TypeExtractor.NO_INDEX,
+                               TypeExtractor.NO_INDEX,
+                               getType(),
+                               Utils.getCallLocationName(),
+                               true);
+
+               return process(keyedProcessFunction, outType);
+       }
+
+       /**
+        * Applies the given {@link KeyedProcessFunction} on the input stream, 
thereby creating a transformed output stream.
+        *
+        * <p>The function will be called for every element in the input 
streams and can produce zero
+        * or more output elements. Contrary to the {@link 
DataStream#flatMap(FlatMapFunction)}
+        * function, this function can also query the time and set timers. When 
reacting to the firing
+        * of set timers the function can directly emit elements and/or 
register yet more timers.
+        *
+        * @param keyedProcessFunction The {@link KeyedProcessFunction} that is 
called for each element in the stream.
+        *
+        * @param outputType {@link TypeInformation} for the result type of the 
function.
+        *
+        * @param <R> The type of elements emitted by the {@code 
KeyedProcessFunction}.
+        *
+        * @return The transformed {@link DataStream}.
+        */
+       @Internal
+       public <R> SingleOutputStreamOperator<R> process(
+                       KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
+                       TypeInformation<R> outputType) {
+
+               KeyedProcessOperator<KEY, T, R> operator = new 
KeyedProcessOperator<>(clean(keyedProcessFunction));
+               return transform("KeyedProcess", outputType, operator);
+       }
 
        // 
------------------------------------------------------------------------
        //  Windowing

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
new file mode 100644
index 0000000..a03480b
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * A keyed function that processes elements of a stream.
+ *
+ * <p>For every element in the input stream {@link #processElement(Object, 
Context, Collector)}
+ * is invoked. This can produce zero or more elements as output. 
Implementations can also
+ * query the time and set timers through the provided {@link Context}. For 
firing timers
+ * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can 
again produce
+ * zero or more elements as output and register further timers.
+ *
+ * <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to 
a key) is only
+ * available if the {@code KeyedProcessFunction} is applied on a {@code 
KeyedStream}.
+ *
+ * <p><b>NOTE:</b> A {@code KeyedProcessFunction} is always a
+ * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, 
access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} is always 
available and setup and
+ * teardown methods can be implemented. See
+ * {@link 
org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
+ *
+ * @param <K> Type of the key.
+ * @param <I> Type of the input elements.
+ * @param <O> Type of the output elements.
+ */
+@PublicEvolving
+public abstract class KeyedProcessFunction<K, I, O> extends 
AbstractRichFunction {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Process one element from the input stream.
+        *
+        * <p>This function can output zero or more elements using the {@link 
Collector} parameter
+        * and also update internal state or set timers using the {@link 
Context} parameter.
+        *
+        * @param value The input value.
+        * @param ctx A {@link Context} that allows querying the timestamp of 
the element and getting
+        *            a {@link TimerService} for registering timers and 
querying the time. The
+        *            context is only valid during the invocation of this 
method, do not store it.
+        * @param out The collector for returning result values.
+        *
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
+        */
+       public abstract void processElement(I value, Context ctx, Collector<O> 
out) throws Exception;
+
+       /**
+        * Called when a timer set using {@link TimerService} fires.
+        *
+        * @param timestamp The timestamp of the firing timer.
+        * @param ctx An {@link OnTimerContext} that allows querying the 
timestamp, the {@link TimeDomain}, and the key
+        *            of the firing timer and getting a {@link TimerService} 
for registering timers and querying the time.
+        *            The context is only valid during the invocation of this 
method, do not store it.
+        * @param out The collector for returning result values.
+        *
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
+        */
+       public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> 
out) throws Exception {}
+
+       /**
+        * Information available in an invocation of {@link 
#processElement(Object, Context, Collector)}
+        * or {@link #onTimer(long, OnTimerContext, Collector)}.
+        */
+       public abstract class Context {
+
+               /**
+                * Timestamp of the element currently being processed or 
timestamp of a firing timer.
+                *
+                * <p>This might be {@code null}, for example if the time 
characteristic of your program
+                * is set to {@link 
org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+                */
+               public abstract Long timestamp();
+
+               /**
+                * A {@link TimerService} for querying time and registering 
timers.
+                */
+               public abstract TimerService timerService();
+
+               /**
+                * Emits a record to the side output identified by the {@link 
OutputTag}.
+                *
+                * @param outputTag the {@code OutputTag} that identifies the 
side output to emit to.
+                * @param value The record to emit.
+                */
+               public abstract <X> void output(OutputTag<X> outputTag, X 
value);
+       }
+
+       /**
+        * Information available in an invocation of {@link #onTimer(long, 
OnTimerContext, Collector)}.
+        */
+       public abstract class OnTimerContext extends Context {
+               /**
+                * The {@link TimeDomain} of the firing timer.
+                */
+               public abstract TimeDomain timeDomain();
+
+               /**
+                * Get key of the firing timer.
+                */
+               public abstract K getCurrentKey();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
index 6501a9d..b74fdf3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.SimpleTimerService;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.OutputTag;
 
@@ -31,12 +31,11 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for 
executing keyed
- * {@link ProcessFunction ProcessFunctions}.
+ * A {@link StreamOperator} for executing {@link KeyedProcessFunction 
KeyedProcessFunctions}.
  */
 @Internal
 public class KeyedProcessOperator<K, IN, OUT>
-               extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
+               extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, 
IN, OUT>>
                implements OneInputStreamOperator<IN, OUT>, Triggerable<K, 
VoidNamespace> {
 
        private static final long serialVersionUID = 1L;
@@ -47,7 +46,7 @@ public class KeyedProcessOperator<K, IN, OUT>
 
        private transient OnTimerContextImpl onTimerContext;
 
-       public KeyedProcessOperator(ProcessFunction<IN, OUT> function) {
+       public KeyedProcessOperator(KeyedProcessFunction<K, IN, OUT> function) {
                super(function);
 
                chainingStrategy = ChainingStrategy.ALWAYS;
@@ -70,21 +69,13 @@ public class KeyedProcessOperator<K, IN, OUT>
        @Override
        public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
                collector.setAbsoluteTimestamp(timer.getTimestamp());
-               onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
-               onTimerContext.timer = timer;
-               userFunction.onTimer(timer.getTimestamp(), onTimerContext, 
collector);
-               onTimerContext.timeDomain = null;
-               onTimerContext.timer = null;
+               invokeUserFunction(TimeDomain.EVENT_TIME, timer);
        }
 
        @Override
        public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) 
throws Exception {
                collector.eraseTimestamp();
-               onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
-               onTimerContext.timer = timer;
-               userFunction.onTimer(timer.getTimestamp(), onTimerContext, 
collector);
-               onTimerContext.timeDomain = null;
-               onTimerContext.timer = null;
+               invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
        }
 
        @Override
@@ -95,13 +86,23 @@ public class KeyedProcessOperator<K, IN, OUT>
                context.element = null;
        }
 
-       private class ContextImpl extends ProcessFunction<IN, OUT>.Context {
+       private void invokeUserFunction(
+                       TimeDomain timeDomain,
+                       InternalTimer<K, VoidNamespace> timer) throws Exception 
{
+               onTimerContext.timeDomain = timeDomain;
+               onTimerContext.timer = timer;
+               userFunction.onTimer(timer.getTimestamp(), onTimerContext, 
collector);
+               onTimerContext.timeDomain = null;
+               onTimerContext.timer = null;
+       }
+
+       private class ContextImpl extends KeyedProcessFunction<K, IN, 
OUT>.Context {
 
                private final TimerService timerService;
 
                private StreamRecord<IN> element;
 
-               ContextImpl(ProcessFunction<IN, OUT> function, TimerService 
timerService) {
+               ContextImpl(KeyedProcessFunction<K, IN, OUT> function, 
TimerService timerService) {
                        function.super();
                        this.timerService = checkNotNull(timerService);
                }
@@ -132,15 +133,15 @@ public class KeyedProcessOperator<K, IN, OUT>
                }
        }
 
-       private class OnTimerContextImpl extends ProcessFunction<IN, 
OUT>.OnTimerContext{
+       private class OnTimerContextImpl extends KeyedProcessFunction<K, IN, 
OUT>.OnTimerContext {
 
                private final TimerService timerService;
 
                private TimeDomain timeDomain;
 
-               private InternalTimer<?, VoidNamespace> timer;
+               private InternalTimer<K, VoidNamespace> timer;
 
-               OnTimerContextImpl(ProcessFunction<IN, OUT> function, 
TimerService timerService) {
+               OnTimerContextImpl(KeyedProcessFunction<K, IN, OUT> function, 
TimerService timerService) {
                        function.super();
                        this.timerService = checkNotNull(timerService);
                }
@@ -170,5 +171,10 @@ public class KeyedProcessOperator<K, IN, OUT>
                        checkState(timeDomain != null);
                        return timeDomain;
                }
+
+               @Override
+               public K getCurrentKey() {
+                       return timer.getKey();
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/LegacyKeyedProcessOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/LegacyKeyedProcessOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/LegacyKeyedProcessOperator.java
new file mode 100644
index 0000000..8481c46
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/LegacyKeyedProcessOperator.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link StreamOperator} for executing keyed {@link ProcessFunction 
ProcessFunctions}.
+ *
+ * @deprecated Replaced by {@link KeyedProcessOperator} which takes {@code 
KeyedProcessFunction}
+ */
+@Deprecated
+@Internal
+public class LegacyKeyedProcessOperator<K, IN, OUT>
+               extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
+               implements OneInputStreamOperator<IN, OUT>, Triggerable<K, 
VoidNamespace> {
+
+       private static final long serialVersionUID = 1L;
+
+       private transient TimestampedCollector<OUT> collector;
+
+       private transient ContextImpl context;
+
+       private transient OnTimerContextImpl onTimerContext;
+
+       public LegacyKeyedProcessOperator(ProcessFunction<IN, OUT> function) {
+               super(function);
+
+               chainingStrategy = ChainingStrategy.ALWAYS;
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               collector = new TimestampedCollector<>(output);
+
+               InternalTimerService<VoidNamespace> internalTimerService =
+                               getInternalTimerService("user-timers", 
VoidNamespaceSerializer.INSTANCE, this);
+
+               TimerService timerService = new 
SimpleTimerService(internalTimerService);
+
+               context = new ContextImpl(userFunction, timerService);
+               onTimerContext = new OnTimerContextImpl(userFunction, 
timerService);
+       }
+
+       @Override
+       public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
+               collector.setAbsoluteTimestamp(timer.getTimestamp());
+               invokeUserFunction(TimeDomain.EVENT_TIME, timer);
+       }
+
+       @Override
+       public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) 
throws Exception {
+               collector.eraseTimestamp();
+               invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
+       }
+
+       @Override
+       public void processElement(StreamRecord<IN> element) throws Exception {
+               collector.setTimestamp(element);
+               context.element = element;
+               userFunction.processElement(element.getValue(), context, 
collector);
+               context.element = null;
+       }
+
+       private void invokeUserFunction(
+                       TimeDomain timeDomain,
+                       InternalTimer<K, VoidNamespace> timer) throws Exception 
{
+               onTimerContext.timeDomain = timeDomain;
+               onTimerContext.timer = timer;
+               userFunction.onTimer(timer.getTimestamp(), onTimerContext, 
collector);
+               onTimerContext.timeDomain = null;
+               onTimerContext.timer = null;
+       }
+
+       private class ContextImpl extends ProcessFunction<IN, OUT>.Context {
+
+               private final TimerService timerService;
+
+               private StreamRecord<IN> element;
+
+               ContextImpl(ProcessFunction<IN, OUT> function, TimerService 
timerService) {
+                       function.super();
+                       this.timerService = checkNotNull(timerService);
+               }
+
+               @Override
+               public Long timestamp() {
+                       checkState(element != null);
+
+                       if (element.hasTimestamp()) {
+                               return element.getTimestamp();
+                       } else {
+                               return null;
+                       }
+               }
+
+               @Override
+               public TimerService timerService() {
+                       return timerService;
+               }
+
+               @Override
+               public <X> void output(OutputTag<X> outputTag, X value) {
+                       if (outputTag == null) {
+                               throw new IllegalArgumentException("OutputTag 
must not be null.");
+                       }
+
+                       output.collect(outputTag, new StreamRecord<>(value, 
element.getTimestamp()));
+               }
+       }
+
+       private class OnTimerContextImpl extends ProcessFunction<IN, 
OUT>.OnTimerContext{
+
+               private final TimerService timerService;
+
+               private TimeDomain timeDomain;
+
+               private InternalTimer<?, VoidNamespace> timer;
+
+               OnTimerContextImpl(ProcessFunction<IN, OUT> function, 
TimerService timerService) {
+                       function.super();
+                       this.timerService = checkNotNull(timerService);
+               }
+
+               @Override
+               public Long timestamp() {
+                       checkState(timer != null);
+                       return timer.getTimestamp();
+               }
+
+               @Override
+               public TimerService timerService() {
+                       return timerService;
+               }
+
+               @Override
+               public <X> void output(OutputTag<X> outputTag, X value) {
+                       if (outputTag == null) {
+                               throw new IllegalArgumentException("OutputTag 
must not be null.");
+                       }
+
+                       output.collect(outputTag, new StreamRecord<>(value, 
timer.getTimestamp()));
+               }
+
+               @Override
+               public TimeDomain timeDomain() {
+                       checkState(timeDomain != null);
+                       return timeDomain;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index ec8a134..4fa3fc8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -49,6 +49,7 @@ import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
@@ -61,6 +62,7 @@ import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -689,11 +691,11 @@ public class DataStreamTest extends TestLogger {
        }
 
        /**
-        * Verify that a {@link KeyedStream#process(ProcessFunction)} call is 
correctly translated to
-        * an operator.
+        * Verify that a {@link KeyedStream#process(ProcessFunction)} call is 
correctly translated to an operator.
         */
        @Test
-       public void testKeyedProcessTranslation() {
+       @Deprecated
+       public void testKeyedStreamProcessTranslation() {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                DataStreamSource<Long> src = env.generateSequence(0, 0);
 
@@ -724,12 +726,43 @@ public class DataStreamTest extends TestLogger {
                processed.addSink(new DiscardingSink<Integer>());
 
                assertEquals(processFunction, 
getFunctionForDataStream(processed));
+               assertTrue(getOperatorForDataStream(processed) instanceof 
LegacyKeyedProcessOperator);
+       }
+
+       /**
+        * Verify that a {@link KeyedStream#process(KeyedProcessFunction)} call 
is correctly translated to an operator.
+        */
+       @Test
+       public void testKeyedStreamKeyedProcessTranslation() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               DataStreamSource<Long> src = env.generateSequence(0, 0);
+
+               KeyedProcessFunction<Long, Long, Integer> keyedProcessFunction 
= new KeyedProcessFunction<Long, Long, Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void processElement(Long value, Context ctx, 
Collector<Integer> out) throws Exception {
+                               // Do nothing
+                       }
+
+                       @Override
+                       public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<Integer> out) throws Exception {
+                               // Do nothing
+                       }
+               };
+
+               DataStream<Integer> processed = src
+                               .keyBy(new IdentityKeySelector<Long>())
+                               .process(keyedProcessFunction);
+
+               processed.addSink(new DiscardingSink<Integer>());
+
+               assertEquals(keyedProcessFunction, 
getFunctionForDataStream(processed));
                assertTrue(getOperatorForDataStream(processed) instanceof 
KeyedProcessOperator);
        }
 
        /**
-        * Verify that a {@link DataStream#process(ProcessFunction)} call is 
correctly translated to
-        * an operator.
+        * Verify that a {@link DataStream#process(ProcessFunction)} call is 
correctly translated to an operator.
         */
        @Test
        public void testProcessTranslation() {

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
index e1986f3..c5f478c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
@@ -111,8 +111,10 @@ public class KeyedProcessOperatorTest extends TestLogger {
        @Test
        public void testEventTimeTimers() throws Exception {
 
+               final int expectedKey = 17;
+
                KeyedProcessOperator<Integer, Integer, Integer> operator =
-                               new KeyedProcessOperator<>(new 
TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
+                               new KeyedProcessOperator<>(new 
TriggeringFlatMapFunction(TimeDomain.EVENT_TIME, expectedKey));
 
                OneInputStreamOperatorTestHarness<Integer, Integer> testHarness 
=
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -122,14 +124,14 @@ public class KeyedProcessOperatorTest extends TestLogger {
 
                testHarness.processWatermark(new Watermark(0));
 
-               testHarness.processElement(new StreamRecord<>(17, 42L));
+               testHarness.processElement(new StreamRecord<>(expectedKey, 
42L));
 
                testHarness.processWatermark(new Watermark(5));
 
                ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
                expectedOutput.add(new Watermark(0L));
-               expectedOutput.add(new StreamRecord<>(17, 42L));
+               expectedOutput.add(new StreamRecord<>(expectedKey, 42L));
                expectedOutput.add(new StreamRecord<>(1777, 5L));
                expectedOutput.add(new Watermark(5L));
 
@@ -141,8 +143,10 @@ public class KeyedProcessOperatorTest extends TestLogger {
        @Test
        public void testProcessingTimeTimers() throws Exception {
 
+               final int expectedKey = 17;
+
                KeyedProcessOperator<Integer, Integer, Integer> operator =
-                               new KeyedProcessOperator<>(new 
TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
+                               new KeyedProcessOperator<>(new 
TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME, expectedKey));
 
                OneInputStreamOperatorTestHarness<Integer, Integer> testHarness 
=
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -150,13 +154,13 @@ public class KeyedProcessOperatorTest extends TestLogger {
                testHarness.setup();
                testHarness.open();
 
-               testHarness.processElement(new StreamRecord<>(17));
+               testHarness.processElement(new StreamRecord<>(expectedKey));
 
                testHarness.setProcessingTime(5);
 
                ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
-               expectedOutput.add(new StreamRecord<>(17));
+               expectedOutput.add(new StreamRecord<>(expectedKey));
                expectedOutput.add(new StreamRecord<>(1777));
 
                TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
@@ -243,8 +247,10 @@ public class KeyedProcessOperatorTest extends TestLogger {
        @Test
        public void testSnapshotAndRestore() throws Exception {
 
+               final int expectedKey = 5;
+
                KeyedProcessOperator<Integer, Integer, String> operator =
-                               new KeyedProcessOperator<>(new 
BothTriggeringFlatMapFunction());
+                               new KeyedProcessOperator<>(new 
BothTriggeringFlatMapFunction(expectedKey));
 
                OneInputStreamOperatorTestHarness<Integer, String> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -252,14 +258,14 @@ public class KeyedProcessOperatorTest extends TestLogger {
                testHarness.setup();
                testHarness.open();
 
-               testHarness.processElement(new StreamRecord<>(5, 12L));
+               testHarness.processElement(new StreamRecord<>(expectedKey, 
12L));
 
                // snapshot and restore from scratch
                OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
 
                testHarness.close();
 
-               operator = new KeyedProcessOperator<>(new 
BothTriggeringFlatMapFunction());
+               operator = new KeyedProcessOperator<>(new 
BothTriggeringFlatMapFunction(expectedKey));
 
                testHarness = new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
 
@@ -283,8 +289,7 @@ public class KeyedProcessOperatorTest extends TestLogger {
 
        @Test
        public void testNullOutputTagRefusal() throws Exception {
-               KeyedProcessOperator<Integer, Integer, String> operator =
-                       new KeyedProcessOperator<>(new 
NullOutputTagEmittingProcessFunction());
+               KeyedProcessOperator<Integer, Integer, String> operator = new 
KeyedProcessOperator<>(new NullOutputTagEmittingProcessFunction());
 
                OneInputStreamOperatorTestHarness<Integer, String> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(
@@ -307,8 +312,7 @@ public class KeyedProcessOperatorTest extends TestLogger {
         */
        @Test
        public void testSideOutput() throws Exception {
-               KeyedProcessOperator<Integer, Integer, String> operator =
-                       new KeyedProcessOperator<>(new 
SideOutputProcessFunction());
+               KeyedProcessOperator<Integer, Integer, String> operator = new 
KeyedProcessOperator<>(new SideOutputProcessFunction());
 
                OneInputStreamOperatorTestHarness<Integer, String> testHarness =
                        new KeyedOneInputStreamOperatorTestHarness<>(
@@ -346,7 +350,7 @@ public class KeyedProcessOperatorTest extends TestLogger {
                testHarness.close();
        }
 
-       private static class NullOutputTagEmittingProcessFunction extends 
ProcessFunction<Integer, String> {
+       private static class NullOutputTagEmittingProcessFunction extends 
KeyedProcessFunction<Integer, Integer, String> {
 
                @Override
                public void processElement(Integer value, Context ctx, 
Collector<String> out) throws Exception {
@@ -354,7 +358,7 @@ public class KeyedProcessOperatorTest extends TestLogger {
                }
        }
 
-       private static class SideOutputProcessFunction extends 
ProcessFunction<Integer, String> {
+       private static class SideOutputProcessFunction extends 
KeyedProcessFunction<Integer, Integer, String> {
 
                static final OutputTag<Integer> INTEGER_OUTPUT_TAG = new 
OutputTag<Integer>("int-out") {};
                static final OutputTag<Long> LONG_OUTPUT_TAG = new 
OutputTag<Long>("long-out") {};
@@ -377,19 +381,19 @@ public class KeyedProcessOperatorTest extends TestLogger {
                }
        }
 
-       private static class QueryingFlatMapFunction extends 
ProcessFunction<Integer, String> {
+       private static class QueryingFlatMapFunction extends 
KeyedProcessFunction<Integer, Integer, String> {
 
                private static final long serialVersionUID = 1L;
 
-               private final TimeDomain timeDomain;
+               private final TimeDomain expectedTimeDomain;
 
                public QueryingFlatMapFunction(TimeDomain timeDomain) {
-                       this.timeDomain = timeDomain;
+                       this.expectedTimeDomain = timeDomain;
                }
 
                @Override
                public void processElement(Integer value, Context ctx, 
Collector<String> out) throws Exception {
-                       if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+                       if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) {
                                out.collect(value + "TIME:" + 
ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
                        } else {
                                out.collect(value + "TIME:" + 
ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
@@ -401,23 +405,26 @@ public class KeyedProcessOperatorTest extends TestLogger {
                                long timestamp,
                                OnTimerContext ctx,
                                Collector<String> out) throws Exception {
+                       // Do nothing
                }
        }
 
-       private static class TriggeringFlatMapFunction extends 
ProcessFunction<Integer, Integer> {
+       private static class TriggeringFlatMapFunction extends 
KeyedProcessFunction<Integer, Integer, Integer> {
 
                private static final long serialVersionUID = 1L;
 
-               private final TimeDomain timeDomain;
+               private final TimeDomain expectedTimeDomain;
+               private final Integer expectedKey;
 
-               public TriggeringFlatMapFunction(TimeDomain timeDomain) {
-                       this.timeDomain = timeDomain;
+               public TriggeringFlatMapFunction(TimeDomain timeDomain, Integer 
expectedKey) {
+                       this.expectedTimeDomain = timeDomain;
+                       this.expectedKey = expectedKey;
                }
 
                @Override
                public void processElement(Integer value, Context ctx, 
Collector<Integer> out) throws Exception {
                        out.collect(value);
-                       if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+                       if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) {
                                
ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() 
+ 5);
                        } else {
                                
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + 5);
@@ -429,30 +436,30 @@ public class KeyedProcessOperatorTest extends TestLogger {
                                long timestamp,
                                OnTimerContext ctx,
                                Collector<Integer> out) throws Exception {
-
-                       assertEquals(this.timeDomain, ctx.timeDomain());
+                       assertEquals(expectedKey, ctx.getCurrentKey());
+                       assertEquals(expectedTimeDomain, ctx.timeDomain());
                        out.collect(1777);
                }
        }
 
-       private static class TriggeringStatefulFlatMapFunction extends 
ProcessFunction<Integer, String> {
+       private static class TriggeringStatefulFlatMapFunction extends 
KeyedProcessFunction<Integer, Integer, String> {
 
                private static final long serialVersionUID = 1L;
 
                private final ValueStateDescriptor<Integer> state =
                                new ValueStateDescriptor<>("seen-element", 
IntSerializer.INSTANCE);
 
-               private final TimeDomain timeDomain;
+               private final TimeDomain expectedTimeDomain;
 
                public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) 
{
-                       this.timeDomain = timeDomain;
+                       this.expectedTimeDomain = timeDomain;
                }
 
                @Override
                public void processElement(Integer value, Context ctx, 
Collector<String> out) throws Exception {
                        out.collect("INPUT:" + value);
                        getRuntimeContext().getState(state).update(value);
-                       if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+                       if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) {
                                
ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() 
+ 5);
                        } else {
                                
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + 5);
@@ -464,15 +471,21 @@ public class KeyedProcessOperatorTest extends TestLogger {
                                long timestamp,
                                OnTimerContext ctx,
                                Collector<String> out) throws Exception {
-                       assertEquals(this.timeDomain, ctx.timeDomain());
+                       assertEquals(expectedTimeDomain, ctx.timeDomain());
                        out.collect("STATE:" + 
getRuntimeContext().getState(state).value());
                }
        }
 
-       private static class BothTriggeringFlatMapFunction extends 
ProcessFunction<Integer, String> {
+       private static class BothTriggeringFlatMapFunction extends 
KeyedProcessFunction<Integer, Integer, String> {
 
                private static final long serialVersionUID = 1L;
 
+               private final Integer expectedKey;
+
+               public BothTriggeringFlatMapFunction(Integer expectedKey) {
+                       this.expectedKey = expectedKey;
+               }
+
                @Override
                public void processElement(Integer value, Context ctx, 
Collector<String> out) throws Exception {
                        ctx.timerService().registerProcessingTimeTimer(5);
@@ -484,6 +497,8 @@ public class KeyedProcessOperatorTest extends TestLogger {
                                long timestamp,
                                OnTimerContext ctx,
                                Collector<String> out) throws Exception {
+                       assertEquals(expectedKey, ctx.getCurrentKey());
+
                        if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
                                out.collect("EVENT:1777");
                        } else {
@@ -491,5 +506,4 @@ public class KeyedProcessOperatorTest extends TestLogger {
                        }
                }
        }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/LegacyKeyedProcessOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/LegacyKeyedProcessOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/LegacyKeyedProcessOperatorTest.java
new file mode 100644
index 0000000..970bb35
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/LegacyKeyedProcessOperatorTest.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link LegacyKeyedProcessOperator}.
+ */
+@Deprecated
+public class LegacyKeyedProcessOperatorTest extends TestLogger {
+
+       @Rule
+       public ExpectedException expectedException = ExpectedException.none();
+
+       @Test
+       public void testTimestampAndWatermarkQuerying() throws Exception {
+
+               LegacyKeyedProcessOperator<Integer, Integer, String> operator =
+                               new LegacyKeyedProcessOperator<>(new 
QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
+
+               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processWatermark(new Watermark(17));
+               testHarness.processElement(new StreamRecord<>(5, 12L));
+
+               testHarness.processWatermark(new Watermark(42));
+               testHarness.processElement(new StreamRecord<>(6, 13L));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new Watermark(17L));
+               expectedOutput.add(new StreamRecord<>("5TIME:17 TS:12", 12L));
+               expectedOutput.add(new Watermark(42L));
+               expectedOutput.add(new StreamRecord<>("6TIME:42 TS:13", 13L));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       @Test
+       public void testTimestampAndProcessingTimeQuerying() throws Exception {
+
+               LegacyKeyedProcessOperator<Integer, Integer, String> operator =
+                               new LegacyKeyedProcessOperator<>(new 
QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.setProcessingTime(17);
+               testHarness.processElement(new StreamRecord<>(5));
+
+               testHarness.setProcessingTime(42);
+               testHarness.processElement(new StreamRecord<>(6));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new StreamRecord<>("5TIME:17 TS:null"));
+               expectedOutput.add(new StreamRecord<>("6TIME:42 TS:null"));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       @Test
+       public void testEventTimeTimers() throws Exception {
+
+               LegacyKeyedProcessOperator<Integer, Integer, Integer> operator =
+                               new LegacyKeyedProcessOperator<>(new 
TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
+
+               OneInputStreamOperatorTestHarness<Integer, Integer> testHarness 
=
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processWatermark(new Watermark(0));
+
+               testHarness.processElement(new StreamRecord<>(17, 42L));
+
+               testHarness.processWatermark(new Watermark(5));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new Watermark(0L));
+               expectedOutput.add(new StreamRecord<>(17, 42L));
+               expectedOutput.add(new StreamRecord<>(1777, 5L));
+               expectedOutput.add(new Watermark(5L));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       @Test
+       public void testProcessingTimeTimers() throws Exception {
+
+               LegacyKeyedProcessOperator<Integer, Integer, Integer> operator =
+                               new LegacyKeyedProcessOperator<>(new 
TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+               OneInputStreamOperatorTestHarness<Integer, Integer> testHarness 
=
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processElement(new StreamRecord<>(17));
+
+               testHarness.setProcessingTime(5);
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new StreamRecord<>(17));
+               expectedOutput.add(new StreamRecord<>(1777));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       /**
+        * Verifies that we don't have leakage between different keys.
+        */
+       @Test
+       public void testEventTimeTimerWithState() throws Exception {
+
+               LegacyKeyedProcessOperator<Integer, Integer, String> operator =
+                               new LegacyKeyedProcessOperator<>(new 
TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
+
+               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processWatermark(new Watermark(1));
+               testHarness.processElement(new StreamRecord<>(17, 0L)); // 
should set timer for 6
+
+               testHarness.processWatermark(new Watermark(2));
+               testHarness.processElement(new StreamRecord<>(42, 1L)); // 
should set timer for 7
+
+               testHarness.processWatermark(new Watermark(6));
+               testHarness.processWatermark(new Watermark(7));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new Watermark(1L));
+               expectedOutput.add(new StreamRecord<>("INPUT:17", 0L));
+               expectedOutput.add(new Watermark(2L));
+               expectedOutput.add(new StreamRecord<>("INPUT:42", 1L));
+               expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+               expectedOutput.add(new Watermark(6L));
+               expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+               expectedOutput.add(new Watermark(7L));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       /**
+        * Verifies that we don't have leakage between different keys.
+        */
+       @Test
+       public void testProcessingTimeTimerWithState() throws Exception {
+
+               LegacyKeyedProcessOperator<Integer, Integer, String> operator =
+                               new LegacyKeyedProcessOperator<>(new 
TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.setProcessingTime(1);
+               testHarness.processElement(new StreamRecord<>(17)); // should 
set timer for 6
+
+               testHarness.setProcessingTime(2);
+               testHarness.processElement(new StreamRecord<>(42)); // should 
set timer for 7
+
+               testHarness.setProcessingTime(6);
+               testHarness.setProcessingTime(7);
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new StreamRecord<>("INPUT:17"));
+               expectedOutput.add(new StreamRecord<>("INPUT:42"));
+               expectedOutput.add(new StreamRecord<>("STATE:17"));
+               expectedOutput.add(new StreamRecord<>("STATE:42"));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       @Test
+       public void testSnapshotAndRestore() throws Exception {
+
+               LegacyKeyedProcessOperator<Integer, Integer, String> operator =
+                               new LegacyKeyedProcessOperator<>(new 
BothTriggeringFlatMapFunction());
+
+               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processElement(new StreamRecord<>(5, 12L));
+
+               // snapshot and restore from scratch
+               OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
+
+               testHarness.close();
+
+               operator = new LegacyKeyedProcessOperator<>(new 
BothTriggeringFlatMapFunction());
+
+               testHarness = new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.initializeState(snapshot);
+               testHarness.open();
+
+               testHarness.setProcessingTime(5);
+               testHarness.processWatermark(new Watermark(6));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new StreamRecord<>("PROC:1777"));
+               expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
+               expectedOutput.add(new Watermark(6));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       @Test
+       public void testNullOutputTagRefusal() throws Exception {
+               LegacyKeyedProcessOperator<Integer, Integer, String> operator =
+                       new LegacyKeyedProcessOperator<>(new 
NullOutputTagEmittingProcessFunction());
+
+               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+                       new KeyedOneInputStreamOperatorTestHarness<>(
+                               operator, new IdentityKeySelector<>(), 
BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.setProcessingTime(17);
+               try {
+                       
expectedException.expect(IllegalArgumentException.class);
+                       testHarness.processElement(new StreamRecord<>(5));
+               } finally {
+                       testHarness.close();
+               }
+       }
+
+       /**
+        * This also verifies that the timestamps ouf side-emitted records is 
correct.
+        */
+       @Test
+       public void testSideOutput() throws Exception {
+               LegacyKeyedProcessOperator<Integer, Integer, String> operator =
+                       new LegacyKeyedProcessOperator<>(new 
SideOutputProcessFunction());
+
+               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+                       new KeyedOneInputStreamOperatorTestHarness<>(
+                               operator, new IdentityKeySelector<>(), 
BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processElement(new StreamRecord<>(42, 17L /* 
timestamp */));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new StreamRecord<>("IN:42", 17L /* timestamp 
*/));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               ConcurrentLinkedQueue<StreamRecord<Integer>> 
expectedIntSideOutput = new ConcurrentLinkedQueue<>();
+               expectedIntSideOutput.add(new StreamRecord<>(42, 17L /* 
timestamp */));
+               ConcurrentLinkedQueue<StreamRecord<Integer>> intSideOutput =
+                       
testHarness.getSideOutput(SideOutputProcessFunction.INTEGER_OUTPUT_TAG);
+               TestHarnessUtil.assertOutputEquals(
+                       "Side output was not correct.",
+                       expectedIntSideOutput,
+                       intSideOutput);
+
+               ConcurrentLinkedQueue<StreamRecord<Long>> 
expectedLongSideOutput = new ConcurrentLinkedQueue<>();
+               expectedLongSideOutput.add(new StreamRecord<>(42L, 17L /* 
timestamp */));
+               ConcurrentLinkedQueue<StreamRecord<Long>> longSideOutput =
+                       
testHarness.getSideOutput(SideOutputProcessFunction.LONG_OUTPUT_TAG);
+               TestHarnessUtil.assertOutputEquals(
+                       "Side output was not correct.",
+                       expectedLongSideOutput,
+                       longSideOutput);
+
+               testHarness.close();
+       }
+
+       private static class NullOutputTagEmittingProcessFunction extends 
ProcessFunction<Integer, String> {
+
+               @Override
+               public void processElement(Integer value, Context ctx, 
Collector<String> out) throws Exception {
+                       ctx.output(null, value);
+               }
+       }
+
+       private static class SideOutputProcessFunction extends 
ProcessFunction<Integer, String> {
+
+               static final OutputTag<Integer> INTEGER_OUTPUT_TAG = new 
OutputTag<Integer>("int-out") {};
+               static final OutputTag<Long> LONG_OUTPUT_TAG = new 
OutputTag<Long>("long-out") {};
+
+               @Override
+               public void processElement(Integer value, Context ctx, 
Collector<String> out) throws Exception {
+                       out.collect("IN:" + value);
+
+                       ctx.output(INTEGER_OUTPUT_TAG, value);
+                       ctx.output(LONG_OUTPUT_TAG, value.longValue());
+               }
+       }
+
+       private static class IdentityKeySelector<T> implements KeySelector<T, 
T> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public T getKey(T value) throws Exception {
+                       return value;
+               }
+       }
+
+       private static class QueryingFlatMapFunction extends 
ProcessFunction<Integer, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final TimeDomain timeDomain;
+
+               public QueryingFlatMapFunction(TimeDomain timeDomain) {
+                       this.timeDomain = timeDomain;
+               }
+
+               @Override
+               public void processElement(Integer value, Context ctx, 
Collector<String> out) throws Exception {
+                       if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+                               out.collect(value + "TIME:" + 
ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
+                       } else {
+                               out.collect(value + "TIME:" + 
ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
+                       }
+               }
+
+               @Override
+               public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<String> out) throws Exception {
+                       // Do nothing
+               }
+       }
+
+       private static class TriggeringFlatMapFunction extends 
ProcessFunction<Integer, Integer> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final TimeDomain timeDomain;
+
+               public TriggeringFlatMapFunction(TimeDomain timeDomain) {
+                       this.timeDomain = timeDomain;
+               }
+
+               @Override
+               public void processElement(Integer value, Context ctx, 
Collector<Integer> out) throws Exception {
+                       out.collect(value);
+                       if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+                               
ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() 
+ 5);
+                       } else {
+                               
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + 5);
+                       }
+               }
+
+               @Override
+               public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<Integer> out) throws Exception {
+                       assertEquals(this.timeDomain, ctx.timeDomain());
+                       out.collect(1777);
+               }
+       }
+
+       private static class TriggeringStatefulFlatMapFunction extends 
ProcessFunction<Integer, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final ValueStateDescriptor<Integer> state =
+                               new ValueStateDescriptor<>("seen-element", 
IntSerializer.INSTANCE);
+
+               private final TimeDomain timeDomain;
+
+               public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) 
{
+                       this.timeDomain = timeDomain;
+               }
+
+               @Override
+               public void processElement(Integer value, Context ctx, 
Collector<String> out) throws Exception {
+                       out.collect("INPUT:" + value);
+                       getRuntimeContext().getState(state).update(value);
+                       if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+                               
ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() 
+ 5);
+                       } else {
+                               
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + 5);
+                       }
+               }
+
+               @Override
+               public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<String> out) throws Exception {
+                       assertEquals(this.timeDomain, ctx.timeDomain());
+                       out.collect("STATE:" + 
getRuntimeContext().getState(state).value());
+               }
+       }
+
+       private static class BothTriggeringFlatMapFunction extends 
ProcessFunction<Integer, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void processElement(Integer value, Context ctx, 
Collector<String> out) throws Exception {
+                       ctx.timerService().registerProcessingTimeTimer(5);
+                       ctx.timerService().registerEventTimeTimer(6);
+               }
+
+               @Override
+               public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<String> out) throws Exception {
+                       if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
+                               out.collect("EVENT:1777");
+                       } else {
+                               out.collect("PROC:1777");
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 49bdbd9..51def98 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -24,7 +24,7 @@ import 
org.apache.flink.api.common.state.{FoldingStateDescriptor, ReducingStateD
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.streaming.api.datastream.{QueryableStateStream, 
DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => 
WindowedJavaStream}
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, 
ProcessFunction}
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import 
org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, 
SumAggregator}
 import 
org.apache.flink.streaming.api.functions.query.{QueryableAppendingStateOperator,
 QueryableValueStateOperator}
@@ -66,9 +66,11 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) 
extends DataStream[T]
     * function, this function can also query the time and set timers. When 
reacting to the firing
     * of set timers the function can directly emit elements and/or register 
yet more timers.
     *
-    * @param processFunction The [[ProcessFunction]] that is called for each 
element
-    *                   in the stream.
+    * @param processFunction The [[ProcessFunction]] that is called for each 
element in the stream.
+    *
+    * @deprecated Use [[KeyedStream#process(KeyedProcessFunction)]]
     */
+  @deprecated("will be removed in a future version")
   @PublicEvolving
   override def process[R: TypeInformation](
     processFunction: ProcessFunction[T, R]): DataStream[R] = {
@@ -79,7 +81,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) 
extends DataStream[T]
 
     asScalaStream(javaStream.process(processFunction, 
implicitly[TypeInformation[R]]))
   }
-  
+
+  /**
+   * Applies the given [[KeyedProcessFunction]] on the input stream, thereby
+   * creating a transformed output stream.
+   *
+   * The function will be called for every element in the stream and can 
produce
+   * zero or more output. The function can also query the time and set timers. 
When
+   * reacting to the firing of set timers the function can emit yet more 
elements.
+   *
+   * The function will be called for every element in the input streams and 
can produce zero
+   * or more output elements. Contrary to the 
[[DataStream#flatMap(FlatMapFunction)]]
+   * function, this function can also query the time and set timers. When 
reacting to the firing
+   * of set timers the function can directly emit elements and/or register yet 
more timers.
+   *
+   * @param keyedProcessFunction The [[KeyedProcessFunction]] that is called 
for each element
+   *                             in the stream.
+   */
+  @PublicEvolving
+  def process[R: TypeInformation](
+    keyedProcessFunction: KeyedProcessFunction[K, T, R]): DataStream[R] = {
+
+    if (keyedProcessFunction == null) {
+      throw new NullPointerException("KeyedProcessFunction must not be null.")
+    }
+
+    asScalaStream(javaStream.process(keyedProcessFunction, 
implicitly[TypeInformation[R]]))
+  }
+
   // ------------------------------------------------------------------------
   //  Windowing
   // ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/15998629/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index e2c5b41..51ec5e3 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -23,13 +23,11 @@ import java.lang
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, 
ProcessFunction}
 import org.apache.flink.streaming.api.functions.co.CoMapFunction
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink
 import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
-import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, 
KeyedProcessOperator, ProcessOperator, StreamOperator}
+import org.apache.flink.streaming.api.operators._
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
-import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, 
PurgingTrigger}
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
 import org.apache.flink.streaming.runtime.partitioner._
@@ -430,10 +428,11 @@ class DataStreamTest extends AbstractTestBase {
   }
 
   /**
-   * Verify that a [[KeyedStream.process()]] call is correctly translated to 
an operator.
+   * Verify that a [[KeyedStream.process(ProcessFunction)]] call is correctly
+   * translated to an operator.
    */
   @Test
-  def testKeyedProcessTranslation(): Unit = {
+  def testKeyedStreamProcessTranslation(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     val src = env.generateSequence(0, 0)
@@ -448,12 +447,36 @@ class DataStreamTest extends AbstractTestBase {
     val flatMapped = src.keyBy(x => x).process(processFunction)
 
     assert(processFunction == getFunctionForDataStream(flatMapped))
+    
assert(getOperatorForDataStream(flatMapped).isInstanceOf[LegacyKeyedProcessOperator[_,
 _, _]])
+  }
+
+  /**
+   * Verify that a [[KeyedStream.process(KeyedProcessFunction)]] call is 
correctly
+   * translated to an operator.
+   */
+  @Test
+  def testKeyedStreamKeyedProcessTranslation(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val src = env.generateSequence(0, 0)
+
+    val keyedProcessFunction = new KeyedProcessFunction[Long, Long, Int] {
+      override def processElement(
+                                   value: Long,
+                                   ctx: KeyedProcessFunction[Long, Long, 
Int]#Context,
+                                   out: Collector[Int]): Unit = ???
+    }
+
+    val flatMapped = src.keyBy(x => x).process(keyedProcessFunction)
+
+    assert(keyedProcessFunction == getFunctionForDataStream(flatMapped))
     
assert(getOperatorForDataStream(flatMapped).isInstanceOf[KeyedProcessOperator[_,
 _, _]])
   }
 
   /**
-    * Verify that a [[DataStream.process()]] call is correctly translated to 
an operator.
-    */
+   * Verify that a [[DataStream.process(ProcessFunction)]] call is correctly
+   * translated to an operator.
+   */
   @Test
   def testProcessTranslation(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
@@ -473,7 +496,6 @@ class DataStreamTest extends AbstractTestBase {
     
assert(getOperatorForDataStream(flatMapped).isInstanceOf[ProcessOperator[_, _]])
   }
 
-
   @Test def operatorTest() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
@@ -688,5 +710,4 @@ class DataStreamTest extends AbstractTestBase {
     m.print()
     m.getId
   }
-
 }

Reply via email to