Lietong Liu created SPARK-37328:
-----------------------------------

             Summary: SPARK-33832 brings the bug that OptimizeSkewedJoin may 
not work since it was applied onn whole plan innstead of new stage plan
                 Key: SPARK-37328
                 URL: https://issues.apache.org/jira/browse/SPARK-37328
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.2.0
            Reporter: Lietong Liu


Since OptimizeSkewedJoin was moved from queryStageOptimizerRules to 

queryStagePreparationRules, the position OptimizeSkewedJoin was applied has 
been moved from newQueryStage() to reOptimize(). The plan OptimizeSkewedJoin 
applied on changed from plan of new stage which is about to submit to whole 
spark plan.

In the cases where skewedJoin is not last stage, OptimizeSkewedJoin may not 
work because the number of collected shuffleStages is more than 2.

The following test will prove it:

 

 
{code:java}

test("OptimizeSkewJoin may not work") {
  withSQLConf(
    SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
    SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
    SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100",
    SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100",
    SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
    SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
    withTempView("skewData1", "skewData2", "skewData3") {
      spark
        .range(0, 1000, 1, 10)
        .selectExpr("id % 3 as key1", "id % 3 as value1")
        .createOrReplaceTempView("skewData1")
      spark
        .range(0, 1000, 1, 10)
        .selectExpr("id % 1 as key2", "id as value2")
        .createOrReplaceTempView("skewData2")
      spark
        .range(0, 1000, 1, 10)
        .selectExpr("id % 1 as key3", "id as value3")
        .createOrReplaceTempView("skewData3")

      // Query has two skewedJoin in two continuous stages.
      val (_, adaptive1) =
        runAdaptiveAndVerifyResult(
          """
            |SELECT key1 FROM skewData1 s1
            |JOIN skewData2 s2
            |ON s1.key1 = s2.key2
            |JOIN skewData3
            |ON s1.value1 = value3
            |""".stripMargin)
      val shuffles1 = collect(adaptive1) {
        case s: ShuffleExchangeExec => s
      }
      assert(shuffles1.size == 4)
      val smj1 = findTopLevelSortMergeJoin(adaptive1)
      assert(smj1.size == 2 && smj1.forall(_.isSkewJoin))
    }
  }
} {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to