Repository: flink
Updated Branches:
  refs/heads/master 718f6e4e3 -> 891950eab


[FLINK-4957] Provide API for TimelyCoFlatMapFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/891950ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/891950ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/891950ea

Branch: refs/heads/master
Commit: 891950eabaaed1fdfc1c0c88806f1125b085c4b6
Parents: 0b873ac
Author: Aljoscha Krettek <[email protected]>
Authored: Fri Oct 28 14:36:06 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Nov 7 16:25:57 2016 +0100

----------------------------------------------------------------------
 .../api/datastream/ConnectedStreams.java        | 63 ++++++++++++++++++++
 .../streaming/api/scala/ConnectedStreams.scala  | 30 +++++++++-
 2 files changed, 92 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/891950ea/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 50ef95b..dc763cb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -26,9 +27,11 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
+import org.apache.flink.streaming.api.operators.co.CoStreamTimelyFlatMap;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 
 import static java.util.Objects.requireNonNull;
@@ -230,6 +233,66 @@ public class ConnectedStreams<IN1, IN2> {
                return transform("Co-Flat Map", outTypeInfo, new 
CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
        }
 
+       /**
+        * Applies the given {@link TimelyCoFlatMapFunction} on the connected 
input streams,
+        * thereby creating a transformed output stream.
+        *
+        * <p>The function will be called for every element in the streams and 
can produce
+        * zero or more output. The function can also query the time and set 
timers. When
+        * reacting to the firing of set timers the function can emit yet more 
elements.
+        *
+        * <p>A {@link 
org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction}
+        * can be used to gain access to features provided by the
+        * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+        *
+        * @param coFlatMapper The {@link TimelyCoFlatMapFunction} that is 
called for each element
+        *                      in the stream.
+        *
+        * @param <R> The of elements emitted by the {@code 
TimelyCoFlatMapFunction}.
+        *
+        * @return The transformed {@link DataStream}.
+        */
+       public <R> SingleOutputStreamOperator<R> flatMap(
+                       TimelyCoFlatMapFunction<IN1, IN2, R> coFlatMapper) {
+
+               TypeInformation<R> outTypeInfo = 
TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
+                               TimelyCoFlatMapFunction.class, false, true, 
getType1(), getType2(),
+                               Utils.getCallLocationName(), true);
+
+               return flatMap(coFlatMapper, outTypeInfo);
+       }
+
+       /**
+        * Applies the given {@link TimelyCoFlatMapFunction} on the connected 
input streams,
+        * thereby creating a transformed output stream.
+        *
+        * <p>The function will be called for every element in the streams and 
can produce
+        * zero or more output. The function can also query the time and set 
timers. When
+        * reacting to the firing of set timers the function can emit yet more 
elements.
+        *
+        * <p>A {@link 
org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction}
+        * can be used to gain access to features provided by the
+        * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+        *
+        * @param coFlatMapper The {@link TimelyCoFlatMapFunction} that is 
called for each element
+        *                      in the stream.
+        *
+        * @param <R> The of elements emitted by the {@code 
TimelyCoFlatMapFunction}.
+        *
+        * @return The transformed {@link DataStream}.
+        */
+       @Internal
+       public <R> SingleOutputStreamOperator<R> flatMap(
+                       TimelyCoFlatMapFunction<IN1, IN2, R> coFlatMapper,
+                       TypeInformation<R> outputType) {
+
+               CoStreamTimelyFlatMap<Object, IN1, IN2, R> operator = new 
CoStreamTimelyFlatMap<>(
+                               inputStream1.clean(coFlatMapper));
+
+               return transform("Co-Flat Map", outputType, operator);
+       }
+
+
        @PublicEvolving
        public <R> SingleOutputStreamOperator<R> transform(String functionName,
                        TypeInformation<R> outTypeInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/891950ea/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index 141625e..50526b5 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.streaming.api.datastream.{ConnectedStreams => 
JavaCStream, DataStream => JavaStream}
-import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, 
CoMapFunction}
+import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, 
CoMapFunction, TimelyCoFlatMapFunction}
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
 import org.apache.flink.util.Collector
 
@@ -101,6 +101,34 @@ class ConnectedStreams[IN1, IN2](javaStream: 
JavaCStream[IN1, IN2]) {
   }
 
   /**
+   * Applies the given [[TimelyCoFlatMapFunction]] on the connected input 
streams,
+   * thereby creating a transformed output stream.
+   *
+   * The function will be called for every element in the streams and can 
produce
+   * zero or more output. The function can also query the time and set timers. 
When
+   * reacting to the firing of set timers the function can emit yet more 
elements.
+   *
+   * A 
[[org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction]]
+   * can be used to gain access to features provided by the
+   * [[org.apache.flink.api.common.functions.RichFunction]] interface.
+   *
+   * @param coFlatMapper The [[TimelyCoFlatMapFunction]] that is called for 
each element
+    *                     in the stream.
+    *
+   * @return The transformed { @link DataStream}.
+   */
+  def flatMap[R: TypeInformation](
+      coFlatMapper: TimelyCoFlatMapFunction[IN1, IN2, R]) : DataStream[R] = {
+
+    if (coFlatMapper == null) throw new NullPointerException("FlatMap function 
must not be null.")
+
+    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
+
+    asScalaStream(javaStream.flatMap(coFlatMapper, outType))
+  }
+
+
+  /**
    * Applies a CoFlatMap transformation on these connected streams.
    *
    * The transformation calls [[CoFlatMapFunction#flatMap1]] for each element

Reply via email to