This is an automated email from the ASF dual-hosted git repository.

jackylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a7d5fc6b1 [GLUTEN-9034][VL] Add VeloxResizeBatchesExec for Shuffle 
(#9035)
9a7d5fc6b1 is described below

commit 9a7d5fc6b13aa41b7a171b71512a037474b9125d
Author: WangGuangxin <[email protected]>
AuthorDate: Tue Apr 22 15:24:00 2025 +0800

    [GLUTEN-9034][VL] Add VeloxResizeBatchesExec for Shuffle (#9035)
---
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |  2 +
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  | 15 +-----
 .../org/apache/gluten/config/VeloxConfig.scala     | 30 ++++++++++-
 .../gluten/execution/VeloxResizeBatchesExec.scala  |  1 +
 ...AppendBatchResizeForShuffleInputAndOutput.scala | 58 ++++++++++++++++++++++
 .../gluten/execution/MiscOperatorSuite.scala       | 44 +++++++++++++++-
 6 files changed, 132 insertions(+), 18 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index a4e1eb24b8..fe785c6fea 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -89,6 +89,7 @@ object VeloxRuleApi {
       c => HeuristicTransform.WithRewrites(validatorBuilder(c.glutenConf), 
rewrites, offloads))
 
     // Legacy: Post-transform rules.
+    injector.injectPostTransform(_ => 
AppendBatchResizeForShuffleInputAndOutput())
     injector.injectPostTransform(_ => UnionTransformerRule())
     injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
     injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
@@ -176,6 +177,7 @@ object VeloxRuleApi {
           c => RasOffload.Rule(offload, validatorBuilder(c.glutenConf), 
rewrites)))
 
     // Gluten RAS: Post rules.
+    injector.injectPostTransform(_ => 
AppendBatchResizeForShuffleInputAndOutput())
     injector.injectPostTransform(_ => RemoveTransitions)
     injector.injectPostTransform(_ => UnionTransformerRule())
     injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 44aa8e836f..f6c66585da 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -362,19 +362,6 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
       }
     }
 
-    def maybeAddAppendBatchesExec(plan: SparkPlan): SparkPlan = {
-      plan match {
-        case shuffle: ColumnarShuffleExchangeExec
-            if !shuffle.useSortBasedShuffle &&
-              VeloxConfig.get.veloxResizeBatchesShuffleInput =>
-          val range = VeloxConfig.get.veloxResizeBatchesShuffleInputRange
-          val appendBatches =
-            VeloxResizeBatchesExec(shuffle.child, range.min, range.max)
-          shuffle.withNewChildren(Seq(appendBatches))
-        case _ => plan
-      }
-    }
-
     val child = shuffle.child
 
     val newShuffle = shuffle.outputPartitioning match {
@@ -437,7 +424,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
       case _ =>
         ColumnarShuffleExchangeExec(shuffle, child, null)
     }
-    maybeAddAppendBatchesExec(newShuffle)
+    newShuffle
   }
 
   /** Generate ShuffledHashJoinExecTransformer. */
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala 
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index b94da4eafd..389db080ff 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -34,16 +34,19 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) 
{
   def veloxResizeBatchesShuffleInput: Boolean =
     getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT)
 
+  def veloxResizeBatchesShuffleOutput: Boolean =
+    getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_OUTPUT)
+
   case class ResizeRange(min: Int, max: Int) {
     assert(max >= min)
     assert(min > 0, "Min batch size should be larger than 0")
     assert(max > 0, "Max batch size should be larger than 0")
   }
 
-  def veloxResizeBatchesShuffleInputRange: ResizeRange = {
+  def veloxResizeBatchesShuffleInputOutputRange: ResizeRange = {
     val standardSize = getConf(COLUMNAR_MAX_BATCH_SIZE)
     val defaultMinSize: Int = (0.25 * standardSize).toInt.max(1)
-    val minSize = getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE)
+    val minSize = 
getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_OUTPUT_MIN_SIZE)
       .getOrElse(defaultMinSize)
     ResizeRange(minSize, Int.MaxValue)
   }
@@ -294,6 +297,14 @@ object VeloxConfig {
       .booleanConf
       .createWithDefault(true)
 
+  val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_OUTPUT =
+    
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput")
+      .internal()
+      .doc(s"If true, combine small columnar batches together right after 
shuffle read. " +
+        s"The default minimum output batch size is equal to 0.25 * 
${COLUMNAR_MAX_BATCH_SIZE.key}")
+      .booleanConf
+      .createWithDefault(false)
+
   val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE =
     
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize")
       .internal()
@@ -306,6 +317,21 @@ object VeloxConfig {
       .intConf
       .createOptional
 
+  val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_OUTPUT_MIN_SIZE =
+    
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInputOuptut.minSize")
+      .internal()
+      .doc(
+        s"The minimum batch size for shuffle input and output. " +
+          s"If size of an input batch is " +
+          s"smaller than the value, it will be combined with other " +
+          s"batches before sending to shuffle. " +
+          s"The same applies for batches output by shuffle read. " +
+          s"Only functions when " +
+          s"${COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT.key} or " +
+          s"${COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_OUTPUT.key} is set to 
true. " +
+          s"Default value: 0.25 * <max batch size>")
+      .fallbackConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE)
+
   val COLUMNAR_VELOX_ENABLE_USER_EXCEPTION_STACKTRACE =
     
buildConf("spark.gluten.sql.columnar.backend.velox.enableUserExceptionStacktrace")
       .internal()
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
index 9ed687d337..a8e0863639 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
@@ -89,6 +89,7 @@ case class VeloxResizeBatchesExec(
 
         val out = Iterators
           .wrap(appender.asScala)
+          .protectInvocationFlow()
           .collectReadMillis(outMillis => appendMillis.getAndAdd(outMillis))
           .recyclePayload(_.close())
           .recycleIterator {
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
new file mode 100644
index 0000000000..e6ae9f1e54
--- /dev/null
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.config.VeloxConfig
+import org.apache.gluten.execution.VeloxResizeBatchesExec
+
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ColumnarShuffleExchangeExec, SparkPlan}
+import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, 
ShuffleQueryStageExec}
+
+/**
+ * Try to append [[VeloxResizeBatchesExec]] for shuffle input and ouput to 
make the batch sizes in
+ * good shape.
+ */
+case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan] 
{
+  override def apply(plan: SparkPlan): SparkPlan = {
+    val range = VeloxConfig.get.veloxResizeBatchesShuffleInputOutputRange
+    plan.transformUp {
+      case shuffle: ColumnarShuffleExchangeExec
+          if !shuffle.useSortBasedShuffle &&
+            VeloxConfig.get.veloxResizeBatchesShuffleInput =>
+        val appendBatches =
+          VeloxResizeBatchesExec(shuffle.child, range.min, range.max)
+        shuffle.withNewChildren(Seq(appendBatches))
+      case a @ AQEShuffleReadExec(ShuffleQueryStageExec(_, _: 
ColumnarShuffleExchangeExec, _), _)
+          if VeloxConfig.get.veloxResizeBatchesShuffleOutput =>
+        VeloxResizeBatchesExec(a, range.min, range.max)
+      // Since it's transformed in a bottom to up order, so we may first 
encountered
+      // ShuffeQueryStageExec, which is transformed to 
VeloxResizeBatchesExec(ShuffeQueryStageExec),
+      // then we see AQEShuffleReadExec
+      case a @ AQEShuffleReadExec(
+            VeloxResizeBatchesExec(
+              s @ ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExec, _),
+              _,
+              _),
+            _) if VeloxConfig.get.veloxResizeBatchesShuffleOutput =>
+        VeloxResizeBatchesExec(a.copy(child = s), range.min, range.max)
+      case s @ ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExec, _)
+          if VeloxConfig.get.veloxResizeBatchesShuffleOutput =>
+        VeloxResizeBatchesExec(s, range.min, range.max)
+    }
+  }
+}
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index 3e52ac8655..30ff88f343 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -16,14 +16,14 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
 import org.apache.gluten.expression.VeloxDummyExpression
 import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, 
AQEShuffleReadExec, ShuffleQueryStageExec}
 import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
 import org.apache.spark.sql.execution.window.WindowExec
 import org.apache.spark.sql.functions._
@@ -2052,4 +2052,44 @@ class MiscOperatorSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
       }
     }
   }
+
+  test("Check VeloxResizeBatches is added in ShuffleRead") {
+    Seq(true, false).foreach(
+      coalesceEnabled => {
+        withSQLConf(
+          VeloxConfig.COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_OUTPUT.key -> 
"true",
+          SQLConf.SHUFFLE_PARTITIONS.key -> "10",
+          SQLConf.COALESCE_PARTITIONS_ENABLED.key -> coalesceEnabled.toString
+        ) {
+          runQueryAndCompare(
+            "SELECT l_orderkey, count(1) from lineitem group by 
l_orderkey".stripMargin) {
+            df =>
+              val executedPlan = getExecutedPlan(df)
+              if (coalesceEnabled) {
+                // 
VeloxResizeBatches(AQEShuffleRead(ShuffleQueryStage(ColumnarShuffleExchange)))
+                assert(executedPlan.sliding(4).exists {
+                  case Seq(
+                        _: ColumnarShuffleExchangeExec,
+                        _: ShuffleQueryStageExec,
+                        _: AQEShuffleReadExec,
+                        _: VeloxResizeBatchesExec
+                      ) =>
+                    true
+                  case _ => false
+                })
+              } else {
+                // 
VeloxResizeBatches(ShuffleQueryStage(ColumnarShuffleExchange))
+                assert(executedPlan.sliding(3).exists {
+                  case Seq(
+                        _: ColumnarShuffleExchangeExec,
+                        _: ShuffleQueryStageExec,
+                        _: VeloxResizeBatchesExec) =>
+                    true
+                  case _ => false
+                })
+              }
+          }
+        }
+      })
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to