Repository: flink
Updated Branches:
  refs/heads/master 5d5637b01 -> 99188390f


[FLINK-5012] Expose Timestamp in Timely FlatMap Functions

This also adds a Context parameter that holds the timestamp, time domain
and TimerService to declutter the parameter list of the functions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99188390
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99188390
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99188390

Branch: refs/heads/master
Commit: 99188390f9fcd63f249e25f0e847ed1bb29c0d5c
Parents: 5d5637b
Author: Aljoscha Krettek <[email protected]>
Authored: Tue Nov 8 16:52:21 2016 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Nov 11 10:17:59 2016 +0100

----------------------------------------------------------------------
 .../api/functions/TimelyFlatMapFunction.java    |  46 +++++++--
 .../functions/co/TimelyCoFlatMapFunction.java   |  56 +++++++++--
 .../api/operators/StreamTimelyFlatMap.java      |  83 ++++++++++++++-
 .../api/operators/co/CoStreamTimelyFlatMap.java |  88 +++++++++++++++-
 .../flink/streaming/api/DataStreamTest.java     |   5 +-
 .../api/operators/TimelyFlatMapTest.java        |  56 +++++------
 .../api/operators/co/TimelyCoFlatMapTest.java   | 100 +++++++++----------
 .../streaming/api/scala/DataStreamTest.scala    |   7 +-
 8 files changed, 324 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/99188390/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
index 77fe35e..5f039c4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
@@ -52,27 +52,59 @@ public interface TimelyFlatMapFunction<I, O> extends 
Function, Serializable {
         * it into zero, one, or more elements.
         *
         * @param value The input value.
-        * @param timerService A {@link TimerService} that allows setting 
timers and querying the
-        *                        current time.
+        * @param ctx A {@link Context} that allows querying the timestamp of 
the element and getting
+        *            a {@link TimerService} for registering timers and 
querying the time. The
+        *            context is only valid during the invocation of this 
method, do not store it.
         * @param out The collector for returning result values.
         *
         * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
         *                   to fail and may trigger recovery.
         */
-       void flatMap(I value, TimerService timerService, Collector<O> out) 
throws Exception;
+       void flatMap(I value, Context ctx, Collector<O> out) throws Exception;
 
        /**
         * Called when a timer set using {@link TimerService} fires.
         *
         * @param timestamp The timestamp of the firing timer.
-        * @param timeDomain The {@link TimeDomain} of the firing timer.
-        * @param timerService A {@link TimerService} that allows setting 
timers and querying the
-        *                        current time.
+        * @param ctx An {@link OnTimerContext} that allows querying the 
timestamp of the firing timer,
+        *            querying the {@link TimeDomain} of the firing timer and 
getting a
+        *            {@link TimerService} for registering timers and querying 
the time.
+        *            The context is only valid during the invocation of this 
method, do not store it.
         * @param out The collector for returning result values.
         *
         * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
         *                   to fail and may trigger recovery.
         */
-       void onTimer(long timestamp, TimeDomain timeDomain, TimerService 
timerService, Collector<O> out) throws Exception ;
+       void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) 
throws Exception ;
+
+       /**
+        * Information available in an invocation of {@link #flatMap(Object, 
Context, Collector)}
+        * or {@link #onTimer(long, OnTimerContext, Collector)}.
+        */
+       interface Context {
+
+               /**
+                * Timestamp of the element currently being processed or 
timestamp of a firing timer.
+                *
+                * <p>This might be {@code null}, for example if the time 
characteristic of your program
+                * is set to {@link 
org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+                */
+               Long timestamp();
+
+               /**
+                * A {@link TimerService} for querying time and registering 
timers.
+                */
+               TimerService timerService();
+       }
+
+       /**
+        * Information available in an invocation of {@link #onTimer(long, 
OnTimerContext, Collector)}.
+        */
+       interface OnTimerContext extends Context {
+               /**
+                * The {@link TimeDomain} of the firing timer.
+                */
+               TimeDomain timeDomain();
+       }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/99188390/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
index 87355c6..89c7d79 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
 import org.apache.flink.util.Collector;
 
 import java.io.Serializable;
@@ -55,38 +56,73 @@ public interface TimelyCoFlatMapFunction<IN1, IN2, OUT> 
extends Function, Serial
         * This method is called for each element in the first of the connected 
streams.
         * 
         * @param value The stream element
-        * @param timerService A {@link TimerService} that allows setting 
timers and querying the
-        *                        current time.
+        * @param ctx An {@link OnTimerContext} that allows querying the 
timestamp of the element,
+        *            querying the {@link TimeDomain} of the firing timer and 
getting a
+        *            {@link TimerService} for registering timers and querying 
the time.
+        *            The context is only valid during the invocation of this 
method, do not store it.
         * @param out The collector to emit resulting elements to
         * @throws Exception The function may throw exceptions which cause the 
streaming program
         *                   to fail and go into recovery.
         */
-       void flatMap1(IN1 value, TimerService timerService, Collector<OUT> out) 
throws Exception;
+       void flatMap1(IN1 value, Context ctx, Collector<OUT> out) throws 
Exception;
 
        /**
         * This method is called for each element in the second of the 
connected streams.
         * 
         * @param value The stream element
-        * @param timerService A {@link TimerService} that allows setting 
timers and querying the
-        *                        current time.
+        * @param ctx An {@link OnTimerContext} that allows querying the 
timestamp of the element,
+        *            querying the {@link TimeDomain} of the firing timer and 
getting a
+        *            {@link TimerService} for registering timers and querying 
the time.
+        *            The context is only valid during the invocation of this 
method, do not store it.
         * @param out The collector to emit resulting elements to
         * @throws Exception The function may throw exceptions which cause the 
streaming program
         *                   to fail and go into recovery.
         */
-       void flatMap2(IN2 value, TimerService timerService, Collector<OUT> out) 
throws Exception;
+       void flatMap2(IN2 value, Context ctx, Collector<OUT> out) throws 
Exception;
 
        /**
         * Called when a timer set using {@link TimerService} fires.
         *
         * @param timestamp The timestamp of the firing timer.
-        * @param timeDomain The {@link TimeDomain} of the firing timer.
-        * @param timerService A {@link TimerService} that allows setting 
timers and querying the
-        *                        current time.
+        * @param ctx An {@link OnTimerContext} that allows querying the 
timestamp of the firing timer,
+        *            querying the {@link TimeDomain} of the firing timer and 
getting a
+        *            {@link TimerService} for registering timers and querying 
the time.
+        *            The context is only valid during the invocation of this 
method, do not store it.
         * @param out The collector for returning result values.
         *
         * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
         *                   to fail and may trigger recovery.
         */
-       void onTimer(long timestamp, TimeDomain timeDomain, TimerService 
timerService, Collector<OUT> out) throws Exception ;
+       void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) 
throws Exception ;
 
+       /**
+        * Information available in an invocation of {@link #flatMap1(Object, 
Context, Collector)}/
+        * {@link #flatMap2(Object, Context, Collector)}
+        * or {@link #onTimer(long, OnTimerContext, Collector)}.
+        */
+       interface Context {
+
+               /**
+                * Timestamp of the element currently being processed or 
timestamp of a firing timer.
+                *
+                * <p>This might be {@code null}, for example if the time 
characteristic of your program
+                * is set to {@link 
org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+                */
+               Long timestamp();
+
+               /**
+                * A {@link TimerService} for querying time and registering 
timers.
+                */
+               TimerService timerService();
+       }
+
+       /**
+        * Information available in an invocation of {@link #onTimer(long, 
OnTimerContext, Collector)}.
+        */
+       interface OnTimerContext extends TimelyFlatMapFunction.Context {
+               /**
+                * The {@link TimeDomain} of the firing timer.
+                */
+               TimeDomain timeDomain();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/99188390/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
index d507ba6..bafc435 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
@@ -26,6 +26,9 @@ import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
 @Internal
 public class StreamTimelyFlatMap<K, IN, OUT>
                extends AbstractUdfStreamOperator<OUT, 
TimelyFlatMapFunction<IN, OUT>>
@@ -37,6 +40,10 @@ public class StreamTimelyFlatMap<K, IN, OUT>
 
        private transient TimerService timerService;
 
+       private transient ContextImpl<IN> context;
+
+       private transient OnTimerContextImpl onTimerContext;
+
        public StreamTimelyFlatMap(TimelyFlatMapFunction<IN, OUT> flatMapper) {
                super(flatMapper);
 
@@ -52,23 +59,93 @@ public class StreamTimelyFlatMap<K, IN, OUT>
                                getInternalTimerService("user-timers", 
VoidNamespaceSerializer.INSTANCE, this);
 
                this.timerService = new 
SimpleTimerService(internalTimerService);
+
+               context = new ContextImpl<>(timerService);
+               onTimerContext = new OnTimerContextImpl(timerService);
        }
 
        @Override
        public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
                collector.setAbsoluteTimestamp(timer.getTimestamp());
-               userFunction.onTimer(timer.getTimestamp(), 
TimeDomain.EVENT_TIME, timerService, collector);
+               onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
+               onTimerContext.timer = timer;
+               userFunction.onTimer(timer.getTimestamp(), onTimerContext, 
collector);
+               onTimerContext.timeDomain = null;
+               onTimerContext.timer = null;
        }
 
        @Override
        public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) 
throws Exception {
                collector.setAbsoluteTimestamp(timer.getTimestamp());
-               userFunction.onTimer(timer.getTimestamp(), 
TimeDomain.PROCESSING_TIME, timerService, collector);
+               onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
+               onTimerContext.timer = timer;
+               userFunction.onTimer(timer.getTimestamp(), onTimerContext, 
collector);
+               onTimerContext.timeDomain = null;
+               onTimerContext.timer = null;
        }
 
        @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
                collector.setTimestamp(element);
-               userFunction.flatMap(element.getValue(), timerService, 
collector);
+               context.element = element;
+               userFunction.flatMap(element.getValue(), context, collector);
+               context.element = null;
+       }
+
+       private static class ContextImpl<T> implements 
TimelyFlatMapFunction.Context {
+
+               private final TimerService timerService;
+
+               private StreamRecord<T> element;
+
+               ContextImpl(TimerService timerService) {
+                       this.timerService = checkNotNull(timerService);
+               }
+
+               @Override
+               public Long timestamp() {
+                       checkState(element != null);
+
+                       if (element.hasTimestamp()) {
+                               return element.getTimestamp();
+                       } else {
+                               return null;
+                       }
+               }
+
+               @Override
+               public TimerService timerService() {
+                       return timerService;
+               }
+       }
+
+       private static class OnTimerContextImpl implements 
TimelyFlatMapFunction.OnTimerContext{
+
+               private final TimerService timerService;
+
+               private TimeDomain timeDomain;
+
+               private InternalTimer<?, VoidNamespace> timer;
+
+               OnTimerContextImpl(TimerService timerService) {
+                       this.timerService = checkNotNull(timerService);
+               }
+
+               @Override
+               public TimeDomain timeDomain() {
+                       checkState(timeDomain != null);
+                       return timeDomain;
+               }
+
+               @Override
+               public Long timestamp() {
+                       checkState(timer != null);
+                       return timer.getTimestamp();
+               }
+
+               @Override
+               public TimerService timerService() {
+                       return timerService;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/99188390/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
index 212aafd..75e4c14 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
@@ -32,6 +32,9 @@ import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
 @Internal
 public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT>
                extends AbstractUdfStreamOperator<OUT, 
TimelyCoFlatMapFunction<IN1, IN2, OUT>>
@@ -43,6 +46,10 @@ public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT>
 
        private transient TimerService timerService;
 
+       private transient ContextImpl context;
+
+       private transient OnTimerContextImpl onTimerContext;
+
        public CoStreamTimelyFlatMap(TimelyCoFlatMapFunction<IN1, IN2, OUT> 
flatMapper) {
                super(flatMapper);
        }
@@ -56,34 +63,105 @@ public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT>
                                getInternalTimerService("user-timers", 
VoidNamespaceSerializer.INSTANCE, this);
 
                this.timerService = new 
SimpleTimerService(internalTimerService);
+
+               context = new ContextImpl(timerService);
+               onTimerContext = new OnTimerContextImpl(timerService);
        }
 
        @Override
        public void processElement1(StreamRecord<IN1> element) throws Exception 
{
                collector.setTimestamp(element);
-               userFunction.flatMap1(element.getValue(), timerService, 
collector);
-
+               context.element = element;
+               userFunction.flatMap1(element.getValue(), context, collector);
+               context.element = null;
        }
 
        @Override
        public void processElement2(StreamRecord<IN2> element) throws Exception 
{
                collector.setTimestamp(element);
-               userFunction.flatMap2(element.getValue(), timerService, 
collector);
+               context.element = element;
+               userFunction.flatMap2(element.getValue(), context, collector);
+               context.element = null;
        }
 
        @Override
        public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
                collector.setAbsoluteTimestamp(timer.getTimestamp());
-               userFunction.onTimer(timer.getTimestamp(), 
TimeDomain.EVENT_TIME, timerService, collector);
+               onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
+               onTimerContext.timer = timer;
+               userFunction.onTimer(timer.getTimestamp(), onTimerContext, 
collector);
+               onTimerContext.timeDomain = null;
+               onTimerContext.timer = null;
        }
 
        @Override
        public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) 
throws Exception {
                collector.setAbsoluteTimestamp(timer.getTimestamp());
-               userFunction.onTimer(timer.getTimestamp(), 
TimeDomain.PROCESSING_TIME, timerService, collector);
+               onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
+               onTimerContext.timer = timer;
+               userFunction.onTimer(timer.getTimestamp(), onTimerContext, 
collector);
+               onTimerContext.timeDomain = null;
+               onTimerContext.timer = null;
        }
 
        protected TimestampedCollector<OUT> getCollector() {
                return collector;
        }
+
+       private static class ContextImpl implements 
TimelyCoFlatMapFunction.Context {
+
+               private final TimerService timerService;
+
+               private StreamRecord<?> element;
+
+               ContextImpl(TimerService timerService) {
+                       this.timerService = checkNotNull(timerService);
+               }
+
+               @Override
+               public Long timestamp() {
+                       checkState(element != null);
+
+                       if (element.hasTimestamp()) {
+                               return element.getTimestamp();
+                       } else {
+                               return null;
+                       }
+               }
+
+               @Override
+               public TimerService timerService() {
+                       return timerService;
+               }
+       }
+
+       private static class OnTimerContextImpl implements 
TimelyCoFlatMapFunction.OnTimerContext {
+
+               private final TimerService timerService;
+
+               private TimeDomain timeDomain;
+
+               private InternalTimer<?, VoidNamespace> timer;
+
+               OnTimerContextImpl(TimerService timerService) {
+                       this.timerService = checkNotNull(timerService);
+               }
+
+               @Override
+               public TimeDomain timeDomain() {
+                       checkState(timeDomain != null);
+                       return timeDomain;
+               }
+
+               @Override
+               public Long timestamp() {
+                       checkState(timer != null);
+                       return timer.getTimestamp();
+               }
+
+               @Override
+               public TimerService timerService() {
+                       return timerService;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/99188390/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 5e43120..8f002ba 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -560,7 +560,7 @@ public class DataStreamTest {
                        @Override
                        public void flatMap(
                                        Long value,
-                                       TimerService timerService,
+                                       Context ctx,
                                        Collector<Integer> out) throws 
Exception {
 
                        }
@@ -568,8 +568,7 @@ public class DataStreamTest {
                        @Override
                        public void onTimer(
                                        long timestamp,
-                                       TimeDomain timeDomain,
-                                       TimerService timerService,
+                                       OnTimerContext ctx,
                                        Collector<Integer> out) throws 
Exception {
 
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/99188390/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
index 46b52ee..6080ddc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction;
 import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -46,7 +45,7 @@ import static org.junit.Assert.assertEquals;
 public class TimelyFlatMapTest extends TestLogger {
 
        @Test
-       public void testCurrentEventTime() throws Exception {
+       public void testTimestampAndWatermarkQuerying() throws Exception {
 
                StreamTimelyFlatMap<Integer, Integer, String> operator =
                                new StreamTimelyFlatMap<>(new 
QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
@@ -66,9 +65,9 @@ public class TimelyFlatMapTest extends TestLogger {
                ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
                expectedOutput.add(new Watermark(17L));
-               expectedOutput.add(new StreamRecord<>("5TIME:17", 12L));
+               expectedOutput.add(new StreamRecord<>("5TIME:17 TS:12", 12L));
                expectedOutput.add(new Watermark(42L));
-               expectedOutput.add(new StreamRecord<>("6TIME:42", 13L));
+               expectedOutput.add(new StreamRecord<>("6TIME:42 TS:13", 13L));
 
                TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
 
@@ -76,7 +75,7 @@ public class TimelyFlatMapTest extends TestLogger {
        }
 
        @Test
-       public void testCurrentProcessingTime() throws Exception {
+       public void testTimestampAndProcessingTimeQuerying() throws Exception {
 
                StreamTimelyFlatMap<Integer, Integer, String> operator =
                                new StreamTimelyFlatMap<>(new 
QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
@@ -95,8 +94,8 @@ public class TimelyFlatMapTest extends TestLogger {
 
                ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
-               expectedOutput.add(new StreamRecord<>("5TIME:17"));
-               expectedOutput.add(new StreamRecord<>("6TIME:42"));
+               expectedOutput.add(new StreamRecord<>("5TIME:17 TS:null"));
+               expectedOutput.add(new StreamRecord<>("6TIME:42 TS:null"));
 
                TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
 
@@ -298,19 +297,18 @@ public class TimelyFlatMapTest extends TestLogger {
                }
 
                @Override
-               public void flatMap(Integer value, TimerService timerService, 
Collector<String> out) throws Exception {
+               public void flatMap(Integer value, Context ctx, 
Collector<String> out) throws Exception {
                        if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
-                               out.collect(value + "TIME:" + 
timerService.currentWatermark());
+                               out.collect(value + "TIME:" + 
ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
                        } else {
-                               out.collect(value + "TIME:" + 
timerService.currentProcessingTime());
+                               out.collect(value + "TIME:" + 
ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
                        }
                }
 
                @Override
                public void onTimer(
                                long timestamp,
-                               TimeDomain timeDomain,
-                               TimerService timerService,
+                               OnTimerContext ctx,
                                Collector<String> out) throws Exception {
                }
        }
@@ -326,23 +324,22 @@ public class TimelyFlatMapTest extends TestLogger {
                }
 
                @Override
-               public void flatMap(Integer value, TimerService timerService, 
Collector<Integer> out) throws Exception {
+               public void flatMap(Integer value, Context ctx, 
Collector<Integer> out) throws Exception {
                        out.collect(value);
                        if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
-                               
timerService.registerEventTimeTimer(timerService.currentWatermark() + 5);
+                               
ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() 
+ 5);
                        } else {
-                               
timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 
5);
+                               
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + 5);
                        }
                }
 
                @Override
                public void onTimer(
                                long timestamp,
-                               TimeDomain timeDomain,
-                               TimerService timerService,
+                               OnTimerContext ctx,
                                Collector<Integer> out) throws Exception {
 
-                       assertEquals(this.timeDomain, timeDomain);
+                       assertEquals(this.timeDomain, ctx.timeDomain());
                        out.collect(1777);
                }
        }
@@ -361,23 +358,22 @@ public class TimelyFlatMapTest extends TestLogger {
                }
 
                @Override
-               public void flatMap(Integer value, TimerService timerService, 
Collector<String> out) throws Exception {
+               public void flatMap(Integer value, Context ctx, 
Collector<String> out) throws Exception {
                        out.collect("INPUT:" + value);
                        getRuntimeContext().getState(state).update(value);
                        if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
-                               
timerService.registerEventTimeTimer(timerService.currentWatermark() + 5);
+                               
ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() 
+ 5);
                        } else {
-                               
timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 
5);
+                               
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + 5);
                        }
                }
 
                @Override
                public void onTimer(
                                long timestamp,
-                               TimeDomain timeDomain,
-                               TimerService timerService,
+                               OnTimerContext ctx,
                                Collector<String> out) throws Exception {
-                       assertEquals(this.timeDomain, timeDomain);
+                       assertEquals(this.timeDomain, ctx.timeDomain());
                        out.collect("STATE:" + 
getRuntimeContext().getState(state).value());
                }
        }
@@ -387,19 +383,17 @@ public class TimelyFlatMapTest extends TestLogger {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void flatMap(Integer value, TimerService timerService, 
Collector<String> out) throws Exception {
-                       timerService.registerProcessingTimeTimer(5);
-                       timerService.registerEventTimeTimer(6);
-
+               public void flatMap(Integer value, Context ctx, 
Collector<String> out) throws Exception {
+                       ctx.timerService().registerProcessingTimeTimer(5);
+                       ctx.timerService().registerEventTimeTimer(6);
                }
 
                @Override
                public void onTimer(
                                long timestamp,
-                               TimeDomain timeDomain,
-                               TimerService timerService,
+                               OnTimerContext ctx,
                                Collector<String> out) throws Exception {
-                       if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
+                       if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
                                out.collect("EVENT:1777");
                        } else {
                                out.collect("PROC:1777");

http://git-wip-us.apache.org/repos/asf/flink/blob/99188390/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
index cb5d6c2..7c29631 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -46,7 +45,7 @@ import static org.junit.Assert.assertEquals;
 public class TimelyCoFlatMapTest extends TestLogger {
 
        @Test
-       public void testCurrentEventTime() throws Exception {
+       public void testTimestampAndWatermarkQuerying() throws Exception {
 
                CoStreamTimelyFlatMap<String, Integer, String, String> operator 
=
                                new CoStreamTimelyFlatMap<>(new 
WatermarkQueryingFlatMapFunction());
@@ -72,9 +71,9 @@ public class TimelyCoFlatMapTest extends TestLogger {
                ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
                expectedOutput.add(new Watermark(17L));
-               expectedOutput.add(new StreamRecord<>("5WM:17", 12L));
+               expectedOutput.add(new StreamRecord<>("5WM:17 TS:12", 12L));
                expectedOutput.add(new Watermark(42L));
-               expectedOutput.add(new StreamRecord<>("6WM:42", 13L));
+               expectedOutput.add(new StreamRecord<>("6WM:42 TS:13", 13L));
 
                TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
 
@@ -82,7 +81,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
        }
 
        @Test
-       public void testCurrentProcessingTime() throws Exception {
+       public void testTimestampAndProcessingTimeQuerying() throws Exception {
 
                CoStreamTimelyFlatMap<String, Integer, String, String> operator 
=
                                new CoStreamTimelyFlatMap<>(new 
ProcessingTimeQueryingFlatMapFunction());
@@ -105,8 +104,8 @@ public class TimelyCoFlatMapTest extends TestLogger {
 
                ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
-               expectedOutput.add(new StreamRecord<>("5PT:17"));
-               expectedOutput.add(new StreamRecord<>("6PT:42"));
+               expectedOutput.add(new StreamRecord<>("5PT:17 TS:null"));
+               expectedOutput.add(new StreamRecord<>("6PT:42 TS:null"));
 
                TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
 
@@ -350,20 +349,19 @@ public class TimelyCoFlatMapTest extends TestLogger {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void flatMap1(Integer value, TimerService timerService, 
Collector<String> out) throws Exception {
-                       out.collect(value + "WM:" + 
timerService.currentWatermark());
+               public void flatMap1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
+                       out.collect(value + "WM:" + 
ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
                }
 
                @Override
-               public void flatMap2(String value, TimerService timerService, 
Collector<String> out) throws Exception {
-                       out.collect(value + "WM:" + 
timerService.currentWatermark());
+               public void flatMap2(String value, Context ctx, 
Collector<String> out) throws Exception {
+                       out.collect(value + "WM:" + 
ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
                }
 
                @Override
                public void onTimer(
                                long timestamp,
-                               TimeDomain timeDomain,
-                               TimerService timerService,
+                               OnTimerContext ctx,
                                Collector<String> out) throws Exception {
                }
        }
@@ -373,25 +371,24 @@ public class TimelyCoFlatMapTest extends TestLogger {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void flatMap1(Integer value, TimerService timerService, 
Collector<String> out) throws Exception {
+               public void flatMap1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
                        out.collect("INPUT1:" + value);
-                       timerService.registerEventTimeTimer(5);
+                       ctx.timerService().registerEventTimeTimer(5);
                }
 
                @Override
-               public void flatMap2(String value, TimerService timerService, 
Collector<String> out) throws Exception {
+               public void flatMap2(String value, Context ctx, 
Collector<String> out) throws Exception {
                        out.collect("INPUT2:" + value);
-                       timerService.registerEventTimeTimer(6);
+                       ctx.timerService().registerEventTimeTimer(6);
                }
 
                @Override
                public void onTimer(
                                long timestamp,
-                               TimeDomain timeDomain,
-                               TimerService timerService,
+                               OnTimerContext ctx,
                                Collector<String> out) throws Exception {
 
-                       assertEquals(TimeDomain.EVENT_TIME, timeDomain);
+                       assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
                        out.collect("" + 1777);
                }
        }
@@ -404,27 +401,26 @@ public class TimelyCoFlatMapTest extends TestLogger {
                                new ValueStateDescriptor<>("seen-element", 
StringSerializer.INSTANCE, null);
 
                @Override
-               public void flatMap1(Integer value, TimerService timerService, 
Collector<String> out) throws Exception {
+               public void flatMap1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
                        out.collect("INPUT1:" + value);
                        getRuntimeContext().getState(state).update("" + value);
-                       
timerService.registerEventTimeTimer(timerService.currentWatermark() + 5);
+                       
ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() 
+ 5);
                }
 
                @Override
-               public void flatMap2(String value, TimerService timerService, 
Collector<String> out) throws Exception {
+               public void flatMap2(String value, Context ctx, 
Collector<String> out) throws Exception {
                        out.collect("INPUT2:" + value);
                        getRuntimeContext().getState(state).update(value);
-                       
timerService.registerEventTimeTimer(timerService.currentWatermark() + 5);
+                       
ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() 
+ 5);
                }
 
 
                @Override
                public void onTimer(
                                long timestamp,
-                               TimeDomain timeDomain,
-                               TimerService timerService,
+                               OnTimerContext ctx,
                                Collector<String> out) throws Exception {
-                       assertEquals(TimeDomain.EVENT_TIME, timeDomain);
+                       assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
                        out.collect("STATE:" + 
getRuntimeContext().getState(state).value());
                }
        }
@@ -434,25 +430,24 @@ public class TimelyCoFlatMapTest extends TestLogger {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void flatMap1(Integer value, TimerService timerService, 
Collector<String> out) throws Exception {
+               public void flatMap1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
                        out.collect("INPUT1:" + value);
-                       timerService.registerProcessingTimeTimer(5);
+                       ctx.timerService().registerProcessingTimeTimer(5);
                }
 
                @Override
-               public void flatMap2(String value, TimerService timerService, 
Collector<String> out) throws Exception {
+               public void flatMap2(String value, Context ctx, 
Collector<String> out) throws Exception {
                        out.collect("INPUT2:" + value);
-                       timerService.registerProcessingTimeTimer(6);
+                       ctx.timerService().registerProcessingTimeTimer(6);
                }
 
                @Override
                public void onTimer(
                                long timestamp,
-                               TimeDomain timeDomain,
-                               TimerService timerService,
+                               OnTimerContext ctx,
                                Collector<String> out) throws Exception {
 
-                       assertEquals(TimeDomain.PROCESSING_TIME, timeDomain);
+                       assertEquals(TimeDomain.PROCESSING_TIME, 
ctx.timeDomain());
                        out.collect("" + 1777);
                }
        }
@@ -462,20 +457,19 @@ public class TimelyCoFlatMapTest extends TestLogger {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void flatMap1(Integer value, TimerService timerService, 
Collector<String> out) throws Exception {
-                       out.collect(value + "PT:" + 
timerService.currentProcessingTime());
+               public void flatMap1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
+                       out.collect(value + "PT:" + 
ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
                }
 
                @Override
-               public void flatMap2(String value, TimerService timerService, 
Collector<String> out) throws Exception {
-                       out.collect(value + "PT:" + 
timerService.currentProcessingTime());
+               public void flatMap2(String value, Context ctx, 
Collector<String> out) throws Exception {
+                       out.collect(value + "PT:" + 
ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
                }
 
                @Override
                public void onTimer(
                                long timestamp,
-                               TimeDomain timeDomain,
-                               TimerService timerService,
+                               OnTimerContext ctx,
                                Collector<String> out) throws Exception {
                }
        }
@@ -488,27 +482,26 @@ public class TimelyCoFlatMapTest extends TestLogger {
                                new ValueStateDescriptor<>("seen-element", 
StringSerializer.INSTANCE, null);
 
                @Override
-               public void flatMap1(Integer value, TimerService timerService, 
Collector<String> out) throws Exception {
+               public void flatMap1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
                        out.collect("INPUT1:" + value);
                        getRuntimeContext().getState(state).update("" + value);
-                       
timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 
5);
+                       
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + 5);
                }
 
                @Override
-               public void flatMap2(String value, TimerService timerService, 
Collector<String> out) throws Exception {
+               public void flatMap2(String value, Context ctx, 
Collector<String> out) throws Exception {
                        out.collect("INPUT2:" + value);
                        getRuntimeContext().getState(state).update(value);
-                       
timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 
5);
+                       
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + 5);
                }
 
 
                @Override
                public void onTimer(
                                long timestamp,
-                               TimeDomain timeDomain,
-                               TimerService timerService,
+                               OnTimerContext ctx,
                                Collector<String> out) throws Exception {
-                       assertEquals(TimeDomain.PROCESSING_TIME, timeDomain);
+                       assertEquals(TimeDomain.PROCESSING_TIME, 
ctx.timeDomain());
                        out.collect("STATE:" + 
getRuntimeContext().getState(state).value());
                }
        }
@@ -518,23 +511,22 @@ public class TimelyCoFlatMapTest extends TestLogger {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void flatMap1(Integer value, TimerService timerService, 
Collector<String> out) throws Exception {
-                       timerService.registerEventTimeTimer(6);
+               public void flatMap1(Integer value, Context ctx, 
Collector<String> out) throws Exception {
+                       ctx.timerService().registerEventTimeTimer(6);
                }
 
                @Override
-               public void flatMap2(String value, TimerService timerService, 
Collector<String> out) throws Exception {
-                       timerService.registerProcessingTimeTimer(5);
+               public void flatMap2(String value, Context ctx, 
Collector<String> out) throws Exception {
+                       ctx.timerService().registerProcessingTimeTimer(5);
                }
 
 
                @Override
                public void onTimer(
                                long timestamp,
-                               TimeDomain timeDomain,
-                               TimerService timerService,
+                               OnTimerContext ctx,
                                Collector<String> out) throws Exception {
-                       if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
+                       if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
                                out.collect("EVENT:1777");
                        } else {
                                out.collect("PROC:1777");

http://git-wip-us.apache.org/repos/asf/flink/blob/99188390/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index de8b388..967142b 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -22,9 +22,9 @@ import java.lang
 
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.streaming.api.{TimeDomain, TimerService}
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction
+import 
org.apache.flink.streaming.api.functions.TimelyFlatMapFunction.{Context, 
OnTimerContext}
 import org.apache.flink.streaming.api.functions.co.CoMapFunction
 import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
 import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, 
StreamOperator, StreamTimelyFlatMap}
@@ -327,11 +327,10 @@ class DataStreamTest extends 
StreamingMultipleProgramsTestBase {
     val src = env.generateSequence(0, 0)
 
     val timelyFlatMapFunction = new TimelyFlatMapFunction[Long, Int] {
-      override def flatMap(value: Long, timerService: TimerService, out: 
Collector[Int]): Unit = ???
+      override def flatMap(value: Long, ctx: Context, out: Collector[Int]): 
Unit = ???
       override def onTimer(
           timestamp: Long,
-          timeDomain: TimeDomain,
-          timerService: TimerService,
+          ctx: OnTimerContext,
           out: Collector[Int]): Unit = ???
     }
 

Reply via email to