[ 
https://issues.apache.org/jira/browse/SPARK-23544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

caoxuewen updated SPARK-23544:
------------------------------
    Description: 
Currently, we execute the SQL statement:
select MDTTemp.* from (select * from distinctAgg where a > 2 distribute by a, 
b,c) MDTTemp 
left join (select * from testData5 where f > 1) ProjData 
on MDTTemp.b = ProjData.g and 
 MDTTemp.c = ProjData.h and 
 MDTTemp.d < (ProjData.j - 3) and 
 MDTTemp.d >= (ProjData.j + 3)

the physical plan the explain looks like:
== Physical Plan ==
*Project [a#203, b#204, c#205, d#206, e#207]
+- SortMergeJoin [b#204, c#205], [g#222, h#223], LeftOuter, ((d#206 < (j#224 - 
3)) && (d#206 >= (j#224 + 3)))
 :- *Sort [b#204 ASC, c#205 ASC], false, 0
 : +- Exchange hashpartitioning(b#204, c#205, 5)
 : +- Exchange hashpartitioning(a#203, b#204, c#205, 5)
 : +- *Filter (a#203 > 2)
 : +- Scan ExistingRDD[a#203,b#204,c#205,d#206,e#207]
 +- *Sort [g#222 ASC, h#223 ASC], false, 0
 +- Exchange hashpartitioning(g#222, h#223, 5)
 +- *Project [g#222, h#223, j#224]
 +- *Filter (f#221 > 1)
 +- Scan ExistingRDD[f#221,g#222,h#223,j#224,k#225]

There is a redundancy ShuffleExchange that is not necessary. This PR will 
provide a rule to remove redundancy ShuffleExchange in the planner. now the 
explain looks like:

== Physical Plan ==
*Project [a#203, b#204, c#205, d#206, e#207]
+- SortMergeJoin [b#204, c#205], [g#222, h#223], LeftOuter, ((d#206 < (j#224 - 
3)) && (d#206 >= (j#224 + 3)))
 :- *Sort [b#204 ASC, c#205 ASC], false, 0
 : +- Exchange hashpartitioning(b#204, c#205, 5)
 : +- *Filter (a#203 > 2)
 : +- Scan ExistingRDD[a#203,b#204,c#205,d#206,e#207]
 +- *Sort [g#222 ASC, h#223 ASC], false, 0
 +- Exchange hashpartitioning(g#222, h#223, 5)
 +- *Project [g#222, h#223, j#224]
 +- *Filter (f#221 > 1)
 +- Scan ExistingRDD[f#221,g#222,h#223,j#224,k#225]

 

and I have add a test case:

val N = 2 << 20
 runJoinBenchmark("sort merge join", N) {
 val df1 = sparkSession.range(N)
 .selectExpr(s"(id * 15485863) % ${N*10} as k1")
 val df2 = sparkSession.range(N)
 .selectExpr(s"(id * 15485867) % ${N*10} as k2")
 val df = df1.join(df2.repartition(20), col("k1") === col("k2"))
 
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
 df.count()
 }

 

To test the performance of the following:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
 Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
 sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
 
------------------------------------------------------------------------------------------------
 sort merge join Repartition off 3520 / 4364 0.6 1678.5 1.0X
 sort merge join Repartition on 1946 / 2203 1.1 927.9 1.8X

  was:
Currently, when the children of the join are Repartition or 
RepartitionByExpression, Repartition operation is not necessary, I think that 
we can remove the Repartition operation in the Optimizer, and it is safe for 
the join operation. now the explain looks like:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.CollapseRepartition 
===
 
Input LogicalPlan:
Join Inner
:- Repartition 10, false
: +- LocalRelation <empty>, [a#0, b#1]
+- Repartition 10, false
 +- LocalRelation <empty>, [c#2, d#3]

Output LogicalPlan:
Join Inner
:- LocalRelation <empty>, [a#0, b#1]
+- LocalRelation <empty>, [c#2, d#3]

 
h3. and I have add a test case:


val N = 2 << 20
runJoinBenchmark("sort merge join", N) {
 val df1 = sparkSession.range(N)
 .selectExpr(s"(id * 15485863) % ${N*10} as k1")
 val df2 = sparkSession.range(N)
 .selectExpr(s"(id * 15485867) % ${N*10} as k2")
 val df = df1.join(df2.repartition(20), col("k1") === col("k2"))
 
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
 df.count()
}

 

To test the performance of the following:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
sort merge join Repartition off 3520 / 4364 0.6 1678.5 1.0X
sort merge join Repartition on 1946 / 2203 1.1 927.9 1.8X


> Remove redundancy ShuffleExchange in the planner
> ------------------------------------------------
>
>                 Key: SPARK-23544
>                 URL: https://issues.apache.org/jira/browse/SPARK-23544
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: caoxuewen
>            Priority: Major
>
> Currently, we execute the SQL statement:
> select MDTTemp.* from (select * from distinctAgg where a > 2 distribute by a, 
> b,c) MDTTemp 
> left join (select * from testData5 where f > 1) ProjData 
> on MDTTemp.b = ProjData.g and 
>  MDTTemp.c = ProjData.h and 
>  MDTTemp.d < (ProjData.j - 3) and 
>  MDTTemp.d >= (ProjData.j + 3)
> the physical plan the explain looks like:
> == Physical Plan ==
> *Project [a#203, b#204, c#205, d#206, e#207]
> +- SortMergeJoin [b#204, c#205], [g#222, h#223], LeftOuter, ((d#206 < (j#224 
> - 3)) && (d#206 >= (j#224 + 3)))
>  :- *Sort [b#204 ASC, c#205 ASC], false, 0
>  : +- Exchange hashpartitioning(b#204, c#205, 5)
>  : +- Exchange hashpartitioning(a#203, b#204, c#205, 5)
>  : +- *Filter (a#203 > 2)
>  : +- Scan ExistingRDD[a#203,b#204,c#205,d#206,e#207]
>  +- *Sort [g#222 ASC, h#223 ASC], false, 0
>  +- Exchange hashpartitioning(g#222, h#223, 5)
>  +- *Project [g#222, h#223, j#224]
>  +- *Filter (f#221 > 1)
>  +- Scan ExistingRDD[f#221,g#222,h#223,j#224,k#225]
> There is a redundancy ShuffleExchange that is not necessary. This PR will 
> provide a rule to remove redundancy ShuffleExchange in the planner. now the 
> explain looks like:
> == Physical Plan ==
> *Project [a#203, b#204, c#205, d#206, e#207]
> +- SortMergeJoin [b#204, c#205], [g#222, h#223], LeftOuter, ((d#206 < (j#224 
> - 3)) && (d#206 >= (j#224 + 3)))
>  :- *Sort [b#204 ASC, c#205 ASC], false, 0
>  : +- Exchange hashpartitioning(b#204, c#205, 5)
>  : +- *Filter (a#203 > 2)
>  : +- Scan ExistingRDD[a#203,b#204,c#205,d#206,e#207]
>  +- *Sort [g#222 ASC, h#223 ASC], false, 0
>  +- Exchange hashpartitioning(g#222, h#223, 5)
>  +- *Project [g#222, h#223, j#224]
>  +- *Filter (f#221 > 1)
>  +- Scan ExistingRDD[f#221,g#222,h#223,j#224,k#225]
>  
> and I have add a test case:
> val N = 2 << 20
>  runJoinBenchmark("sort merge join", N) {
>  val df1 = sparkSession.range(N)
>  .selectExpr(s"(id * 15485863) % ${N*10} as k1")
>  val df2 = sparkSession.range(N)
>  .selectExpr(s"(id * 15485867) % ${N*10} as k2")
>  val df = df1.join(df2.repartition(20), col("k1") === col("k2"))
>  
> assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
>  df.count()
>  }
>  
> To test the performance of the following:
> Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
>  Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
>  sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
>  
> ------------------------------------------------------------------------------------------------
>  sort merge join Repartition off 3520 / 4364 0.6 1678.5 1.0X
>  sort merge join Repartition on 1946 / 2203 1.1 927.9 1.8X



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to