This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new a091ac501 [CORE] Drop redundant partial sort which has pre-project 
when offloading sort agg (#6294)
a091ac501 is described below

commit a091ac5010968274f3f8b561d7e36054f92ed151
Author: Mingliang Zhu <[email protected]>
AuthorDate: Thu Jul 4 19:18:26 2024 +0800

    [CORE] Drop redundant partial sort which has pre-project when offloading 
sort agg (#6294)
---
 .../execution/VeloxAggregateFunctionsSuite.scala   | 26 ++++++++++++++++++++++
 .../org/apache/gluten/execution/SortUtils.scala    | 24 ++++++++++++++++----
 .../extension/columnar/OffloadSingleNode.scala     |  8 +------
 3 files changed, 47 insertions(+), 11 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
index ae6306cc0..992106d13 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
@@ -1135,6 +1135,32 @@ abstract class VeloxAggregateFunctionsSuite extends 
VeloxWholeStageTransformerSu
     df.select(max(col("txn"))).collect
 
   }
+
+  test("drop redundant partial sort which has pre-project when offload 
sortAgg") {
+    // Spark 3.2 does not have this configuration, but it does not affect the 
test results.
+    withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") {
+      withTempView("t1") {
+        Seq((-1, 2), (-1, 3), (2, 3), (3, 4), (-3, 5), (4, 5))
+          .toDF("c1", "c2")
+          .createOrReplaceTempView("t1")
+        runQueryAndCompare("select c2, sum(if(c1<0,0,c1)) from t1 group by 
c2") {
+          df =>
+            {
+              assert(
+                getExecutedPlan(df).count(
+                  plan => {
+                    plan.isInstanceOf[HashAggregateExecTransformer]
+                  }) == 2)
+              assert(
+                getExecutedPlan(df).count(
+                  plan => {
+                    plan.isInstanceOf[SortExecTransformer]
+                  }) == 0)
+            }
+        }
+      }
+    }
+  }
 }
 
 class VeloxAggregateFunctionsDefaultSuite extends VeloxAggregateFunctionsSuite 
{
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/SortUtils.scala 
b/gluten-core/src/main/scala/org/apache/gluten/execution/SortUtils.scala
index 2c0ad1b0a..b01c71738 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/execution/SortUtils.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/execution/SortUtils.scala
@@ -23,11 +23,27 @@ import org.apache.spark.sql.execution.{ProjectExec, 
SortExec, SparkPlan}
 object SortUtils {
   def dropPartialSort(plan: SparkPlan): SparkPlan = plan match {
     case RewrittenNodeWall(p) => RewrittenNodeWall(dropPartialSort(p))
-    case sort: SortExec if !sort.global => sort.child
+    case PartialSortLike(child) => child
     // from pre/post project-pulling
-    case ProjectExec(_, SortExec(_, false, ProjectExec(_, p), _))
-        if plan.outputSet == p.outputSet =>
-      p
+    case ProjectLike(PartialSortLike(ProjectLike(child))) if plan.outputSet == 
child.outputSet =>
+      child
+    case ProjectLike(PartialSortLike(child)) => 
plan.withNewChildren(Seq(child))
     case _ => plan
   }
 }
+
+object PartialSortLike {
+  def unapply(plan: SparkPlan): Option[SparkPlan] = plan match {
+    case sort: SortExecTransformer if !sort.global => Some(sort.child)
+    case sort: SortExec if !sort.global => Some(sort.child)
+    case _ => None
+  }
+}
+
+object ProjectLike {
+  def unapply(plan: SparkPlan): Option[SparkPlan] = plan match {
+    case project: ProjectExecTransformer => Some(project.child)
+    case project: ProjectExec => Some(project.child)
+    case _ => None
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
index 7a4222b5c..62c72af79 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
@@ -425,13 +425,7 @@ object OffloadOthers {
           ColumnarCoalesceExec(plan.numPartitions, plan.child)
         case plan: SortAggregateExec =>
           logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
-          HashAggregateExecBaseTransformer.from(plan) {
-            case sort: SortExecTransformer if !sort.global =>
-              sort.child
-            case sort: SortExec if !sort.global =>
-              sort.child
-            case other => other
-          }
+          
HashAggregateExecBaseTransformer.from(plan)(SortUtils.dropPartialSort)
         case plan: ObjectHashAggregateExec =>
           logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
           HashAggregateExecBaseTransformer.from(plan)()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to