This is an automated email from the ASF dual-hosted git repository.
liujiayi771 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new eae60a5b4d [CORE] Preserve fallback tag for nodes without logicalLink
in RemoveFallbackTagRule (#12028)
eae60a5b4d is described below
commit eae60a5b4dd4308e8e8e05d8023bbb512f08cbca
Author: Joey <[email protected]>
AuthorDate: Sat May 9 15:46:18 2026 +0800
[CORE] Preserve fallback tag for nodes without logicalLink in
RemoveFallbackTagRule (#12028)
For SparkPlan nodes that lack a logicalLink (e.g., SortExec /
BroadcastExchange
generated by EnsureRequirements), GlutenFallbackReporter cannot copy the
fallback
reason onto a logical plan, so removing the FallbackTag on the physical
node makes
GlutenQueryExecutionListener fall back to the generic 'Gluten does not
touch it
or does not support it' reason at SQL execution end.
Fix: in RemoveFallbackTagRule, skip untagging for such nodes. Their tag
stays on
the physical node and is read by GlutenExplainUtils.handleVanillaSparkPlan
via
FallbackTags.getOption(p) in both the per-stage GlutenFallbackReporter
event and
the end-of-SQL plan walk.
Note: an earlier attempt synthesized a dummy LeafNode and attached it via
SparkPlan.LOGICAL_PLAN_TAG. That poisoned
AdaptiveSparkPlanExec.setLogicalLinkFor-
NewQueryStage, which relies on EnsureRequirements-generated nodes having no
logicalLink in order to walk down to the real logical node, and broke
AQE-driven
join/empty-relation rewrites (SPARK-34533, SPARK-34781, SPARK-39551).
Co-authored-by: Claude Opus 4.7 <[email protected]>
---
.../org/apache/gluten/execution/FallbackSuite.scala | 6 +++++-
.../apache/gluten/extension/columnar/FallbackTag.scala | 16 +++++++++++++++-
2 files changed, 20 insertions(+), 2 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
index 5250ab7df7..421e0959f1 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
@@ -324,6 +324,10 @@ class FallbackSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSparkPl
}
}
}
+ // Drain any pending events from previous tests before registering the
listener.
+ // Spark's LiveListenerBus is async, so events posted but not yet
dispatched will
+ // still be delivered to listeners added afterwards, contaminating
`events` here.
+ GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext)
spark.sparkContext.addSparkListener(listener)
withSQLConf(GlutenConfig.COLUMNAR_SORT_ENABLED.key -> "false") {
try {
@@ -345,7 +349,7 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite
with AdaptiveSparkPl
val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values)
assert(fallbackReasons.nonEmpty)
assert(
- fallbackReasons.exists(
+ fallbackReasons.forall(
_.contains("[FallbackByUserOptions] Validation failed on node
Sort")))
} finally {
spark.sparkContext.removeSparkListener(listener)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTag.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTag.scala
index b18fa519e0..32020e68bc 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTag.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTag.scala
@@ -115,7 +115,21 @@ object FallbackTags {
case class RemoveFallbackTagRule() extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
- plan.foreach(FallbackTags.untag)
+ plan.foreach {
+ // Nodes without a logicalLink (e.g. SortExec/BroadcastExchange added by
+ // EnsureRequirements) have no place to forward the fallback reason via
+ // GlutenFallbackReporter. Keep the tag on the physical node so that
+ // GlutenExplainUtils.handleVanillaSparkPlan can still read it directly.
+ // We intentionally do NOT inject a synthetic logicalLink here, because
+ // AdaptiveSparkPlanExec.setLogicalLinkForNewQueryStage relies on these
+ // EnsureRequirements-generated nodes having no logicalLink in order to
+ // walk down to the real logical node; a synthetic link would poison
+ // AQE's stage-to-logical-plan mapping (breaking
AQEPropagateEmptyRelation,
+ // ValidateSparkPlan-driven re-plans, etc.).
+ case p if FallbackTags.nonEmpty(p) && p.logicalLink.isEmpty =>
+ case p =>
+ FallbackTags.untag(p)
+ }
plan
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]