[FLINK-4955] Add Translations Tests for 
KeyedStream.flatMap(TimelyFlatMapFunction)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06fb9f1b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06fb9f1b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06fb9f1b

Branch: refs/heads/master
Commit: 06fb9f1b4f97ade67a23cd3adc8212e7d848de48
Parents: f0ef370
Author: Aljoscha Krettek <[email protected]>
Authored: Fri Oct 28 13:58:29 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Nov 7 16:25:57 2016 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/DataStreamTest.java     | 50 ++++++++++++++++++++
 .../streaming/api/scala/DataStreamTest.scala    | 28 ++++++++++-
 2 files changed, 77 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/06fb9f1b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 17bea68..5e43120 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
@@ -46,6 +47,7 @@ import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamTimelyFlatMap;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
@@ -544,6 +546,45 @@ public class DataStreamTest {
                assertEquals(TypeExtractor.getForClass(CustomPOJO.class), 
flatten.getType());
        }
 
+       /**
+        * Verify that a timely flat map call is correctly translated to an 
operator.
+        */
+       @Test
+       public void testTimelyFlatMapTranslation() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               DataStreamSource<Long> src = env.generateSequence(0, 0);
+
+               TimelyFlatMapFunction<Long, Integer> timelyFlatMapFunction = 
new TimelyFlatMapFunction<Long, Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void flatMap(
+                                       Long value,
+                                       TimerService timerService,
+                                       Collector<Integer> out) throws 
Exception {
+
+                       }
+
+                       @Override
+                       public void onTimer(
+                                       long timestamp,
+                                       TimeDomain timeDomain,
+                                       TimerService timerService,
+                                       Collector<Integer> out) throws 
Exception {
+
+                       }
+               };
+
+               DataStream<Integer> flatMapped = src
+                               .keyBy(new IdentityKeySelector<Long>())
+                               .flatMap(timelyFlatMapFunction);
+
+               flatMapped.addSink(new DiscardingSink<Integer>());
+
+               assertEquals(timelyFlatMapFunction, 
getFunctionForDataStream(flatMapped));
+               assertTrue(getOperatorForDataStream(flatMapped) instanceof 
StreamTimelyFlatMap);
+       }
+
        @Test
        public void operatorTest() {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -809,6 +850,15 @@ public class DataStreamTest {
                }
        }
 
+       private static class IdentityKeySelector<T> implements KeySelector<T, 
T> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public T getKey(T value) throws Exception {
+                       return value;
+               }
+       }
+
        public static class CustomPOJO {
                private String s;
                private int i;

http://git-wip-us.apache.org/repos/asf/flink/blob/06fb9f1b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index b73eae8..de8b388 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -22,10 +22,12 @@ import java.lang
 
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.streaming.api.{TimeDomain, TimerService}
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction
 import org.apache.flink.streaming.api.functions.co.CoMapFunction
 import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
-import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, 
StreamOperator}
+import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, 
StreamOperator, StreamTimelyFlatMap}
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
 import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, 
PurgingTrigger}
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
@@ -315,6 +317,30 @@ class DataStreamTest extends 
StreamingMultipleProgramsTestBase {
     // TODO check for custom case class
   }
 
+  /**
+   * Verify that a timely flat map call is correctly translated to an operator.
+   */
+  @Test
+  def testTimelyFlatMapTranslation(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val src = env.generateSequence(0, 0)
+
+    val timelyFlatMapFunction = new TimelyFlatMapFunction[Long, Int] {
+      override def flatMap(value: Long, timerService: TimerService, out: 
Collector[Int]): Unit = ???
+      override def onTimer(
+          timestamp: Long,
+          timeDomain: TimeDomain,
+          timerService: TimerService,
+          out: Collector[Int]): Unit = ???
+    }
+
+    val flatMapped = src.keyBy(x => x).flatMap(timelyFlatMapFunction)
+
+    assert(timelyFlatMapFunction == getFunctionForDataStream(flatMapped))
+    
assert(getOperatorForDataStream(flatMapped).isInstanceOf[StreamTimelyFlatMap[_, 
_, _]])
+  }
+
   @Test def operatorTest() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 

Reply via email to