[ 
https://issues.apache.org/jira/browse/SPARK-10476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Simeon Simeonov updated SPARK-10476:
------------------------------------
    Summary: Add common RDD operations on standard Scala collections  (was: Add 
common RDD API methods to standard Scala collections)

> Add common RDD operations on standard Scala collections
> -------------------------------------------------------
>
>                 Key: SPARK-10476
>                 URL: https://issues.apache.org/jira/browse/SPARK-10476
>             Project: Spark
>          Issue Type: New Feature
>          Components: Spark Core
>    Affects Versions: 1.4.1
>            Reporter: Simeon Simeonov
>            Priority: Minor
>              Labels: core, mapPartitions, rdd
>
> A common pattern in Spark development is to look for opportunities to 
> leverage data locality using mechanisms such as {{mapPartitions}}. Often this 
> happens when an existing set of RDD transformations is refactored to improve 
> performance. At that point, significant code refactoring may be required 
> because the input is {{Iterator\[T]}} as opposed to an RDD. The most common 
> examples we've encountered so far involve the {{*ByKey}} methods, {{sample}} 
> and {{takeSample}}. We have also observed cases where, due to changes in the 
> structure of data use of {{mapPartitions}} is no longer possible and the code 
> has to be converted to use the RDD API.
> If data manipulation through the RDD API could be applied to the standard 
> Scala data structures then refactoring Spark data pipelines would become 
> faster and less bug-prone. Also, and this is no small benefit, the 
> thoughtfulness and experience of the Spark community could spread to the 
> broader Scala community.
> There are multiple approaches to solving this problem, including but not 
> limited to creating a set of {{Local*RDD}} classes and/or adding implicit 
> conversions.
> Here is a simple example meant to be short as opposed to complete or 
> performance-optimized:
> {code}
> implicit class LocalRDD[T](it: Iterator[T]) extends Iterable[T] {
>   def this(collection: Iterable[T]) = this(collection.toIterator)
>   def iterator = it
> }
> implicit class LocalPairRDD[K, V](it: Iterator[(K, V)]) extends Iterable[(K, 
> V)] {
>   def this(collection: Iterable[(K, V)]) = this(collection.toIterator)
>   def iterator = it
>   def groupByKey() = new LocalPairRDD[K, Iterable[V]](
>     groupBy(_._1).map { case (k, valuePairs) => (k, valuePairs.map(_._2)) }
>   )
> }
> sc.
>   parallelize(Array((1, 10), (2, 10), (1, 20))).
>   repartition(1).
>   mapPartitions(data => data.groupByKey().toIterator).
>   take(2)
> // Array[(Int, Iterable[Int])] = Array((2,List(10)), (1,List(10, 20)))
> {code} 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to