This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 082de7fcdc40 [SPARK-54850][SQL] Improve `extractShuffleIds` to find
`AdaptiveSparkPlanExec` anywhere in plan tree
082de7fcdc40 is described below
commit 082de7fcdc4030bd96ec5f6cd534030e961418bf
Author: Chang chen <[email protected]>
AuthorDate: Mon Dec 29 21:11:24 2025 +0800
[SPARK-54850][SQL] Improve `extractShuffleIds` to find
`AdaptiveSparkPlanExec` anywhere in plan tree
### What changes were proposed in this pull request?
This PR uses `collectFirst` to find the first `AdaptiveSparkPlanExec` node
anywhere in the plan tree, instead of assuming the root plan is an
`AdaptiveSparkPlanExec`.
### Why are the changes needed?
https://github.com/apache/spark/pull/52157 introduced the
`extractShuffleIds` method in `SQLExecution` to find shuffle IDs of
`SparkPlan`. Previously, the method implicitly assumed that if AQE is enabled,
the `AdaptiveSparkPlanExec` would be at the root of the input. Since Spark only
inserts `AdaptiveSparkPlanExec` under Command, this assumption was fine.
However, the `AdaptiveSparkPlanExec` may not be the root node in Gluten. Gluten
needs to insert a special physical plan to do colum [...]
By using `collectFirst`, we can correctly locate the
`AdaptiveSparkPlanExec` regardless of its position in the plan tree, which
improves compatibility.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Pass GHA.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53620 from baibaichen/feature/extractShuffleIds.
Authored-by: Chang chen <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../main/scala/org/apache/spark/sql/execution/SQLExecution.scala | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 1cab0f8d35af..19bafeb19612 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -71,11 +71,12 @@ object SQLExecution extends Logging {
}
private def extractShuffleIds(plan: SparkPlan): Seq[Int] = {
- plan match {
+ val shuffleIdsOption = plan.collectFirst {
case ae: AdaptiveSparkPlanExec =>
ae.context.shuffleIds.asScala.keys.toSeq
- case nonAdaptivePlan =>
- nonAdaptivePlan.collect {
+ }
+ shuffleIdsOption.getOrElse {
+ plan.collect {
case exec: ShuffleExchangeLike => exec.shuffleId
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]