[FLINK-4952] [scala] Add KeyedStream.flatMap(TimelyFlatMapFunction)
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0ef3703 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0ef3703 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0ef3703 Branch: refs/heads/master Commit: f0ef370399638689c2e1adc54a3acf0afab67a17 Parents: b9173b3 Author: Aljoscha Krettek <[email protected]> Authored: Fri Oct 28 13:47:06 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Nov 7 16:25:57 2016 +0100 ---------------------------------------------------------------------- .../streaming/api/datastream/KeyedStream.java | 31 ++++++++++++++++-- .../flink/streaming/api/scala/KeyedStream.scala | 33 +++++++++++++++++++- 2 files changed, 61 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f0ef3703/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index 922ad20..c938f5b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -202,11 +202,38 @@ public class KeyedStream<T, KEY> extends DataStream<T> { Utils.getCallLocationName(), true); + return flatMap(flatMapper, outType); + } + + /** + * Applies the given {@link TimelyFlatMapFunction} on the input stream, thereby + * creating a transformed output stream. + * + * <p>The function will be called for every element in the stream and can produce + * zero or more output. The function can also query the time and set timers. When + * reacting to the firing of set timers the function can emit yet more elements. + * + * <p>A {@link org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction} + * can be used to gain access to features provided by the + * {@link org.apache.flink.api.common.functions.RichFunction} interface. + * + * @param flatMapper The {@link TimelyFlatMapFunction} that is called for each element + * in the stream. + * @param outputType {@link TypeInformation} for the result type of the function. + * + * @param <R> The of elements emitted by the {@code TimelyFlatMapFunction}. + * + * @return The transformed {@link DataStream}. + */ + @Internal + public <R> SingleOutputStreamOperator<R> flatMap( + TimelyFlatMapFunction<T, R> flatMapper, + TypeInformation<R> outputType) { + StreamTimelyFlatMap<KEY, T, R> operator = new StreamTimelyFlatMap<>(keyType.createSerializer(getExecutionConfig()), clean(flatMapper)); - return transform("Flat Map", outType, operator); - + return transform("Flat Map", outputType, operator); } http://git-wip-us.apache.org/repos/asf/flink/blob/f0ef3703/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala index 68eebea..1971359 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala @@ -23,7 +23,8 @@ import org.apache.flink.api.common.functions._ import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor, ValueStateDescriptor} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, KeyedStream => KeyedJavaStream, QueryableStateStream, WindowedStream => WindowedJavaStream} +import org.apache.flink.streaming.api.datastream.{QueryableStateStream, SingleOutputStreamOperator, DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream} +import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator} import org.apache.flink.streaming.api.functions.query.{QueryableAppendingStateOperator, QueryableValueStateOperator} @@ -46,6 +47,36 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] */ @Internal def getKeyType = javaStream.getKeyType() + + + // ------------------------------------------------------------------------ + // basic transformations + // ------------------------------------------------------------------------ + + /** + * Applies the given [[TimelyFlatMapFunction]] on the input stream, thereby + * creating a transformed output stream. + * + * The function will be called for every element in the stream and can produce + * zero or more output. The function can also query the time and set timers. When + * reacting to the firing of set timers the function can emit yet more elements. + * + * A [[org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction]] + * can be used to gain access to features provided by the + * [[org.apache.flink.api.common.functions.RichFunction]] + * + * @param flatMapper The [[TimelyFlatMapFunction]] that is called for each element + * in the stream. + */ + def flatMap[R: TypeInformation]( + flatMapper: TimelyFlatMapFunction[T, R]): DataStream[R] = { + + if (flatMapper == null) { + throw new NullPointerException("TimelyFlatMapFunction must not be null.") + } + + asScalaStream(javaStream.flatMap(flatMapper, implicitly[TypeInformation[R]])) + } // ------------------------------------------------------------------------ // Windowing
