This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 68dd66ac8b [GLUTEN-11635] Enable partial fallback if parent node
supports partial fallback (#11637)
68dd66ac8b is described below
commit 68dd66ac8b0f89b63ccd1b1fe138e4c80d10fd1a
Author: Wechar Yu <[email protected]>
AuthorDate: Tue Mar 10 16:30:57 2026 +0800
[GLUTEN-11635] Enable partial fallback if parent node supports partial
fallback (#11637)
Fix #11635
How was this patch tested?
Add a new unit test.
select plus_one(col1) as col2, l_partkey from (
select plus_one(l_orderkey) as col1, l_partkey from lineitem
)
Before this PR:
== Physical Plan ==
*(1) Project [if (isnull(col1#73L)) null else
plus_one(knownnotnull(col1#73L)) AS col2#74L, l_partkey#1L]
+- *(1) Project [if (isnull(l_orderkey#0L)) null else
plus_one(knownnotnull(l_orderkey#0L)) AS col1#73L, l_partkey#1L]
+- VeloxColumnarToRow
+- ^(1) BatchScanExecTransformer parquet
file:/root/workspace/gluten-community/backends-velox/target/scala-2.13/test-classes/tpch-data-parquet/lineitem[l_orderkey#0L,
l_partkey#1L] ParquetScan DataFilters: [], Format: parquet, Location:
InMemoryFileIndex(1
paths)[file:/root/workspace/gluten-community/backends-velox/target/scala-2.13...,
PartitionFilters: [], PushedAggregation: [], PushedFilters: [], PushedGroupBy:
[], ReadSchema: struct<l_orderkey:bigint,l_partkey:bigint> Runtim [...]
After this PR:
VeloxColumnarToRow
+- ^(3) ProjectExecTransformer [_SparkPartialProject0#56L AS col2#38L,
l_partkey#1L]
+- ^(3) InputIteratorTransformer[col1#37L, l_partkey#1L,
_SparkPartialProject0#56L]
+- ColumnarPartialProject [if (isnull(col1#37L)) null else
plus_one(knownnotnull(col1#37L)) AS col2#38L, l_partkey#1L] PartialProject
List(if (isnull(col1#37L)) null else plus_one(knownnotnull(col1#37L)) AS
_SparkPartialProject0#56L)
+- ^(2) ProjectExecTransformer [_SparkPartialProject0#55L AS
col1#37L, l_partkey#1L]
+- ^(2) InputIteratorTransformer[l_orderkey#0L, l_partkey#1L,
_SparkPartialProject0#55L]
+- ColumnarPartialProject [if (isnull(l_orderkey#0L)) null
else plus_one(knownnotnull(l_orderkey#0L)) AS col1#37L, l_partkey#1L]
PartialProject List(if (isnull(l_orderkey#0L)) null else
plus_one(knownnotnull(l_orderkey#0L)) AS _SparkPartialProject0#55L)
+- ^(1) BatchScanExecTransformer parquet
file:/root/workspace/gluten-community/backends-velox/target/scala-2.13/test-classes/tpch-data-parquet/lineitem[l_orderkey#0L,
l_partkey#1L] ParquetScan DataFilters: [], Format: parquet, Location:
InMemoryFileIndex(1
paths)[file:/root/workspace/gluten-community/backends-velox/target/scala-2.13...,
PartitionFilters: [], PushedAggregation: [], PushedFilters: [], PushedGroupBy:
[], ReadSchema: struct<l_orderkey:bigint,l_partkey:bi [...]
---
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 6 +--
.../apache/gluten/extension/PartialFallback.scala | 44 ++++++++++++++++++++++
.../gluten/extension/PartialGenerateRule.scala | 36 +++++++++---------
.../gluten/extension/PartialProjectRule.scala | 39 ++++++++++---------
.../gluten/expression/UDFPartialProjectSuite.scala | 15 ++++++++
.../spark/sql/execution/GlutenHiveUDFSuite.scala | 18 +++++++++
6 files changed, 116 insertions(+), 42 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 1d80536290..2895e63660 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -118,8 +118,7 @@ object VeloxRuleApi {
injector.injectPostTransform(_ =>
AppendBatchResizeForShuffleInputAndOutput())
injector.injectPostTransform(_ =>
GpuBufferBatchResizeForShuffleInputOutput())
injector.injectPostTransform(_ => UnionTransformerRule())
- injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
- injector.injectPostTransform(_ => PartialGenerateRule())
+ injector.injectPostTransform(_ => PartialFallbackRules())
injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectPostTransform(_ => PushDownFilterToScan)
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
@@ -223,8 +222,7 @@ object VeloxRuleApi {
injector.injectPostTransform(_ =>
GpuBufferBatchResizeForShuffleInputOutput())
injector.injectPostTransform(_ => RemoveTransitions)
injector.injectPostTransform(_ => UnionTransformerRule())
- injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
- injector.injectPostTransform(_ => PartialGenerateRule())
+ injector.injectPostTransform(_ => PartialFallbackRules())
injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectPostTransform(_ => PushDownFilterToScan)
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/PartialFallback.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/PartialFallback.scala
new file mode 100644
index 0000000000..8a7da72b3e
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/PartialFallback.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution.{GenerateExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.internal.SQLConf
+
+case class PartialFallbackRules() extends Rule[SparkPlan] {
+ override def apply(plan: SparkPlan): SparkPlan = {
+ new PartialFallbackRuleExecutor().execute(plan)
+ }
+
+ private class PartialFallbackRuleExecutor extends RuleExecutor[SparkPlan] {
+ private def fixedPoint =
+ FixedPoint(
+ SQLConf.get.optimizerMaxIterations,
+ maxIterationsSetting = SQLConf.OPTIMIZER_MAX_ITERATIONS.key)
+
+ override protected def batches: Seq[Batch] = Seq(
+ Batch("PartialFallback", fixedPoint, PartialProjectRule(),
PartialGenerateRule()))
+ }
+}
+
+object PartialFallback {
+ def supportPartialFallback(plan: SparkPlan): Boolean = {
+ plan.isInstanceOf[ProjectExec] ||
+ plan.isInstanceOf[GenerateExec]
+ }
+}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/PartialGenerateRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/PartialGenerateRule.scala
index 5e641862e4..b47bc23fb7 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/PartialGenerateRule.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/PartialGenerateRule.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.extension
import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.{ColumnarPartialGenerateExec,
GenerateExecTransformer}
+import org.apache.gluten.execution.{ColumnarPartialGenerateExec,
GenerateExecTransformer, WholeStageTransformer}
import org.apache.gluten.utils.PlanUtil
import org.apache.spark.sql.catalyst.expressions.UserDefinedExpression
@@ -29,23 +29,23 @@ case class PartialGenerateRule() extends Rule[SparkPlan] {
if (!GlutenConfig.get.enableColumnarPartialGenerate) {
return plan
}
- val newPlan = plan match {
- // If the root node of the plan is a GenerateExec and its child is a
gluten columnar op,
- // we try to add a ColumnarPartialGenerateExec
- case plan: GenerateExec if PlanUtil.isGlutenColumnarOp(plan.child) =>
- tryAddColumnarPartialGenerateExec(plan)
- case _ => plan
- }
- newPlan.transformUp {
- case parent: SparkPlan
- if parent.children.exists(_.isInstanceOf[GenerateExec]) &&
- PlanUtil.isGlutenColumnarOp(parent) =>
- parent.mapChildren {
- case plan: GenerateExec if PlanUtil.isGlutenColumnarOp(plan.child) =>
- tryAddColumnarPartialGenerateExec(plan)
- case other => other
- }
- }
+ // Wrap a WholeStageTransformer to check if the top node supports partial
fallback.
+ // It will be removed afterward.
+ val wrapped = WholeStageTransformer(plan)(-1)
+ wrapped
+ .transformUp {
+ case parent: SparkPlan
+ if parent.children.exists(_.isInstanceOf[GenerateExec]) &&
+ (PlanUtil.isGlutenColumnarOp(parent) ||
PartialFallback.supportPartialFallback(
+ parent)) =>
+ parent.mapChildren {
+ case plan: GenerateExec if PlanUtil.isGlutenColumnarOp(plan.child)
=>
+ tryAddColumnarPartialGenerateExec(plan)
+ case other => other
+ }
+ }
+ .children
+ .head
}
private def tryAddColumnarPartialGenerateExec(plan: GenerateExec): SparkPlan
= {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/PartialProjectRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/PartialProjectRule.scala
index f60bf11746..7a07ec0bc9 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/PartialProjectRule.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/PartialProjectRule.scala
@@ -17,37 +17,36 @@
package org.apache.gluten.extension
import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.ColumnarPartialProjectExec
+import org.apache.gluten.execution.{ColumnarPartialProjectExec,
WholeStageTransformer}
import org.apache.gluten.utils.PlanUtil
-import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
-case class PartialProjectRule(spark: SparkSession) extends Rule[SparkPlan] {
+case class PartialProjectRule() extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
if (!GlutenConfig.get.enableColumnarPartialProject) {
return plan
}
- val newPlan = plan match {
- // If the root node of the plan is a ProjectExec and its child is a
gluten columnar op,
- // we try to add a ColumnarPartialProjectExec
- case p: ProjectExec if PlanUtil.isGlutenColumnarOp(p.child) =>
- tryAddColumnarPartialProjectExec(p)
- case _ => plan
- }
+ // Wrap a WholeStageTransformer to check if the top node supports partial
fallback.
+ // It will be removed afterward.
+ val wrapped = WholeStageTransformer(plan)(-1)
- newPlan.transformUp {
- case parent: SparkPlan
- if parent.children.exists(_.isInstanceOf[ProjectExec]) &&
- PlanUtil.isGlutenColumnarOp(parent) =>
- parent.mapChildren {
- case p: ProjectExec if PlanUtil.isGlutenColumnarOp(p.child) =>
- tryAddColumnarPartialProjectExec(p)
- case other => other
- }
- }
+ wrapped
+ .transformUp {
+ case parent: SparkPlan
+ if parent.children.exists(_.isInstanceOf[ProjectExec]) &&
+ (PlanUtil.isGlutenColumnarOp(parent) ||
PartialFallback.supportPartialFallback(
+ parent)) =>
+ parent.mapChildren {
+ case p: ProjectExec if PlanUtil.isGlutenColumnarOp(p.child) =>
+ tryAddColumnarPartialProjectExec(p)
+ case other => other
+ }
+ }
+ .children
+ .head
}
private def tryAddColumnarPartialProjectExec(plan: ProjectExec): SparkPlan =
{
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala
index f5a1bd454b..404477deba 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala
@@ -118,6 +118,21 @@ abstract class UDFPartialProjectSuite extends
WholeStageTransformerSuite {
}
}
+ testWithMinSparkVersion("test plus_one in nested project lists", "3.4") {
+ val sql = """
+ |select plus_one(col1) as col2, l_partkey from (
+ | select plus_one(l_orderkey) as col1, l_partkey from lineitem
+ |)""".stripMargin
+ runQueryAndCompare(sql) {
+ checkGlutenPlan[ColumnarPartialProjectExec]
+ }
+
+ val df = spark.sql(sql)
+ assert(df.queryExecution.executedPlan.collect {
+ case p: ColumnarPartialProjectExec => p
+ }.size == 2)
+ }
+
test("test plus_one with many columns in project") {
runQueryAndCompare("SELECT plus_one(cast(l_orderkey as long)),
hash(l_partkey) from lineitem") {
checkGlutenPlan[ColumnarPartialProjectExec]
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
index 6919a30390..296de381cd 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
@@ -157,6 +157,24 @@ class GlutenHiveUDFSuite extends GlutenQueryComparisonTest
with SQLTestUtils {
}
}
+ test("nested partial fallback") {
+ withTempFunction("noInputUDTF") {
+ val plusOne = udf((x: Long) => x + 1)
+ spark.udf.register("plus_one", plusOne)
+ sql(s"CREATE TEMPORARY FUNCTION noInputUDTF AS
'${classOf[NoInputUDTF].getName}'")
+ runQueryAndCompare("""
+ |select plus_one(col1) as col2, l_partkey from (
+ | select col1, l_partkey from lineitem lateral view
noInputUDTF() as col1
+ |)""".stripMargin) {
+ df =>
+ {
+ checkOperatorMatch[ColumnarPartialProjectExec](df)
+ checkOperatorMatch[ColumnarPartialGenerateExec](df)
+ }
+ }
+ }
+ }
+
test("lateral view outer udtf") {
withTempFunction("conditionalOutputUDTF") {
sql(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]