[ https://issues.apache.org/jira/browse/SPARK-24556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
yucai updated SPARK-24556: -------------------------- Description: Currently, ReusedExchange would rewrite output partitioning if child's partitioning is HashPartitioning, but it does not do the same when child's partitioning is RangePartitioning, sometimes, it could introduce extra shuffle, see: {code:java} val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j") val df1 = df.as("t1") val df2 = df.as("t2") val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right") t.cache.orderBy($"t2.j").explain {code} Before fix: {code:sql} == Physical Plan == *(1) Sort [j#14 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200) +- InMemoryTableScan [i#5, j#6, i#13, j#14] +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder... +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as... : +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) : +- LocalTableScan [i#5, j#6] +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) {code} Better plan should avoid "Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)", like: {code:sql} == Physical Plan == *(1) Sort [j#14 ASC NULLS FIRST], true, 0 +- InMemoryTableScan [i#5, j#6, i#13, j#14] +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder... +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) : +- LocalTableScan [i#5, j#6] +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) {code} was: Currently, ReusedExchange would rewrite output partitioning if child's partitioning is HashPartitioning, but it does not do the same when child's partitioning is RangePartitioning, sometimes, it could introduce extra shuffle, see: {code} val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j") val df1 = df.as("t1") val df2 = df.as("t2") val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right") t.cache.orderBy($"t2.j").explain {code} Before fix: {code:sql} == Physical Plan == *(1) Sort [j#14 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200) +- InMemoryTableScan [i#5, j#6, i#13, j#14] +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder... +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) : +- LocalTableScan [i#5, j#6] +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) {code} Better plan should avoid "Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)", like: {code:sql} == Physical Plan == *(1) Sort [j#14 ASC NULLS FIRST], true, 0 +- InMemoryTableScan [i#5, j#6, i#13, j#14] +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder... +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) : +- LocalTableScan [i#5, j#6] +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) {code} > ReusedExchange should rewrite output partitioning also when child's > partitioning is RangePartitioning > ----------------------------------------------------------------------------------------------------- > > Key: SPARK-24556 > URL: https://issues.apache.org/jira/browse/SPARK-24556 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.2 > Reporter: yucai > Priority: Major > > Currently, ReusedExchange would rewrite output partitioning if child's > partitioning is HashPartitioning, but it does not do the same when child's > partitioning is RangePartitioning, sometimes, it could introduce extra > shuffle, see: > {code:java} > val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j") > val df1 = df.as("t1") > val df2 = df.as("t2") > val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right") > t.cache.orderBy($"t2.j").explain > {code} > Before fix: > {code:sql} > == Physical Plan == > *(1) Sort [j#14 ASC NULLS FIRST], true, 0 > +- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200) > +- InMemoryTableScan [i#5, j#6, i#13, j#14] > +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder... > +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft > :- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, false] as... > : +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 > : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, > 200) > : +- LocalTableScan [i#5, j#6] > +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 > +- ReusedExchange [i#13, j#14], Exchange > rangepartitioning(j#6 ASC NULLS FIRST, 200) > {code} > Better plan should avoid "Exchange rangepartitioning(j#14 ASC NULLS FIRST, > 200)", like: > {code:sql} > == Physical Plan == > *(1) Sort [j#14 ASC NULLS FIRST], true, 0 > +- InMemoryTableScan [i#5, j#6, i#13, j#14] > +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder... > +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft > :- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) > : +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0 > : +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200) > : +- LocalTableScan [i#5, j#6] > +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0 > +- ReusedExchange [i#13, j#14], Exchange > rangepartitioning(j#6 ASC NULLS FIRST, 200) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org