This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new db5a2f740 [VL] Collapse trivial projects generated by rule 
PushDownInputFileExpression
db5a2f740 is described below

commit db5a2f7400e0f4eacb1a5538fe843394b7bcf556
Author: Mingliang Zhu <[email protected]>
AuthorDate: Fri Sep 13 14:57:44 2024 +0800

    [VL] Collapse trivial projects generated by rule PushDownInputFileExpression
---
 .../execution/ScalarFunctionsValidateSuite.scala   | 17 ++++++++++++
 .../columnar/PushDownInputFileExpression.scala     | 31 +++++++++++++++++++---
 2 files changed, 44 insertions(+), 4 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
index 57f194f4b..ecad62984 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.gluten.execution
 
+import org.apache.gluten.GlutenConfig
+
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.sql.catalyst.optimizer.NullPropagation
 import org.apache.spark.sql.execution.ProjectExec
@@ -1366,6 +1368,21 @@ abstract class ScalarFunctionsValidateSuite extends 
FunctionsValidateSuite {
             |""".stripMargin
         compareResultsAgainstVanillaSpark(sql, true, { _ => })
     }
+
+    // Collapse project if scan is fallback and the outer project is cheap or 
fallback.
+    Seq("true", "false").foreach {
+      flag =>
+        withSQLConf(
+          GlutenConfig.COLUMNAR_PROJECT_ENABLED.key -> flag,
+          GlutenConfig.COLUMNAR_BATCHSCAN_ENABLED.key -> "false") {
+          runQueryAndCompare("SELECT l_orderkey, input_file_name() as name 
FROM lineitem") {
+            df =>
+              val plan = df.queryExecution.executedPlan
+              assert(collect(plan) { case f: ProjectExecTransformer => f 
}.size == 0)
+              assert(collect(plan) { case f: ProjectExec => f }.size == 1)
+          }
+        }
+    }
   }
 
   testWithSpecifiedSparkVersion("array insert", Some("3.4")) {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
index e1219fead..e92ffb943 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
@@ -16,9 +16,10 @@
  */
 package org.apache.gluten.extension.columnar
 
-import org.apache.gluten.execution.{BatchScanExecTransformer, 
FileSourceScanExecTransformer}
+import org.apache.gluten.execution.{BatchScanExecTransformer, 
FileSourceScanExecTransformer, ProjectExecTransformer}
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, 
NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, 
InputFileName, NamedExpression}
+import org.apache.spark.sql.catalyst.optimizer.CollapseProjectShim
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{DeserializeToObjectExec, LeafExecNode, 
ProjectExec, SerializeFromObjectExec, SparkPlan, UnionExec}
 
@@ -34,8 +35,8 @@ import scala.collection.mutable
  * Two rules are involved:
  *   - Before offload, add new project before leaf node and push down input 
file expression to the
  *     new project
- *   - After offload, if scan be offloaded, push down input file expression 
into scan and remove
- *     project
+ *   - After offload, push down input file expression into scan and remove 
project if scan be
+ *     offloaded, collapse project if scan is fallback and the outer project 
is cheap or fallback
  */
 object PushDownInputFileExpression {
   def containsInputFileRelatedExpr(expr: Expression): Boolean = {
@@ -113,6 +114,28 @@ object PushDownInputFileExpression {
       case p @ ProjectExec(projectList, child: BatchScanExecTransformer)
           if projectList.exists(containsInputFileRelatedExpr) =>
         child.copy(output = p.output.asInstanceOf[Seq[AttributeReference]])
+      case p1 @ ProjectExec(_, p2: ProjectExec) if canCollapseProject(p2) =>
+        p2.copy(projectList =
+          CollapseProjectShim.buildCleanedProjectList(p1.projectList, 
p2.projectList))
+      case p1 @ ProjectExecTransformer(_, p2: ProjectExec) if 
canCollapseProject(p1, p2) =>
+        p2.copy(projectList =
+          CollapseProjectShim.buildCleanedProjectList(p1.projectList, 
p2.projectList))
+    }
+
+    private def canCollapseProject(project: ProjectExec): Boolean = {
+      project.projectList.forall {
+        case Alias(_: InputFileName | _: InputFileBlockStart | _: 
InputFileBlockLength, _) => true
+        case _: Attribute => true
+        case _ => false
+      }
+    }
+
+    private def canCollapseProject(p1: ProjectExecTransformer, p2: 
ProjectExec): Boolean = {
+      canCollapseProject(p2) && p1.projectList.forall {
+        case Alias(_: Attribute, _) => true
+        case _: Attribute => true
+        case _ => false
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to