This is an automated email from the ASF dual-hosted git repository.
liujiayi771 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new cf62a41bf2 [CORE] Get correct fallback reason on nodes without
logicalLink (#11295)
cf62a41bf2 is described below
commit cf62a41bf254b038217685822da6f5801d374fe3
Author: Joey <[email protected]>
AuthorDate: Tue Dec 16 09:27:11 2025 +0800
[CORE] Get correct fallback reason on nodes without logicalLink (#11295)
---
.../apache/gluten/execution/FallbackSuite.scala | 49 +++++++++++++++++++++-
.../org/apache/spark/utils/GlutenSuiteUtils.scala | 23 ++++++++++
.../spark/sql/execution/GlutenExplainUtils.scala | 20 ++++++---
3 files changed, 86 insertions(+), 6 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
index 70ef9e19ec..9eccf75ad3 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
@@ -17,12 +17,17 @@
package org.apache.gluten.execution
import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
+import org.apache.gluten.events.GlutenPlanFallbackEvent
import org.apache.spark.SparkConf
-import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec,
ColumnarShuffleExchangeExec, SparkPlan}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
+import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec,
ColumnarShuffleExchangeExec, SortExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
AQEShuffleReadExec}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec,
SortMergeJoinExec}
+import org.apache.spark.utils.GlutenSuiteUtils
+
+import scala.collection.mutable.ArrayBuffer
class FallbackSuite extends VeloxWholeStageTransformerSuite with
AdaptiveSparkPlanHelper {
protected val rootPath: String = getClass.getResource("/").getPath
@@ -35,6 +40,9 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite
with AdaptiveSparkPl
.set("spark.sql.shuffle.partitions", "5")
.set("spark.memory.offHeap.size", "2g")
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
+ // The gluten ui event test suite expects the spark ui to be enabled.
+ .set(GlutenConfig.GLUTEN_UI_ENABLED.key, "true")
+ .set("spark.ui.enabled", "true")
}
override def beforeAll(): Unit = {
@@ -312,4 +320,43 @@ class FallbackSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSparkPl
assert(columnarToRow == 1)
}
}
+
+ test("get correct fallback reason on nodes without logicalLink") {
+ val events = new ArrayBuffer[GlutenPlanFallbackEvent]
+ val listener = new SparkListener {
+ override def onOtherEvent(event: SparkListenerEvent): Unit = {
+ event match {
+ case e: GlutenPlanFallbackEvent => events.append(e)
+ case _ =>
+ }
+ }
+ }
+ spark.sparkContext.addSparkListener(listener)
+ withSQLConf(GlutenConfig.COLUMNAR_SORT_ENABLED.key -> "false") {
+ try {
+ val df = spark.sql("""
+ |SELECT
+ | c1,
+ | c2,
+ | ROW_NUMBER() OVER (PARTITION BY c1 ORDER BY
c2) as row_num,
+ | RANK() OVER (PARTITION BY c1 ORDER BY c2) as
rank_num
+ |FROM tmp1
+ |
+ |""".stripMargin)
+ df.collect()
+ GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext)
+ val sort = find(df.queryExecution.executedPlan) {
+ _.isInstanceOf[SortExec]
+ }
+ assert(sort.isDefined)
+ val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values)
+ assert(fallbackReasons.nonEmpty)
+ assert(
+ fallbackReasons.forall(
+ _.contains("[FallbackByUserOptions] Validation failed on node
Sort")))
+ } finally {
+ spark.sparkContext.removeSparkListener(listener)
+ }
+ }
+ }
}
diff --git
a/backends-velox/src/test/scala/org/apache/spark/utils/GlutenSuiteUtils.scala
b/backends-velox/src/test/scala/org/apache/spark/utils/GlutenSuiteUtils.scala
new file mode 100644
index 0000000000..1453a56072
--- /dev/null
+++
b/backends-velox/src/test/scala/org/apache/spark/utils/GlutenSuiteUtils.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.utils
+
+import org.apache.spark.SparkContext
+
+object GlutenSuiteUtils {
+ def waitUntilEmpty(ctx: SparkContext): Unit =
ctx.listenerBus.waitUntilEmpty()
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
index 7a33350524..1e5beeb7c1 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
@@ -65,12 +65,22 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
case Some(tag) => addFallbackNodeWithReason(p, tag.reason(),
fallbackNodeToReason)
case _ =>
// If the SparkPlan does not have fallback reason, then there are two
options:
- // 1. Gluten ignore that plan and it's a kind of fallback
+ // 1. Gluten ignore that plan, and it's a kind of fallback
// 2. Gluten does not support it without the fallback reason
- addFallbackNodeWithReason(
- p,
- "Gluten does not touch it or does not support it",
- fallbackNodeToReason)
+
+ // Some SparkPlan nodes may lack a logicalLink (e.g., SortExec
generated
+ // by EnsureRequirements). Additionally, some Gluten rules might omit
+ // `copyTagsFrom` during plan conversion, leading to the same issue.
+ // When such a node falls back, the fallback reason might be stored in
+ // its tags. We attempt to retrieve it here as a fallback mechanism.
+ // However, this is not always reliable and merely increases our
chances
+ // of finding the correct reason, as AQE might create new SparkPlan
+ // instances that do not preserve the tags.
+ val reason = FallbackTags
+ .getOption(p)
+ .map(_.reason())
+ .getOrElse("Gluten does not touch it or does not support it")
+ addFallbackNodeWithReason(p, reason, fallbackNodeToReason)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]