This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 45f2e72369 [GLUTEN-10988][VL] Do not resize batches for 
sort-based/rss-sort shuffle (#10991)
45f2e72369 is described below

commit 45f2e72369ffae47e24a699258a912211bb76031
Author: Zhen Wang <[email protected]>
AuthorDate: Wed Nov 5 22:17:15 2025 +0800

    [GLUTEN-10988][VL] Do not resize batches for sort-based/rss-sort shuffle 
(#10991)
---
 ...AppendBatchResizeForShuffleInputAndOutput.scala | 29 +++++++++++++++-------
 .../org/apache/gluten/config/GlutenConfig.scala    | 10 ++++++++
 2 files changed, 30 insertions(+), 9 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
index ebe55f573c..7d6309bf93 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.gluten.extension
 
-import org.apache.gluten.config.HashShuffleWriterType
 import org.apache.gluten.config.VeloxConfig
 import org.apache.gluten.execution.VeloxResizeBatchesExec
 
@@ -30,29 +29,41 @@ import 
org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, ShuffleQuery
  */
 case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan] 
{
   override def apply(plan: SparkPlan): SparkPlan = {
+    val resizeBatchesShuffleInputEnabled = 
VeloxConfig.get.veloxResizeBatchesShuffleInput
+    val resizeBatchesShuffleOutputEnabled = 
VeloxConfig.get.veloxResizeBatchesShuffleOutput
+    if (!resizeBatchesShuffleInputEnabled && 
!resizeBatchesShuffleOutputEnabled) {
+      return plan
+    }
+
     val range = VeloxConfig.get.veloxResizeBatchesShuffleInputOutputRange
     plan.transformUp {
       case shuffle: ColumnarShuffleExchangeExec
-          if shuffle.shuffleWriterType == HashShuffleWriterType &&
-            VeloxConfig.get.veloxResizeBatchesShuffleInput =>
+          if resizeBatchesShuffleInputEnabled &&
+            shuffle.shuffleWriterType.requiresResizingShuffleInput =>
         val appendBatches =
           VeloxResizeBatchesExec(shuffle.child, range.min, range.max)
         shuffle.withNewChildren(Seq(appendBatches))
-      case a @ AQEShuffleReadExec(ShuffleQueryStageExec(_, _: 
ColumnarShuffleExchangeExec, _), _)
-          if VeloxConfig.get.veloxResizeBatchesShuffleOutput =>
+      case a @ AQEShuffleReadExec(
+            ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, _),
+            _)
+          if resizeBatchesShuffleOutputEnabled &&
+            shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
         VeloxResizeBatchesExec(a, range.min, range.max)
       // Since it's transformed in a bottom to up order, so we may first 
encountered
       // ShuffeQueryStageExec, which is transformed to 
VeloxResizeBatchesExec(ShuffeQueryStageExec),
       // then we see AQEShuffleReadExec
       case a @ AQEShuffleReadExec(
             VeloxResizeBatchesExec(
-              s @ ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExec, _),
+              s @ ShuffleQueryStageExec(_, shuffle: 
ColumnarShuffleExchangeExec, _),
               _,
               _),
-            _) if VeloxConfig.get.veloxResizeBatchesShuffleOutput =>
+            _)
+          if resizeBatchesShuffleOutputEnabled &&
+            shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
         VeloxResizeBatchesExec(a.copy(child = s), range.min, range.max)
-      case s @ ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExec, _)
-          if VeloxConfig.get.veloxResizeBatchesShuffleOutput =>
+      case s @ ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, 
_)
+          if resizeBatchesShuffleOutputEnabled &&
+            shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
         VeloxResizeBatchesExec(s, range.min, range.max)
     }
   }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index b57bd59123..c1b378c333 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -31,22 +31,32 @@ import scala.collection.JavaConverters._
 
 trait ShuffleWriterType {
   val name: String
+  val requiresResizingShuffleInput: Boolean
+  val requiresResizingShuffleOutput: Boolean
 }
 
 case object HashShuffleWriterType extends ShuffleWriterType {
   override val name: String = ReservedKeys.GLUTEN_HASH_SHUFFLE_WRITER
+  override val requiresResizingShuffleInput: Boolean = true
+  override val requiresResizingShuffleOutput: Boolean = true
 }
 
 case object SortShuffleWriterType extends ShuffleWriterType {
   override val name: String = ReservedKeys.GLUTEN_SORT_SHUFFLE_WRITER
+  override val requiresResizingShuffleInput: Boolean = false
+  override val requiresResizingShuffleOutput: Boolean = false
 }
 
 case object RssSortShuffleWriterType extends ShuffleWriterType {
   override val name: String = ReservedKeys.GLUTEN_RSS_SORT_SHUFFLE_WRITER
+  override val requiresResizingShuffleInput: Boolean = false
+  override val requiresResizingShuffleOutput: Boolean = false
 }
 
 case object GpuHashShuffleWriterType extends ShuffleWriterType {
   override val name: String = ReservedKeys.GLUTEN_GPU_HASH_SHUFFLE_WRITER
+  override val requiresResizingShuffleInput: Boolean = true
+  override val requiresResizingShuffleOutput: Boolean = true
 }
 
 /*


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to