Repository: flink Updated Branches: refs/heads/master 718f6e4e3 -> 891950eab
[FLINK-4957] Provide API for TimelyCoFlatMapFunction Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/891950ea Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/891950ea Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/891950ea Branch: refs/heads/master Commit: 891950eabaaed1fdfc1c0c88806f1125b085c4b6 Parents: 0b873ac Author: Aljoscha Krettek <[email protected]> Authored: Fri Oct 28 14:36:06 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Nov 7 16:25:57 2016 +0100 ---------------------------------------------------------------------- .../api/datastream/ConnectedStreams.java | 63 ++++++++++++++++++++ .../streaming/api/scala/ConnectedStreams.scala | 30 +++++++++- 2 files changed, 92 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/891950ea/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java index 50ef95b..dc763cb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.api.datastream; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Public; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -26,9 +27,11 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.streaming.api.functions.co.CoMapFunction; +import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap; import org.apache.flink.streaming.api.operators.co.CoStreamMap; +import org.apache.flink.streaming.api.operators.co.CoStreamTimelyFlatMap; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import static java.util.Objects.requireNonNull; @@ -230,6 +233,66 @@ public class ConnectedStreams<IN1, IN2> { return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper))); } + /** + * Applies the given {@link TimelyCoFlatMapFunction} on the connected input streams, + * thereby creating a transformed output stream. + * + * <p>The function will be called for every element in the streams and can produce + * zero or more output. The function can also query the time and set timers. When + * reacting to the firing of set timers the function can emit yet more elements. + * + * <p>A {@link org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction} + * can be used to gain access to features provided by the + * {@link org.apache.flink.api.common.functions.RichFunction} interface. + * + * @param coFlatMapper The {@link TimelyCoFlatMapFunction} that is called for each element + * in the stream. + * + * @param <R> The of elements emitted by the {@code TimelyCoFlatMapFunction}. + * + * @return The transformed {@link DataStream}. + */ + public <R> SingleOutputStreamOperator<R> flatMap( + TimelyCoFlatMapFunction<IN1, IN2, R> coFlatMapper) { + + TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper, + TimelyCoFlatMapFunction.class, false, true, getType1(), getType2(), + Utils.getCallLocationName(), true); + + return flatMap(coFlatMapper, outTypeInfo); + } + + /** + * Applies the given {@link TimelyCoFlatMapFunction} on the connected input streams, + * thereby creating a transformed output stream. + * + * <p>The function will be called for every element in the streams and can produce + * zero or more output. The function can also query the time and set timers. When + * reacting to the firing of set timers the function can emit yet more elements. + * + * <p>A {@link org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction} + * can be used to gain access to features provided by the + * {@link org.apache.flink.api.common.functions.RichFunction} interface. + * + * @param coFlatMapper The {@link TimelyCoFlatMapFunction} that is called for each element + * in the stream. + * + * @param <R> The of elements emitted by the {@code TimelyCoFlatMapFunction}. + * + * @return The transformed {@link DataStream}. + */ + @Internal + public <R> SingleOutputStreamOperator<R> flatMap( + TimelyCoFlatMapFunction<IN1, IN2, R> coFlatMapper, + TypeInformation<R> outputType) { + + CoStreamTimelyFlatMap<Object, IN1, IN2, R> operator = new CoStreamTimelyFlatMap<>( + inputStream1.clean(coFlatMapper)); + + return transform("Co-Flat Map", outputType, operator); + } + + @PublicEvolving public <R> SingleOutputStreamOperator<R> transform(String functionName, TypeInformation<R> outTypeInfo, http://git-wip-us.apache.org/repos/asf/flink/blob/891950ea/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala index 141625e..50526b5 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala @@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream} -import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction} +import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, TimelyCoFlatMapFunction} import org.apache.flink.streaming.api.operators.TwoInputStreamOperator import org.apache.flink.util.Collector @@ -101,6 +101,34 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { } /** + * Applies the given [[TimelyCoFlatMapFunction]] on the connected input streams, + * thereby creating a transformed output stream. + * + * The function will be called for every element in the streams and can produce + * zero or more output. The function can also query the time and set timers. When + * reacting to the firing of set timers the function can emit yet more elements. + * + * A [[org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction]] + * can be used to gain access to features provided by the + * [[org.apache.flink.api.common.functions.RichFunction]] interface. + * + * @param coFlatMapper The [[TimelyCoFlatMapFunction]] that is called for each element + * in the stream. + * + * @return The transformed { @link DataStream}. + */ + def flatMap[R: TypeInformation]( + coFlatMapper: TimelyCoFlatMapFunction[IN1, IN2, R]) : DataStream[R] = { + + if (coFlatMapper == null) throw new NullPointerException("FlatMap function must not be null.") + + val outType : TypeInformation[R] = implicitly[TypeInformation[R]] + + asScalaStream(javaStream.flatMap(coFlatMapper, outType)) + } + + + /** * Applies a CoFlatMap transformation on these connected streams. * * The transformation calls [[CoFlatMapFunction#flatMap1]] for each element
