This is an automated email from the ASF dual-hosted git repository.

liujiayi771 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new eae60a5b4d [CORE] Preserve fallback tag for nodes without logicalLink 
in RemoveFallbackTagRule (#12028)
eae60a5b4d is described below

commit eae60a5b4dd4308e8e8e05d8023bbb512f08cbca
Author: Joey <[email protected]>
AuthorDate: Sat May 9 15:46:18 2026 +0800

    [CORE] Preserve fallback tag for nodes without logicalLink in 
RemoveFallbackTagRule (#12028)
    
    For SparkPlan nodes that lack a logicalLink (e.g., SortExec / 
BroadcastExchange
    generated by EnsureRequirements), GlutenFallbackReporter cannot copy the 
fallback
    reason onto a logical plan, so removing the FallbackTag on the physical 
node makes
    GlutenQueryExecutionListener fall back to the generic 'Gluten does not 
touch it
    or does not support it' reason at SQL execution end.
    
    Fix: in RemoveFallbackTagRule, skip untagging for such nodes. Their tag 
stays on
    the physical node and is read by GlutenExplainUtils.handleVanillaSparkPlan 
via
    FallbackTags.getOption(p) in both the per-stage GlutenFallbackReporter 
event and
    the end-of-SQL plan walk.
    
    Note: an earlier attempt synthesized a dummy LeafNode and attached it via
    SparkPlan.LOGICAL_PLAN_TAG. That poisoned 
AdaptiveSparkPlanExec.setLogicalLinkFor-
    NewQueryStage, which relies on EnsureRequirements-generated nodes having no
    logicalLink in order to walk down to the real logical node, and broke 
AQE-driven
    join/empty-relation rewrites (SPARK-34533, SPARK-34781, SPARK-39551).
    
    Co-authored-by: Claude Opus 4.7 <[email protected]>
---
 .../org/apache/gluten/execution/FallbackSuite.scala      |  6 +++++-
 .../apache/gluten/extension/columnar/FallbackTag.scala   | 16 +++++++++++++++-
 2 files changed, 20 insertions(+), 2 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala 
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
index 5250ab7df7..421e0959f1 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
@@ -324,6 +324,10 @@ class FallbackSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSparkPl
         }
       }
     }
+    // Drain any pending events from previous tests before registering the 
listener.
+    // Spark's LiveListenerBus is async, so events posted but not yet 
dispatched will
+    // still be delivered to listeners added afterwards, contaminating 
`events` here.
+    GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext)
     spark.sparkContext.addSparkListener(listener)
     withSQLConf(GlutenConfig.COLUMNAR_SORT_ENABLED.key -> "false") {
       try {
@@ -345,7 +349,7 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite 
with AdaptiveSparkPl
         val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values)
         assert(fallbackReasons.nonEmpty)
         assert(
-          fallbackReasons.exists(
+          fallbackReasons.forall(
             _.contains("[FallbackByUserOptions] Validation failed on node 
Sort")))
       } finally {
         spark.sparkContext.removeSparkListener(listener)
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTag.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTag.scala
index b18fa519e0..32020e68bc 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTag.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTag.scala
@@ -115,7 +115,21 @@ object FallbackTags {
 
 case class RemoveFallbackTagRule() extends Rule[SparkPlan] {
   override def apply(plan: SparkPlan): SparkPlan = {
-    plan.foreach(FallbackTags.untag)
+    plan.foreach {
+      // Nodes without a logicalLink (e.g. SortExec/BroadcastExchange added by
+      // EnsureRequirements) have no place to forward the fallback reason via
+      // GlutenFallbackReporter. Keep the tag on the physical node so that
+      // GlutenExplainUtils.handleVanillaSparkPlan can still read it directly.
+      // We intentionally do NOT inject a synthetic logicalLink here, because
+      // AdaptiveSparkPlanExec.setLogicalLinkForNewQueryStage relies on these
+      // EnsureRequirements-generated nodes having no logicalLink in order to
+      // walk down to the real logical node; a synthetic link would poison
+      // AQE's stage-to-logical-plan mapping (breaking 
AQEPropagateEmptyRelation,
+      // ValidateSparkPlan-driven re-plans, etc.).
+      case p if FallbackTags.nonEmpty(p) && p.logicalLink.isEmpty =>
+      case p =>
+        FallbackTags.untag(p)
+    }
     plan
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to