This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 15885f2 [SPARK-37652][SQL] Add test for optimize skewed join through
union
15885f2 is described below
commit 15885f2f8aea7905a3ecdf08906fa72355186030
Author: mcdull-zhang <[email protected]>
AuthorDate: Wed Feb 9 21:54:33 2022 +0800
[SPARK-37652][SQL] Add test for optimize skewed join through union
### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/34974, solved most scenarios of data
skew in union.
add test for it.
### Why are the changes needed?
Added tests for the following scenarios:
<b>scenes 1</b>
```
Union
SMJ
ShuffleQueryStage
ShuffleQueryStage
SMJ
ShuffleQueryStage
ShuffleQueryStage
```
<b>scenes 2</b>
```
Union
SMJ
ShuffleQueryStage
ShuffleQueryStage
HashAggregate
```
<b>scenes 3: not yet supported, SMJ-3 will introduce a new shuffle, so
SMJ-1 cannot be optimized</b>
```
Union
SMJ-1
ShuffleQueryStage
ShuffleQueryStage
SMJ-2
SMJ-3
ShuffleQueryStage
ShuffleQueryStage
HashAggregate
```
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Pass the added test
Closes #34908 from mcdull-zhang/skewed_union.
Authored-by: mcdull-zhang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../adaptive/AdaptiveQueryExecSuite.scala | 43 ++++++++++++++++++++++
1 file changed, 43 insertions(+)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 1bd8ad9..d1c7064 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -2441,6 +2441,49 @@ class AdaptiveQueryExecSuite
}
}
}
+
+ test("SPARK-37652: optimize skewed join through union") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100",
+ SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100") {
+ withTempView("skewData1", "skewData2") {
+ spark
+ .range(0, 1000, 1, 10)
+ .selectExpr("id % 3 as key1", "id as value1")
+ .createOrReplaceTempView("skewData1")
+ spark
+ .range(0, 1000, 1, 10)
+ .selectExpr("id % 1 as key2", "id as value2")
+ .createOrReplaceTempView("skewData2")
+
+ def checkSkewJoin(query: String, joinNums: Int, optimizeSkewJoinNums:
Int): Unit = {
+ val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult(query)
+ val joins = findTopLevelSortMergeJoin(innerAdaptivePlan)
+ val optimizeSkewJoins = joins.filter(_.isSkewJoin)
+ assert(joins.size == joinNums && optimizeSkewJoins.size ==
optimizeSkewJoinNums)
+ }
+
+ // skewJoin union skewJoin
+ checkSkewJoin(
+ "SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 " +
+ "UNION ALL SELECT key2 FROM skewData1 JOIN skewData2 ON key1 =
key2", 2, 2)
+
+ // skewJoin union aggregate
+ checkSkewJoin(
+ "SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 " +
+ "UNION ALL SELECT key2 FROM skewData2 GROUP BY key2", 1, 1)
+
+ // skewJoin1 union (skewJoin2 join aggregate)
+ // skewJoin2 will lead to extra shuffles, but skew1 cannot be optimized
+ checkSkewJoin(
+ "SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 UNION ALL
" +
+ "SELECT key1 from (SELECT key1 FROM skewData1 JOIN skewData2 ON
key1 = key2) tmp1 " +
+ "JOIN (SELECT key2 FROM skewData2 GROUP BY key2) tmp2 ON key1 =
key2", 3, 0)
+ }
+ }
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]