This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 45f2e72369 [GLUTEN-10988][VL] Do not resize batches for
sort-based/rss-sort shuffle (#10991)
45f2e72369 is described below
commit 45f2e72369ffae47e24a699258a912211bb76031
Author: Zhen Wang <[email protected]>
AuthorDate: Wed Nov 5 22:17:15 2025 +0800
[GLUTEN-10988][VL] Do not resize batches for sort-based/rss-sort shuffle
(#10991)
---
...AppendBatchResizeForShuffleInputAndOutput.scala | 29 +++++++++++++++-------
.../org/apache/gluten/config/GlutenConfig.scala | 10 ++++++++
2 files changed, 30 insertions(+), 9 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
index ebe55f573c..7d6309bf93 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
@@ -16,7 +16,6 @@
*/
package org.apache.gluten.extension
-import org.apache.gluten.config.HashShuffleWriterType
import org.apache.gluten.config.VeloxConfig
import org.apache.gluten.execution.VeloxResizeBatchesExec
@@ -30,29 +29,41 @@ import
org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, ShuffleQuery
*/
case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan]
{
override def apply(plan: SparkPlan): SparkPlan = {
+ val resizeBatchesShuffleInputEnabled =
VeloxConfig.get.veloxResizeBatchesShuffleInput
+ val resizeBatchesShuffleOutputEnabled =
VeloxConfig.get.veloxResizeBatchesShuffleOutput
+ if (!resizeBatchesShuffleInputEnabled &&
!resizeBatchesShuffleOutputEnabled) {
+ return plan
+ }
+
val range = VeloxConfig.get.veloxResizeBatchesShuffleInputOutputRange
plan.transformUp {
case shuffle: ColumnarShuffleExchangeExec
- if shuffle.shuffleWriterType == HashShuffleWriterType &&
- VeloxConfig.get.veloxResizeBatchesShuffleInput =>
+ if resizeBatchesShuffleInputEnabled &&
+ shuffle.shuffleWriterType.requiresResizingShuffleInput =>
val appendBatches =
VeloxResizeBatchesExec(shuffle.child, range.min, range.max)
shuffle.withNewChildren(Seq(appendBatches))
- case a @ AQEShuffleReadExec(ShuffleQueryStageExec(_, _:
ColumnarShuffleExchangeExec, _), _)
- if VeloxConfig.get.veloxResizeBatchesShuffleOutput =>
+ case a @ AQEShuffleReadExec(
+ ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, _),
+ _)
+ if resizeBatchesShuffleOutputEnabled &&
+ shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
VeloxResizeBatchesExec(a, range.min, range.max)
// Since it's transformed in a bottom to up order, so we may first
encountered
// ShuffeQueryStageExec, which is transformed to
VeloxResizeBatchesExec(ShuffeQueryStageExec),
// then we see AQEShuffleReadExec
case a @ AQEShuffleReadExec(
VeloxResizeBatchesExec(
- s @ ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExec, _),
+ s @ ShuffleQueryStageExec(_, shuffle:
ColumnarShuffleExchangeExec, _),
_,
_),
- _) if VeloxConfig.get.veloxResizeBatchesShuffleOutput =>
+ _)
+ if resizeBatchesShuffleOutputEnabled &&
+ shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
VeloxResizeBatchesExec(a.copy(child = s), range.min, range.max)
- case s @ ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExec, _)
- if VeloxConfig.get.veloxResizeBatchesShuffleOutput =>
+ case s @ ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec,
_)
+ if resizeBatchesShuffleOutputEnabled &&
+ shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
VeloxResizeBatchesExec(s, range.min, range.max)
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index b57bd59123..c1b378c333 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -31,22 +31,32 @@ import scala.collection.JavaConverters._
trait ShuffleWriterType {
val name: String
+ val requiresResizingShuffleInput: Boolean
+ val requiresResizingShuffleOutput: Boolean
}
case object HashShuffleWriterType extends ShuffleWriterType {
override val name: String = ReservedKeys.GLUTEN_HASH_SHUFFLE_WRITER
+ override val requiresResizingShuffleInput: Boolean = true
+ override val requiresResizingShuffleOutput: Boolean = true
}
case object SortShuffleWriterType extends ShuffleWriterType {
override val name: String = ReservedKeys.GLUTEN_SORT_SHUFFLE_WRITER
+ override val requiresResizingShuffleInput: Boolean = false
+ override val requiresResizingShuffleOutput: Boolean = false
}
case object RssSortShuffleWriterType extends ShuffleWriterType {
override val name: String = ReservedKeys.GLUTEN_RSS_SORT_SHUFFLE_WRITER
+ override val requiresResizingShuffleInput: Boolean = false
+ override val requiresResizingShuffleOutput: Boolean = false
}
case object GpuHashShuffleWriterType extends ShuffleWriterType {
override val name: String = ReservedKeys.GLUTEN_GPU_HASH_SHUFFLE_WRITER
+ override val requiresResizingShuffleInput: Boolean = true
+ override val requiresResizingShuffleOutput: Boolean = true
}
/*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]