Josh Rosen created SPARK-9563:
---------------------------------
Summary: Collapse repartition and exchange
Key: SPARK-9563
URL: https://issues.apache.org/jira/browse/SPARK-9563
Project: Spark
Issue Type: Improvement
Components: SQL
Reporter: Josh Rosen
Consider the following query:
{code}
val df1 = sqlContext.createDataFrame(sc.parallelize(1 to 100, 100).map(x => (x,
x)))
val df2 = sqlContext.createDataFrame(sc.parallelize(1 to 100, 100).map(x => (x,
x)))
df1.repartition(1000).join(df2, "_1").explain(true)
{code}
Here's the plan for this query as of Spark 1.4.1:
{code}
== Parsed Logical Plan ==
Project [_1#68991,_2#68992,_2#68994]
Join Inner, Some((_1#68991 = _1#68993))
Repartition 1000, true
LogicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame
at <console>:29
LogicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame at
<console>:30
== Analyzed Logical Plan ==
_1: int, _2: int, _2: int
Project [_1#68991,_2#68992,_2#68994]
Join Inner, Some((_1#68991 = _1#68993))
Repartition 1000, true
LogicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame
at <console>:29
LogicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame at
<console>:30
== Optimized Logical Plan ==
Project [_1#68991,_2#68992,_2#68994]
Join Inner, Some((_1#68991 = _1#68993))
Repartition 1000, true
LogicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame
at <console>:29
LogicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame at
<console>:30
== Physical Plan ==
Project [_1#68991,_2#68992,_2#68994]
ShuffledHashJoin [_1#68991], [_1#68993], BuildRight
Exchange (HashPartitioning 200)
Repartition 1000, true
PhysicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame
at <console>:29
Exchange (HashPartitioning 200)
PhysicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame
at <console>:30
{code}
In this plan, we end up repartitioning {{df1}} to have 1000 partitions, which
involves a shuffle, only to turn around and shuffle again as part of the
exchange.
To avoid this extra shuffle, I think that we should remove the Repartition when
the following condition holds:
- Exchange's child is a repartition operator where shuffle=True.
We should not perform this collapsing when shuffle=False, since there might be
a legitimate reason to coalesce before shuffling (reducing the number of map
outputs that need to be tracked, for instance).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]