Repository: flink Updated Branches: refs/heads/master 20884c07d -> adddcd839
[FLINK-3527] Add Scala DataStream.transform() This implicitly adds KeyedStream.transform() and also explicitly ConnectedStreams.transform(). This also removes the transform exclusions from the API completeness tests. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/adddcd83 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/adddcd83 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/adddcd83 Branch: refs/heads/master Commit: adddcd8394b81bf1965480b1085b9bfa3696ac1b Parents: 20884c0 Author: Aljoscha Krettek <[email protected]> Authored: Fri Feb 26 20:56:40 2016 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Sat Feb 27 00:38:45 2016 +0100 ---------------------------------------------------------------------- .../flink/streaming/api/scala/ConnectedStreams.scala | 13 +++++++++++-- .../flink/streaming/api/scala/DataStream.scala | 15 +++++++++++++++ .../scala/StreamingScalaAPICompletenessTest.scala | 3 --- 3 files changed, 26 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/adddcd83/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala index a80937c..669f12e 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala @@ -18,12 +18,14 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.annotation.{Internal, Public} +import org.apache.flink.annotation.{PublicEvolving, Internal, Public} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.api.java.typeutils.ResultTypeQueryable -import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream} +import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream, SingleOutputStreamOperator, KeyedStream} import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction} +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator +import org.apache.flink.streaming.api.transformations.TwoInputTransformation import org.apache.flink.util.Collector /** @@ -269,6 +271,13 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { private[flink] def clean[F <: AnyRef](f: F): F = { new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f) } + + @PublicEvolving + def transform[R: TypeInformation]( + functionName: String, + operator: TwoInputStreamOperator[IN1, IN2, R]): DataStream[R] = { + asScalaStream(javaStream.transform(functionName, implicitly[TypeInformation[R]], operator)) + } } @Internal http://git-wip-us.apache.org/repos/asf/flink/blob/adddcd83/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 04a8a5f..9c0675f 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -32,6 +32,8 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream, DataStream => JavaStream, KeyedStream => JavaKeyedStream, _} import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks, AssignerWithPeriodicWatermarks, AscendingTimestampExtractor, TimestampExtractor} +import org.apache.flink.streaming.api.operators.OneInputStreamOperator +import org.apache.flink.streaming.api.transformations.OneInputTransformation import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window} @@ -953,4 +955,17 @@ class DataStream[T](stream: JavaStream[T]) { new StreamExecutionEnvironment(stream.getExecutionEnvironment).scalaClean(f) } + /** + * Transforms the [[DataStream]] by using a custom [[OneInputStreamOperator]]. + * + * @param operatorName name of the operator, for logging purposes + * @param operator the object containing the transformation logic + * @tparam R the type of elements emitted by the operator + */ + @PublicEvolving + def transform[R: TypeInformation]( + operatorName: String, + operator: OneInputStreamOperator[T, R]): DataStream[R] = { + asScalaStream(stream.transform(operatorName, implicitly[TypeInformation[R]], operator)) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/adddcd83/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala index 7ba3194..415f057 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala @@ -39,7 +39,6 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { // private[flink]. "org.apache.flink.streaming.api.datastream.DataStream.getType", "org.apache.flink.streaming.api.datastream.DataStream.copy", - "org.apache.flink.streaming.api.datastream.DataStream.transform", "org.apache.flink.streaming.api.datastream.DataStream.getTransformation", "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.copy", "org.apache.flink.streaming.api.datastream.ConnectedStreams.getExecutionEnvironment", @@ -49,7 +48,6 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType1", "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType2", "org.apache.flink.streaming.api.datastream.ConnectedStreams.addGeneralWindowCombine", - "org.apache.flink.streaming.api.datastream.ConnectedStreams.transform", "org.apache.flink.streaming.api.datastream.WindowedDataStream.getType", "org.apache.flink.streaming.api.datastream.WindowedDataStream.getExecutionConfig", @@ -59,7 +57,6 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { "org.apache.flink.streaming.api.datastream.AllWindowedStream.getExecutionEnvironment", "org.apache.flink.streaming.api.datastream.AllWindowedStream.getInputType", - "org.apache.flink.streaming.api.datastream.KeyedStream.transform", "org.apache.flink.streaming.api.datastream.KeyedStream.getKeySelector", "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.isChainingEnabled",
