[FLINK-4952] [scala] Add KeyedStream.flatMap(TimelyFlatMapFunction)

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0ef3703
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0ef3703
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0ef3703

Branch: refs/heads/master
Commit: f0ef370399638689c2e1adc54a3acf0afab67a17
Parents: b9173b3
Author: Aljoscha Krettek <[email protected]>
Authored: Fri Oct 28 13:47:06 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Nov 7 16:25:57 2016 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/KeyedStream.java   | 31 ++++++++++++++++--
 .../flink/streaming/api/scala/KeyedStream.scala | 33 +++++++++++++++++++-
 2 files changed, 61 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f0ef3703/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 922ad20..c938f5b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -202,11 +202,38 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
                                Utils.getCallLocationName(),
                                true);
 
+               return flatMap(flatMapper, outType);
+       }
+
+       /**
+        * Applies the given {@link TimelyFlatMapFunction} on the input stream, 
thereby
+        * creating a transformed output stream.
+        *
+        * <p>The function will be called for every element in the stream and 
can produce
+        * zero or more output. The function can also query the time and set 
timers. When
+        * reacting to the firing of set timers the function can emit yet more 
elements.
+        *
+        * <p>A {@link 
org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction}
+        * can be used to gain access to features provided by the
+        * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+        *
+        * @param flatMapper The {@link TimelyFlatMapFunction} that is called 
for each element
+        *                      in the stream.
+        * @param outputType {@link TypeInformation} for the result type of the 
function.
+        *
+        * @param <R> The of elements emitted by the {@code 
TimelyFlatMapFunction}.
+        *
+        * @return The transformed {@link DataStream}.
+        */
+       @Internal
+       public <R> SingleOutputStreamOperator<R> flatMap(
+                       TimelyFlatMapFunction<T, R> flatMapper,
+                       TypeInformation<R> outputType) {
+
                StreamTimelyFlatMap<KEY, T, R> operator =
                                new 
StreamTimelyFlatMap<>(keyType.createSerializer(getExecutionConfig()), 
clean(flatMapper));
 
-               return transform("Flat Map", outType, operator);
-
+               return transform("Flat Map", outputType, operator);
        }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ef3703/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 68eebea..1971359 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -23,7 +23,8 @@ import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.state.{FoldingStateDescriptor, 
ListStateDescriptor, ReducingStateDescriptor, ValueStateDescriptor}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, 
KeyedStream => KeyedJavaStream, QueryableStateStream, WindowedStream => 
WindowedJavaStream}
+import org.apache.flink.streaming.api.datastream.{QueryableStateStream, 
SingleOutputStreamOperator, DataStream => JavaStream, KeyedStream => 
KeyedJavaStream, WindowedStream => WindowedJavaStream}
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import 
org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, 
SumAggregator}
 import 
org.apache.flink.streaming.api.functions.query.{QueryableAppendingStateOperator,
 QueryableValueStateOperator}
@@ -46,6 +47,36 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) 
extends DataStream[T]
    */
   @Internal
   def getKeyType = javaStream.getKeyType()
+
+
+  // ------------------------------------------------------------------------
+  //  basic transformations
+  // ------------------------------------------------------------------------
+
+  /**
+    * Applies the given [[TimelyFlatMapFunction]] on the input stream, thereby
+    * creating a transformed output stream.
+    *
+    * The function will be called for every element in the stream and can 
produce
+    * zero or more output. The function can also query the time and set 
timers. When
+    * reacting to the firing of set timers the function can emit yet more 
elements.
+    *
+    * A [[org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction]]
+    * can be used to gain access to features provided by the
+    * [[org.apache.flink.api.common.functions.RichFunction]]
+    *
+    * @param flatMapper The [[TimelyFlatMapFunction]] that is called for each 
element
+    *                   in the stream.
+    */
+  def flatMap[R: TypeInformation](
+      flatMapper: TimelyFlatMapFunction[T, R]): DataStream[R] = {
+
+    if (flatMapper == null) {
+      throw new NullPointerException("TimelyFlatMapFunction must not be null.")
+    }
+
+    asScalaStream(javaStream.flatMap(flatMapper, 
implicitly[TypeInformation[R]]))
+  }
   
   // ------------------------------------------------------------------------
   //  Windowing

Reply via email to