This is an automated email from the ASF dual-hosted git repository.
rui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 6025750396 [GLUTEN-8945][VL] Pull out duplicate projections for
HashProbe and FilterProject (#8946)
6025750396 is described below
commit 60257503960152e5170aa3b30771da24d445dbb5
Author: Mingliang Zhu <[email protected]>
AuthorDate: Wed Mar 19 17:57:31 2025 +0800
[GLUTEN-8945][VL] Pull out duplicate projections for HashProbe and
FilterProject (#8946)
---
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 2 +
.../gluten/extension/PullOutDuplicateProject.scala | 122 +++++++++++++++++++++
.../gluten/execution/VeloxHashJoinSuite.scala | 69 +++++++++++-
3 files changed, 192 insertions(+), 1 deletion(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 3b15fa2263..566cffd1bf 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -96,6 +96,7 @@ object VeloxRuleApi {
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
injector.injectPostTransform(_ => EnsureLocalSortRequirements)
injector.injectPostTransform(_ => EliminateLocalSort)
+ injector.injectPostTransform(_ => PullOutDuplicateProject)
injector.injectPostTransform(_ => CollapseProjectExecTransformer)
injector.injectPostTransform(c =>
FlushableHashAggregateRule.apply(c.session))
injector.injectPostTransform(c =>
InsertTransitions.create(c.outputsColumnar, VeloxBatch))
@@ -179,6 +180,7 @@ object VeloxRuleApi {
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
injector.injectPostTransform(_ => EnsureLocalSortRequirements)
injector.injectPostTransform(_ => EliminateLocalSort)
+ injector.injectPostTransform(_ => PullOutDuplicateProject)
injector.injectPostTransform(_ => CollapseProjectExecTransformer)
injector.injectPostTransform(c =>
FlushableHashAggregateRule.apply(c.session))
injector.injectPostTransform(c =>
InsertTransitions.create(c.outputsColumnar, VeloxBatch))
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/PullOutDuplicateProject.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/PullOutDuplicateProject.scala
new file mode 100644
index 0000000000..7e6958bc42
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/PullOutDuplicateProject.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.execution.{BroadcastHashJoinExecTransformer,
FilterExecTransformer, LimitExecTransformer, ProjectExecTransformer}
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeMap, AttributeSet, PredicateHelper}
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Velox does not allow duplicate projections in HashProbe and FilterProject,
this rule pull out
+ * duplicate projections to a new project outside.
+ */
+object PullOutDuplicateProject extends Rule[SparkPlan] with PredicateHelper {
+ override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
+ case l @ LimitExecTransformer(p: ProjectExecTransformer, _, _) =>
+ val pullOutAliases = new ArrayBuffer[Alias]()
+ val newChild = rewriteProject(p, AttributeSet.empty, pullOutAliases)
+ if (pullOutAliases.isEmpty) {
+ l
+ } else {
+ outerProject(l.copy(child = newChild), l.output, pullOutAliases)
+ }
+ case p @ ProjectExecTransformer(_, child: ProjectExecTransformer) =>
+ val pullOutAliases = new ArrayBuffer[Alias]()
+ val newChild = rewriteProject(child, AttributeSet.empty, pullOutAliases)
+ val aliasMap = AttributeMap(pullOutAliases.map(a => a.toAttribute -> a))
+ val newProjectList = p.projectList.map(replaceAliasButKeepName(_,
aliasMap))
+ ProjectExecTransformer(newProjectList, newChild)
+ case f @ FilterExecTransformer(_, child: ProjectExecTransformer) =>
+ val pullOutAliases = new ArrayBuffer[Alias]()
+ val newChild = rewriteProject(child, f.references, pullOutAliases)
+ if (pullOutAliases.isEmpty) {
+ f
+ } else {
+ outerProject(f.copy(child = newChild), f.output, pullOutAliases)
+ }
+ case bhj: BroadcastHashJoinExecTransformer
+ if bhj.streamedPlan.isInstanceOf[ProjectExecTransformer] =>
+ val pullOutAliases = new ArrayBuffer[Alias]()
+ val newStreamedPlan = rewriteProject(
+ bhj.streamedPlan.asInstanceOf[ProjectExecTransformer],
+ bhj.references,
+ pullOutAliases)
+ if (pullOutAliases.isEmpty) {
+ bhj
+ } else {
+ val newBhj = bhj.joinBuildSide match {
+ case BuildLeft => bhj.copy(right = newStreamedPlan)
+ case BuildRight => bhj.copy(left = newStreamedPlan)
+ }
+ outerProject(newBhj, bhj.output, pullOutAliases)
+ }
+ }
+
+ private def outerProject(
+ child: SparkPlan,
+ output: Seq[Attribute],
+ pullOutAliases: ArrayBuffer[Alias]): ProjectExecTransformer = {
+ val aliasMap = AttributeMap(pullOutAliases.map(a => a.toAttribute -> a))
+ val newProjectList = output.map(attr => aliasMap.getOrElse(attr, attr))
+ ProjectExecTransformer(newProjectList, child)
+ }
+
+ /**
+ * If there are duplicate projections and not refer to parent, only the
original attribute is kept
+ * in the project.
+ */
+ private def rewriteProject(
+ project: ProjectExecTransformer,
+ references: AttributeSet,
+ pullOutAliases: ArrayBuffer[Alias]): SparkPlan = {
+ val projectList = project.projectList
+ val duplicates = AttributeSet(
+ projectList
+ .collect {
+ case attr: Attribute if !references.contains(attr) => attr
+ case a @ Alias(attr: Attribute, _)
+ if !references.contains(a) && !references.contains(attr) =>
+ attr
+ }
+ .groupBy(_.exprId)
+ .filter(_._2.size > 1)
+ .map(_._2.head))
+ if (duplicates.nonEmpty) {
+ val newProjectList = projectList.filter {
+ case a @ Alias(attr: Attribute, _)
+ if !references.contains(a) && duplicates.contains(attr) =>
+ pullOutAliases.append(a)
+ false
+ case _ => true
+ } ++ duplicates.filter(!project.outputSet.contains(_)).toSeq
+ val newProject = project.copy(projectList = newProjectList)
+ // If the output of the new project is the same as the child, delete it
to simplify the plan.
+ if (newProject.outputSet.equals(project.child.outputSet)) {
+ project.child
+ } else {
+ newProject
+ }
+ } else {
+ project
+ }
+ }
+}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
index 2023ad97eb..9ec2ec33a6 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
@@ -21,7 +21,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference}
import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec,
ColumnarSubqueryBroadcastExec, InputIteratorTransformer}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ReusedExchangeExec}
@@ -195,4 +195,71 @@ class VeloxHashJoinSuite extends
VeloxWholeStageTransformerSuite {
}
})
}
+
+ test("pull out duplicate projections for HashProbe and FilterProject") {
+ withTable("t1", "t2", "t3") {
+ Seq((1, 1), (2, 2)).toDF("c1", "c2").write.saveAsTable("t1")
+ Seq(1, 2, 3).toDF("c1").write.saveAsTable("t2")
+ Seq(1, 2, 3).toDF("c1").write.saveAsTable("t3")
+ // test HashProbe, pull out `c2 as a,c2 as b`.
+ val q1 =
+ """
+ |select tt1.* from
+ |(select c1,c2, c2 as a,c2 as b from t1) tt1
+ |left join t2
+ |on tt1.c1 = t2.c1
+ |""".stripMargin
+ val q2 =
+ """
+ |select tt1.* from
+ |(select c1, c2 as a,c2 as b from t1) tt1
+ |left join t2
+ |on tt1.c1 = t2.c1
+ |limit 1
+ |""".stripMargin
+ val q3 =
+ """
+ |select tt1.* from
+ |(select c1, c2 as a,c2 as b from t1) tt1
+ |left join t2
+ |on tt1.c1 = t2.c1
+ |left join t3
+ |on tt1.c1 = t3.c1
+ |""".stripMargin
+ Seq(q1, q2, q3).foreach {
+ runQueryAndCompare(_) {
+ df =>
+ {
+ val executedPlan = getExecutedPlan(df)
+ val projects = executedPlan.collect {
+ case p @ ProjectExecTransformer(_, _:
BroadcastHashJoinExecTransformer) => p
+ }
+ assert(projects.nonEmpty)
+ val aliases = projects.last.projectList.collect { case a: Alias
=> a }
+ assert(aliases.size == 2)
+ }
+ }
+ }
+
+ // test FilterProject, only pull out `c2 as b`.
+ val q4 =
+ """
+ |select c1, c2, a, b from
+ |(select c1, c2, c2 as a, c2 as b, rand() as c from t1) tt1
+ |where c > -1 and b > 1
+ |""".stripMargin
+ runQueryAndCompare(q4) {
+ df =>
+ {
+ val executedPlan = getExecutedPlan(df)
+ val projects = executedPlan.collect {
+ case p @ ProjectExecTransformer(_, _: FilterExecTransformer) => p
+ }
+ assert(projects.nonEmpty)
+ val aliases = projects.last.projectList.collect { case a: Alias =>
a }
+ assert(aliases.size == 1)
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]