Repository: flink Updated Branches: refs/heads/master 5d5637b01 -> 99188390f
[FLINK-5012] Expose Timestamp in Timely FlatMap Functions This also adds a Context parameter that holds the timestamp, time domain and TimerService to declutter the parameter list of the functions. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99188390 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99188390 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99188390 Branch: refs/heads/master Commit: 99188390f9fcd63f249e25f0e847ed1bb29c0d5c Parents: 5d5637b Author: Aljoscha Krettek <[email protected]> Authored: Tue Nov 8 16:52:21 2016 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Nov 11 10:17:59 2016 +0100 ---------------------------------------------------------------------- .../api/functions/TimelyFlatMapFunction.java | 46 +++++++-- .../functions/co/TimelyCoFlatMapFunction.java | 56 +++++++++-- .../api/operators/StreamTimelyFlatMap.java | 83 ++++++++++++++- .../api/operators/co/CoStreamTimelyFlatMap.java | 88 +++++++++++++++- .../flink/streaming/api/DataStreamTest.java | 5 +- .../api/operators/TimelyFlatMapTest.java | 56 +++++------ .../api/operators/co/TimelyCoFlatMapTest.java | 100 +++++++++---------- .../streaming/api/scala/DataStreamTest.scala | 7 +- 8 files changed, 324 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/99188390/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java index 77fe35e..5f039c4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java @@ -52,27 +52,59 @@ public interface TimelyFlatMapFunction<I, O> extends Function, Serializable { * it into zero, one, or more elements. * * @param value The input value. - * @param timerService A {@link TimerService} that allows setting timers and querying the - * current time. + * @param ctx A {@link Context} that allows querying the timestamp of the element and getting + * a {@link TimerService} for registering timers and querying the time. The + * context is only valid during the invocation of this method, do not store it. * @param out The collector for returning result values. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ - void flatMap(I value, TimerService timerService, Collector<O> out) throws Exception; + void flatMap(I value, Context ctx, Collector<O> out) throws Exception; /** * Called when a timer set using {@link TimerService} fires. * * @param timestamp The timestamp of the firing timer. - * @param timeDomain The {@link TimeDomain} of the firing timer. - * @param timerService A {@link TimerService} that allows setting timers and querying the - * current time. + * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer, + * querying the {@link TimeDomain} of the firing timer and getting a + * {@link TimerService} for registering timers and querying the time. + * The context is only valid during the invocation of this method, do not store it. * @param out The collector for returning result values. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ - void onTimer(long timestamp, TimeDomain timeDomain, TimerService timerService, Collector<O> out) throws Exception ; + void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception ; + + /** + * Information available in an invocation of {@link #flatMap(Object, Context, Collector)} + * or {@link #onTimer(long, OnTimerContext, Collector)}. + */ + interface Context { + + /** + * Timestamp of the element currently being processed or timestamp of a firing timer. + * + * <p>This might be {@code null}, for example if the time characteristic of your program + * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. + */ + Long timestamp(); + + /** + * A {@link TimerService} for querying time and registering timers. + */ + TimerService timerService(); + } + + /** + * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}. + */ + interface OnTimerContext extends Context { + /** + * The {@link TimeDomain} of the firing timer. + */ + TimeDomain timeDomain(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/99188390/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java index 87355c6..89c7d79 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.Function; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction; import org.apache.flink.util.Collector; import java.io.Serializable; @@ -55,38 +56,73 @@ public interface TimelyCoFlatMapFunction<IN1, IN2, OUT> extends Function, Serial * This method is called for each element in the first of the connected streams. * * @param value The stream element - * @param timerService A {@link TimerService} that allows setting timers and querying the - * current time. + * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the element, + * querying the {@link TimeDomain} of the firing timer and getting a + * {@link TimerService} for registering timers and querying the time. + * The context is only valid during the invocation of this method, do not store it. * @param out The collector to emit resulting elements to * @throws Exception The function may throw exceptions which cause the streaming program * to fail and go into recovery. */ - void flatMap1(IN1 value, TimerService timerService, Collector<OUT> out) throws Exception; + void flatMap1(IN1 value, Context ctx, Collector<OUT> out) throws Exception; /** * This method is called for each element in the second of the connected streams. * * @param value The stream element - * @param timerService A {@link TimerService} that allows setting timers and querying the - * current time. + * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the element, + * querying the {@link TimeDomain} of the firing timer and getting a + * {@link TimerService} for registering timers and querying the time. + * The context is only valid during the invocation of this method, do not store it. * @param out The collector to emit resulting elements to * @throws Exception The function may throw exceptions which cause the streaming program * to fail and go into recovery. */ - void flatMap2(IN2 value, TimerService timerService, Collector<OUT> out) throws Exception; + void flatMap2(IN2 value, Context ctx, Collector<OUT> out) throws Exception; /** * Called when a timer set using {@link TimerService} fires. * * @param timestamp The timestamp of the firing timer. - * @param timeDomain The {@link TimeDomain} of the firing timer. - * @param timerService A {@link TimerService} that allows setting timers and querying the - * current time. + * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer, + * querying the {@link TimeDomain} of the firing timer and getting a + * {@link TimerService} for registering timers and querying the time. + * The context is only valid during the invocation of this method, do not store it. * @param out The collector for returning result values. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ - void onTimer(long timestamp, TimeDomain timeDomain, TimerService timerService, Collector<OUT> out) throws Exception ; + void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception ; + /** + * Information available in an invocation of {@link #flatMap1(Object, Context, Collector)}/ + * {@link #flatMap2(Object, Context, Collector)} + * or {@link #onTimer(long, OnTimerContext, Collector)}. + */ + interface Context { + + /** + * Timestamp of the element currently being processed or timestamp of a firing timer. + * + * <p>This might be {@code null}, for example if the time characteristic of your program + * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. + */ + Long timestamp(); + + /** + * A {@link TimerService} for querying time and registering timers. + */ + TimerService timerService(); + } + + /** + * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}. + */ + interface OnTimerContext extends TimelyFlatMapFunction.Context { + /** + * The {@link TimeDomain} of the firing timer. + */ + TimeDomain timeDomain(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/99188390/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java index d507ba6..bafc435 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java @@ -26,6 +26,9 @@ import org.apache.flink.streaming.api.TimerService; import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + @Internal public class StreamTimelyFlatMap<K, IN, OUT> extends AbstractUdfStreamOperator<OUT, TimelyFlatMapFunction<IN, OUT>> @@ -37,6 +40,10 @@ public class StreamTimelyFlatMap<K, IN, OUT> private transient TimerService timerService; + private transient ContextImpl<IN> context; + + private transient OnTimerContextImpl onTimerContext; + public StreamTimelyFlatMap(TimelyFlatMapFunction<IN, OUT> flatMapper) { super(flatMapper); @@ -52,23 +59,93 @@ public class StreamTimelyFlatMap<K, IN, OUT> getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this); this.timerService = new SimpleTimerService(internalTimerService); + + context = new ContextImpl<>(timerService); + onTimerContext = new OnTimerContextImpl(timerService); } @Override public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception { collector.setAbsoluteTimestamp(timer.getTimestamp()); - userFunction.onTimer(timer.getTimestamp(), TimeDomain.EVENT_TIME, timerService, collector); + onTimerContext.timeDomain = TimeDomain.EVENT_TIME; + onTimerContext.timer = timer; + userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector); + onTimerContext.timeDomain = null; + onTimerContext.timer = null; } @Override public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception { collector.setAbsoluteTimestamp(timer.getTimestamp()); - userFunction.onTimer(timer.getTimestamp(), TimeDomain.PROCESSING_TIME, timerService, collector); + onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME; + onTimerContext.timer = timer; + userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector); + onTimerContext.timeDomain = null; + onTimerContext.timer = null; } @Override public void processElement(StreamRecord<IN> element) throws Exception { collector.setTimestamp(element); - userFunction.flatMap(element.getValue(), timerService, collector); + context.element = element; + userFunction.flatMap(element.getValue(), context, collector); + context.element = null; + } + + private static class ContextImpl<T> implements TimelyFlatMapFunction.Context { + + private final TimerService timerService; + + private StreamRecord<T> element; + + ContextImpl(TimerService timerService) { + this.timerService = checkNotNull(timerService); + } + + @Override + public Long timestamp() { + checkState(element != null); + + if (element.hasTimestamp()) { + return element.getTimestamp(); + } else { + return null; + } + } + + @Override + public TimerService timerService() { + return timerService; + } + } + + private static class OnTimerContextImpl implements TimelyFlatMapFunction.OnTimerContext{ + + private final TimerService timerService; + + private TimeDomain timeDomain; + + private InternalTimer<?, VoidNamespace> timer; + + OnTimerContextImpl(TimerService timerService) { + this.timerService = checkNotNull(timerService); + } + + @Override + public TimeDomain timeDomain() { + checkState(timeDomain != null); + return timeDomain; + } + + @Override + public Long timestamp() { + checkState(timer != null); + return timer.getTimestamp(); + } + + @Override + public TimerService timerService() { + return timerService; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/99188390/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java index 212aafd..75e4c14 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java @@ -32,6 +32,9 @@ import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + @Internal public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT> extends AbstractUdfStreamOperator<OUT, TimelyCoFlatMapFunction<IN1, IN2, OUT>> @@ -43,6 +46,10 @@ public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT> private transient TimerService timerService; + private transient ContextImpl context; + + private transient OnTimerContextImpl onTimerContext; + public CoStreamTimelyFlatMap(TimelyCoFlatMapFunction<IN1, IN2, OUT> flatMapper) { super(flatMapper); } @@ -56,34 +63,105 @@ public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT> getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this); this.timerService = new SimpleTimerService(internalTimerService); + + context = new ContextImpl(timerService); + onTimerContext = new OnTimerContextImpl(timerService); } @Override public void processElement1(StreamRecord<IN1> element) throws Exception { collector.setTimestamp(element); - userFunction.flatMap1(element.getValue(), timerService, collector); - + context.element = element; + userFunction.flatMap1(element.getValue(), context, collector); + context.element = null; } @Override public void processElement2(StreamRecord<IN2> element) throws Exception { collector.setTimestamp(element); - userFunction.flatMap2(element.getValue(), timerService, collector); + context.element = element; + userFunction.flatMap2(element.getValue(), context, collector); + context.element = null; } @Override public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception { collector.setAbsoluteTimestamp(timer.getTimestamp()); - userFunction.onTimer(timer.getTimestamp(), TimeDomain.EVENT_TIME, timerService, collector); + onTimerContext.timeDomain = TimeDomain.EVENT_TIME; + onTimerContext.timer = timer; + userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector); + onTimerContext.timeDomain = null; + onTimerContext.timer = null; } @Override public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception { collector.setAbsoluteTimestamp(timer.getTimestamp()); - userFunction.onTimer(timer.getTimestamp(), TimeDomain.PROCESSING_TIME, timerService, collector); + onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME; + onTimerContext.timer = timer; + userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector); + onTimerContext.timeDomain = null; + onTimerContext.timer = null; } protected TimestampedCollector<OUT> getCollector() { return collector; } + + private static class ContextImpl implements TimelyCoFlatMapFunction.Context { + + private final TimerService timerService; + + private StreamRecord<?> element; + + ContextImpl(TimerService timerService) { + this.timerService = checkNotNull(timerService); + } + + @Override + public Long timestamp() { + checkState(element != null); + + if (element.hasTimestamp()) { + return element.getTimestamp(); + } else { + return null; + } + } + + @Override + public TimerService timerService() { + return timerService; + } + } + + private static class OnTimerContextImpl implements TimelyCoFlatMapFunction.OnTimerContext { + + private final TimerService timerService; + + private TimeDomain timeDomain; + + private InternalTimer<?, VoidNamespace> timer; + + OnTimerContextImpl(TimerService timerService) { + this.timerService = checkNotNull(timerService); + } + + @Override + public TimeDomain timeDomain() { + checkState(timeDomain != null); + return timeDomain; + } + + @Override + public Long timestamp() { + checkState(timer != null); + return timer.getTimestamp(); + } + + @Override + public TimerService timerService() { + return timerService; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/99188390/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index 5e43120..8f002ba 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -560,7 +560,7 @@ public class DataStreamTest { @Override public void flatMap( Long value, - TimerService timerService, + Context ctx, Collector<Integer> out) throws Exception { } @@ -568,8 +568,7 @@ public class DataStreamTest { @Override public void onTimer( long timestamp, - TimeDomain timeDomain, - TimerService timerService, + OnTimerContext ctx, Collector<Integer> out) throws Exception { } http://git-wip-us.apache.org/repos/asf/flink/blob/99188390/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java index 46b52ee..6080ddc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.TimeDomain; -import org.apache.flink.streaming.api.TimerService; import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction; import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction; import org.apache.flink.streaming.api.watermark.Watermark; @@ -46,7 +45,7 @@ import static org.junit.Assert.assertEquals; public class TimelyFlatMapTest extends TestLogger { @Test - public void testCurrentEventTime() throws Exception { + public void testTimestampAndWatermarkQuerying() throws Exception { StreamTimelyFlatMap<Integer, Integer, String> operator = new StreamTimelyFlatMap<>(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME)); @@ -66,9 +65,9 @@ public class TimelyFlatMapTest extends TestLogger { ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); expectedOutput.add(new Watermark(17L)); - expectedOutput.add(new StreamRecord<>("5TIME:17", 12L)); + expectedOutput.add(new StreamRecord<>("5TIME:17 TS:12", 12L)); expectedOutput.add(new Watermark(42L)); - expectedOutput.add(new StreamRecord<>("6TIME:42", 13L)); + expectedOutput.add(new StreamRecord<>("6TIME:42 TS:13", 13L)); TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); @@ -76,7 +75,7 @@ public class TimelyFlatMapTest extends TestLogger { } @Test - public void testCurrentProcessingTime() throws Exception { + public void testTimestampAndProcessingTimeQuerying() throws Exception { StreamTimelyFlatMap<Integer, Integer, String> operator = new StreamTimelyFlatMap<>(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME)); @@ -95,8 +94,8 @@ public class TimelyFlatMapTest extends TestLogger { ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - expectedOutput.add(new StreamRecord<>("5TIME:17")); - expectedOutput.add(new StreamRecord<>("6TIME:42")); + expectedOutput.add(new StreamRecord<>("5TIME:17 TS:null")); + expectedOutput.add(new StreamRecord<>("6TIME:42 TS:null")); TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); @@ -298,19 +297,18 @@ public class TimelyFlatMapTest extends TestLogger { } @Override - public void flatMap(Integer value, TimerService timerService, Collector<String> out) throws Exception { + public void flatMap(Integer value, Context ctx, Collector<String> out) throws Exception { if (timeDomain.equals(TimeDomain.EVENT_TIME)) { - out.collect(value + "TIME:" + timerService.currentWatermark()); + out.collect(value + "TIME:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()); } else { - out.collect(value + "TIME:" + timerService.currentProcessingTime()); + out.collect(value + "TIME:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()); } } @Override public void onTimer( long timestamp, - TimeDomain timeDomain, - TimerService timerService, + OnTimerContext ctx, Collector<String> out) throws Exception { } } @@ -326,23 +324,22 @@ public class TimelyFlatMapTest extends TestLogger { } @Override - public void flatMap(Integer value, TimerService timerService, Collector<Integer> out) throws Exception { + public void flatMap(Integer value, Context ctx, Collector<Integer> out) throws Exception { out.collect(value); if (timeDomain.equals(TimeDomain.EVENT_TIME)) { - timerService.registerEventTimeTimer(timerService.currentWatermark() + 5); + ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); } else { - timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5); + ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); } } @Override public void onTimer( long timestamp, - TimeDomain timeDomain, - TimerService timerService, + OnTimerContext ctx, Collector<Integer> out) throws Exception { - assertEquals(this.timeDomain, timeDomain); + assertEquals(this.timeDomain, ctx.timeDomain()); out.collect(1777); } } @@ -361,23 +358,22 @@ public class TimelyFlatMapTest extends TestLogger { } @Override - public void flatMap(Integer value, TimerService timerService, Collector<String> out) throws Exception { + public void flatMap(Integer value, Context ctx, Collector<String> out) throws Exception { out.collect("INPUT:" + value); getRuntimeContext().getState(state).update(value); if (timeDomain.equals(TimeDomain.EVENT_TIME)) { - timerService.registerEventTimeTimer(timerService.currentWatermark() + 5); + ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); } else { - timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5); + ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); } } @Override public void onTimer( long timestamp, - TimeDomain timeDomain, - TimerService timerService, + OnTimerContext ctx, Collector<String> out) throws Exception { - assertEquals(this.timeDomain, timeDomain); + assertEquals(this.timeDomain, ctx.timeDomain()); out.collect("STATE:" + getRuntimeContext().getState(state).value()); } } @@ -387,19 +383,17 @@ public class TimelyFlatMapTest extends TestLogger { private static final long serialVersionUID = 1L; @Override - public void flatMap(Integer value, TimerService timerService, Collector<String> out) throws Exception { - timerService.registerProcessingTimeTimer(5); - timerService.registerEventTimeTimer(6); - + public void flatMap(Integer value, Context ctx, Collector<String> out) throws Exception { + ctx.timerService().registerProcessingTimeTimer(5); + ctx.timerService().registerEventTimeTimer(6); } @Override public void onTimer( long timestamp, - TimeDomain timeDomain, - TimerService timerService, + OnTimerContext ctx, Collector<String> out) throws Exception { - if (TimeDomain.EVENT_TIME.equals(timeDomain)) { + if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) { out.collect("EVENT:1777"); } else { out.collect("PROC:1777"); http://git-wip-us.apache.org/repos/asf/flink/blob/99188390/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java index cb5d6c2..7c29631 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.TimeDomain; -import org.apache.flink.streaming.api.TimerService; import org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction; import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction; import org.apache.flink.streaming.api.watermark.Watermark; @@ -46,7 +45,7 @@ import static org.junit.Assert.assertEquals; public class TimelyCoFlatMapTest extends TestLogger { @Test - public void testCurrentEventTime() throws Exception { + public void testTimestampAndWatermarkQuerying() throws Exception { CoStreamTimelyFlatMap<String, Integer, String, String> operator = new CoStreamTimelyFlatMap<>(new WatermarkQueryingFlatMapFunction()); @@ -72,9 +71,9 @@ public class TimelyCoFlatMapTest extends TestLogger { ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); expectedOutput.add(new Watermark(17L)); - expectedOutput.add(new StreamRecord<>("5WM:17", 12L)); + expectedOutput.add(new StreamRecord<>("5WM:17 TS:12", 12L)); expectedOutput.add(new Watermark(42L)); - expectedOutput.add(new StreamRecord<>("6WM:42", 13L)); + expectedOutput.add(new StreamRecord<>("6WM:42 TS:13", 13L)); TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); @@ -82,7 +81,7 @@ public class TimelyCoFlatMapTest extends TestLogger { } @Test - public void testCurrentProcessingTime() throws Exception { + public void testTimestampAndProcessingTimeQuerying() throws Exception { CoStreamTimelyFlatMap<String, Integer, String, String> operator = new CoStreamTimelyFlatMap<>(new ProcessingTimeQueryingFlatMapFunction()); @@ -105,8 +104,8 @@ public class TimelyCoFlatMapTest extends TestLogger { ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - expectedOutput.add(new StreamRecord<>("5PT:17")); - expectedOutput.add(new StreamRecord<>("6PT:42")); + expectedOutput.add(new StreamRecord<>("5PT:17 TS:null")); + expectedOutput.add(new StreamRecord<>("6PT:42 TS:null")); TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); @@ -350,20 +349,19 @@ public class TimelyCoFlatMapTest extends TestLogger { private static final long serialVersionUID = 1L; @Override - public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception { - out.collect(value + "WM:" + timerService.currentWatermark()); + public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception { + out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()); } @Override - public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception { - out.collect(value + "WM:" + timerService.currentWatermark()); + public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception { + out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()); } @Override public void onTimer( long timestamp, - TimeDomain timeDomain, - TimerService timerService, + OnTimerContext ctx, Collector<String> out) throws Exception { } } @@ -373,25 +371,24 @@ public class TimelyCoFlatMapTest extends TestLogger { private static final long serialVersionUID = 1L; @Override - public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception { + public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception { out.collect("INPUT1:" + value); - timerService.registerEventTimeTimer(5); + ctx.timerService().registerEventTimeTimer(5); } @Override - public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception { + public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception { out.collect("INPUT2:" + value); - timerService.registerEventTimeTimer(6); + ctx.timerService().registerEventTimeTimer(6); } @Override public void onTimer( long timestamp, - TimeDomain timeDomain, - TimerService timerService, + OnTimerContext ctx, Collector<String> out) throws Exception { - assertEquals(TimeDomain.EVENT_TIME, timeDomain); + assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain()); out.collect("" + 1777); } } @@ -404,27 +401,26 @@ public class TimelyCoFlatMapTest extends TestLogger { new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null); @Override - public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception { + public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception { out.collect("INPUT1:" + value); getRuntimeContext().getState(state).update("" + value); - timerService.registerEventTimeTimer(timerService.currentWatermark() + 5); + ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); } @Override - public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception { + public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception { out.collect("INPUT2:" + value); getRuntimeContext().getState(state).update(value); - timerService.registerEventTimeTimer(timerService.currentWatermark() + 5); + ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); } @Override public void onTimer( long timestamp, - TimeDomain timeDomain, - TimerService timerService, + OnTimerContext ctx, Collector<String> out) throws Exception { - assertEquals(TimeDomain.EVENT_TIME, timeDomain); + assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain()); out.collect("STATE:" + getRuntimeContext().getState(state).value()); } } @@ -434,25 +430,24 @@ public class TimelyCoFlatMapTest extends TestLogger { private static final long serialVersionUID = 1L; @Override - public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception { + public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception { out.collect("INPUT1:" + value); - timerService.registerProcessingTimeTimer(5); + ctx.timerService().registerProcessingTimeTimer(5); } @Override - public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception { + public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception { out.collect("INPUT2:" + value); - timerService.registerProcessingTimeTimer(6); + ctx.timerService().registerProcessingTimeTimer(6); } @Override public void onTimer( long timestamp, - TimeDomain timeDomain, - TimerService timerService, + OnTimerContext ctx, Collector<String> out) throws Exception { - assertEquals(TimeDomain.PROCESSING_TIME, timeDomain); + assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain()); out.collect("" + 1777); } } @@ -462,20 +457,19 @@ public class TimelyCoFlatMapTest extends TestLogger { private static final long serialVersionUID = 1L; @Override - public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception { - out.collect(value + "PT:" + timerService.currentProcessingTime()); + public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception { + out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()); } @Override - public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception { - out.collect(value + "PT:" + timerService.currentProcessingTime()); + public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception { + out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()); } @Override public void onTimer( long timestamp, - TimeDomain timeDomain, - TimerService timerService, + OnTimerContext ctx, Collector<String> out) throws Exception { } } @@ -488,27 +482,26 @@ public class TimelyCoFlatMapTest extends TestLogger { new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null); @Override - public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception { + public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception { out.collect("INPUT1:" + value); getRuntimeContext().getState(state).update("" + value); - timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5); + ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); } @Override - public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception { + public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception { out.collect("INPUT2:" + value); getRuntimeContext().getState(state).update(value); - timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5); + ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); } @Override public void onTimer( long timestamp, - TimeDomain timeDomain, - TimerService timerService, + OnTimerContext ctx, Collector<String> out) throws Exception { - assertEquals(TimeDomain.PROCESSING_TIME, timeDomain); + assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain()); out.collect("STATE:" + getRuntimeContext().getState(state).value()); } } @@ -518,23 +511,22 @@ public class TimelyCoFlatMapTest extends TestLogger { private static final long serialVersionUID = 1L; @Override - public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception { - timerService.registerEventTimeTimer(6); + public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception { + ctx.timerService().registerEventTimeTimer(6); } @Override - public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception { - timerService.registerProcessingTimeTimer(5); + public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception { + ctx.timerService().registerProcessingTimeTimer(5); } @Override public void onTimer( long timestamp, - TimeDomain timeDomain, - TimerService timerService, + OnTimerContext ctx, Collector<String> out) throws Exception { - if (TimeDomain.EVENT_TIME.equals(timeDomain)) { + if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) { out.collect("EVENT:1777"); } else { out.collect("PROC:1777"); http://git-wip-us.apache.org/repos/asf/flink/blob/99188390/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala index de8b388..967142b 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala @@ -22,9 +22,9 @@ import java.lang import org.apache.flink.api.common.functions._ import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.streaming.api.{TimeDomain, TimerService} import org.apache.flink.streaming.api.collector.selector.OutputSelector import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction +import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction.{Context, OnTimerContext} import org.apache.flink.streaming.api.functions.co.CoMapFunction import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph} import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator, StreamTimelyFlatMap} @@ -327,11 +327,10 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { val src = env.generateSequence(0, 0) val timelyFlatMapFunction = new TimelyFlatMapFunction[Long, Int] { - override def flatMap(value: Long, timerService: TimerService, out: Collector[Int]): Unit = ??? + override def flatMap(value: Long, ctx: Context, out: Collector[Int]): Unit = ??? override def onTimer( timestamp: Long, - timeDomain: TimeDomain, - timerService: TimerService, + ctx: OnTimerContext, out: Collector[Int]): Unit = ??? }
