Repository: flink
Updated Branches:
  refs/heads/master 20884c07d -> adddcd839


[FLINK-3527] Add Scala DataStream.transform()

This implicitly adds KeyedStream.transform() and also explicitly
ConnectedStreams.transform().

This also removes the transform exclusions from the API completeness
tests.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/adddcd83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/adddcd83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/adddcd83

Branch: refs/heads/master
Commit: adddcd8394b81bf1965480b1085b9bfa3696ac1b
Parents: 20884c0
Author: Aljoscha Krettek <[email protected]>
Authored: Fri Feb 26 20:56:40 2016 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Sat Feb 27 00:38:45 2016 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/scala/ConnectedStreams.scala | 13 +++++++++++--
 .../flink/streaming/api/scala/DataStream.scala       | 15 +++++++++++++++
 .../scala/StreamingScalaAPICompletenessTest.scala    |  3 ---
 3 files changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/adddcd83/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index a80937c..669f12e 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -18,12 +18,14 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.annotation.{Internal, Public}
+import org.apache.flink.annotation.{PublicEvolving, Internal, Public}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.streaming.api.datastream.{ConnectedStreams => 
JavaCStream, DataStream => JavaStream}
+import org.apache.flink.streaming.api.datastream.{ConnectedStreams => 
JavaCStream, DataStream => JavaStream, SingleOutputStreamOperator, KeyedStream}
 import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, 
CoMapFunction}
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation
 import org.apache.flink.util.Collector
 
 /**
@@ -269,6 +271,13 @@ class ConnectedStreams[IN1, IN2](javaStream: 
JavaCStream[IN1, IN2]) {
   private[flink] def clean[F <: AnyRef](f: F): F = {
     new 
StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
   }
+
+  @PublicEvolving
+  def transform[R: TypeInformation](
+      functionName: String,
+      operator: TwoInputStreamOperator[IN1, IN2, R]): DataStream[R] = {
+    asScalaStream(javaStream.transform(functionName, 
implicitly[TypeInformation[R]], operator))
+  }
 }
 
 @Internal

http://git-wip-us.apache.org/repos/asf/flink/blob/adddcd83/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 04a8a5f..9c0675f 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -32,6 +32,8 @@ import 
org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream => 
JavaAllWindowedStream, DataStream => JavaStream, KeyedStream => 
JavaKeyedStream, _}
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import 
org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks, 
AssignerWithPeriodicWatermarks, AscendingTimestampExtractor, TimestampExtractor}
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, 
TimeWindow, Window}
@@ -953,4 +955,17 @@ class DataStream[T](stream: JavaStream[T]) {
     new 
StreamExecutionEnvironment(stream.getExecutionEnvironment).scalaClean(f)
   }
 
+  /**
+    * Transforms the [[DataStream]] by using a custom 
[[OneInputStreamOperator]].
+    *
+    * @param operatorName name of the operator, for logging purposes
+    * @param operator the object containing the transformation logic
+    * @tparam R the type of elements emitted by the operator
+    */
+  @PublicEvolving
+  def transform[R: TypeInformation](
+      operatorName: String,
+      operator: OneInputStreamOperator[T, R]): DataStream[R] = {
+    asScalaStream(stream.transform(operatorName, 
implicitly[TypeInformation[R]], operator))
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/adddcd83/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index 7ba3194..415f057 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -39,7 +39,6 @@ class StreamingScalaAPICompletenessTest extends 
ScalaAPICompletenessTestBase {
       // private[flink].
       "org.apache.flink.streaming.api.datastream.DataStream.getType",
       "org.apache.flink.streaming.api.datastream.DataStream.copy",
-      "org.apache.flink.streaming.api.datastream.DataStream.transform",
       "org.apache.flink.streaming.api.datastream.DataStream.getTransformation",
       
"org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.copy",
       
"org.apache.flink.streaming.api.datastream.ConnectedStreams.getExecutionEnvironment",
@@ -49,7 +48,6 @@ class StreamingScalaAPICompletenessTest extends 
ScalaAPICompletenessTestBase {
       "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType1",
       "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType2",
       
"org.apache.flink.streaming.api.datastream.ConnectedStreams.addGeneralWindowCombine",
-      "org.apache.flink.streaming.api.datastream.ConnectedStreams.transform",
 
       "org.apache.flink.streaming.api.datastream.WindowedDataStream.getType",
       
"org.apache.flink.streaming.api.datastream.WindowedDataStream.getExecutionConfig",
@@ -59,7 +57,6 @@ class StreamingScalaAPICompletenessTest extends 
ScalaAPICompletenessTestBase {
       
"org.apache.flink.streaming.api.datastream.AllWindowedStream.getExecutionEnvironment",
       
"org.apache.flink.streaming.api.datastream.AllWindowedStream.getInputType",
 
-      "org.apache.flink.streaming.api.datastream.KeyedStream.transform",
       "org.apache.flink.streaming.api.datastream.KeyedStream.getKeySelector",
 
       
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.isChainingEnabled",

Reply via email to