[
https://issues.apache.org/jira/browse/SPARK-23778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stefano Pettini updated SPARK-23778:
------------------------------------
Attachment: as_it_should_be.png
> SparkContext.emptyRDD confuses SparkContext.union
> -------------------------------------------------
>
> Key: SPARK-23778
> URL: https://issues.apache.org/jira/browse/SPARK-23778
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.3.0, 2.3.0
> Reporter: Stefano Pettini
> Priority: Minor
> Attachments: as_it_should_be.png,
> partitioner_lost_and_unneeded_extra_stage.png
>
>
> SparkContext.emptyRDD is an unpartitioned RDD. Clearly it's empty so whether
> it's partitioned or not should be just a academic debate. Unfortunately it
> doesn't seem to be like this and the issue has side effects.
> Namely, it confuses the RDD union.
> When there are N classic RDDs partitioned the same way, the union is
> implemented with the optimized PartitionerAwareUnionRDD, that retains the
> common partitioner in the result. If one of the N RDDs happens to be an
> emptyRDD, as it doesn't have a partitioner, the union is implemented by just
> appending all the partitions of the N RDDs, dropping the partitioner. But
> there's no need for this, as the emptyRDD contains no elements. This results
> in further unneeded shuffles once the result of the union is used.
> See for example:
> {{val p = new HashPartitioner(3)}}
> {{val a = context.parallelize(List(10, 20, 30, 40, 50)).keyBy(_ /
> 10).partitionBy(p)}}
> {{val b1 = a.mapValues(_ + 1)}}
> {{val b2 = a.mapValues(_ - 1)}}
> {{val e = context.emptyRDD[(Int, Int)]}}
> {{val x = context.union(a, b1, b2, e)}}
> {{val y = x.reduceByKey(_ + _)}}
> {{assert(x.partitioner.contains(p))}}
> {{y.collect()}}
> The assert fails. Disabling it, it's possible to see that reduceByKey
> introduced a shuffles, although all the input RDDs are already partitioned
> the same way, but the emptyRDD.
> Forcing a partitioner on the emptyRDD:
> {{val e = context.emptyRDD[(Int, Int)].partitionBy(p)}}
> solves the problem with the assert and doesn't introduce the unneeded extra
> stage and shuffle.
> Union implementation should be changed to ignore the partitioner of emptyRDDs
> and consider those as _partitioned in a way compatible with any partitioner_,
> basically ignoring them.
> Present since 1.3 at least.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]