[
https://issues.apache.org/jira/browse/SPARK-37328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun updated SPARK-37328:
----------------------------------
Parent: SPARK-37063
Issue Type: Sub-task (was: Bug)
> SPARK-33832 brings the bug that OptimizeSkewedJoin may not work since it was
> applied on whole plan innstead of new stage plan
> -----------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-37328
> URL: https://issues.apache.org/jira/browse/SPARK-37328
> Project: Spark
> Issue Type: Sub-task
> Components: SQL
> Affects Versions: 3.3.0
> Reporter: Lietong Liu
> Assignee: Wenchen Fan
> Priority: Major
> Fix For: 3.3.0
>
>
> Since OptimizeSkewedJoin was moved from queryStageOptimizerRules to
> queryStagePreparationRules, the position OptimizeSkewedJoin was applied has
> been moved from newQueryStage() to reOptimize(). The plan OptimizeSkewedJoin
> applied on changed from plan of new stage which is about to submit to whole
> spark plan.
> In the cases where skewedJoin is not last stage, OptimizeSkewedJoin may not
> work because the number of collected shuffleStages is more than 2.
> The following test will prove it:
>
>
> {code:java}
> test("OptimizeSkewJoin may not work") {
> withSQLConf(
> SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
> SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
> SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100",
> SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100",
> SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
> SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
> withTempView("skewData1", "skewData2", "skewData3") {
> spark
> .range(0, 1000, 1, 10)
> .selectExpr("id % 3 as key1", "id % 3 as value1")
> .createOrReplaceTempView("skewData1")
> spark
> .range(0, 1000, 1, 10)
> .selectExpr("id % 1 as key2", "id as value2")
> .createOrReplaceTempView("skewData2")
> spark
> .range(0, 1000, 1, 10)
> .selectExpr("id % 1 as key3", "id as value3")
> .createOrReplaceTempView("skewData3")
> // Query has two skewedJoin in two continuous stages.
> val (_, adaptive1) =
> runAdaptiveAndVerifyResult(
> """
> |SELECT key1 FROM skewData1 s1
> |JOIN skewData2 s2
> |ON s1.key1 = s2.key2
> |JOIN skewData3
> |ON s1.value1 = value3
> |""".stripMargin)
> val shuffles1 = collect(adaptive1) {
> case s: ShuffleExchangeExec => s
> }
> assert(shuffles1.size == 4)
> val smj1 = findTopLevelSortMergeJoin(adaptive1)
> assert(smj1.size == 2 && smj1.forall(_.isSkewJoin))
> }
> }
> } {code}
> I'll open a PR shortly to fix this issue
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]