[
https://issues.apache.org/jira/browse/SPARK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Apache Spark reassigned SPARK-9563:
-----------------------------------
Assignee: Apache Spark
> Remove repartition operators when they are the child of Exchange and
> shuffle=True
> ---------------------------------------------------------------------------------
>
> Key: SPARK-9563
> URL: https://issues.apache.org/jira/browse/SPARK-9563
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Reporter: Josh Rosen
> Assignee: Apache Spark
>
> Consider the following query:
> {code}
> val df1 = sqlContext.createDataFrame(sc.parallelize(1 to 100, 100).map(x =>
> (x, x)))
> val df2 = sqlContext.createDataFrame(sc.parallelize(1 to 100, 100).map(x =>
> (x, x)))
> df1.repartition(1000).join(df2, "_1").explain(true)
> {code}
> Here's the plan for this query as of Spark 1.4.1:
> {code}
> == Parsed Logical Plan ==
> Project [_1#68991,_2#68992,_2#68994]
> Join Inner, Some((_1#68991 = _1#68993))
> Repartition 1000, true
> LogicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame
> at <console>:29
> LogicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame
> at <console>:30
> == Analyzed Logical Plan ==
> _1: int, _2: int, _2: int
> Project [_1#68991,_2#68992,_2#68994]
> Join Inner, Some((_1#68991 = _1#68993))
> Repartition 1000, true
> LogicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame
> at <console>:29
> LogicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame
> at <console>:30
> == Optimized Logical Plan ==
> Project [_1#68991,_2#68992,_2#68994]
> Join Inner, Some((_1#68991 = _1#68993))
> Repartition 1000, true
> LogicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame
> at <console>:29
> LogicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame
> at <console>:30
> == Physical Plan ==
> Project [_1#68991,_2#68992,_2#68994]
> ShuffledHashJoin [_1#68991], [_1#68993], BuildRight
> Exchange (HashPartitioning 200)
> Repartition 1000, true
> PhysicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at
> createDataFrame at <console>:29
> Exchange (HashPartitioning 200)
> PhysicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at
> createDataFrame at <console>:30
> {code}
> In this plan, we end up repartitioning {{df1}} to have 1000 partitions, which
> involves a shuffle, only to turn around and shuffle again as part of the
> exchange.
> To avoid this extra shuffle, I think that we should remove the Repartition
> when the following condition holds:
> - Exchange's child is a repartition operator where shuffle=True.
> We should not perform this collapsing when shuffle=False, since there might
> be a legitimate reason to coalesce before shuffling (reducing the number of
> map outputs that need to be tracked, for instance).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]