Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1704#discussion_r53947144
--- Diff:
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/acceptPartialFunctions/package.scala
---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.extensions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.{JoinedStreams,
CoGroupedStreams, KeyedStream, DataStream}
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+import scala.reflect.ClassTag
+
+/**
+ * acceptPartialFunctions extends the original DataStream with methods
with unique names
+ * that delegate to core higher-order functions (e.g. `map`) so that we
can work around
+ * the fact that overloaded methods taking functions as parameters can't
accept partial
+ * functions as well. This enables the possibility to directly apply
pattern matching
+ * to decompose inputs such as tuples, case classes and collections.
+ *
+ * e.g.
+ * {{{
+ * object Main {
+ * import org.apache.flink.api.scala.extensions._
+ * case class Point(x: Double, y: Double)
+ * def main(args: Array[String]): Unit = {
+ * val env = StreamExecutionEnvironment.getExecutionEnvironment
+ * val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
+ * ds.filterWith {
+ * case Point(x, _) => x > 1
+ * }.reduceWith {
+ * case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 +
y2)
+ * }.mapWith {
+ * case Point(x, y) => (x, y)
+ * }.flatMapWith {
+ * case (x, y) => Seq('x' -> x, 'y' -> y)
+ * }.groupingBy {
+ * case (id, value) => id
+ * }
+ * }
+ * }
+ * }}}
+ *
+ */
+package object acceptPartialFunctions {
+
+ implicit class OnDataStream[T: TypeInformation](stream: DataStream[T]) {
+
+ /**
+ * Applies a function `fun` to each item of the stream
+ *
+ * @param fun The function to be applied to each item
+ * @tparam R The type of the items in the returned stream
+ * @return A dataset of R
+ */
+ def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataStream[R] =
+ stream.map(fun)
+
+ /**
+ * Applies a function `fun` to each item of the stream, producing a
collection of items
+ * that will be flattened in the resulting stream
+ *
+ * @param fun The function to be applied to each item
+ * @tparam R The type of the items in the returned stream
+ * @return A dataset of R
+ */
+ def flatMapWith[R: TypeInformation: ClassTag](fun: T =>
TraversableOnce[R]): DataStream[R] =
+ stream.flatMap(fun)
+
+ /**
+ * Applies a predicate `fun` to each item of the stream, keeping only
those for which
+ * the predicate holds
+ *
+ * @param fun The predicate to be tested on each item
+ * @return A dataset of R
+ */
+ def filterWith(fun: T => Boolean): DataStream[T] =
+ stream.filter(fun)
+
+ /**
+ * Keys the items according to a keying function `fun`
+ *
+ * @param fun The keying function
+ * @tparam K The type of the key, for which type information must be
known
+ * @return A stream of Ts keyed by Ks
+ */
+ def keyingBy[K: TypeInformation: ClassTag](fun: T => K):
KeyedStream[T, K] =
+ stream.keyBy(fun)
+
+ }
+
+ implicit class OnKeyedStream[T: TypeInformation, K](stream:
KeyedStream[T, K]) {
--- End diff --
What is with the `fold` operation?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---