This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b16523a1b7 [GLUTEN-9034][VL] Fix the VeloxResizeBatch not add for
ReusedExchange (#11069)
b16523a1b7 is described below
commit b16523a1b7fcd3b1baf3c813998150ee01e36c0c
Author: Jin Chengcheng <[email protected]>
AuthorDate: Fri Nov 14 10:10:06 2025 +0000
[GLUTEN-9034][VL] Fix the VeloxResizeBatch not add for ReusedExchange
(#11069)
Before that, in TPCDS Q95, there are 4 AQEShuffleRead, but only one add the
VeloxResizeBatch node, Match the ReusedExchangeExec too.
---
...AppendBatchResizeForShuffleInputAndOutput.scala | 31 +++++++++++++++++++++-
1 file changed, 30 insertions(+), 1 deletion(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
index 7d6309bf93..92519eecf7 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
@@ -22,6 +22,7 @@ import org.apache.gluten.execution.VeloxResizeBatchesExec
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ColumnarShuffleExchangeExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec,
ShuffleQueryStageExec}
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
/**
* Try to append [[VeloxResizeBatchesExec]] for shuffle input and output to
make the batch sizes in
@@ -49,7 +50,16 @@ case class AppendBatchResizeForShuffleInputAndOutput()
extends Rule[SparkPlan] {
if resizeBatchesShuffleOutputEnabled &&
shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
VeloxResizeBatchesExec(a, range.min, range.max)
- // Since it's transformed in a bottom to up order, so we may first
encountered
+ case a @ AQEShuffleReadExec(
+ ShuffleQueryStageExec(
+ _,
+ ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec),
+ _),
+ _)
+ if resizeBatchesShuffleOutputEnabled &&
+ shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
+ VeloxResizeBatchesExec(a, range.min, range.max)
+ // Since it's transformed in a bottom to up order, so we may first
encounter
// ShuffeQueryStageExec, which is transformed to
VeloxResizeBatchesExec(ShuffeQueryStageExec),
// then we see AQEShuffleReadExec
case a @ AQEShuffleReadExec(
@@ -61,10 +71,29 @@ case class AppendBatchResizeForShuffleInputAndOutput()
extends Rule[SparkPlan] {
if resizeBatchesShuffleOutputEnabled &&
shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
VeloxResizeBatchesExec(a.copy(child = s), range.min, range.max)
+ case a @ AQEShuffleReadExec(
+ VeloxResizeBatchesExec(
+ s @ ShuffleQueryStageExec(
+ _,
+ ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec),
+ _),
+ _,
+ _),
+ _)
+ if resizeBatchesShuffleOutputEnabled &&
+ shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
+ VeloxResizeBatchesExec(a.copy(child = s), range.min, range.max)
case s @ ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec,
_)
if resizeBatchesShuffleOutputEnabled &&
shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
VeloxResizeBatchesExec(s, range.min, range.max)
+ case s @ ShuffleQueryStageExec(
+ _,
+ ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec),
+ _)
+ if resizeBatchesShuffleOutputEnabled &&
+ shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
+ VeloxResizeBatchesExec(s, range.min, range.max)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]