Josh Rosen created SPARK-9563:
---------------------------------

             Summary: Collapse repartition and exchange
                 Key: SPARK-9563
                 URL: https://issues.apache.org/jira/browse/SPARK-9563
             Project: Spark
          Issue Type: Improvement
          Components: SQL
            Reporter: Josh Rosen


Consider the following query:

{code}
val df1 = sqlContext.createDataFrame(sc.parallelize(1 to 100, 100).map(x => (x, 
x)))
val df2 = sqlContext.createDataFrame(sc.parallelize(1 to 100, 100).map(x => (x, 
x)))
df1.repartition(1000).join(df2, "_1").explain(true)
{code}

Here's the plan for this query as of Spark 1.4.1:

{code}
== Parsed Logical Plan ==
Project [_1#68991,_2#68992,_2#68994]
 Join Inner, Some((_1#68991 = _1#68993))
  Repartition 1000, true
   LogicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame 
at <console>:29
  LogicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame at 
<console>:30

== Analyzed Logical Plan ==
_1: int, _2: int, _2: int
Project [_1#68991,_2#68992,_2#68994]
 Join Inner, Some((_1#68991 = _1#68993))
  Repartition 1000, true
   LogicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame 
at <console>:29
  LogicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame at 
<console>:30

== Optimized Logical Plan ==
Project [_1#68991,_2#68992,_2#68994]
 Join Inner, Some((_1#68991 = _1#68993))
  Repartition 1000, true
   LogicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame 
at <console>:29
  LogicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame at 
<console>:30

== Physical Plan ==
Project [_1#68991,_2#68992,_2#68994]
 ShuffledHashJoin [_1#68991], [_1#68993], BuildRight
  Exchange (HashPartitioning 200)
   Repartition 1000, true
    PhysicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame 
at <console>:29
  Exchange (HashPartitioning 200)
   PhysicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame 
at <console>:30
{code}

In this plan, we end up repartitioning {{df1}} to have 1000 partitions, which 
involves a shuffle, only to turn around and shuffle again as part of the 
exchange.

To avoid this extra shuffle, I think that we should remove the Repartition when 
the following condition holds:

- Exchange's child is a repartition operator where shuffle=True.

We should not perform this collapsing when shuffle=False, since there might be 
a legitimate reason to coalesce before shuffling (reducing the number of map 
outputs that need to be tracked, for instance).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to