This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 90e8cf833 [KYUUBI #5489] Adjust shuffle partitions dynamically
90e8cf833 is described below

commit 90e8cf8337fa1ba357caf17376bb0b8045d26f89
Author: wforget <[email protected]>
AuthorDate: Tue Oct 31 16:29:08 2023 +0800

    [KYUUBI #5489] Adjust shuffle partitions dynamically
    
    ### _Why are the changes needed?_
    
    Usually, we can use `spark.sql.shuffle.partitions` to configure the number 
of shuffle partitions (or 
`spark.sql.adaptive.coalescePartitions.initialPartitionNum` for AQE). However, 
it seems difficult to find a universal value for all SQL jobs.
    
    Although Spark AQE can dynamically merge and split partitions based on 
partition size, inappropriate shuffle partitions may still cause some problems:
    
    + When there are too few shuffle partitions, the join skew optimization 
threshold is large and the skew partitions will not be split.
    + When using RemoteShuffleService, an inappropriate number of shuffle 
partitions may result in too large partitions or too many partitions, which 
will lead to high pressure on the shuffle server.
    
    So I want to provide an optimization rule to dynamically adjust the number 
of partitions based on the size of the input data.
    
    Calculate the number of partitions based on input data size:
    
    ```
    targetShufflePartitions = sum(scanSize|shuffleReadSize) / 
advisoryPartitionSizeInBytes
    ```
    
    then replace the number of partitions for all `ShuffleExchangeExec` nodes.
    
    ### _How was this patch tested?_
    - [X] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    
    No
    
    Closes #5489 from wForget/dynamic_shuffle_partitions.
    
    Closes #5489
    
    5a2bb6c25 [wforget] only takes effect when aqe is enabled
    038b7bb45 [wforget] moved behind InsertShuffleNodeBeforeJoin
    7ca87d8e8 [wforget] comment
    d65047fda [wforget] sum scanSizes
    e4d8f33af [wforget] comments
    4f0f25d8e [wforget] configurable
    f77d1d648 [wforget] code style
    0bf572f27 [wforget] use partition stats
    8d251c3fd [wforget] Adjust shuffle partitions dynamically
    
    Authored-by: wforget <[email protected]>
    Signed-off-by: ulyssesyou <[email protected]>
---
 .../kyuubi/sql/DynamicShufflePartitions.scala      |  98 +++++++++++++++
 .../org/apache/kyuubi/sql/KyuubiSQLConf.scala      |  16 +++
 .../kyuubi/sql/KyuubiSparkSQLCommonExtension.scala |   1 +
 .../spark/sql/hive/HiveSparkPlanHelper.scala       |  21 ++++
 .../spark/sql/DynamicShufflePartitionsSuite.scala  | 140 +++++++++++++++++++++
 5 files changed, 276 insertions(+)

diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/DynamicShufflePartitions.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/DynamicShufflePartitions.scala
new file mode 100644
index 000000000..03d93d076
--- /dev/null
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/DynamicShufflePartitions.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.sql
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
RangePartitioning, RoundRobinPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
+import org.apache.spark.sql.execution.exchange.{REPARTITION_BY_NUM, 
ShuffleExchangeExec, ValidateRequirements}
+import org.apache.spark.sql.hive.HiveSparkPlanHelper.HiveTableScanExec
+import org.apache.spark.sql.internal.SQLConf._
+
+import org.apache.kyuubi.sql.KyuubiSQLConf.{DYNAMIC_SHUFFLE_PARTITIONS, 
DYNAMIC_SHUFFLE_PARTITIONS_MAX_NUM}
+
+/**
+ * Dynamically adjust the number of shuffle partitions according to the input 
data size
+ */
+case class DynamicShufflePartitions(spark: SparkSession) extends 
Rule[SparkPlan] {
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.getConf(DYNAMIC_SHUFFLE_PARTITIONS) || 
!conf.getConf(ADAPTIVE_EXECUTION_ENABLED)) {
+      plan
+    } else {
+      val maxDynamicShufflePartitions = 
conf.getConf(DYNAMIC_SHUFFLE_PARTITIONS_MAX_NUM)
+
+      def collectScanSizes(plan: SparkPlan): Seq[Long] = plan match {
+        case FileSourceScanExec(relation, _, _, _, _, _, _, _, _) =>
+          Seq(relation.location.sizeInBytes)
+        case t: HiveTableScanExec =>
+          t.relation.prunedPartitions match {
+            case Some(partitions) => 
Seq(partitions.flatMap(_.stats).map(_.sizeInBytes.toLong).sum)
+            case None => Seq(t.relation.computeStats().sizeInBytes.toLong)
+                .filter(_ != conf.defaultSizeInBytes)
+          }
+        case stage: ShuffleQueryStageExec if stage.isMaterialized && 
stage.mapStats.isDefined =>
+          Seq(stage.mapStats.get.bytesByPartitionId.sum)
+        case p =>
+          p.children.flatMap(collectScanSizes)
+      }
+
+      val scanSizes = collectScanSizes(plan)
+      if (scanSizes.isEmpty) {
+        return plan
+      }
+
+      val targetSize = conf.getConf(ADVISORY_PARTITION_SIZE_IN_BYTES)
+      val targetShufflePartitions = Math.min(
+        Math.max(scanSizes.sum / targetSize + 1, 
conf.numShufflePartitions).toInt,
+        maxDynamicShufflePartitions)
+
+      val newPlan = plan transformUp {
+        case exchange @ ShuffleExchangeExec(outputPartitioning, _, 
shuffleOrigin, _)
+            if shuffleOrigin != REPARTITION_BY_NUM =>
+          val newOutPartitioning = outputPartitioning match {
+            case RoundRobinPartitioning(numPartitions)
+                if targetShufflePartitions != numPartitions =>
+              Some(RoundRobinPartitioning(targetShufflePartitions))
+            case HashPartitioning(expressions, numPartitions)
+                if targetShufflePartitions != numPartitions =>
+              Some(HashPartitioning(expressions, targetShufflePartitions))
+            case RangePartitioning(ordering, numPartitions)
+                if targetShufflePartitions != numPartitions =>
+              Some(RangePartitioning(ordering, targetShufflePartitions))
+            case _ => None
+          }
+          if (newOutPartitioning.isDefined) {
+            exchange.copy(outputPartitioning = newOutPartitioning.get)
+          } else {
+            exchange
+          }
+      }
+
+      if (ValidateRequirements.validate(newPlan)) {
+        newPlan
+      } else {
+        logInfo("DynamicShufflePartitions rule generated an invalid plan. " +
+          "Falling back to the original plan.")
+        plan
+      }
+    }
+  }
+
+}
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index 6f45dae12..597cb250b 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -273,4 +273,20 @@ object KyuubiSQLConf {
       .version("1.8.0")
       .stringConf
       .createOptional
+
+  val DYNAMIC_SHUFFLE_PARTITIONS =
+    buildConf("spark.sql.optimizer.dynamicShufflePartitions")
+      .doc("If true, adjust the number of shuffle partitions dynamically based 
on the job" +
+        " input size. The new number of partitions is the maximum input size" +
+        " divided by `spark.sql.adaptive.advisoryPartitionSizeInBytes`.")
+      .version("1.9.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  val DYNAMIC_SHUFFLE_PARTITIONS_MAX_NUM =
+    buildConf("spark.sql.optimizer.dynamicShufflePartitions.maxNum")
+      .doc("The maximum partition number of DynamicShufflePartitions.")
+      .version("1.9.0")
+      .intConf
+      .createWithDefault(2000)
 }
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
index f39ad3cc3..ad95ac429 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
@@ -43,6 +43,7 @@ object KyuubiSparkSQLCommonExtension {
     extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
 
     extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)
+    extensions.injectQueryStagePrepRule(DynamicShufflePartitions)
 
     extensions.injectQueryStagePrepRule(FinalStageConfigIsolation(_))
   }
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/spark/sql/hive/HiveSparkPlanHelper.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/spark/sql/hive/HiveSparkPlanHelper.scala
new file mode 100644
index 000000000..aa9a04596
--- /dev/null
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/spark/sql/hive/HiveSparkPlanHelper.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+object HiveSparkPlanHelper {
+  type HiveTableScanExec = 
org.apache.spark.sql.hive.execution.HiveTableScanExec
+}
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/DynamicShufflePartitionsSuite.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/DynamicShufflePartitionsSuite.scala
new file mode 100644
index 000000000..6668675a5
--- /dev/null
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/DynamicShufflePartitionsSuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.sql.execution.{CommandResultExec, SparkPlan}
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
ShuffleQueryStageExec}
+import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
ShuffleExchangeExec}
+import org.apache.spark.sql.hive.HiveUtils.CONVERT_METASTORE_PARQUET
+import org.apache.spark.sql.internal.SQLConf._
+
+import org.apache.kyuubi.sql.KyuubiSQLConf.{DYNAMIC_SHUFFLE_PARTITIONS, 
DYNAMIC_SHUFFLE_PARTITIONS_MAX_NUM}
+
+class DynamicShufflePartitionsSuite extends KyuubiSparkSQLExtensionTest {
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    setupData()
+  }
+
+  test("test dynamic shuffle partitions") {
+    def collectExchanges(plan: SparkPlan): Seq[ShuffleExchangeExec] = {
+      plan match {
+        case p: CommandResultExec => collectExchanges(p.commandPhysicalPlan)
+        case p: AdaptiveSparkPlanExec => collectExchanges(p.finalPhysicalPlan)
+        case p: ShuffleQueryStageExec => collectExchanges(p.plan)
+        case p: ShuffleExchangeExec => p +: collectExchanges(p.child)
+        case p => p.children.flatMap(collectExchanges)
+      }
+    }
+
+    // datasource scan
+    withTable("table1", "table2", "table3") {
+      sql("create table table1 stored as parquet as select c1, c2 from t1")
+      sql("create table table2 stored as parquet as select c1, c2 from t2")
+      sql("create table table3 (c1 int, c2 string) stored as parquet")
+      sql("ANALYZE TABLE table1 COMPUTE STATISTICS")
+      sql("ANALYZE TABLE table2 COMPUTE STATISTICS")
+
+      val initialPartitionNum: Int = 2
+      Seq(false, true).foreach { dynamicShufflePartitions =>
+        val maxDynamicShufflePartitions = if (dynamicShufflePartitions) {
+          Seq(8, 2000)
+        } else {
+          Seq(2000)
+        }
+        maxDynamicShufflePartitions.foreach { maxDynamicShufflePartitionNum =>
+          withSQLConf(
+            DYNAMIC_SHUFFLE_PARTITIONS.key -> 
dynamicShufflePartitions.toString,
+            DYNAMIC_SHUFFLE_PARTITIONS_MAX_NUM.key -> 
maxDynamicShufflePartitionNum.toString,
+            AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+            COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> 
initialPartitionNum.toString,
+            ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "500") {
+            val df = sql("insert overwrite table3 " +
+              " select a.c1 as c1, b.c2 as c2 from table1 a join table2 b on 
a.c1 = b.c1")
+
+            val exchanges = collectExchanges(df.queryExecution.executedPlan)
+            val (joinExchanges, rebalanceExchanges) = exchanges
+              .partition(_.shuffleOrigin == ENSURE_REQUIREMENTS)
+            // table scan size: 7369 3287
+            assert(joinExchanges.size == 2)
+            if (dynamicShufflePartitions) {
+              joinExchanges.foreach(e =>
+                assert(e.outputPartitioning.numPartitions
+                  == Math.min(22, maxDynamicShufflePartitionNum)))
+            } else {
+              joinExchanges.foreach(e =>
+                assert(e.outputPartitioning.numPartitions == 
initialPartitionNum))
+            }
+
+            assert(rebalanceExchanges.size == 1)
+            if (dynamicShufflePartitions) {
+              if (maxDynamicShufflePartitionNum == 8) {
+                // shuffle query size: 1424 451
+                
assert(rebalanceExchanges.head.outputPartitioning.numPartitions ==
+                  Math.min(4, maxDynamicShufflePartitionNum))
+              } else {
+                // shuffle query size: 2057 664
+                
assert(rebalanceExchanges.head.outputPartitioning.numPartitions ==
+                  Math.min(6, maxDynamicShufflePartitionNum))
+              }
+            } else {
+              assert(
+                rebalanceExchanges.head.outputPartitioning.numPartitions == 
initialPartitionNum)
+            }
+          }
+
+          // hive table scan
+          withSQLConf(
+            DYNAMIC_SHUFFLE_PARTITIONS.key -> 
dynamicShufflePartitions.toString,
+            DYNAMIC_SHUFFLE_PARTITIONS_MAX_NUM.key -> 
maxDynamicShufflePartitionNum.toString,
+            AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+            COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> 
initialPartitionNum.toString,
+            ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "500",
+            CONVERT_METASTORE_PARQUET.key -> "false") {
+            val df = sql("insert overwrite table3 " +
+              " select a.c1 as c1, b.c2 as c2 from table1 a join table2 b on 
a.c1 = b.c1")
+
+            val exchanges = collectExchanges(df.queryExecution.executedPlan)
+            val (joinExchanges, rebalanceExchanges) = exchanges
+              .partition(_.shuffleOrigin == ENSURE_REQUIREMENTS)
+            // table scan size: 7369 3287
+            assert(joinExchanges.size == 2)
+            if (dynamicShufflePartitions) {
+              joinExchanges.foreach(e =>
+                assert(e.outputPartitioning.numPartitions ==
+                  Math.min(22, maxDynamicShufflePartitionNum)))
+            } else {
+              joinExchanges.foreach(e =>
+                assert(e.outputPartitioning.numPartitions == 
initialPartitionNum))
+            }
+            // shuffle query size: 5154 720
+            assert(rebalanceExchanges.size == 1)
+            if (dynamicShufflePartitions) {
+              assert(rebalanceExchanges.head.outputPartitioning.numPartitions
+                == Math.min(12, maxDynamicShufflePartitionNum))
+            } else {
+              assert(rebalanceExchanges.head.outputPartitioning.numPartitions 
==
+                initialPartitionNum)
+            }
+          }
+        }
+      }
+    }
+  }
+
+}

Reply via email to