[FLINK-4955] Add Translations Tests for KeyedStream.flatMap(TimelyFlatMapFunction)
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06fb9f1b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06fb9f1b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06fb9f1b Branch: refs/heads/master Commit: 06fb9f1b4f97ade67a23cd3adc8212e7d848de48 Parents: f0ef370 Author: Aljoscha Krettek <[email protected]> Authored: Fri Oct 28 13:58:29 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Nov 7 16:25:57 2016 +0100 ---------------------------------------------------------------------- .../flink/streaming/api/DataStreamTest.java | 50 ++++++++++++++++++++ .../streaming/api/scala/DataStreamTest.scala | 28 ++++++++++- 2 files changed, 77 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/06fb9f1b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index 17bea68..5e43120 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; @@ -46,6 +47,7 @@ import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamTimelyFlatMap; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; @@ -544,6 +546,45 @@ public class DataStreamTest { assertEquals(TypeExtractor.getForClass(CustomPOJO.class), flatten.getType()); } + /** + * Verify that a timely flat map call is correctly translated to an operator. + */ + @Test + public void testTimelyFlatMapTranslation() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStreamSource<Long> src = env.generateSequence(0, 0); + + TimelyFlatMapFunction<Long, Integer> timelyFlatMapFunction = new TimelyFlatMapFunction<Long, Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap( + Long value, + TimerService timerService, + Collector<Integer> out) throws Exception { + + } + + @Override + public void onTimer( + long timestamp, + TimeDomain timeDomain, + TimerService timerService, + Collector<Integer> out) throws Exception { + + } + }; + + DataStream<Integer> flatMapped = src + .keyBy(new IdentityKeySelector<Long>()) + .flatMap(timelyFlatMapFunction); + + flatMapped.addSink(new DiscardingSink<Integer>()); + + assertEquals(timelyFlatMapFunction, getFunctionForDataStream(flatMapped)); + assertTrue(getOperatorForDataStream(flatMapped) instanceof StreamTimelyFlatMap); + } + @Test public void operatorTest() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -809,6 +850,15 @@ public class DataStreamTest { } } + private static class IdentityKeySelector<T> implements KeySelector<T, T> { + private static final long serialVersionUID = 1L; + + @Override + public T getKey(T value) throws Exception { + return value; + } + } + public static class CustomPOJO { private String s; private int i; http://git-wip-us.apache.org/repos/asf/flink/blob/06fb9f1b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala index b73eae8..de8b388 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala @@ -22,10 +22,12 @@ import java.lang import org.apache.flink.api.common.functions._ import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.streaming.api.{TimeDomain, TimerService} import org.apache.flink.streaming.api.collector.selector.OutputSelector +import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction import org.apache.flink.streaming.api.functions.co.CoMapFunction import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph} -import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator} +import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator, StreamTimelyFlatMap} import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger} import org.apache.flink.streaming.api.windowing.windows.GlobalWindow @@ -315,6 +317,30 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { // TODO check for custom case class } + /** + * Verify that a timely flat map call is correctly translated to an operator. + */ + @Test + def testTimelyFlatMapTranslation(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val src = env.generateSequence(0, 0) + + val timelyFlatMapFunction = new TimelyFlatMapFunction[Long, Int] { + override def flatMap(value: Long, timerService: TimerService, out: Collector[Int]): Unit = ??? + override def onTimer( + timestamp: Long, + timeDomain: TimeDomain, + timerService: TimerService, + out: Collector[Int]): Unit = ??? + } + + val flatMapped = src.keyBy(x => x).flatMap(timelyFlatMapFunction) + + assert(timelyFlatMapFunction == getFunctionForDataStream(flatMapped)) + assert(getOperatorForDataStream(flatMapped).isInstanceOf[StreamTimelyFlatMap[_, _, _]]) + } + @Test def operatorTest() { val env = StreamExecutionEnvironment.getExecutionEnvironment
