[ 
https://issues.apache.org/jira/browse/FLINK-1159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15163115#comment-15163115
 ] 

ASF GitHub Bot commented on FLINK-1159:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r53948137
  
    --- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/extensions/acceptPartialFunctions/package.scala
 ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.scala.extensions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala._
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * acceptPartialFunctions extends the original DataSet with methods with 
unique names
    +  * that delegate to core higher-order functions (e.g. `map`) so that we 
can work around
    +  * the fact that overloaded methods taking functions as parameters can't 
accept partial
    +  * functions as well. This enables the possibility to directly apply 
pattern matching
    +  * to decompose inputs such as tuples, case classes and collections.
    +  *
    +  * e.g.
    +  * {{{
    +  *   object Main {
    +  *     import org.apache.flink.api.scala.extensions._
    +  *     case class Point(x: Double, y: Double)
    +  *     def main(args: Array[String]): Unit = {
    +  *       val env = ExecutionEnvironment.getExecutionEnvironment
    +  *       val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6))
    +  *       ds.filterWith {
    +  *         case Point(x, _) => x > 1
    +  *       }.reduceWith {
    +  *         case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + 
y2)
    +  *       }.mapWith {
    +  *         case Point(x, y) => (x, y)
    +  *       }.flatMapWith {
    +  *         case (x, y) => Seq('x' -> x, 'y' -> y)
    +  *       }.groupingBy {
    +  *         case (id, value) => id
    +  *       }
    +  *     }
    +  *   }
    +  * }}}
    +  *
    +  */
    +package object acceptPartialFunctions {
    +
    +  implicit class OnDataSet[T: TypeInformation](ds: DataSet[T]) {
    +
    +    /**
    +      * Applies a function `fun` to each item of the data set
    +      *
    +      * @param fun The function to be applied to each item
    +      * @tparam R The type of the items in the returned data set
    +      * @return A dataset of R
    +      */
    +    def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R] =
    +      ds.map(fun)
    +
    +    /**
    +      * Applies a function `fun` to a partition as a whole
    +      *
    +      * @param fun The function to be applied on the whole partition
    +      * @tparam R The type of the items in the returned data set
    +      * @return A dataset of R
    +      */
    +    def mapPartitionWith[R: TypeInformation: ClassTag](fun: Seq[T] => R): 
DataSet[R] =
    +      ds.mapPartition {
    +        (it, out) =>
    +          out.collect(fun(it.to[Seq]))
    +      }
    +
    +    /**
    +      * Applies a function `fun` to each item of the dataset, producing a 
collection of items
    +      * that will be flattened in the resulting data set
    +      *
    +      * @param fun The function to be applied to each item
    +      * @tparam R The type of the items in the returned data set
    +      * @return A dataset of R
    +      */
    +    def flatMapWith[R: TypeInformation: ClassTag](fun: T => 
TraversableOnce[R]): DataSet[R] =
    +      ds.flatMap(fun)
    +
    +    /**
    +      * Applies a predicate `fun` to each item of the data set, keeping 
only those for which
    +      * the predicate holds
    +      *
    +      * @param fun The predicate to be tested on each item
    +      * @return A dataset of R
    +      */
    +    def filterWith(fun: T => Boolean): DataSet[T] =
    +      ds.filter(fun)
    +
    +    /**
    +      * Applies a reducer `fun` to the data set
    +      *
    +      * @param fun The reducing function to be applied on the whole data 
set
    +      * @tparam R The type of the items in the returned collection
    +      * @return A data set of Rs
    +      */
    +    def reduceWith[R: TypeInformation: ClassTag](fun: (T, T) => T): 
DataSet[T] =
    +      ds.reduce(fun)
    +
    +    /**
    +      * Applies a reducer `fun` to a grouped data set
    +      *
    +      * @param fun The function to be applied to the whole grouping
    +      * @tparam R The type of the items in the returned data set
    +      * @return A dataset of Rs
    +      */
    +    def reduceGroupWith[R: TypeInformation: ClassTag](fun: Seq[T] => R): 
DataSet[R] =
    +      ds.reduceGroup {
    +        (it, out) =>
    +          out.collect(fun(it.to[Seq]))
    +      }
    +
    +    /**
    +      * Groups the items according to a grouping function `fun`
    +      *
    +      * @param fun The grouping function
    +      * @tparam K The return type of the grouping function, for which type 
information must be known
    +      * @return A grouped data set of Ts
    +      */
    +    def groupingBy[K: TypeInformation: ClassTag](fun: T => K): 
GroupedDataSet[T] =
    +      ds.groupBy(fun)
    +
    +  }
    +
    +  implicit class OnJoinDataSet[L: TypeInformation, R: TypeInformation](
    +      dataset: JoinDataSet[L, R]) {
    +
    +    /**
    +      * Joins the data sets using the function `fun` to project elements 
from both in the
    +      * resulting data set
    +      *
    +      * @param fun The function that defines the projection of the join
    +      * @tparam O The return type of the projection, for which type 
information must be known
    +      * @return A fully joined data set of Os
    +      */
    +    def projecting[O: TypeInformation: ClassTag](fun: (L, R) => O): 
DataSet[O] =
    +      dataset(fun)
    +
    +  }
    +
    +  implicit class OnCoGroupDataSet[L: TypeInformation, R: TypeInformation](
    +      dataset: CoGroupDataSet[L, R]) {
    +
    +    /**
    +      * Co-groups the data sets using the function `fun` to project 
elements from both in
    +      * the resulting data set
    +      *
    +      * @param fun The function that defines the projection of the 
co-group operation
    +      * @tparam O The return type of the projection, for which type 
information must be known
    +      * @return A fully co-grouped data set of Os
    +      */
    +    def projecting[O: TypeInformation: ClassTag](fun: (Seq[L], Seq[R]) => 
O): DataSet[O] =
    +      dataset {
    +        (left, right) =>
    +          fun(left.to[Seq], right.to[Seq])
    +      }
    +
    +  }
    --- End diff --
    
    `CrossDataSet` is missing


> Case style anonymous functions not supported by Scala API
> ---------------------------------------------------------
>
>                 Key: FLINK-1159
>                 URL: https://issues.apache.org/jira/browse/FLINK-1159
>             Project: Flink
>          Issue Type: Bug
>          Components: Scala API
>            Reporter: Till Rohrmann
>            Assignee: Stefano Baghino
>
> In Scala it is very common to define anonymous functions of the following form
> {code}
> {
> case foo: Bar => foobar(foo)
> case _ => throw new RuntimeException()
> }
> {code}
> These case style anonymous functions are not supported yet by the Scala API. 
> Thus, one has to write redundant code to name the function parameter.
> What works is the following pattern, but it is not intuitive for someone 
> coming from Scala:
> {code}
> dataset.map{
>   _ match{
>     case foo:Bar => ...
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to