This is an automated email from the ASF dual-hosted git repository.

liujiayi771 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new cf62a41bf2 [CORE] Get correct fallback reason on nodes without 
logicalLink (#11295)
cf62a41bf2 is described below

commit cf62a41bf254b038217685822da6f5801d374fe3
Author: Joey <[email protected]>
AuthorDate: Tue Dec 16 09:27:11 2025 +0800

    [CORE] Get correct fallback reason on nodes without logicalLink (#11295)
---
 .../apache/gluten/execution/FallbackSuite.scala    | 49 +++++++++++++++++++++-
 .../org/apache/spark/utils/GlutenSuiteUtils.scala  | 23 ++++++++++
 .../spark/sql/execution/GlutenExplainUtils.scala   | 20 ++++++---
 3 files changed, 86 insertions(+), 6 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala 
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
index 70ef9e19ec..9eccf75ad3 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
@@ -17,12 +17,17 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
+import org.apache.gluten.events.GlutenPlanFallbackEvent
 
 import org.apache.spark.SparkConf
-import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, 
ColumnarShuffleExchangeExec, SparkPlan}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
+import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, 
ColumnarShuffleExchangeExec, SortExec, SparkPlan}
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, 
AQEShuffleReadExec}
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
SortMergeJoinExec}
+import org.apache.spark.utils.GlutenSuiteUtils
+
+import scala.collection.mutable.ArrayBuffer
 
 class FallbackSuite extends VeloxWholeStageTransformerSuite with 
AdaptiveSparkPlanHelper {
   protected val rootPath: String = getClass.getResource("/").getPath
@@ -35,6 +40,9 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite 
with AdaptiveSparkPl
       .set("spark.sql.shuffle.partitions", "5")
       .set("spark.memory.offHeap.size", "2g")
       .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+      // The gluten ui event test suite expects the spark ui to be enabled.
+      .set(GlutenConfig.GLUTEN_UI_ENABLED.key, "true")
+      .set("spark.ui.enabled", "true")
   }
 
   override def beforeAll(): Unit = {
@@ -312,4 +320,43 @@ class FallbackSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSparkPl
         assert(columnarToRow == 1)
     }
   }
+
+  test("get correct fallback reason on nodes without logicalLink") {
+    val events = new ArrayBuffer[GlutenPlanFallbackEvent]
+    val listener = new SparkListener {
+      override def onOtherEvent(event: SparkListenerEvent): Unit = {
+        event match {
+          case e: GlutenPlanFallbackEvent => events.append(e)
+          case _ =>
+        }
+      }
+    }
+    spark.sparkContext.addSparkListener(listener)
+    withSQLConf(GlutenConfig.COLUMNAR_SORT_ENABLED.key -> "false") {
+      try {
+        val df = spark.sql("""
+                             |SELECT
+                             |  c1,
+                             |  c2,
+                             |  ROW_NUMBER() OVER (PARTITION BY c1 ORDER BY 
c2) as row_num,
+                             |  RANK() OVER (PARTITION BY c1 ORDER BY c2) as 
rank_num
+                             |FROM tmp1
+                             |
+                             |""".stripMargin)
+        df.collect()
+        GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext)
+        val sort = find(df.queryExecution.executedPlan) {
+          _.isInstanceOf[SortExec]
+        }
+        assert(sort.isDefined)
+        val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values)
+        assert(fallbackReasons.nonEmpty)
+        assert(
+          fallbackReasons.forall(
+            _.contains("[FallbackByUserOptions] Validation failed on node 
Sort")))
+      } finally {
+        spark.sparkContext.removeSparkListener(listener)
+      }
+    }
+  }
 }
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/utils/GlutenSuiteUtils.scala 
b/backends-velox/src/test/scala/org/apache/spark/utils/GlutenSuiteUtils.scala
new file mode 100644
index 0000000000..1453a56072
--- /dev/null
+++ 
b/backends-velox/src/test/scala/org/apache/spark/utils/GlutenSuiteUtils.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.utils
+
+import org.apache.spark.SparkContext
+
+object GlutenSuiteUtils {
+  def waitUntilEmpty(ctx: SparkContext): Unit = 
ctx.listenerBus.waitUntilEmpty()
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
index 7a33350524..1e5beeb7c1 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
@@ -65,12 +65,22 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
       case Some(tag) => addFallbackNodeWithReason(p, tag.reason(), 
fallbackNodeToReason)
       case _ =>
         // If the SparkPlan does not have fallback reason, then there are two 
options:
-        // 1. Gluten ignore that plan and it's a kind of fallback
+        // 1. Gluten ignore that plan, and it's a kind of fallback
         // 2. Gluten does not support it without the fallback reason
-        addFallbackNodeWithReason(
-          p,
-          "Gluten does not touch it or does not support it",
-          fallbackNodeToReason)
+
+        // Some SparkPlan nodes may lack a logicalLink (e.g., SortExec 
generated
+        // by EnsureRequirements). Additionally, some Gluten rules might omit
+        // `copyTagsFrom` during plan conversion, leading to the same issue.
+        // When such a node falls back, the fallback reason might be stored in
+        // its tags. We attempt to retrieve it here as a fallback mechanism.
+        // However, this is not always reliable and merely increases our 
chances
+        // of finding the correct reason, as AQE might create new SparkPlan
+        // instances that do not preserve the tags.
+        val reason = FallbackTags
+          .getOption(p)
+          .map(_.reason())
+          .getOrElse("Gluten does not touch it or does not support it")
+        addFallbackNodeWithReason(p, reason, fallbackNodeToReason)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to