This is an automated email from the ASF dual-hosted git repository.

rui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 6025750396 [GLUTEN-8945][VL] Pull out duplicate projections for 
HashProbe and FilterProject (#8946)
6025750396 is described below

commit 60257503960152e5170aa3b30771da24d445dbb5
Author: Mingliang Zhu <[email protected]>
AuthorDate: Wed Mar 19 17:57:31 2025 +0800

    [GLUTEN-8945][VL] Pull out duplicate projections for HashProbe and 
FilterProject (#8946)
---
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |   2 +
 .../gluten/extension/PullOutDuplicateProject.scala | 122 +++++++++++++++++++++
 .../gluten/execution/VeloxHashJoinSuite.scala      |  69 +++++++++++-
 3 files changed, 192 insertions(+), 1 deletion(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 3b15fa2263..566cffd1bf 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -96,6 +96,7 @@ object VeloxRuleApi {
     injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
     injector.injectPostTransform(_ => EnsureLocalSortRequirements)
     injector.injectPostTransform(_ => EliminateLocalSort)
+    injector.injectPostTransform(_ => PullOutDuplicateProject)
     injector.injectPostTransform(_ => CollapseProjectExecTransformer)
     injector.injectPostTransform(c => 
FlushableHashAggregateRule.apply(c.session))
     injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, VeloxBatch))
@@ -179,6 +180,7 @@ object VeloxRuleApi {
     injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
     injector.injectPostTransform(_ => EnsureLocalSortRequirements)
     injector.injectPostTransform(_ => EliminateLocalSort)
+    injector.injectPostTransform(_ => PullOutDuplicateProject)
     injector.injectPostTransform(_ => CollapseProjectExecTransformer)
     injector.injectPostTransform(c => 
FlushableHashAggregateRule.apply(c.session))
     injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, VeloxBatch))
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/PullOutDuplicateProject.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/PullOutDuplicateProject.scala
new file mode 100644
index 0000000000..7e6958bc42
--- /dev/null
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/PullOutDuplicateProject.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.execution.{BroadcastHashJoinExecTransformer, 
FilterExecTransformer, LimitExecTransformer, ProjectExecTransformer}
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeMap, AttributeSet, PredicateHelper}
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Velox does not allow duplicate projections in HashProbe and FilterProject, 
this rule pull out
+ * duplicate projections to a new project outside.
+ */
+object PullOutDuplicateProject extends Rule[SparkPlan] with PredicateHelper {
+  override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
+    case l @ LimitExecTransformer(p: ProjectExecTransformer, _, _) =>
+      val pullOutAliases = new ArrayBuffer[Alias]()
+      val newChild = rewriteProject(p, AttributeSet.empty, pullOutAliases)
+      if (pullOutAliases.isEmpty) {
+        l
+      } else {
+        outerProject(l.copy(child = newChild), l.output, pullOutAliases)
+      }
+    case p @ ProjectExecTransformer(_, child: ProjectExecTransformer) =>
+      val pullOutAliases = new ArrayBuffer[Alias]()
+      val newChild = rewriteProject(child, AttributeSet.empty, pullOutAliases)
+      val aliasMap = AttributeMap(pullOutAliases.map(a => a.toAttribute -> a))
+      val newProjectList = p.projectList.map(replaceAliasButKeepName(_, 
aliasMap))
+      ProjectExecTransformer(newProjectList, newChild)
+    case f @ FilterExecTransformer(_, child: ProjectExecTransformer) =>
+      val pullOutAliases = new ArrayBuffer[Alias]()
+      val newChild = rewriteProject(child, f.references, pullOutAliases)
+      if (pullOutAliases.isEmpty) {
+        f
+      } else {
+        outerProject(f.copy(child = newChild), f.output, pullOutAliases)
+      }
+    case bhj: BroadcastHashJoinExecTransformer
+        if bhj.streamedPlan.isInstanceOf[ProjectExecTransformer] =>
+      val pullOutAliases = new ArrayBuffer[Alias]()
+      val newStreamedPlan = rewriteProject(
+        bhj.streamedPlan.asInstanceOf[ProjectExecTransformer],
+        bhj.references,
+        pullOutAliases)
+      if (pullOutAliases.isEmpty) {
+        bhj
+      } else {
+        val newBhj = bhj.joinBuildSide match {
+          case BuildLeft => bhj.copy(right = newStreamedPlan)
+          case BuildRight => bhj.copy(left = newStreamedPlan)
+        }
+        outerProject(newBhj, bhj.output, pullOutAliases)
+      }
+  }
+
+  private def outerProject(
+      child: SparkPlan,
+      output: Seq[Attribute],
+      pullOutAliases: ArrayBuffer[Alias]): ProjectExecTransformer = {
+    val aliasMap = AttributeMap(pullOutAliases.map(a => a.toAttribute -> a))
+    val newProjectList = output.map(attr => aliasMap.getOrElse(attr, attr))
+    ProjectExecTransformer(newProjectList, child)
+  }
+
+  /**
+   * If there are duplicate projections and not refer to parent, only the 
original attribute is kept
+   * in the project.
+   */
+  private def rewriteProject(
+      project: ProjectExecTransformer,
+      references: AttributeSet,
+      pullOutAliases: ArrayBuffer[Alias]): SparkPlan = {
+    val projectList = project.projectList
+    val duplicates = AttributeSet(
+      projectList
+        .collect {
+          case attr: Attribute if !references.contains(attr) => attr
+          case a @ Alias(attr: Attribute, _)
+              if !references.contains(a) && !references.contains(attr) =>
+            attr
+        }
+        .groupBy(_.exprId)
+        .filter(_._2.size > 1)
+        .map(_._2.head))
+    if (duplicates.nonEmpty) {
+      val newProjectList = projectList.filter {
+        case a @ Alias(attr: Attribute, _)
+            if !references.contains(a) && duplicates.contains(attr) =>
+          pullOutAliases.append(a)
+          false
+        case _ => true
+      } ++ duplicates.filter(!project.outputSet.contains(_)).toSeq
+      val newProject = project.copy(projectList = newProjectList)
+      // If the output of the new project is the same as the child, delete it 
to simplify the plan.
+      if (newProject.outputSet.equals(project.child.outputSet)) {
+        project.child
+      } else {
+        newProject
+      }
+    } else {
+      project
+    }
+  }
+}
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
index 2023ad97eb..9ec2ec33a6 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
@@ -21,7 +21,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference}
 import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, 
ColumnarSubqueryBroadcastExec, InputIteratorTransformer}
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ReusedExchangeExec}
 
@@ -195,4 +195,71 @@ class VeloxHashJoinSuite extends 
VeloxWholeStageTransformerSuite {
           }
         })
   }
+
+  test("pull out duplicate projections for HashProbe and FilterProject") {
+    withTable("t1", "t2", "t3") {
+      Seq((1, 1), (2, 2)).toDF("c1", "c2").write.saveAsTable("t1")
+      Seq(1, 2, 3).toDF("c1").write.saveAsTable("t2")
+      Seq(1, 2, 3).toDF("c1").write.saveAsTable("t3")
+      // test HashProbe, pull out `c2 as a,c2 as b`.
+      val q1 =
+        """
+          |select tt1.* from
+          |(select c1,c2, c2 as a,c2 as b from t1) tt1
+          |left join t2
+          |on tt1.c1 = t2.c1
+          |""".stripMargin
+      val q2 =
+        """
+          |select tt1.* from
+          |(select c1, c2 as a,c2 as b from t1) tt1
+          |left join t2
+          |on tt1.c1 = t2.c1
+          |limit 1
+          |""".stripMargin
+      val q3 =
+        """
+          |select tt1.* from
+          |(select c1, c2 as a,c2 as b from t1) tt1
+          |left join t2
+          |on tt1.c1 = t2.c1
+          |left join t3
+          |on tt1.c1 = t3.c1
+          |""".stripMargin
+      Seq(q1, q2, q3).foreach {
+        runQueryAndCompare(_) {
+          df =>
+            {
+              val executedPlan = getExecutedPlan(df)
+              val projects = executedPlan.collect {
+                case p @ ProjectExecTransformer(_, _: 
BroadcastHashJoinExecTransformer) => p
+              }
+              assert(projects.nonEmpty)
+              val aliases = projects.last.projectList.collect { case a: Alias 
=> a }
+              assert(aliases.size == 2)
+            }
+        }
+      }
+
+      // test FilterProject, only pull out `c2 as b`.
+      val q4 =
+        """
+          |select c1, c2, a, b from
+          |(select c1, c2, c2 as a, c2 as b, rand() as c from t1) tt1
+          |where c > -1 and b > 1
+          |""".stripMargin
+      runQueryAndCompare(q4) {
+        df =>
+          {
+            val executedPlan = getExecutedPlan(df)
+            val projects = executedPlan.collect {
+              case p @ ProjectExecTransformer(_, _: FilterExecTransformer) => p
+            }
+            assert(projects.nonEmpty)
+            val aliases = projects.last.projectList.collect { case a: Alias => 
a }
+            assert(aliases.size == 1)
+          }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to