Stefano Pettini created SPARK-23778:
---------------------------------------

             Summary: SparkContext.emptyRDD confuses SparkContext.union
                 Key: SPARK-23778
                 URL: https://issues.apache.org/jira/browse/SPARK-23778
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.3.0, 1.3.0
            Reporter: Stefano Pettini
         Attachments: partitioner_lost_and_unneeded_extra_stage.png

SparkContext.emptyRDD is an unpartitioned RDD. Clearly it's empty so whether 
it's partitioned or not should be just a academic debate. Unfortunately it 
doesn't seem to be like this and the issue has side effects.

Namely, it confuses the RDD union.

When there are N classic RDDs partitioned the same way, the union is 
implemented with the optimized PartitionerAwareUnionRDD, that retains the 
common partitioner in the result. If one of the N RDDs happens to be an 
emptyRDD, as it doesn't have a partitioner, the union is implemented by just 
appending all the partitions of the N RDDs, dropping the partitioner. But 
there's no need for this, as the emptyRDD contains no elements. This results in 
further unneeded shuffles once the result of the union is used.

See for example:

{{val p = new HashPartitioner(3)}}
{{val a = context.parallelize(List(10, 20, 30, 40, 50)).keyBy(_ / 
10).partitionBy(p)}}
{{val b1 = a.mapValues(_ + 1)}}
{{val b2 = a.mapValues(_ - 1)}}
{{val e = context.emptyRDD[(Int, Int)]}}
{{val x = context.union(a, b1, b2, e)}}
{{val y = x.reduceByKey(_ + _)}}
{{assert(x.partitioner.contains(p))}}
{{y.collect()}}

The assert fails. Disabling it, it's possible to see that reduceByKey 
introduced a shuffles, although all the input RDDs are already partitioned the 
same way, but the emptyRDD.

Forcing a partitioner on the emptyRDD:

{{val e = context.emptyRDD[(Int, Int)].partitionBy(p)}}

solves the problem with the assert and doesn't introduce the unneeded extra 
stage and shuffle.

Union implementation should be changed to ignore the partitioner of emptyRDDs 
and consider those as _partitioned in a way compatible with any partitioner_, 
basically ignoring them.

Present since 1.3 at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to