This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 68dd66ac8b [GLUTEN-11635] Enable partial fallback if parent node 
supports partial fallback (#11637)
68dd66ac8b is described below

commit 68dd66ac8b0f89b63ccd1b1fe138e4c80d10fd1a
Author: Wechar Yu <[email protected]>
AuthorDate: Tue Mar 10 16:30:57 2026 +0800

    [GLUTEN-11635] Enable partial fallback if parent node supports partial 
fallback (#11637)
    
    Fix #11635
    
    How was this patch tested?
    Add a new unit test.
    
    select plus_one(col1) as col2, l_partkey from (
      select plus_one(l_orderkey) as col1, l_partkey from lineitem
    )
    Before this PR:
    
    == Physical Plan ==
    *(1) Project [if (isnull(col1#73L)) null else 
plus_one(knownnotnull(col1#73L)) AS col2#74L, l_partkey#1L]
    +- *(1) Project [if (isnull(l_orderkey#0L)) null else 
plus_one(knownnotnull(l_orderkey#0L)) AS col1#73L, l_partkey#1L]
       +- VeloxColumnarToRow
          +- ^(1) BatchScanExecTransformer parquet 
file:/root/workspace/gluten-community/backends-velox/target/scala-2.13/test-classes/tpch-data-parquet/lineitem[l_orderkey#0L,
 l_partkey#1L] ParquetScan DataFilters: [], Format: parquet, Location: 
InMemoryFileIndex(1 
paths)[file:/root/workspace/gluten-community/backends-velox/target/scala-2.13...,
 PartitionFilters: [], PushedAggregation: [], PushedFilters: [], PushedGroupBy: 
[], ReadSchema: struct<l_orderkey:bigint,l_partkey:bigint> Runtim [...]
    After this PR:
    
    VeloxColumnarToRow
    +- ^(3) ProjectExecTransformer [_SparkPartialProject0#56L AS col2#38L, 
l_partkey#1L]
       +- ^(3) InputIteratorTransformer[col1#37L, l_partkey#1L, 
_SparkPartialProject0#56L]
          +- ColumnarPartialProject [if (isnull(col1#37L)) null else 
plus_one(knownnotnull(col1#37L)) AS col2#38L, l_partkey#1L] PartialProject 
List(if (isnull(col1#37L)) null else plus_one(knownnotnull(col1#37L)) AS 
_SparkPartialProject0#56L)
             +- ^(2) ProjectExecTransformer [_SparkPartialProject0#55L AS 
col1#37L, l_partkey#1L]
                +- ^(2) InputIteratorTransformer[l_orderkey#0L, l_partkey#1L, 
_SparkPartialProject0#55L]
                   +- ColumnarPartialProject [if (isnull(l_orderkey#0L)) null 
else plus_one(knownnotnull(l_orderkey#0L)) AS col1#37L, l_partkey#1L] 
PartialProject List(if (isnull(l_orderkey#0L)) null else 
plus_one(knownnotnull(l_orderkey#0L)) AS _SparkPartialProject0#55L)
                      +- ^(1) BatchScanExecTransformer parquet 
file:/root/workspace/gluten-community/backends-velox/target/scala-2.13/test-classes/tpch-data-parquet/lineitem[l_orderkey#0L,
 l_partkey#1L] ParquetScan DataFilters: [], Format: parquet, Location: 
InMemoryFileIndex(1 
paths)[file:/root/workspace/gluten-community/backends-velox/target/scala-2.13...,
 PartitionFilters: [], PushedAggregation: [], PushedFilters: [], PushedGroupBy: 
[], ReadSchema: struct<l_orderkey:bigint,l_partkey:bi [...]
---
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |  6 +--
 .../apache/gluten/extension/PartialFallback.scala  | 44 ++++++++++++++++++++++
 .../gluten/extension/PartialGenerateRule.scala     | 36 +++++++++---------
 .../gluten/extension/PartialProjectRule.scala      | 39 ++++++++++---------
 .../gluten/expression/UDFPartialProjectSuite.scala | 15 ++++++++
 .../spark/sql/execution/GlutenHiveUDFSuite.scala   | 18 +++++++++
 6 files changed, 116 insertions(+), 42 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 1d80536290..2895e63660 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -118,8 +118,7 @@ object VeloxRuleApi {
     injector.injectPostTransform(_ => 
AppendBatchResizeForShuffleInputAndOutput())
     injector.injectPostTransform(_ => 
GpuBufferBatchResizeForShuffleInputOutput())
     injector.injectPostTransform(_ => UnionTransformerRule())
-    injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
-    injector.injectPostTransform(_ => PartialGenerateRule())
+    injector.injectPostTransform(_ => PartialFallbackRules())
     injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
     injector.injectPostTransform(_ => PushDownFilterToScan)
     injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
@@ -223,8 +222,7 @@ object VeloxRuleApi {
     injector.injectPostTransform(_ => 
GpuBufferBatchResizeForShuffleInputOutput())
     injector.injectPostTransform(_ => RemoveTransitions)
     injector.injectPostTransform(_ => UnionTransformerRule())
-    injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
-    injector.injectPostTransform(_ => PartialGenerateRule())
+    injector.injectPostTransform(_ => PartialFallbackRules())
     injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
     injector.injectPostTransform(_ => PushDownFilterToScan)
     injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/PartialFallback.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/PartialFallback.scala
new file mode 100644
index 0000000000..8a7da72b3e
--- /dev/null
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/PartialFallback.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution.{GenerateExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.internal.SQLConf
+
+case class PartialFallbackRules() extends Rule[SparkPlan] {
+  override def apply(plan: SparkPlan): SparkPlan = {
+    new PartialFallbackRuleExecutor().execute(plan)
+  }
+
+  private class PartialFallbackRuleExecutor extends RuleExecutor[SparkPlan] {
+    private def fixedPoint =
+      FixedPoint(
+        SQLConf.get.optimizerMaxIterations,
+        maxIterationsSetting = SQLConf.OPTIMIZER_MAX_ITERATIONS.key)
+
+    override protected def batches: Seq[Batch] = Seq(
+      Batch("PartialFallback", fixedPoint, PartialProjectRule(), 
PartialGenerateRule()))
+  }
+}
+
+object PartialFallback {
+  def supportPartialFallback(plan: SparkPlan): Boolean = {
+    plan.isInstanceOf[ProjectExec] ||
+    plan.isInstanceOf[GenerateExec]
+  }
+}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/PartialGenerateRule.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/PartialGenerateRule.scala
index 5e641862e4..b47bc23fb7 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/extension/PartialGenerateRule.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/PartialGenerateRule.scala
@@ -17,7 +17,7 @@
 package org.apache.gluten.extension
 
 import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.{ColumnarPartialGenerateExec, 
GenerateExecTransformer}
+import org.apache.gluten.execution.{ColumnarPartialGenerateExec, 
GenerateExecTransformer, WholeStageTransformer}
 import org.apache.gluten.utils.PlanUtil
 
 import org.apache.spark.sql.catalyst.expressions.UserDefinedExpression
@@ -29,23 +29,23 @@ case class PartialGenerateRule() extends Rule[SparkPlan] {
     if (!GlutenConfig.get.enableColumnarPartialGenerate) {
       return plan
     }
-    val newPlan = plan match {
-      // If the root node of the plan is a GenerateExec and its child is a 
gluten columnar op,
-      // we try to add a ColumnarPartialGenerateExec
-      case plan: GenerateExec if PlanUtil.isGlutenColumnarOp(plan.child) =>
-        tryAddColumnarPartialGenerateExec(plan)
-      case _ => plan
-    }
-    newPlan.transformUp {
-      case parent: SparkPlan
-          if parent.children.exists(_.isInstanceOf[GenerateExec]) &&
-            PlanUtil.isGlutenColumnarOp(parent) =>
-        parent.mapChildren {
-          case plan: GenerateExec if PlanUtil.isGlutenColumnarOp(plan.child) =>
-            tryAddColumnarPartialGenerateExec(plan)
-          case other => other
-        }
-    }
+    // Wrap a WholeStageTransformer to check if the top node supports partial 
fallback.
+    // It will be removed afterward.
+    val wrapped = WholeStageTransformer(plan)(-1)
+    wrapped
+      .transformUp {
+        case parent: SparkPlan
+            if parent.children.exists(_.isInstanceOf[GenerateExec]) &&
+              (PlanUtil.isGlutenColumnarOp(parent) || 
PartialFallback.supportPartialFallback(
+                parent)) =>
+          parent.mapChildren {
+            case plan: GenerateExec if PlanUtil.isGlutenColumnarOp(plan.child) 
=>
+              tryAddColumnarPartialGenerateExec(plan)
+            case other => other
+          }
+      }
+      .children
+      .head
   }
 
   private def tryAddColumnarPartialGenerateExec(plan: GenerateExec): SparkPlan 
= {
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/PartialProjectRule.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/PartialProjectRule.scala
index f60bf11746..7a07ec0bc9 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/extension/PartialProjectRule.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/PartialProjectRule.scala
@@ -17,37 +17,36 @@
 package org.apache.gluten.extension
 
 import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.ColumnarPartialProjectExec
+import org.apache.gluten.execution.{ColumnarPartialProjectExec, 
WholeStageTransformer}
 import org.apache.gluten.utils.PlanUtil
 
-import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
 
-case class PartialProjectRule(spark: SparkSession) extends Rule[SparkPlan] {
+case class PartialProjectRule() extends Rule[SparkPlan] {
   override def apply(plan: SparkPlan): SparkPlan = {
     if (!GlutenConfig.get.enableColumnarPartialProject) {
       return plan
     }
 
-    val newPlan = plan match {
-      // If the root node of the plan is a ProjectExec and its child is a 
gluten columnar op,
-      // we try to add a ColumnarPartialProjectExec
-      case p: ProjectExec if PlanUtil.isGlutenColumnarOp(p.child) =>
-        tryAddColumnarPartialProjectExec(p)
-      case _ => plan
-    }
+    // Wrap a WholeStageTransformer to check if the top node supports partial 
fallback.
+    // It will be removed afterward.
+    val wrapped = WholeStageTransformer(plan)(-1)
 
-    newPlan.transformUp {
-      case parent: SparkPlan
-          if parent.children.exists(_.isInstanceOf[ProjectExec]) &&
-            PlanUtil.isGlutenColumnarOp(parent) =>
-        parent.mapChildren {
-          case p: ProjectExec if PlanUtil.isGlutenColumnarOp(p.child) =>
-            tryAddColumnarPartialProjectExec(p)
-          case other => other
-        }
-    }
+    wrapped
+      .transformUp {
+        case parent: SparkPlan
+            if parent.children.exists(_.isInstanceOf[ProjectExec]) &&
+              (PlanUtil.isGlutenColumnarOp(parent) || 
PartialFallback.supportPartialFallback(
+                parent)) =>
+          parent.mapChildren {
+            case p: ProjectExec if PlanUtil.isGlutenColumnarOp(p.child) =>
+              tryAddColumnarPartialProjectExec(p)
+            case other => other
+          }
+      }
+      .children
+      .head
   }
 
   private def tryAddColumnarPartialProjectExec(plan: ProjectExec): SparkPlan = 
{
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala
index f5a1bd454b..404477deba 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala
@@ -118,6 +118,21 @@ abstract class UDFPartialProjectSuite extends 
WholeStageTransformerSuite {
     }
   }
 
+  testWithMinSparkVersion("test plus_one in nested project lists", "3.4") {
+    val sql = """
+                |select plus_one(col1) as col2, l_partkey from (
+                | select plus_one(l_orderkey) as col1, l_partkey from lineitem
+                |)""".stripMargin
+    runQueryAndCompare(sql) {
+      checkGlutenPlan[ColumnarPartialProjectExec]
+    }
+
+    val df = spark.sql(sql)
+    assert(df.queryExecution.executedPlan.collect {
+      case p: ColumnarPartialProjectExec => p
+    }.size == 2)
+  }
+
   test("test plus_one with many columns in project") {
     runQueryAndCompare("SELECT plus_one(cast(l_orderkey as long)), 
hash(l_partkey) from lineitem") {
       checkGlutenPlan[ColumnarPartialProjectExec]
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
index 6919a30390..296de381cd 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
@@ -157,6 +157,24 @@ class GlutenHiveUDFSuite extends GlutenQueryComparisonTest 
with SQLTestUtils {
     }
   }
 
+  test("nested partial fallback") {
+    withTempFunction("noInputUDTF") {
+      val plusOne = udf((x: Long) => x + 1)
+      spark.udf.register("plus_one", plusOne)
+      sql(s"CREATE TEMPORARY FUNCTION noInputUDTF AS 
'${classOf[NoInputUDTF].getName}'")
+      runQueryAndCompare("""
+                           |select plus_one(col1) as col2, l_partkey from (
+                           | select col1, l_partkey from lineitem lateral view 
noInputUDTF() as col1
+                           |)""".stripMargin) {
+        df =>
+          {
+            checkOperatorMatch[ColumnarPartialProjectExec](df)
+            checkOperatorMatch[ColumnarPartialGenerateExec](df)
+          }
+      }
+    }
+  }
+
   test("lateral view outer udtf") {
     withTempFunction("conditionalOutputUDTF") {
       sql(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to