[
https://issues.apache.org/jira/browse/SPARK-23544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
caoxuewen updated SPARK-23544:
------------------------------
Description:
Currently, we execute the SQL statement:
select MDTTemp.* from (select * from distinctAgg where a > 2 distribute by a,
b,c) MDTTemp
left join (select * from testData5 where f > 1) ProjData
on MDTTemp.b = ProjData.g and
MDTTemp.c = ProjData.h and
MDTTemp.d < (ProjData.j - 3) and
MDTTemp.d >= (ProjData.j + 3)
the physical plan the explain looks like:
== Physical Plan ==
*Project [a#203, b#204, c#205, d#206, e#207]
+- SortMergeJoin [b#204, c#205], [g#222, h#223], LeftOuter, ((d#206 < (j#224 -
3)) && (d#206 >= (j#224 + 3)))
:- *Sort [b#204 ASC, c#205 ASC], false, 0
: +- Exchange hashpartitioning(b#204, c#205, 5)
: +- Exchange hashpartitioning(a#203, b#204, c#205, 5)
: +- *Filter (a#203 > 2)
: +- Scan ExistingRDD[a#203,b#204,c#205,d#206,e#207]
+- *Sort [g#222 ASC, h#223 ASC], false, 0
+- Exchange hashpartitioning(g#222, h#223, 5)
+- *Project [g#222, h#223, j#224]
+- *Filter (f#221 > 1)
+- Scan ExistingRDD[f#221,g#222,h#223,j#224,k#225]
There is a redundancy ShuffleExchange that is not necessary. This PR will
provide a rule to remove redundancy ShuffleExchange in the planner. now the
explain looks like:
== Physical Plan ==
*Project [a#203, b#204, c#205, d#206, e#207]
+- SortMergeJoin [b#204, c#205], [g#222, h#223], LeftOuter, ((d#206 < (j#224 -
3)) && (d#206 >= (j#224 + 3)))
:- *Sort [b#204 ASC, c#205 ASC], false, 0
: +- Exchange hashpartitioning(b#204, c#205, 5)
: +- *Filter (a#203 > 2)
: +- Scan ExistingRDD[a#203,b#204,c#205,d#206,e#207]
+- *Sort [g#222 ASC, h#223 ASC], false, 0
+- Exchange hashpartitioning(g#222, h#223, 5)
+- *Project [g#222, h#223, j#224]
+- *Filter (f#221 > 1)
+- Scan ExistingRDD[f#221,g#222,h#223,j#224,k#225]
and I have add a test case:
val N = 2 << 20
runJoinBenchmark("sort merge join", N) {
val df1 = sparkSession.range(N)
.selectExpr(s"(id * 15485863) % ${N*10} as k1")
val df2 = sparkSession.range(N)
.selectExpr(s"(id * 15485867) % ${N*10} as k2")
val df = df1.join(df2.repartition(20), col("k1") === col("k2"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
df.count()
}
To test the performance of the following:
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
sort merge join Repartition off 3520 / 4364 0.6 1678.5 1.0X
sort merge join Repartition on 1946 / 2203 1.1 927.9 1.8X
was:
Currently, when the children of the join are Repartition or
RepartitionByExpression, Repartition operation is not necessary, I think that
we can remove the Repartition operation in the Optimizer, and it is safe for
the join operation. now the explain looks like:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.CollapseRepartition
===
Input LogicalPlan:
Join Inner
:- Repartition 10, false
: +- LocalRelation <empty>, [a#0, b#1]
+- Repartition 10, false
+- LocalRelation <empty>, [c#2, d#3]
Output LogicalPlan:
Join Inner
:- LocalRelation <empty>, [a#0, b#1]
+- LocalRelation <empty>, [c#2, d#3]
h3. and I have add a test case:
val N = 2 << 20
runJoinBenchmark("sort merge join", N) {
val df1 = sparkSession.range(N)
.selectExpr(s"(id * 15485863) % ${N*10} as k1")
val df2 = sparkSession.range(N)
.selectExpr(s"(id * 15485867) % ${N*10} as k2")
val df = df1.join(df2.repartition(20), col("k1") === col("k2"))
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
df.count()
}
To test the performance of the following:
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
sort merge join Repartition off 3520 / 4364 0.6 1678.5 1.0X
sort merge join Repartition on 1946 / 2203 1.1 927.9 1.8X
> Remove redundancy ShuffleExchange in the planner
> ------------------------------------------------
>
> Key: SPARK-23544
> URL: https://issues.apache.org/jira/browse/SPARK-23544
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.4.0
> Reporter: caoxuewen
> Priority: Major
>
> Currently, we execute the SQL statement:
> select MDTTemp.* from (select * from distinctAgg where a > 2 distribute by a,
> b,c) MDTTemp
> left join (select * from testData5 where f > 1) ProjData
> on MDTTemp.b = ProjData.g and
> MDTTemp.c = ProjData.h and
> MDTTemp.d < (ProjData.j - 3) and
> MDTTemp.d >= (ProjData.j + 3)
> the physical plan the explain looks like:
> == Physical Plan ==
> *Project [a#203, b#204, c#205, d#206, e#207]
> +- SortMergeJoin [b#204, c#205], [g#222, h#223], LeftOuter, ((d#206 < (j#224
> - 3)) && (d#206 >= (j#224 + 3)))
> :- *Sort [b#204 ASC, c#205 ASC], false, 0
> : +- Exchange hashpartitioning(b#204, c#205, 5)
> : +- Exchange hashpartitioning(a#203, b#204, c#205, 5)
> : +- *Filter (a#203 > 2)
> : +- Scan ExistingRDD[a#203,b#204,c#205,d#206,e#207]
> +- *Sort [g#222 ASC, h#223 ASC], false, 0
> +- Exchange hashpartitioning(g#222, h#223, 5)
> +- *Project [g#222, h#223, j#224]
> +- *Filter (f#221 > 1)
> +- Scan ExistingRDD[f#221,g#222,h#223,j#224,k#225]
> There is a redundancy ShuffleExchange that is not necessary. This PR will
> provide a rule to remove redundancy ShuffleExchange in the planner. now the
> explain looks like:
> == Physical Plan ==
> *Project [a#203, b#204, c#205, d#206, e#207]
> +- SortMergeJoin [b#204, c#205], [g#222, h#223], LeftOuter, ((d#206 < (j#224
> - 3)) && (d#206 >= (j#224 + 3)))
> :- *Sort [b#204 ASC, c#205 ASC], false, 0
> : +- Exchange hashpartitioning(b#204, c#205, 5)
> : +- *Filter (a#203 > 2)
> : +- Scan ExistingRDD[a#203,b#204,c#205,d#206,e#207]
> +- *Sort [g#222 ASC, h#223 ASC], false, 0
> +- Exchange hashpartitioning(g#222, h#223, 5)
> +- *Project [g#222, h#223, j#224]
> +- *Filter (f#221 > 1)
> +- Scan ExistingRDD[f#221,g#222,h#223,j#224,k#225]
>
> and I have add a test case:
> val N = 2 << 20
> runJoinBenchmark("sort merge join", N) {
> val df1 = sparkSession.range(N)
> .selectExpr(s"(id * 15485863) % ${N*10} as k1")
> val df2 = sparkSession.range(N)
> .selectExpr(s"(id * 15485867) % ${N*10} as k2")
> val df = df1.join(df2.repartition(20), col("k1") === col("k2"))
>
> assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
> df.count()
> }
>
> To test the performance of the following:
> Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
> Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
> sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
>
> ------------------------------------------------------------------------------------------------
> sort merge join Repartition off 3520 / 4364 0.6 1678.5 1.0X
> sort merge join Repartition on 1946 / 2203 1.1 927.9 1.8X
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]