http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnDataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnDataStream.scala new file mode 100644 index 0000000..8d98c46 --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnDataStream.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream} + +/** + * Wraps a data stream, allowing to use anonymous partial functions to + * perform extraction of items in a tuple, case class instance or collection + * + * @param stream The wrapped data stream + * @tparam T The type of the data stream items + */ +class OnDataStream[T](stream: DataStream[T]) { + + /** + * Applies a function `fun` to each item of the stream + * + * @param fun The function to be applied to each item + * @tparam R The type of the items in the returned stream + * @return A dataset of R + */ + @PublicEvolving + def mapWith[R: TypeInformation](fun: T => R): DataStream[R] = + stream.map(fun) + + /** + * Applies a function `fun` to each item of the stream, producing a collection of items + * that will be flattened in the resulting stream + * + * @param fun The function to be applied to each item + * @tparam R The type of the items in the returned stream + * @return A dataset of R + */ + @PublicEvolving + def flatMapWith[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R] = + stream.flatMap(fun) + + /** + * Applies a predicate `fun` to each item of the stream, keeping only those for which + * the predicate holds + * + * @param fun The predicate to be tested on each item + * @return A dataset of R + */ + @PublicEvolving + def filterWith(fun: T => Boolean): DataStream[T] = + stream.filter(fun) + + /** + * Keys the items according to a keying function `fun` + * + * @param fun The keying function + * @tparam K The type of the key, for which type information must be known + * @return A stream of Ts keyed by Ks + */ + @PublicEvolving + def keyingBy[K: TypeInformation](fun: T => K): KeyedStream[T, K] = + stream.keyBy(fun) + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedStream.scala new file mode 100644 index 0000000..226eb8b --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedStream.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.scala.{DataStream, JoinedStreams} +import org.apache.flink.streaming.api.windowing.windows.Window + +/** + * Wraps a joined data stream, allowing to use anonymous partial functions to + * perform extraction of items in a tuple, case class instance or collection + * + * @param stream The wrapped data stream + * @tparam L The type of the data stream items from the left side of the join + * @tparam R The type of the data stream items from the right input of the join + * @tparam K The type of key + * @tparam W The type of the window + */ +class OnJoinedStream[L, R, K, W <: Window]( + stream: JoinedStreams[L, R]#Where[K]#EqualTo#WithWindow[W]) { + + /** + * Completes the join operation with the user function that is executed + * for windowed groups. + * + * @param fun The function that defines the projection of the join + * @tparam O The return type of the projection, for which type information must be known + * @return A fully joined data set of Os + */ + @PublicEvolving + def projecting[O: TypeInformation](fun: (L, R) => O): DataStream[O] = + stream.apply(fun) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedStream.scala new file mode 100644 index 0000000..218bcbf --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedStream.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream} + +/** + * Wraps a keyed data stream, allowing to use anonymous partial functions to + * perform extraction of items in a tuple, case class instance or collection + * + * @param stream The wrapped data stream + * @tparam T The type of the data stream items + * @tparam K The type of key + */ +class OnKeyedStream[T, K](stream: KeyedStream[T, K]) { + + /** + * Applies a reducer `fun` to the stream + * + * @param fun The reducing function to be applied on the keyed stream + * @return A data set of Ts + */ + @PublicEvolving + def reduceWith(fun: (T, T) => T): DataStream[T] = + stream.reduce(fun) + + /** + * Folds the stream over a zero element with a reducer `fun` + * + * @param initialValue The zero element + * @param fun The reducing function to be applied on the keyed stream + * @return A data set of Rs + */ + @PublicEvolving + def foldWith[R: TypeInformation](initialValue: R)(fun: (R, T) => R): DataStream[R] = + stream.fold(initialValue)(fun) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala new file mode 100644 index 0000000..f7a5923 --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.scala.{DataStream, WindowedStream} +import org.apache.flink.streaming.api.windowing.windows.Window + +/** + * Wraps a joined data stream, allowing to use anonymous partial functions to + * perform extraction of items in a tuple, case class instance or collection + * + * @param stream The wrapped data stream + * @tparam T The type of the data stream items from the right input of the join + * @tparam K The type of key + * @tparam W The type of the window + */ +class OnWindowedStream[T, K, W <: Window](stream: WindowedStream[T, K, W]) { + + /** + * Applies a reduce function to the window. The window function is called for each evaluation + * of the window for each key individually. The output of the reduce function is interpreted + * as a regular non-windowed stream. + * + * This window will try and pre-aggregate data as much as the window policies permit. + * For example,tumbling time windows can perfectly pre-aggregate the data, meaning that only one + * element per key is stored. Sliding time windows will pre-aggregate on the granularity of the + * slide interval, so a few elements are stored per key (one per slide interval). + * Custom windows may not be able to pre-aggregate, or may need to store extra values in an + * aggregation tree. + * + * @param function The reduce function. + * @return The data stream that is the result of applying the reduce function to the window. + */ + @PublicEvolving + def reduceWith(function: (T, T) => T) = + stream.reduce(function) + + /** + * Applies the given fold function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the reduce function is + * interpreted as a regular non-windowed stream. + * + * @param function The fold function. + * @return The data stream that is the result of applying the fold function to the window. + */ + @PublicEvolving + def foldWith[R: TypeInformation](initialValue: R)(function: (R, T) => R) = + stream.fold(initialValue)(function) + + /** + * Applies the given window function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the window function is + * interpreted as a regular non-windowed stream. + * + * Arriving data is incrementally aggregated using the given fold function. + * + * @param initialValue The initial value of the fold + * @param foldFunction The fold function that is used for incremental aggregation + * @param windowFunction The window function. + * @return The data stream that is the result of applying the window function to the window. + */ + @PublicEvolving + def applyWith[R: TypeInformation]( + initialValue: R)( + foldFunction: (R, T) => R, + windowFunction: (K, W, Stream[R]) => TraversableOnce[R]) + : DataStream[R] = + stream.apply(initialValue, foldFunction, { + (key, window, items, out) => + windowFunction(key, window, items.toStream).foreach(out.collect) + }) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/package.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/package.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/package.scala new file mode 100644 index 0000000..f82bb74 --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/package.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions._ +import org.apache.flink.streaming.api.windowing.windows.Window + +/** + * acceptPartialFunctions extends the original DataStream with methods with unique names + * that delegate to core higher-order functions (e.g. `map`) so that we can work around + * the fact that overloaded methods taking functions as parameters can't accept partial + * functions as well. This enables the possibility to directly apply pattern matching + * to decompose inputs such as tuples, case classes and collections. + * + * The following is a small example that showcases how this extensions would work on + * a Flink data stream: + * + * {{{ + * object Main { + * import org.apache.flink.streaming.api.scala.extensions._ + * case class Point(x: Double, y: Double) + * def main(args: Array[String]): Unit = { + * val env = StreamExecutionEnvironment.getExecutionEnvironment + * val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6)) + * ds.filterWith { + * case Point(x, _) => x > 1 + * }.reduceWith { + * case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2) + * }.mapWith { + * case Point(x, y) => (x, y) + * }.flatMapWith { + * case (x, y) => Seq('x' -> x, 'y' -> y) + * }.keyingBy { + * case (id, value) => id + * } + * } + * } + * }}} + * + * The extension consists of several implicit conversions over all the data stream representations + * that could gain from this feature. To use this set of extensions methods the user has to + * explicitly opt-in by importing + * `org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions`. + * + * For more information and usage examples please consult the Apache Flink official documentation. + * + */ +package object extensions { + + @PublicEvolving + implicit def acceptPartialFunctions[T](ds: DataStream[T]): OnDataStream[T] = + new OnDataStream[T](ds) + + @PublicEvolving + implicit def acceptPartialFunctions[T, K](ds: KeyedStream[T, K]): OnKeyedStream[T, K] = + new OnKeyedStream[T, K](ds) + + @PublicEvolving + implicit def acceptPartialFunctions[L, R, K, W <: Window]( + ds: JoinedStreams[L, R]#Where[K]#EqualTo#WithWindow[W]): OnJoinedStream[L, R, K, W] = + new OnJoinedStream[L, R, K, W](ds) + + @PublicEvolving + implicit def acceptPartialFunctions[IN1, IN2]( + ds: ConnectedStreams[IN1, IN2]): OnConnectedStream[IN1, IN2] = + new OnConnectedStream[IN1, IN2](ds) + + @PublicEvolving + implicit def acceptPartialFunctions[T, K, W <: Window]( + ds: WindowedStream[T, K, W]): OnWindowedStream[T, K, W] = + new OnWindowedStream[T, K, W](ds) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/base/AcceptPFTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/base/AcceptPFTestBase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/base/AcceptPFTestBase.scala new file mode 100644 index 0000000..86176e6 --- /dev/null +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/base/AcceptPFTestBase.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala.extensions.base + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair +import org.apache.flink.util.TestLogger +import org.scalatest.junit.JUnitSuiteLike + +/** + * Common facilities to test the `acceptPartialFunctions` extension + */ +private[extensions] abstract class AcceptPFTestBase extends TestLogger with JUnitSuiteLike { + + private val env = StreamExecutionEnvironment.getExecutionEnvironment + + protected val tuples = env.fromElements( + 1 -> "hello", + 2 -> "world", + 3 -> "foo", + 4 -> "bar", + 5 -> "baz", + 6 -> "quux") + protected val caseObjects = env.fromElements( + KeyValuePair(1, "hello"), + KeyValuePair(2, "world"), + KeyValuePair(3, "foo"), + KeyValuePair(4, "bar"), + KeyValuePair(5, "baz"), + KeyValuePair(6, "quux")) + + protected val keyedTuples = tuples.keyBy(_._1) + protected val keyedCaseObjects = caseObjects.keyBy(_.id) + + protected val windowedTuples = keyedTuples.countWindow(2) + protected val windowedCaseObjects = keyedCaseObjects.countWindow(2) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/data/KeyValuePair.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/data/KeyValuePair.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/data/KeyValuePair.scala new file mode 100644 index 0000000..fad8189 --- /dev/null +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/data/KeyValuePair.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala.extensions.data + +/** + * Simple case class to test the `acceptPartialFunctions` extension + * + * @param id A numerical identifier + * @param value A textual value + */ +private [extensions] case class KeyValuePair(id: Int, value: String) http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedDataStreamTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedDataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedDataStreamTest.scala new file mode 100644 index 0000000..f5bb89a --- /dev/null +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnConnectedDataStreamTest.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator +import org.apache.flink.streaming.api.scala.ConnectedStreams +import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions +import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase +import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair +import org.junit.Test + +class OnConnectedDataStreamTest extends AcceptPFTestBase { + + @Test + def testMapWithOnTuple(): Unit = { + val test = + tuples.connect(tuples).mapWith({ + case (id, value) => s"$id $value" + }, { + case (id, value) => s"$id $value" + }) + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "mapWith should produce a SingleOutputStreamOperator") + } + + @Test + def testMapWithOnCaseClass(): Unit = { + val test = + caseObjects.connect(caseObjects).mapWith({ + case KeyValuePair(id, value) => s"$id $value" + }, { + case KeyValuePair(id, value) => s"$id $value" + }) + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "mapWith should produce a SingleOutputStreamOperator") + } + + @Test + def testFlatMapWithOnTuple(): Unit = { + val test = + tuples.connect(tuples).flatMapWith({ + case (id, value) => List(id.toString, value) + }, { + case (id, value) => List(id.toString, value) + }) + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "flatMapWith should produce a SingleOutputStreamOperator") + } + + @Test + def testFlatMapWithOnCaseClass(): Unit = { + val test = + caseObjects.connect(caseObjects).flatMapWith({ + case KeyValuePair(id, value) => List(id.toString, value) + }, { + case KeyValuePair(id, value) => List(id.toString, value) + }) + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "flatMapWith should produce a SingleOutputStreamOperator") + } + + @Test + def testKeyingByOnTuple(): Unit = { + val test = + tuples.connect(tuples).keyingBy({ + case (id, _) => id + }, { + case (id, _) => id + }) + assert(test.isInstanceOf[ConnectedStreams[_, _]], + "keyingBy should produce a ConnectedStreams") + } + + @Test + def testKeyingByOnCaseClass(): Unit = { + val test = + caseObjects.connect(caseObjects).keyingBy({ + case KeyValuePair(id, _) => id + }, { + case KeyValuePair(id, _) => id + }) + assert(test.isInstanceOf[ConnectedStreams[_, _]], + "keyingBy should produce a ConnectedStreams") + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnDataStreamTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnDataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnDataStreamTest.scala new file mode 100644 index 0000000..fb6d865 --- /dev/null +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnDataStreamTest.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.datastream.{KeyedStream, SingleOutputStreamOperator} +import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions +import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase +import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair +import org.junit.Test + +class OnDataStreamTest extends AcceptPFTestBase { + + @Test + def testMapWithOnTuple(): Unit = { + val test = + tuples.mapWith { + case (id, value) => s"$id $value" + } + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "mapWith should produce a SingleOutputStreamOperator") + } + + @Test + def testMapWithOnCaseClass(): Unit = { + val test = + caseObjects.mapWith { + case KeyValuePair(id, value) => s"$id $value" + } + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "mapWith should produce a SingleOutputStreamOperator") + } + + @Test + def testFlatMapWithOnTuple(): Unit = { + val test = + tuples.flatMapWith { + case (id, value) => List(id.toString, value) + } + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "flatMapWith should produce a SingleOutputStreamOperator") + } + + @Test + def testFlatMapWithOnCaseClass(): Unit = { + val test = + caseObjects.flatMapWith { + case KeyValuePair(id, value) => List(id.toString, value) + } + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "flatMapWith should produce a SingleOutputStreamOperator") + } + + @Test + def testFilterWithOnTuple(): Unit = { + val test = + tuples.filterWith { + case (id, value) => id == 1 + } + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "filterWith should produce a SingleOutputStreamOperator") + } + + @Test + def testFilterWithOnCaseClass(): Unit = { + val test = + caseObjects.filterWith { + case KeyValuePair(id, value) => id == 1 + } + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "filterWith should produce a SingleOutputStreamOperator") + } + + @Test + def testKeyingByOnTuple(): Unit = { + val test = + tuples.keyingBy { + case (id, _) => id + } + assert(test.javaStream.isInstanceOf[KeyedStream[_, _]], + "keyingBy should produce a KeyedStream") + } + + @Test + def testKeyingByOnCaseClass(): Unit = { + val test = + caseObjects.keyingBy { + case KeyValuePair(id, _) => id + } + assert(test.javaStream.isInstanceOf[KeyedStream[_, _]], + "keyingBy should produce a KeyedStream") + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedDataStreamTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedDataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedDataStreamTest.scala new file mode 100644 index 0000000..34c55d7 --- /dev/null +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnJoinedDataStreamTest.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions + +import java.util.concurrent.TimeUnit + +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions +import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase +import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows +import org.apache.flink.streaming.api.windowing.time.Time +import org.junit.Test + +class OnJoinedDataStreamTest extends AcceptPFTestBase { + + @Test + def testProjectingOnTuple(): Unit = { + val test = + tuples.join(tuples). + where { + case (id, _) => id + }.equalTo { + case (id, _) => id + }.window { + TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)) + }.projecting { + case ((_, v1), (_, v2)) => s"$v1 $v2" + } + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "projecting should produce a SingleOutputStreamOperator") + } + + @Test + def testProjectingOnCaseClass(): Unit = { + val test = + caseObjects.join(caseObjects). + where { + case KeyValuePair(id, _) => id + }.equalTo { + case KeyValuePair(id, _) => id + }.window { + TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)) + }.projecting { + case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => s"$v1 $v2" + } + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "projecting should produce a SingleOutputStreamOperator") + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedDataStreamTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedDataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedDataStreamTest.scala new file mode 100644 index 0000000..f6f153a --- /dev/null +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnKeyedDataStreamTest.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator +import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions +import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase +import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair +import org.junit.Test + +class OnKeyedDataStreamTest extends AcceptPFTestBase { + + @Test + def testReduceWithOnTuple(): Unit = { + val test = + keyedTuples.reduceWith { + case ((_, v1), (_, v2)) => 0 -> s"$v1 $v2" + } + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "reduceWith should produce a SingleOutputStreamOperator") + } + + @Test + def testReduceWithOnCaseClass(): Unit = { + val test = + keyedCaseObjects.reduceWith { + case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => KeyValuePair(0, s"$v1 $v2") + } + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "reduceWith should produce a SingleOutputStreamOperator") + } + + @Test + def testFoldWithOnTuple(): Unit = { + val test = + keyedTuples.foldWith("") { + case (folding, (_, value)) => s"$folding $value" + } + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "flatMapWith should produce a SingleOutputStreamOperator") + } + + @Test + def testFoldWithOnCaseClass(): Unit = { + val test = + keyedCaseObjects.foldWith("") { + case (folding, KeyValuePair(_, value)) => s"$folding $value" + } + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "flatMapWith should produce a SingleOutputStreamOperator") + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/5cb84f18/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedDataStreamTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedDataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedDataStreamTest.scala new file mode 100644 index 0000000..4fa9f5a --- /dev/null +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedDataStreamTest.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala.extensions.impl.acceptPartialFunctions + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator +import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions +import org.apache.flink.streaming.api.scala.extensions.base.AcceptPFTestBase +import org.apache.flink.streaming.api.scala.extensions.data.KeyValuePair +import org.junit.Test + +class OnWindowedDataStreamTest extends AcceptPFTestBase { + + @Test + def testReduceWithOnTuple(): Unit = { + val test = + windowedTuples.reduceWith { + case ((_, v1), (_, v2)) => 0 -> s"$v1 $v2" + } + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "reduceWith should produce a SingleOutputStreamOperator") + } + + @Test + def testReduceWithOnCaseClass(): Unit = { + val test = + windowedCaseObjects.reduceWith { + case (KeyValuePair(_, v1), KeyValuePair(_, v2)) => KeyValuePair(0, s"$v1 $v2") + } + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "reduceWith should produce a SingleOutputStreamOperator") + } + + @Test + def testFoldWithOnTuple(): Unit = { + val test = + windowedTuples.foldWith("") { + case (folding, (_, value)) => s"$folding $value" + } + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "foldWith should produce a SingleOutputStreamOperator") + } + + @Test + def testFoldWithOnCaseClass(): Unit = { + val test = + windowedCaseObjects.foldWith("") { + case (folding, KeyValuePair(_, value)) => s"$folding $value" + } + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "foldWith should produce a SingleOutputStreamOperator") + } + + @Test + def testApplyWithOnTuple(): Unit = { + val test = + windowedTuples.applyWith("")( + foldFunction = { + case (folding, (_, value)) => s"$folding $value" + }, + windowFunction = { + case (n, w, head #:: neck #:: _) => Seq(n.toString, w.maxTimestamp().toString, head, neck) + }) + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "applyWith should produce a SingleOutputStreamOperator") + } + + @Test + def testApplyWithOnCaseClass(): Unit = { + val test = + windowedCaseObjects.applyWith("")( + foldFunction = { + case (folding, KeyValuePair(_, value)) => s"$folding $value" + }, + windowFunction = { + case (n, w, head #:: neck #:: _) => Seq(n.toString, w.maxTimestamp().toString, head, neck) + }) + assert(test.javaStream.isInstanceOf[SingleOutputStreamOperator[_]], + "applyWith should produce a SingleOutputStreamOperator") + } + +}
