This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new db5a2f740 [VL] Collapse trivial projects generated by rule
PushDownInputFileExpression
db5a2f740 is described below
commit db5a2f7400e0f4eacb1a5538fe843394b7bcf556
Author: Mingliang Zhu <[email protected]>
AuthorDate: Fri Sep 13 14:57:44 2024 +0800
[VL] Collapse trivial projects generated by rule PushDownInputFileExpression
---
.../execution/ScalarFunctionsValidateSuite.scala | 17 ++++++++++++
.../columnar/PushDownInputFileExpression.scala | 31 +++++++++++++++++++---
2 files changed, 44 insertions(+), 4 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
index 57f194f4b..ecad62984 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
@@ -16,6 +16,8 @@
*/
package org.apache.gluten.execution
+import org.apache.gluten.GlutenConfig
+
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.catalyst.optimizer.NullPropagation
import org.apache.spark.sql.execution.ProjectExec
@@ -1366,6 +1368,21 @@ abstract class ScalarFunctionsValidateSuite extends
FunctionsValidateSuite {
|""".stripMargin
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}
+
+ // Collapse project if scan is fallback and the outer project is cheap or
fallback.
+ Seq("true", "false").foreach {
+ flag =>
+ withSQLConf(
+ GlutenConfig.COLUMNAR_PROJECT_ENABLED.key -> flag,
+ GlutenConfig.COLUMNAR_BATCHSCAN_ENABLED.key -> "false") {
+ runQueryAndCompare("SELECT l_orderkey, input_file_name() as name
FROM lineitem") {
+ df =>
+ val plan = df.queryExecution.executedPlan
+ assert(collect(plan) { case f: ProjectExecTransformer => f
}.size == 0)
+ assert(collect(plan) { case f: ProjectExec => f }.size == 1)
+ }
+ }
+ }
}
testWithSpecifiedSparkVersion("array insert", Some("3.4")) {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
index e1219fead..e92ffb943 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
@@ -16,9 +16,10 @@
*/
package org.apache.gluten.extension.columnar
-import org.apache.gluten.execution.{BatchScanExecTransformer,
FileSourceScanExecTransformer}
+import org.apache.gluten.execution.{BatchScanExecTransformer,
FileSourceScanExecTransformer, ProjectExecTransformer}
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference,
Expression, InputFileBlockLength, InputFileBlockStart, InputFileName,
NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart,
InputFileName, NamedExpression}
+import org.apache.spark.sql.catalyst.optimizer.CollapseProjectShim
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{DeserializeToObjectExec, LeafExecNode,
ProjectExec, SerializeFromObjectExec, SparkPlan, UnionExec}
@@ -34,8 +35,8 @@ import scala.collection.mutable
* Two rules are involved:
* - Before offload, add new project before leaf node and push down input
file expression to the
* new project
- * - After offload, if scan be offloaded, push down input file expression
into scan and remove
- * project
+ * - After offload, push down input file expression into scan and remove
project if scan be
+ * offloaded, collapse project if scan is fallback and the outer project
is cheap or fallback
*/
object PushDownInputFileExpression {
def containsInputFileRelatedExpr(expr: Expression): Boolean = {
@@ -113,6 +114,28 @@ object PushDownInputFileExpression {
case p @ ProjectExec(projectList, child: BatchScanExecTransformer)
if projectList.exists(containsInputFileRelatedExpr) =>
child.copy(output = p.output.asInstanceOf[Seq[AttributeReference]])
+ case p1 @ ProjectExec(_, p2: ProjectExec) if canCollapseProject(p2) =>
+ p2.copy(projectList =
+ CollapseProjectShim.buildCleanedProjectList(p1.projectList,
p2.projectList))
+ case p1 @ ProjectExecTransformer(_, p2: ProjectExec) if
canCollapseProject(p1, p2) =>
+ p2.copy(projectList =
+ CollapseProjectShim.buildCleanedProjectList(p1.projectList,
p2.projectList))
+ }
+
+ private def canCollapseProject(project: ProjectExec): Boolean = {
+ project.projectList.forall {
+ case Alias(_: InputFileName | _: InputFileBlockStart | _:
InputFileBlockLength, _) => true
+ case _: Attribute => true
+ case _ => false
+ }
+ }
+
+ private def canCollapseProject(p1: ProjectExecTransformer, p2:
ProjectExec): Boolean = {
+ canCollapseProject(p2) && p1.projectList.forall {
+ case Alias(_: Attribute, _) => true
+ case _: Attribute => true
+ case _ => false
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]