[
https://issues.apache.org/jira/browse/FLINK-1159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15163114#comment-15163114
]
ASF GitHub Bot commented on FLINK-1159:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1704#discussion_r53948074
--- Diff:
flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/acceptPartialFunctions/package.scala
---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.extensions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+
+import scala.reflect.ClassTag
+
+/**
+ * acceptPartialFunctions extends the original DataSet with methods with
unique names
+ * that delegate to core higher-order functions (e.g. `map`) so that we
can work around
+ * the fact that overloaded methods taking functions as parameters can't
accept partial
+ * functions as well. This enables the possibility to directly apply
pattern matching
+ * to decompose inputs such as tuples, case classes and collections.
+ *
+ * e.g.
+ * {{{
+ * object Main {
+ * import org.apache.flink.api.scala.extensions._
+ * case class Point(x: Double, y: Double)
+ * def main(args: Array[String]): Unit = {
+ * val env = ExecutionEnvironment.getExecutionEnvironment
+ * val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
+ * ds.filterWith {
+ * case Point(x, _) => x > 1
+ * }.reduceWith {
+ * case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 +
y2)
+ * }.mapWith {
+ * case Point(x, y) => (x, y)
+ * }.flatMapWith {
+ * case (x, y) => Seq('x' -> x, 'y' -> y)
+ * }.groupingBy {
+ * case (id, value) => id
+ * }
+ * }
+ * }
+ * }}}
+ *
+ */
+package object acceptPartialFunctions {
+
+ implicit class OnDataSet[T: TypeInformation](ds: DataSet[T]) {
+
+ /**
+ * Applies a function `fun` to each item of the data set
+ *
+ * @param fun The function to be applied to each item
+ * @tparam R The type of the items in the returned data set
+ * @return A dataset of R
+ */
+ def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R] =
+ ds.map(fun)
+
+ /**
+ * Applies a function `fun` to a partition as a whole
+ *
+ * @param fun The function to be applied on the whole partition
+ * @tparam R The type of the items in the returned data set
+ * @return A dataset of R
+ */
+ def mapPartitionWith[R: TypeInformation: ClassTag](fun: Seq[T] => R):
DataSet[R] =
+ ds.mapPartition {
+ (it, out) =>
+ out.collect(fun(it.to[Seq]))
+ }
+
+ /**
+ * Applies a function `fun` to each item of the dataset, producing a
collection of items
+ * that will be flattened in the resulting data set
+ *
+ * @param fun The function to be applied to each item
+ * @tparam R The type of the items in the returned data set
+ * @return A dataset of R
+ */
+ def flatMapWith[R: TypeInformation: ClassTag](fun: T =>
TraversableOnce[R]): DataSet[R] =
+ ds.flatMap(fun)
+
+ /**
+ * Applies a predicate `fun` to each item of the data set, keeping
only those for which
+ * the predicate holds
+ *
+ * @param fun The predicate to be tested on each item
+ * @return A dataset of R
+ */
+ def filterWith(fun: T => Boolean): DataSet[T] =
+ ds.filter(fun)
+
+ /**
+ * Applies a reducer `fun` to the data set
+ *
+ * @param fun The reducing function to be applied on the whole data
set
+ * @tparam R The type of the items in the returned collection
+ * @return A data set of Rs
+ */
+ def reduceWith[R: TypeInformation: ClassTag](fun: (T, T) => T):
DataSet[T] =
+ ds.reduce(fun)
+
+ /**
+ * Applies a reducer `fun` to a grouped data set
+ *
+ * @param fun The function to be applied to the whole grouping
+ * @tparam R The type of the items in the returned data set
+ * @return A dataset of Rs
+ */
+ def reduceGroupWith[R: TypeInformation: ClassTag](fun: Seq[T] => R):
DataSet[R] =
+ ds.reduceGroup {
+ (it, out) =>
+ out.collect(fun(it.to[Seq]))
+ }
+
+ /**
+ * Groups the items according to a grouping function `fun`
+ *
+ * @param fun The grouping function
+ * @tparam K The return type of the grouping function, for which type
information must be known
+ * @return A grouped data set of Ts
+ */
+ def groupingBy[K: TypeInformation: ClassTag](fun: T => K):
GroupedDataSet[T] =
+ ds.groupBy(fun)
+
+ }
+
+ implicit class OnJoinDataSet[L: TypeInformation, R: TypeInformation](
+ dataset: JoinDataSet[L, R]) {
+
+ /**
+ * Joins the data sets using the function `fun` to project elements
from both in the
+ * resulting data set
+ *
+ * @param fun The function that defines the projection of the join
+ * @tparam O The return type of the projection, for which type
information must be known
+ * @return A fully joined data set of Os
+ */
+ def projecting[O: TypeInformation: ClassTag](fun: (L, R) => O):
DataSet[O] =
+ dataset(fun)
+
+ }
+
+ implicit class OnCoGroupDataSet[L: TypeInformation, R: TypeInformation](
+ dataset: CoGroupDataSet[L, R]) {
+
+ /**
+ * Co-groups the data sets using the function `fun` to project
elements from both in
+ * the resulting data set
+ *
+ * @param fun The function that defines the projection of the
co-group operation
+ * @tparam O The return type of the projection, for which type
information must be known
+ * @return A fully co-grouped data set of Os
+ */
+ def projecting[O: TypeInformation: ClassTag](fun: (Seq[L], Seq[R]) =>
O): DataSet[O] =
+ dataset {
+ (left, right) =>
+ fun(left.to[Seq], right.to[Seq])
+ }
+
+ }
--- End diff --
`GroupedDataSet` is missing
> Case style anonymous functions not supported by Scala API
> ---------------------------------------------------------
>
> Key: FLINK-1159
> URL: https://issues.apache.org/jira/browse/FLINK-1159
> Project: Flink
> Issue Type: Bug
> Components: Scala API
> Reporter: Till Rohrmann
> Assignee: Stefano Baghino
>
> In Scala it is very common to define anonymous functions of the following form
> {code}
> {
> case foo: Bar => foobar(foo)
> case _ => throw new RuntimeException()
> }
> {code}
> These case style anonymous functions are not supported yet by the Scala API.
> Thus, one has to write redundant code to name the function parameter.
> What works is the following pattern, but it is not intuitive for someone
> coming from Scala:
> {code}
> dataset.map{
> _ match{
> case foo:Bar => ...
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)