Lietong Liu created SPARK-37328:
-----------------------------------
Summary: SPARK-33832 brings the bug that OptimizeSkewedJoin may
not work since it was applied onn whole plan innstead of new stage plan
Key: SPARK-37328
URL: https://issues.apache.org/jira/browse/SPARK-37328
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.2.0
Reporter: Lietong Liu
Since OptimizeSkewedJoin was moved from queryStageOptimizerRules to
queryStagePreparationRules, the position OptimizeSkewedJoin was applied has
been moved from newQueryStage() to reOptimize(). The plan OptimizeSkewedJoin
applied on changed from plan of new stage which is about to submit to whole
spark plan.
In the cases where skewedJoin is not last stage, OptimizeSkewedJoin may not
work because the number of collected shuffleStages is more than 2.
The following test will prove it:
{code:java}
test("OptimizeSkewJoin may not work") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100",
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
withTempView("skewData1", "skewData2", "skewData3") {
spark
.range(0, 1000, 1, 10)
.selectExpr("id % 3 as key1", "id % 3 as value1")
.createOrReplaceTempView("skewData1")
spark
.range(0, 1000, 1, 10)
.selectExpr("id % 1 as key2", "id as value2")
.createOrReplaceTempView("skewData2")
spark
.range(0, 1000, 1, 10)
.selectExpr("id % 1 as key3", "id as value3")
.createOrReplaceTempView("skewData3")
// Query has two skewedJoin in two continuous stages.
val (_, adaptive1) =
runAdaptiveAndVerifyResult(
"""
|SELECT key1 FROM skewData1 s1
|JOIN skewData2 s2
|ON s1.key1 = s2.key2
|JOIN skewData3
|ON s1.value1 = value3
|""".stripMargin)
val shuffles1 = collect(adaptive1) {
case s: ShuffleExchangeExec => s
}
assert(shuffles1.size == 4)
val smj1 = findTopLevelSortMergeJoin(adaptive1)
assert(smj1.size == 2 && smj1.forall(_.isSkewJoin))
}
}
} {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]