Repository: flink
Updated Branches:
  refs/heads/master 3a27f55cf -> 910f733f5


http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
new file mode 100644
index 0000000..74fd044
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.functions.RichProcessFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link ProcessOperator}.
+ */
+public class ProcessOperatorTest extends TestLogger {
+
+       @Test
+       public void testTimestampAndWatermarkQuerying() throws Exception {
+
+               ProcessOperator<Integer, Integer, String> operator =
+                               new ProcessOperator<>(new 
QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
+
+               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processWatermark(new Watermark(17));
+               testHarness.processElement(new StreamRecord<>(5, 12L));
+
+               testHarness.processWatermark(new Watermark(42));
+               testHarness.processElement(new StreamRecord<>(6, 13L));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new Watermark(17L));
+               expectedOutput.add(new StreamRecord<>("5TIME:17 TS:12", 12L));
+               expectedOutput.add(new Watermark(42L));
+               expectedOutput.add(new StreamRecord<>("6TIME:42 TS:13", 13L));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       @Test
+       public void testTimestampAndProcessingTimeQuerying() throws Exception {
+
+               ProcessOperator<Integer, Integer, String> operator =
+                               new ProcessOperator<>(new 
QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.setProcessingTime(17);
+               testHarness.processElement(new StreamRecord<>(5));
+
+               testHarness.setProcessingTime(42);
+               testHarness.processElement(new StreamRecord<>(6));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new StreamRecord<>("5TIME:17 TS:null"));
+               expectedOutput.add(new StreamRecord<>("6TIME:42 TS:null"));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       @Test
+       public void testEventTimeTimers() throws Exception {
+
+               ProcessOperator<Integer, Integer, Integer> operator =
+                               new ProcessOperator<>(new 
TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
+
+               OneInputStreamOperatorTestHarness<Integer, Integer> testHarness 
=
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processWatermark(new Watermark(0));
+
+               testHarness.processElement(new StreamRecord<>(17, 42L));
+
+               testHarness.processWatermark(new Watermark(5));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new Watermark(0L));
+               expectedOutput.add(new StreamRecord<>(17, 42L));
+               expectedOutput.add(new StreamRecord<>(1777, 5L));
+               expectedOutput.add(new Watermark(5L));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       @Test
+       public void testProcessingTimeTimers() throws Exception {
+
+               ProcessOperator<Integer, Integer, Integer> operator =
+                               new ProcessOperator<>(new 
TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+               OneInputStreamOperatorTestHarness<Integer, Integer> testHarness 
=
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processElement(new StreamRecord<>(17));
+
+               testHarness.setProcessingTime(5);
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new StreamRecord<>(17));
+               expectedOutput.add(new StreamRecord<>(1777, 5L));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       /**
+        * Verifies that we don't have leakage between different keys.
+        */
+       @Test
+       public void testEventTimeTimerWithState() throws Exception {
+
+               ProcessOperator<Integer, Integer, String> operator =
+                               new ProcessOperator<>(new 
TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
+
+               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processWatermark(new Watermark(1));
+               testHarness.processElement(new StreamRecord<>(17, 0L)); // 
should set timer for 6
+
+               testHarness.processWatermark(new Watermark(2));
+               testHarness.processElement(new StreamRecord<>(42, 1L)); // 
should set timer for 7
+
+               testHarness.processWatermark(new Watermark(6));
+               testHarness.processWatermark(new Watermark(7));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new Watermark(1L));
+               expectedOutput.add(new StreamRecord<>("INPUT:17", 0L));
+               expectedOutput.add(new Watermark(2L));
+               expectedOutput.add(new StreamRecord<>("INPUT:42", 1L));
+               expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+               expectedOutput.add(new Watermark(6L));
+               expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+               expectedOutput.add(new Watermark(7L));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       /**
+        * Verifies that we don't have leakage between different keys.
+        */
+       @Test
+       public void testProcessingTimeTimerWithState() throws Exception {
+
+               ProcessOperator<Integer, Integer, String> operator =
+                               new ProcessOperator<>(new 
TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.setProcessingTime(1);
+               testHarness.processElement(new StreamRecord<>(17)); // should 
set timer for 6
+
+               testHarness.setProcessingTime(2);
+               testHarness.processElement(new StreamRecord<>(42)); // should 
set timer for 7
+
+               testHarness.setProcessingTime(6);
+               testHarness.setProcessingTime(7);
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new StreamRecord<>("INPUT:17"));
+               expectedOutput.add(new StreamRecord<>("INPUT:42"));
+               expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+               expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       @Test
+       public void testSnapshotAndRestore() throws Exception {
+
+               ProcessOperator<Integer, Integer, String> operator =
+                               new ProcessOperator<>(new 
BothTriggeringFlatMapFunction());
+
+               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processElement(new StreamRecord<>(5, 12L));
+
+               // snapshot and restore from scratch
+               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+               testHarness.close();
+
+               operator = new ProcessOperator<>(new 
BothTriggeringFlatMapFunction());
+
+               testHarness = new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.initializeState(snapshot);
+               testHarness.open();
+
+               testHarness.setProcessingTime(5);
+               testHarness.processWatermark(new Watermark(6));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
+               expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
+               expectedOutput.add(new Watermark(6));
+
+               System.out.println("GOT: " + testHarness.getOutput());
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       private static class IdentityKeySelector<T> implements KeySelector<T, 
T> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public T getKey(T value) throws Exception {
+                       return value;
+               }
+       }
+
+       private static class QueryingFlatMapFunction implements 
ProcessFunction<Integer, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final TimeDomain timeDomain;
+
+               public QueryingFlatMapFunction(TimeDomain timeDomain) {
+                       this.timeDomain = timeDomain;
+               }
+
+               @Override
+               public void processElement(Integer value, Context ctx, 
Collector<String> out) throws Exception {
+                       if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+                               out.collect(value + "TIME:" + 
ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
+                       } else {
+                               out.collect(value + "TIME:" + 
ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
+                       }
+               }
+
+               @Override
+               public void onTimer(
+                               long timestamp,
+                               OnTimerContext ctx,
+                               Collector<String> out) throws Exception {
+               }
+       }
+
+       private static class TriggeringFlatMapFunction implements 
ProcessFunction<Integer, Integer> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final TimeDomain timeDomain;
+
+               public TriggeringFlatMapFunction(TimeDomain timeDomain) {
+                       this.timeDomain = timeDomain;
+               }
+
+               @Override
+               public void processElement(Integer value, Context ctx, 
Collector<Integer> out) throws Exception {
+                       out.collect(value);
+                       if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+                               
ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() 
+ 5);
+                       } else {
+                               
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + 5);
+                       }
+               }
+
+               @Override
+               public void onTimer(
+                               long timestamp,
+                               OnTimerContext ctx,
+                               Collector<Integer> out) throws Exception {
+
+                       assertEquals(this.timeDomain, ctx.timeDomain());
+                       out.collect(1777);
+               }
+       }
+
+       private static class TriggeringStatefulFlatMapFunction extends 
RichProcessFunction<Integer, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final ValueStateDescriptor<Integer> state =
+                               new ValueStateDescriptor<>("seen-element", 
IntSerializer.INSTANCE,  null);
+
+               private final TimeDomain timeDomain;
+
+               public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) 
{
+                       this.timeDomain = timeDomain;
+               }
+
+               @Override
+               public void processElement(Integer value, Context ctx, 
Collector<String> out) throws Exception {
+                       out.collect("INPUT:" + value);
+                       getRuntimeContext().getState(state).update(value);
+                       if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+                               
ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() 
+ 5);
+                       } else {
+                               
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + 5);
+                       }
+               }
+
+               @Override
+               public void onTimer(
+                               long timestamp,
+                               OnTimerContext ctx,
+                               Collector<String> out) throws Exception {
+                       assertEquals(this.timeDomain, ctx.timeDomain());
+                       out.collect("STATE:" + 
getRuntimeContext().getState(state).value());
+               }
+       }
+
+       private static class BothTriggeringFlatMapFunction implements 
ProcessFunction<Integer, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void processElement(Integer value, Context ctx, 
Collector<String> out) throws Exception {
+                       ctx.timerService().registerProcessingTimeTimer(5);
+                       ctx.timerService().registerEventTimeTimer(6);
+               }
+
+               @Override
+               public void onTimer(
+                               long timestamp,
+                               OnTimerContext ctx,
+                               Collector<String> out) throws Exception {
+                       if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
+                               out.collect("EVENT:1777");
+                       } else {
+                               out.collect("PROC:1777");
+                       }
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
deleted file mode 100644
index 6080ddc..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.operators;
-
-
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction;
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests {@link StreamTimelyFlatMap}.
- */
-public class TimelyFlatMapTest extends TestLogger {
-
-       @Test
-       public void testTimestampAndWatermarkQuerying() throws Exception {
-
-               StreamTimelyFlatMap<Integer, Integer, String> operator =
-                               new StreamTimelyFlatMap<>(new 
QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
-
-               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.open();
-
-               testHarness.processWatermark(new Watermark(17));
-               testHarness.processElement(new StreamRecord<>(5, 12L));
-
-               testHarness.processWatermark(new Watermark(42));
-               testHarness.processElement(new StreamRecord<>(6, 13L));
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               expectedOutput.add(new Watermark(17L));
-               expectedOutput.add(new StreamRecord<>("5TIME:17 TS:12", 12L));
-               expectedOutput.add(new Watermark(42L));
-               expectedOutput.add(new StreamRecord<>("6TIME:42 TS:13", 13L));
-
-               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
-
-               testHarness.close();
-       }
-
-       @Test
-       public void testTimestampAndProcessingTimeQuerying() throws Exception {
-
-               StreamTimelyFlatMap<Integer, Integer, String> operator =
-                               new StreamTimelyFlatMap<>(new 
QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
-
-               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.open();
-
-               testHarness.setProcessingTime(17);
-               testHarness.processElement(new StreamRecord<>(5));
-
-               testHarness.setProcessingTime(42);
-               testHarness.processElement(new StreamRecord<>(6));
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               expectedOutput.add(new StreamRecord<>("5TIME:17 TS:null"));
-               expectedOutput.add(new StreamRecord<>("6TIME:42 TS:null"));
-
-               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
-
-               testHarness.close();
-       }
-
-       @Test
-       public void testEventTimeTimers() throws Exception {
-
-               StreamTimelyFlatMap<Integer, Integer, Integer> operator =
-                               new StreamTimelyFlatMap<>(new 
TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
-
-               OneInputStreamOperatorTestHarness<Integer, Integer> testHarness 
=
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.open();
-
-               testHarness.processWatermark(new Watermark(0));
-
-               testHarness.processElement(new StreamRecord<>(17, 42L));
-
-               testHarness.processWatermark(new Watermark(5));
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               expectedOutput.add(new Watermark(0L));
-               expectedOutput.add(new StreamRecord<>(17, 42L));
-               expectedOutput.add(new StreamRecord<>(1777, 5L));
-               expectedOutput.add(new Watermark(5L));
-
-               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
-
-               testHarness.close();
-       }
-
-       @Test
-       public void testProcessingTimeTimers() throws Exception {
-
-               StreamTimelyFlatMap<Integer, Integer, Integer> operator =
-                               new StreamTimelyFlatMap<>(new 
TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
-
-               OneInputStreamOperatorTestHarness<Integer, Integer> testHarness 
=
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.open();
-
-               testHarness.processElement(new StreamRecord<>(17));
-
-               testHarness.setProcessingTime(5);
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               expectedOutput.add(new StreamRecord<>(17));
-               expectedOutput.add(new StreamRecord<>(1777, 5L));
-
-               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
-
-               testHarness.close();
-       }
-
-       /**
-        * Verifies that we don't have leakage between different keys.
-        */
-       @Test
-       public void testEventTimeTimerWithState() throws Exception {
-
-               StreamTimelyFlatMap<Integer, Integer, String> operator =
-                               new StreamTimelyFlatMap<>(new 
TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
-
-               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.open();
-
-               testHarness.processWatermark(new Watermark(1));
-               testHarness.processElement(new StreamRecord<>(17, 0L)); // 
should set timer for 6
-
-               testHarness.processWatermark(new Watermark(2));
-               testHarness.processElement(new StreamRecord<>(42, 1L)); // 
should set timer for 7
-
-               testHarness.processWatermark(new Watermark(6));
-               testHarness.processWatermark(new Watermark(7));
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               expectedOutput.add(new Watermark(1L));
-               expectedOutput.add(new StreamRecord<>("INPUT:17", 0L));
-               expectedOutput.add(new Watermark(2L));
-               expectedOutput.add(new StreamRecord<>("INPUT:42", 1L));
-               expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
-               expectedOutput.add(new Watermark(6L));
-               expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
-               expectedOutput.add(new Watermark(7L));
-
-               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
-
-               testHarness.close();
-       }
-
-       /**
-        * Verifies that we don't have leakage between different keys.
-        */
-       @Test
-       public void testProcessingTimeTimerWithState() throws Exception {
-
-               StreamTimelyFlatMap<Integer, Integer, String> operator =
-                               new StreamTimelyFlatMap<>(new 
TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
-
-               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.open();
-
-               testHarness.setProcessingTime(1);
-               testHarness.processElement(new StreamRecord<>(17)); // should 
set timer for 6
-
-               testHarness.setProcessingTime(2);
-               testHarness.processElement(new StreamRecord<>(42)); // should 
set timer for 7
-
-               testHarness.setProcessingTime(6);
-               testHarness.setProcessingTime(7);
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               expectedOutput.add(new StreamRecord<>("INPUT:17"));
-               expectedOutput.add(new StreamRecord<>("INPUT:42"));
-               expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
-               expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
-
-               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
-
-               testHarness.close();
-       }
-
-       @Test
-       public void testSnapshotAndRestore() throws Exception {
-
-               StreamTimelyFlatMap<Integer, Integer, String> operator =
-                               new StreamTimelyFlatMap<>(new 
BothTriggeringFlatMapFunction());
-
-               OneInputStreamOperatorTestHarness<Integer, String> testHarness =
-                               new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.open();
-
-               testHarness.processElement(new StreamRecord<>(5, 12L));
-
-               // snapshot and restore from scratch
-               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
-
-               testHarness.close();
-
-               operator = new StreamTimelyFlatMap<>(new 
BothTriggeringFlatMapFunction());
-
-               testHarness = new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new 
IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.initializeState(snapshot);
-               testHarness.open();
-
-               testHarness.setProcessingTime(5);
-               testHarness.processWatermark(new Watermark(6));
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
-               expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
-               expectedOutput.add(new Watermark(6));
-
-               System.out.println("GOT: " + testHarness.getOutput());
-
-               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
-
-               testHarness.close();
-       }
-
-       private static class IdentityKeySelector<T> implements KeySelector<T, 
T> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public T getKey(T value) throws Exception {
-                       return value;
-               }
-       }
-
-       private static class QueryingFlatMapFunction implements 
TimelyFlatMapFunction<Integer, String> {
-
-               private static final long serialVersionUID = 1L;
-
-               private final TimeDomain timeDomain;
-
-               public QueryingFlatMapFunction(TimeDomain timeDomain) {
-                       this.timeDomain = timeDomain;
-               }
-
-               @Override
-               public void flatMap(Integer value, Context ctx, 
Collector<String> out) throws Exception {
-                       if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
-                               out.collect(value + "TIME:" + 
ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
-                       } else {
-                               out.collect(value + "TIME:" + 
ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
-                       }
-               }
-
-               @Override
-               public void onTimer(
-                               long timestamp,
-                               OnTimerContext ctx,
-                               Collector<String> out) throws Exception {
-               }
-       }
-
-       private static class TriggeringFlatMapFunction implements 
TimelyFlatMapFunction<Integer, Integer> {
-
-               private static final long serialVersionUID = 1L;
-
-               private final TimeDomain timeDomain;
-
-               public TriggeringFlatMapFunction(TimeDomain timeDomain) {
-                       this.timeDomain = timeDomain;
-               }
-
-               @Override
-               public void flatMap(Integer value, Context ctx, 
Collector<Integer> out) throws Exception {
-                       out.collect(value);
-                       if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
-                               
ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() 
+ 5);
-                       } else {
-                               
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + 5);
-                       }
-               }
-
-               @Override
-               public void onTimer(
-                               long timestamp,
-                               OnTimerContext ctx,
-                               Collector<Integer> out) throws Exception {
-
-                       assertEquals(this.timeDomain, ctx.timeDomain());
-                       out.collect(1777);
-               }
-       }
-
-       private static class TriggeringStatefulFlatMapFunction extends 
RichTimelyFlatMapFunction<Integer, String> {
-
-               private static final long serialVersionUID = 1L;
-
-               private final ValueStateDescriptor<Integer> state =
-                               new ValueStateDescriptor<>("seen-element", 
IntSerializer.INSTANCE,  null);
-
-               private final TimeDomain timeDomain;
-
-               public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) 
{
-                       this.timeDomain = timeDomain;
-               }
-
-               @Override
-               public void flatMap(Integer value, Context ctx, 
Collector<String> out) throws Exception {
-                       out.collect("INPUT:" + value);
-                       getRuntimeContext().getState(state).update(value);
-                       if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
-                               
ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() 
+ 5);
-                       } else {
-                               
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + 5);
-                       }
-               }
-
-               @Override
-               public void onTimer(
-                               long timestamp,
-                               OnTimerContext ctx,
-                               Collector<String> out) throws Exception {
-                       assertEquals(this.timeDomain, ctx.timeDomain());
-                       out.collect("STATE:" + 
getRuntimeContext().getState(state).value());
-               }
-       }
-
-       private static class BothTriggeringFlatMapFunction implements 
TimelyFlatMapFunction<Integer, String> {
-
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void flatMap(Integer value, Context ctx, 
Collector<String> out) throws Exception {
-                       ctx.timerService().registerProcessingTimeTimer(5);
-                       ctx.timerService().registerEventTimeTimer(6);
-               }
-
-               @Override
-               public void onTimer(
-                               long timestamp,
-                               OnTimerContext ctx,
-                               Collector<String> out) throws Exception {
-                       if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
-                               out.collect("EVENT:1777");
-                       } else {
-                               out.collect("PROC:1777");
-                       }
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
new file mode 100644
index 0000000..a449359
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators.co;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.functions.co.RichCoProcessFunction;
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link CoProcessOperator}.
+ */
+public class CoProcessOperatorTest extends TestLogger {
+
+       @Test
+       public void testTimestampAndWatermarkQuerying() throws Exception {
+
+               CoProcessOperator<String, Integer, String, String> operator =
+                               new CoProcessOperator<>(new 
WatermarkQueryingProcessFunction());
+
+               TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
+                               new KeyedTwoInputStreamOperatorTestHarness<>(
+                                               operator,
+                                               new IntToStringKeySelector<>(),
+                                               new 
IdentityKeySelector<String>(),
+                                               BasicTypeInfo.STRING_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processWatermark1(new Watermark(17));
+               testHarness.processWatermark2(new Watermark(17));
+               testHarness.processElement1(new StreamRecord<>(5, 12L));
+
+               testHarness.processWatermark1(new Watermark(42));
+               testHarness.processWatermark2(new Watermark(42));
+               testHarness.processElement2(new StreamRecord<>("6", 13L));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new Watermark(17L));
+               expectedOutput.add(new StreamRecord<>("5WM:17 TS:12", 12L));
+               expectedOutput.add(new Watermark(42L));
+               expectedOutput.add(new StreamRecord<>("6WM:42 TS:13", 13L));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       @Test
+       public void testTimestampAndProcessingTimeQuerying() throws Exception {
+
+               CoProcessOperator<String, Integer, String, String> operator =
+                               new CoProcessOperator<>(new 
ProcessingTimeQueryingProcessFunction());
+
+               TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
+                               new KeyedTwoInputStreamOperatorTestHarness<>(
+                                               operator,
+                                               new IntToStringKeySelector<>(),
+                                               new 
IdentityKeySelector<String>(),
+                                               BasicTypeInfo.STRING_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.setProcessingTime(17);
+               testHarness.processElement1(new StreamRecord<>(5));
+
+               testHarness.setProcessingTime(42);
+               testHarness.processElement2(new StreamRecord<>("6"));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new StreamRecord<>("5PT:17 TS:null"));
+               expectedOutput.add(new StreamRecord<>("6PT:42 TS:null"));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       @Test
+       public void testEventTimeTimers() throws Exception {
+
+               CoProcessOperator<String, Integer, String, String> operator =
+                               new CoProcessOperator<>(new 
EventTimeTriggeringProcessFunction());
+
+               TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
+                               new KeyedTwoInputStreamOperatorTestHarness<>(
+                                               operator,
+                                               new IntToStringKeySelector<>(),
+                                               new 
IdentityKeySelector<String>(),
+                                               BasicTypeInfo.STRING_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processElement1(new StreamRecord<>(17, 42L));
+               testHarness.processElement2(new StreamRecord<>("18", 42L));
+
+               testHarness.processWatermark1(new Watermark(5));
+               testHarness.processWatermark2(new Watermark(5));
+
+               testHarness.processWatermark1(new Watermark(6));
+               testHarness.processWatermark2(new Watermark(6));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L));
+               expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L));
+               expectedOutput.add(new StreamRecord<>("1777", 5L));
+               expectedOutput.add(new Watermark(5L));
+               expectedOutput.add(new StreamRecord<>("1777", 6L));
+               expectedOutput.add(new Watermark(6L));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       @Test
+       public void testProcessingTimeTimers() throws Exception {
+
+               CoProcessOperator<String, Integer, String, String> operator =
+                               new CoProcessOperator<>(new 
ProcessingTimeTriggeringProcessFunction());
+
+               TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
+                               new KeyedTwoInputStreamOperatorTestHarness<>(
+                                               operator,
+                                               new IntToStringKeySelector<>(),
+                                               new 
IdentityKeySelector<String>(),
+                                               BasicTypeInfo.STRING_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processElement1(new StreamRecord<>(17));
+               testHarness.processElement2(new StreamRecord<>("18"));
+
+               testHarness.setProcessingTime(5);
+               testHarness.setProcessingTime(6);
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new StreamRecord<>("INPUT1:17"));
+               expectedOutput.add(new StreamRecord<>("INPUT2:18"));
+               expectedOutput.add(new StreamRecord<>("1777", 5L));
+               expectedOutput.add(new StreamRecord<>("1777", 6L));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       /**
+        * Verifies that we don't have leakage between different keys.
+        */
+       @Test
+       public void testEventTimeTimerWithState() throws Exception {
+
+               CoProcessOperator<String, Integer, String, String> operator =
+                               new CoProcessOperator<>(new 
EventTimeTriggeringStatefulProcessFunction());
+
+               TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
+                               new KeyedTwoInputStreamOperatorTestHarness<>(
+                                               operator,
+                                               new IntToStringKeySelector<>(),
+                                               new 
IdentityKeySelector<String>(),
+                                               BasicTypeInfo.STRING_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processWatermark1(new Watermark(1));
+               testHarness.processWatermark2(new Watermark(1));
+               testHarness.processElement1(new StreamRecord<>(17, 0L)); // 
should set timer for 6
+
+               testHarness.processWatermark1(new Watermark(2));
+               testHarness.processWatermark2(new Watermark(2));
+               testHarness.processElement2(new StreamRecord<>("42", 1L)); // 
should set timer for 7
+
+               testHarness.processWatermark1(new Watermark(6));
+               testHarness.processWatermark2(new Watermark(6));
+
+               testHarness.processWatermark1(new Watermark(7));
+               testHarness.processWatermark2(new Watermark(7));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new Watermark(1L));
+               expectedOutput.add(new StreamRecord<>("INPUT1:17", 0L));
+               expectedOutput.add(new Watermark(2L));
+               expectedOutput.add(new StreamRecord<>("INPUT2:42", 1L));
+               expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+               expectedOutput.add(new Watermark(6L));
+               expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+               expectedOutput.add(new Watermark(7L));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       /**
+        * Verifies that we don't have leakage between different keys.
+        */
+       @Test
+       public void testProcessingTimeTimerWithState() throws Exception {
+
+               CoProcessOperator<String, Integer, String, String> operator =
+                               new CoProcessOperator<>(new 
ProcessingTimeTriggeringStatefulProcessFunction());
+
+               TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
+                               new KeyedTwoInputStreamOperatorTestHarness<>(
+                                               operator,
+                                               new IntToStringKeySelector<>(),
+                                               new 
IdentityKeySelector<String>(),
+                                               BasicTypeInfo.STRING_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.setProcessingTime(1);
+               testHarness.processElement1(new StreamRecord<>(17)); // should 
set timer for 6
+
+               testHarness.setProcessingTime(2);
+               testHarness.processElement2(new StreamRecord<>("42")); // 
should set timer for 7
+
+               testHarness.setProcessingTime(6);
+               testHarness.setProcessingTime(7);
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new StreamRecord<>("INPUT1:17"));
+               expectedOutput.add(new StreamRecord<>("INPUT2:42"));
+               expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+               expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+       @Test
+       public void testSnapshotAndRestore() throws Exception {
+
+               CoProcessOperator<String, Integer, String, String> operator =
+                               new CoProcessOperator<>(new 
BothTriggeringProcessFunction());
+
+               TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
+                               new KeyedTwoInputStreamOperatorTestHarness<>(
+                                               operator,
+                                               new IntToStringKeySelector<>(),
+                                               new 
IdentityKeySelector<String>(),
+                                               BasicTypeInfo.STRING_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processElement1(new StreamRecord<>(5, 12L));
+               testHarness.processElement2(new StreamRecord<>("5", 12L));
+
+               // snapshot and restore from scratch
+               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+               testHarness.close();
+
+               operator = new CoProcessOperator<>(new 
BothTriggeringProcessFunction());
+
+               testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
+                               operator,
+                               new IntToStringKeySelector<>(),
+                               new IdentityKeySelector<String>(),
+                               BasicTypeInfo.STRING_TYPE_INFO);
+
+               testHarness.setup();
+               testHarness.initializeState(snapshot);
+               testHarness.open();
+
+               testHarness.setProcessingTime(5);
+               testHarness.processWatermark1(new Watermark(6));
+               testHarness.processWatermark2(new Watermark(6));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
+               expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
+               expectedOutput.add(new Watermark(6));
+
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.close();
+       }
+
+
+       private static class IntToStringKeySelector<T> implements 
KeySelector<Integer, String> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public String getKey(Integer value) throws Exception {
+                       return "" + value;
+               }
+       }
+
+       private static class IdentityKeySelector<T> implements KeySelector<T, 
T> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public T getKey(T value) throws Exception {
+                       return value;
+               }
+       }
+
+       private static class WatermarkQueryingProcessFunction implements 
CoProcessFunction<Integer, String, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void processElement1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
+                       out.collect(value + "WM:" + 
ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
+               }
+
+               @Override
+               public void processElement2(String value, Context ctx, 
Collector<String> out) throws Exception {
+                       out.collect(value + "WM:" + 
ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
+               }
+
+               @Override
+               public void onTimer(
+                               long timestamp,
+                               OnTimerContext ctx,
+                               Collector<String> out) throws Exception {
+               }
+       }
+
+       private static class EventTimeTriggeringProcessFunction implements 
CoProcessFunction<Integer, String, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void processElement1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
+                       out.collect("INPUT1:" + value);
+                       ctx.timerService().registerEventTimeTimer(5);
+               }
+
+               @Override
+               public void processElement2(String value, Context ctx, 
Collector<String> out) throws Exception {
+                       out.collect("INPUT2:" + value);
+                       ctx.timerService().registerEventTimeTimer(6);
+               }
+
+               @Override
+               public void onTimer(
+                               long timestamp,
+                               OnTimerContext ctx,
+                               Collector<String> out) throws Exception {
+
+                       assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
+                       out.collect("" + 1777);
+               }
+       }
+
+       private static class EventTimeTriggeringStatefulProcessFunction extends 
RichCoProcessFunction<Integer, String, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final ValueStateDescriptor<String> state =
+                               new ValueStateDescriptor<>("seen-element", 
StringSerializer.INSTANCE, null);
+
+               @Override
+               public void processElement1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
+                       out.collect("INPUT1:" + value);
+                       getRuntimeContext().getState(state).update("" + value);
+                       
ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() 
+ 5);
+               }
+
+               @Override
+               public void processElement2(String value, Context ctx, 
Collector<String> out) throws Exception {
+                       out.collect("INPUT2:" + value);
+                       getRuntimeContext().getState(state).update(value);
+                       
ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() 
+ 5);
+               }
+
+
+               @Override
+               public void onTimer(
+                               long timestamp,
+                               OnTimerContext ctx,
+                               Collector<String> out) throws Exception {
+                       assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
+                       out.collect("STATE:" + 
getRuntimeContext().getState(state).value());
+               }
+       }
+
+       private static class ProcessingTimeTriggeringProcessFunction implements 
CoProcessFunction<Integer, String, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void processElement1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
+                       out.collect("INPUT1:" + value);
+                       ctx.timerService().registerProcessingTimeTimer(5);
+               }
+
+               @Override
+               public void processElement2(String value, Context ctx, 
Collector<String> out) throws Exception {
+                       out.collect("INPUT2:" + value);
+                       ctx.timerService().registerProcessingTimeTimer(6);
+               }
+
+               @Override
+               public void onTimer(
+                               long timestamp,
+                               OnTimerContext ctx,
+                               Collector<String> out) throws Exception {
+
+                       assertEquals(TimeDomain.PROCESSING_TIME, 
ctx.timeDomain());
+                       out.collect("" + 1777);
+               }
+       }
+
+       private static class ProcessingTimeQueryingProcessFunction implements 
CoProcessFunction<Integer, String, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void processElement1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
+                       out.collect(value + "PT:" + 
ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
+               }
+
+               @Override
+               public void processElement2(String value, Context ctx, 
Collector<String> out) throws Exception {
+                       out.collect(value + "PT:" + 
ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
+               }
+
+               @Override
+               public void onTimer(
+                               long timestamp,
+                               OnTimerContext ctx,
+                               Collector<String> out) throws Exception {
+               }
+       }
+
+       private static class ProcessingTimeTriggeringStatefulProcessFunction 
extends RichCoProcessFunction<Integer, String, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final ValueStateDescriptor<String> state =
+                               new ValueStateDescriptor<>("seen-element", 
StringSerializer.INSTANCE, null);
+
+               @Override
+               public void processElement1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
+                       out.collect("INPUT1:" + value);
+                       getRuntimeContext().getState(state).update("" + value);
+                       
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + 5);
+               }
+
+               @Override
+               public void processElement2(String value, Context ctx, 
Collector<String> out) throws Exception {
+                       out.collect("INPUT2:" + value);
+                       getRuntimeContext().getState(state).update(value);
+                       
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + 5);
+               }
+
+
+               @Override
+               public void onTimer(
+                               long timestamp,
+                               OnTimerContext ctx,
+                               Collector<String> out) throws Exception {
+                       assertEquals(TimeDomain.PROCESSING_TIME, 
ctx.timeDomain());
+                       out.collect("STATE:" + 
getRuntimeContext().getState(state).value());
+               }
+       }
+
+       private static class BothTriggeringProcessFunction implements 
CoProcessFunction<Integer, String, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void processElement1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
+                       ctx.timerService().registerEventTimeTimer(6);
+               }
+
+               @Override
+               public void processElement2(String value, Context ctx, 
Collector<String> out) throws Exception {
+                       ctx.timerService().registerProcessingTimeTimer(5);
+               }
+
+
+               @Override
+               public void onTimer(
+                               long timestamp,
+                               OnTimerContext ctx,
+                               Collector<String> out) throws Exception {
+                       if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
+                               out.collect("EVENT:1777");
+                       } else {
+                               out.collect("PROC:1777");
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
deleted file mode 100644
index 7c29631..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
+++ /dev/null
@@ -1,536 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.operators.co;
-
-
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction;
-import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests {@link CoStreamTimelyFlatMap}.
- */
-public class TimelyCoFlatMapTest extends TestLogger {
-
-       @Test
-       public void testTimestampAndWatermarkQuerying() throws Exception {
-
-               CoStreamTimelyFlatMap<String, Integer, String, String> operator 
=
-                               new CoStreamTimelyFlatMap<>(new 
WatermarkQueryingFlatMapFunction());
-
-               TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
-                               new KeyedTwoInputStreamOperatorTestHarness<>(
-                                               operator,
-                                               new IntToStringKeySelector<>(),
-                                               new 
IdentityKeySelector<String>(),
-                                               BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.open();
-
-               testHarness.processWatermark1(new Watermark(17));
-               testHarness.processWatermark2(new Watermark(17));
-               testHarness.processElement1(new StreamRecord<>(5, 12L));
-
-               testHarness.processWatermark1(new Watermark(42));
-               testHarness.processWatermark2(new Watermark(42));
-               testHarness.processElement2(new StreamRecord<>("6", 13L));
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               expectedOutput.add(new Watermark(17L));
-               expectedOutput.add(new StreamRecord<>("5WM:17 TS:12", 12L));
-               expectedOutput.add(new Watermark(42L));
-               expectedOutput.add(new StreamRecord<>("6WM:42 TS:13", 13L));
-
-               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
-
-               testHarness.close();
-       }
-
-       @Test
-       public void testTimestampAndProcessingTimeQuerying() throws Exception {
-
-               CoStreamTimelyFlatMap<String, Integer, String, String> operator 
=
-                               new CoStreamTimelyFlatMap<>(new 
ProcessingTimeQueryingFlatMapFunction());
-
-               TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
-                               new KeyedTwoInputStreamOperatorTestHarness<>(
-                                               operator,
-                                               new IntToStringKeySelector<>(),
-                                               new 
IdentityKeySelector<String>(),
-                                               BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.open();
-
-               testHarness.setProcessingTime(17);
-               testHarness.processElement1(new StreamRecord<>(5));
-
-               testHarness.setProcessingTime(42);
-               testHarness.processElement2(new StreamRecord<>("6"));
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               expectedOutput.add(new StreamRecord<>("5PT:17 TS:null"));
-               expectedOutput.add(new StreamRecord<>("6PT:42 TS:null"));
-
-               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
-
-               testHarness.close();
-       }
-
-       @Test
-       public void testEventTimeTimers() throws Exception {
-
-               CoStreamTimelyFlatMap<String, Integer, String, String> operator 
=
-                               new CoStreamTimelyFlatMap<>(new 
EventTimeTriggeringFlatMapFunction());
-
-               TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
-                               new KeyedTwoInputStreamOperatorTestHarness<>(
-                                               operator,
-                                               new IntToStringKeySelector<>(),
-                                               new 
IdentityKeySelector<String>(),
-                                               BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.open();
-
-               testHarness.processElement1(new StreamRecord<>(17, 42L));
-               testHarness.processElement2(new StreamRecord<>("18", 42L));
-
-               testHarness.processWatermark1(new Watermark(5));
-               testHarness.processWatermark2(new Watermark(5));
-
-               testHarness.processWatermark1(new Watermark(6));
-               testHarness.processWatermark2(new Watermark(6));
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L));
-               expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L));
-               expectedOutput.add(new StreamRecord<>("1777", 5L));
-               expectedOutput.add(new Watermark(5L));
-               expectedOutput.add(new StreamRecord<>("1777", 6L));
-               expectedOutput.add(new Watermark(6L));
-
-               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
-
-               testHarness.close();
-       }
-
-       @Test
-       public void testProcessingTimeTimers() throws Exception {
-
-               CoStreamTimelyFlatMap<String, Integer, String, String> operator 
=
-                               new CoStreamTimelyFlatMap<>(new 
ProcessingTimeTriggeringFlatMapFunction());
-
-               TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
-                               new KeyedTwoInputStreamOperatorTestHarness<>(
-                                               operator,
-                                               new IntToStringKeySelector<>(),
-                                               new 
IdentityKeySelector<String>(),
-                                               BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.open();
-
-               testHarness.processElement1(new StreamRecord<>(17));
-               testHarness.processElement2(new StreamRecord<>("18"));
-
-               testHarness.setProcessingTime(5);
-               testHarness.setProcessingTime(6);
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               expectedOutput.add(new StreamRecord<>("INPUT1:17"));
-               expectedOutput.add(new StreamRecord<>("INPUT2:18"));
-               expectedOutput.add(new StreamRecord<>("1777", 5L));
-               expectedOutput.add(new StreamRecord<>("1777", 6L));
-
-               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
-
-               testHarness.close();
-       }
-
-       /**
-        * Verifies that we don't have leakage between different keys.
-        */
-       @Test
-       public void testEventTimeTimerWithState() throws Exception {
-
-               CoStreamTimelyFlatMap<String, Integer, String, String> operator 
=
-                               new CoStreamTimelyFlatMap<>(new 
EventTimeTriggeringStatefulFlatMapFunction());
-
-               TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
-                               new KeyedTwoInputStreamOperatorTestHarness<>(
-                                               operator,
-                                               new IntToStringKeySelector<>(),
-                                               new 
IdentityKeySelector<String>(),
-                                               BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.open();
-
-               testHarness.processWatermark1(new Watermark(1));
-               testHarness.processWatermark2(new Watermark(1));
-               testHarness.processElement1(new StreamRecord<>(17, 0L)); // 
should set timer for 6
-
-               testHarness.processWatermark1(new Watermark(2));
-               testHarness.processWatermark2(new Watermark(2));
-               testHarness.processElement2(new StreamRecord<>("42", 1L)); // 
should set timer for 7
-
-               testHarness.processWatermark1(new Watermark(6));
-               testHarness.processWatermark2(new Watermark(6));
-
-               testHarness.processWatermark1(new Watermark(7));
-               testHarness.processWatermark2(new Watermark(7));
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               expectedOutput.add(new Watermark(1L));
-               expectedOutput.add(new StreamRecord<>("INPUT1:17", 0L));
-               expectedOutput.add(new Watermark(2L));
-               expectedOutput.add(new StreamRecord<>("INPUT2:42", 1L));
-               expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
-               expectedOutput.add(new Watermark(6L));
-               expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
-               expectedOutput.add(new Watermark(7L));
-
-               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
-
-               testHarness.close();
-       }
-
-       /**
-        * Verifies that we don't have leakage between different keys.
-        */
-       @Test
-       public void testProcessingTimeTimerWithState() throws Exception {
-
-               CoStreamTimelyFlatMap<String, Integer, String, String> operator 
=
-                               new CoStreamTimelyFlatMap<>(new 
ProcessingTimeTriggeringStatefulFlatMapFunction());
-
-               TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
-                               new KeyedTwoInputStreamOperatorTestHarness<>(
-                                               operator,
-                                               new IntToStringKeySelector<>(),
-                                               new 
IdentityKeySelector<String>(),
-                                               BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.open();
-
-               testHarness.setProcessingTime(1);
-               testHarness.processElement1(new StreamRecord<>(17)); // should 
set timer for 6
-
-               testHarness.setProcessingTime(2);
-               testHarness.processElement2(new StreamRecord<>("42")); // 
should set timer for 7
-
-               testHarness.setProcessingTime(6);
-               testHarness.setProcessingTime(7);
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               expectedOutput.add(new StreamRecord<>("INPUT1:17"));
-               expectedOutput.add(new StreamRecord<>("INPUT2:42"));
-               expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
-               expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
-
-               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
-
-               testHarness.close();
-       }
-
-       @Test
-       public void testSnapshotAndRestore() throws Exception {
-
-               CoStreamTimelyFlatMap<String, Integer, String, String> operator 
=
-                               new CoStreamTimelyFlatMap<>(new 
BothTriggeringFlatMapFunction());
-
-               TwoInputStreamOperatorTestHarness<Integer, String, String> 
testHarness =
-                               new KeyedTwoInputStreamOperatorTestHarness<>(
-                                               operator,
-                                               new IntToStringKeySelector<>(),
-                                               new 
IdentityKeySelector<String>(),
-                                               BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.open();
-
-               testHarness.processElement1(new StreamRecord<>(5, 12L));
-               testHarness.processElement2(new StreamRecord<>("5", 12L));
-
-               // snapshot and restore from scratch
-               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
-
-               testHarness.close();
-
-               operator = new CoStreamTimelyFlatMap<>(new 
BothTriggeringFlatMapFunction());
-
-               testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
-                               operator,
-                               new IntToStringKeySelector<>(),
-                               new IdentityKeySelector<String>(),
-                               BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setup();
-               testHarness.initializeState(snapshot);
-               testHarness.open();
-
-               testHarness.setProcessingTime(5);
-               testHarness.processWatermark1(new Watermark(6));
-               testHarness.processWatermark2(new Watermark(6));
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
-               expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
-               expectedOutput.add(new Watermark(6));
-
-               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
-
-               testHarness.close();
-       }
-
-
-       private static class IntToStringKeySelector<T> implements 
KeySelector<Integer, String> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public String getKey(Integer value) throws Exception {
-                       return "" + value;
-               }
-       }
-
-       private static class IdentityKeySelector<T> implements KeySelector<T, 
T> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public T getKey(T value) throws Exception {
-                       return value;
-               }
-       }
-
-       private static class WatermarkQueryingFlatMapFunction implements 
TimelyCoFlatMapFunction<Integer, String, String> {
-
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void flatMap1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
-                       out.collect(value + "WM:" + 
ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
-               }
-
-               @Override
-               public void flatMap2(String value, Context ctx, 
Collector<String> out) throws Exception {
-                       out.collect(value + "WM:" + 
ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
-               }
-
-               @Override
-               public void onTimer(
-                               long timestamp,
-                               OnTimerContext ctx,
-                               Collector<String> out) throws Exception {
-               }
-       }
-
-       private static class EventTimeTriggeringFlatMapFunction implements 
TimelyCoFlatMapFunction<Integer, String, String> {
-
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void flatMap1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
-                       out.collect("INPUT1:" + value);
-                       ctx.timerService().registerEventTimeTimer(5);
-               }
-
-               @Override
-               public void flatMap2(String value, Context ctx, 
Collector<String> out) throws Exception {
-                       out.collect("INPUT2:" + value);
-                       ctx.timerService().registerEventTimeTimer(6);
-               }
-
-               @Override
-               public void onTimer(
-                               long timestamp,
-                               OnTimerContext ctx,
-                               Collector<String> out) throws Exception {
-
-                       assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
-                       out.collect("" + 1777);
-               }
-       }
-
-       private static class EventTimeTriggeringStatefulFlatMapFunction extends 
RichTimelyCoFlatMapFunction<Integer, String, String> {
-
-               private static final long serialVersionUID = 1L;
-
-               private final ValueStateDescriptor<String> state =
-                               new ValueStateDescriptor<>("seen-element", 
StringSerializer.INSTANCE, null);
-
-               @Override
-               public void flatMap1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
-                       out.collect("INPUT1:" + value);
-                       getRuntimeContext().getState(state).update("" + value);
-                       
ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() 
+ 5);
-               }
-
-               @Override
-               public void flatMap2(String value, Context ctx, 
Collector<String> out) throws Exception {
-                       out.collect("INPUT2:" + value);
-                       getRuntimeContext().getState(state).update(value);
-                       
ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() 
+ 5);
-               }
-
-
-               @Override
-               public void onTimer(
-                               long timestamp,
-                               OnTimerContext ctx,
-                               Collector<String> out) throws Exception {
-                       assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
-                       out.collect("STATE:" + 
getRuntimeContext().getState(state).value());
-               }
-       }
-
-       private static class ProcessingTimeTriggeringFlatMapFunction implements 
TimelyCoFlatMapFunction<Integer, String, String> {
-
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void flatMap1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
-                       out.collect("INPUT1:" + value);
-                       ctx.timerService().registerProcessingTimeTimer(5);
-               }
-
-               @Override
-               public void flatMap2(String value, Context ctx, 
Collector<String> out) throws Exception {
-                       out.collect("INPUT2:" + value);
-                       ctx.timerService().registerProcessingTimeTimer(6);
-               }
-
-               @Override
-               public void onTimer(
-                               long timestamp,
-                               OnTimerContext ctx,
-                               Collector<String> out) throws Exception {
-
-                       assertEquals(TimeDomain.PROCESSING_TIME, 
ctx.timeDomain());
-                       out.collect("" + 1777);
-               }
-       }
-
-       private static class ProcessingTimeQueryingFlatMapFunction implements 
TimelyCoFlatMapFunction<Integer, String, String> {
-
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void flatMap1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
-                       out.collect(value + "PT:" + 
ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
-               }
-
-               @Override
-               public void flatMap2(String value, Context ctx, 
Collector<String> out) throws Exception {
-                       out.collect(value + "PT:" + 
ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
-               }
-
-               @Override
-               public void onTimer(
-                               long timestamp,
-                               OnTimerContext ctx,
-                               Collector<String> out) throws Exception {
-               }
-       }
-
-       private static class ProcessingTimeTriggeringStatefulFlatMapFunction 
extends RichTimelyCoFlatMapFunction<Integer, String, String> {
-
-               private static final long serialVersionUID = 1L;
-
-               private final ValueStateDescriptor<String> state =
-                               new ValueStateDescriptor<>("seen-element", 
StringSerializer.INSTANCE, null);
-
-               @Override
-               public void flatMap1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
-                       out.collect("INPUT1:" + value);
-                       getRuntimeContext().getState(state).update("" + value);
-                       
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + 5);
-               }
-
-               @Override
-               public void flatMap2(String value, Context ctx, 
Collector<String> out) throws Exception {
-                       out.collect("INPUT2:" + value);
-                       getRuntimeContext().getState(state).update(value);
-                       
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + 5);
-               }
-
-
-               @Override
-               public void onTimer(
-                               long timestamp,
-                               OnTimerContext ctx,
-                               Collector<String> out) throws Exception {
-                       assertEquals(TimeDomain.PROCESSING_TIME, 
ctx.timeDomain());
-                       out.collect("STATE:" + 
getRuntimeContext().getState(state).value());
-               }
-       }
-
-       private static class BothTriggeringFlatMapFunction implements 
TimelyCoFlatMapFunction<Integer, String, String> {
-
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void flatMap1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
-                       ctx.timerService().registerEventTimeTimer(6);
-               }
-
-               @Override
-               public void flatMap2(String value, Context ctx, 
Collector<String> out) throws Exception {
-                       ctx.timerService().registerProcessingTimeTimer(5);
-               }
-
-
-               @Override
-               public void onTimer(
-                               long timestamp,
-                               OnTimerContext ctx,
-                               Collector<String> out) throws Exception {
-                       if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
-                               out.collect("EVENT:1777");
-                       } else {
-                               out.collect("PROC:1777");
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index 50526b5..a7325a4 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.streaming.api.datastream.{ConnectedStreams => 
JavaCStream, DataStream => JavaStream}
-import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, 
CoMapFunction, TimelyCoFlatMapFunction}
+import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, 
CoMapFunction, CoProcessFunction, RichCoProcessFunction}
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
 import org.apache.flink.util.Collector
 
@@ -101,30 +101,33 @@ class ConnectedStreams[IN1, IN2](javaStream: 
JavaCStream[IN1, IN2]) {
   }
 
   /**
-   * Applies the given [[TimelyCoFlatMapFunction]] on the connected input 
streams,
+   * Applies the given [[CoProcessFunction]] on the connected input streams,
    * thereby creating a transformed output stream.
    *
-   * The function will be called for every element in the streams and can 
produce
-   * zero or more output. The function can also query the time and set timers. 
When
-   * reacting to the firing of set timers the function can emit yet more 
elements.
+   * The function will be called for every element in the input streams and 
can produce zero
+   * or more output elements. Contrary to the [[flatMap(CoFlatMapFunction)]] 
function,
+   * this function can also query the time and set timers. When reacting to 
the firing of set
+   * timers the function can directly emit elements and/or register yet more 
timers.
    *
-   * A 
[[org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction]]
+   * A [[RichCoProcessFunction]]
    * can be used to gain access to features provided by the
    * [[org.apache.flink.api.common.functions.RichFunction]] interface.
    *
-   * @param coFlatMapper The [[TimelyCoFlatMapFunction]] that is called for 
each element
-    *                     in the stream.
-    *
-   * @return The transformed { @link DataStream}.
+   * @param coProcessFunction The [[CoProcessFunction]] that is called for 
each element
+    *                    in the stream.
+   * @return The transformed [[DataStream]].
    */
-  def flatMap[R: TypeInformation](
-      coFlatMapper: TimelyCoFlatMapFunction[IN1, IN2, R]) : DataStream[R] = {
+  @PublicEvolving
+  def process[R: TypeInformation](
+      coProcessFunction: CoProcessFunction[IN1, IN2, R]) : DataStream[R] = {
 
-    if (coFlatMapper == null) throw new NullPointerException("FlatMap function 
must not be null.")
+    if (coProcessFunction == null) {
+      throw new NullPointerException("CoProcessFunction function must not be 
null.")
+    }
 
     val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
 
-    asScalaStream(javaStream.flatMap(coFlatMapper, outType))
+    asScalaStream(javaStream.process(coProcessFunction, outType))
   }
 
 
@@ -144,14 +147,14 @@ class ConnectedStreams[IN1, IN2](javaStream: 
JavaCStream[IN1, IN2]) {
    * @return
     *        The resulting data stream.
    */
-  def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, 
R]): 
+  def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, 
R]):
           DataStream[R] = {
-    
+
     if (coFlatMapper == null) {
       throw new NullPointerException("FlatMap function must not be null.")
     }
-    
-    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]    
+
+    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
     
asScalaStream(javaStream.flatMap(coFlatMapper).returns(outType).asInstanceOf[JavaStream[R]])
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 66d80c2..f2999b3 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -24,7 +24,7 @@ import 
org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescr
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.streaming.api.datastream.{QueryableStateStream, 
SingleOutputStreamOperator, DataStream => JavaStream, KeyedStream => 
KeyedJavaStream, WindowedStream => WindowedJavaStream}
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction
+import org.apache.flink.streaming.api.functions.{ProcessFunction, 
RichProcessFunction}
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import 
org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, 
SumAggregator}
 import 
org.apache.flink.streaming.api.functions.query.{QueryableAppendingStateOperator,
 QueryableValueStateOperator}
@@ -54,28 +54,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) 
extends DataStream[T]
   // ------------------------------------------------------------------------
 
   /**
-    * Applies the given [[TimelyFlatMapFunction]] on the input stream, thereby
+    * Applies the given [[ProcessFunction]] on the input stream, thereby
     * creating a transformed output stream.
     *
     * The function will be called for every element in the stream and can 
produce
     * zero or more output. The function can also query the time and set 
timers. When
     * reacting to the firing of set timers the function can emit yet more 
elements.
     *
-    * A [[org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction]]
+    * The function will be called for every element in the input streams and 
can produce zero
+    * or more output elements. Contrary to the 
[[DataStream#flatMap(FlatMapFunction)]]
+    * function, this function can also query the time and set timers. When 
reacting to the firing
+    * of set timers the function can directly emit elements and/or register 
yet more timers.
+    *
+    * A [[RichProcessFunction]]
     * can be used to gain access to features provided by the
     * [[org.apache.flink.api.common.functions.RichFunction]]
     *
-    * @param flatMapper The [[TimelyFlatMapFunction]] that is called for each 
element
+    * @param processFunction The [[ProcessFunction]] that is called for each 
element
     *                   in the stream.
     */
-  def flatMap[R: TypeInformation](
-      flatMapper: TimelyFlatMapFunction[T, R]): DataStream[R] = {
+  @PublicEvolving
+  def process[R: TypeInformation](
+    processFunction: ProcessFunction[T, R]): DataStream[R] = {
 
-    if (flatMapper == null) {
-      throw new NullPointerException("TimelyFlatMapFunction must not be null.")
+    if (processFunction == null) {
+      throw new NullPointerException("ProcessFunction must not be null.")
     }
 
-    asScalaStream(javaStream.flatMap(flatMapper, 
implicitly[TypeInformation[R]]))
+    asScalaStream(javaStream.process(processFunction, 
implicitly[TypeInformation[R]]))
   }
   
   // ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 967142b..adb59f2 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -23,11 +23,11 @@ import java.lang
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction
-import 
org.apache.flink.streaming.api.functions.TimelyFlatMapFunction.{Context, 
OnTimerContext}
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, 
OnTimerContext}
 import org.apache.flink.streaming.api.functions.co.CoMapFunction
 import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
-import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, 
StreamOperator, StreamTimelyFlatMap}
+import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, 
ProcessOperator, StreamOperator}
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
 import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, 
PurgingTrigger}
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
@@ -318,26 +318,26 @@ class DataStreamTest extends 
StreamingMultipleProgramsTestBase {
   }
 
   /**
-   * Verify that a timely flat map call is correctly translated to an operator.
+   * Verify that a [[KeyedStream.process()]] call is correctly translated to 
an operator.
    */
   @Test
-  def testTimelyFlatMapTranslation(): Unit = {
+  def testProcessTranslation(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     val src = env.generateSequence(0, 0)
 
-    val timelyFlatMapFunction = new TimelyFlatMapFunction[Long, Int] {
-      override def flatMap(value: Long, ctx: Context, out: Collector[Int]): 
Unit = ???
+    val processFunction = new ProcessFunction[Long, Int] {
+      override def processElement(value: Long, ctx: Context, out: 
Collector[Int]): Unit = ???
       override def onTimer(
           timestamp: Long,
           ctx: OnTimerContext,
           out: Collector[Int]): Unit = ???
     }
 
-    val flatMapped = src.keyBy(x => x).flatMap(timelyFlatMapFunction)
+    val flatMapped = src.keyBy(x => x).process(processFunction)
 
-    assert(timelyFlatMapFunction == getFunctionForDataStream(flatMapped))
-    
assert(getOperatorForDataStream(flatMapped).isInstanceOf[StreamTimelyFlatMap[_, 
_, _]])
+    assert(processFunction == getFunctionForDataStream(flatMapped))
+    
assert(getOperatorForDataStream(flatMapped).isInstanceOf[ProcessOperator[_, _, 
_]])
   }
 
   @Test def operatorTest() {

Reply via email to