[ 
https://issues.apache.org/jira/browse/SPARK-23778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefano Pettini updated SPARK-23778:
------------------------------------
    Attachment: as_it_should_be.png

> SparkContext.emptyRDD confuses SparkContext.union
> -------------------------------------------------
>
>                 Key: SPARK-23778
>                 URL: https://issues.apache.org/jira/browse/SPARK-23778
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.3.0, 2.3.0
>            Reporter: Stefano Pettini
>            Priority: Minor
>         Attachments: as_it_should_be.png, 
> partitioner_lost_and_unneeded_extra_stage.png
>
>
> SparkContext.emptyRDD is an unpartitioned RDD. Clearly it's empty so whether 
> it's partitioned or not should be just a academic debate. Unfortunately it 
> doesn't seem to be like this and the issue has side effects.
> Namely, it confuses the RDD union.
> When there are N classic RDDs partitioned the same way, the union is 
> implemented with the optimized PartitionerAwareUnionRDD, that retains the 
> common partitioner in the result. If one of the N RDDs happens to be an 
> emptyRDD, as it doesn't have a partitioner, the union is implemented by just 
> appending all the partitions of the N RDDs, dropping the partitioner. But 
> there's no need for this, as the emptyRDD contains no elements. This results 
> in further unneeded shuffles once the result of the union is used.
> See for example:
> {{val p = new HashPartitioner(3)}}
> {{val a = context.parallelize(List(10, 20, 30, 40, 50)).keyBy(_ / 
> 10).partitionBy(p)}}
> {{val b1 = a.mapValues(_ + 1)}}
> {{val b2 = a.mapValues(_ - 1)}}
> {{val e = context.emptyRDD[(Int, Int)]}}
> {{val x = context.union(a, b1, b2, e)}}
> {{val y = x.reduceByKey(_ + _)}}
> {{assert(x.partitioner.contains(p))}}
> {{y.collect()}}
> The assert fails. Disabling it, it's possible to see that reduceByKey 
> introduced a shuffles, although all the input RDDs are already partitioned 
> the same way, but the emptyRDD.
> Forcing a partitioner on the emptyRDD:
> {{val e = context.emptyRDD[(Int, Int)].partitionBy(p)}}
> solves the problem with the assert and doesn't introduce the unneeded extra 
> stage and shuffle.
> Union implementation should be changed to ignore the partitioner of emptyRDDs 
> and consider those as _partitioned in a way compatible with any partitioner_, 
> basically ignoring them.
> Present since 1.3 at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to