This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new a091ac501 [CORE] Drop redundant partial sort which has pre-project
when offloading sort agg (#6294)
a091ac501 is described below
commit a091ac5010968274f3f8b561d7e36054f92ed151
Author: Mingliang Zhu <[email protected]>
AuthorDate: Thu Jul 4 19:18:26 2024 +0800
[CORE] Drop redundant partial sort which has pre-project when offloading
sort agg (#6294)
---
.../execution/VeloxAggregateFunctionsSuite.scala | 26 ++++++++++++++++++++++
.../org/apache/gluten/execution/SortUtils.scala | 24 ++++++++++++++++----
.../extension/columnar/OffloadSingleNode.scala | 8 +------
3 files changed, 47 insertions(+), 11 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
index ae6306cc0..992106d13 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAggregateFunctionsSuite.scala
@@ -1135,6 +1135,32 @@ abstract class VeloxAggregateFunctionsSuite extends
VeloxWholeStageTransformerSu
df.select(max(col("txn"))).collect
}
+
+ test("drop redundant partial sort which has pre-project when offload
sortAgg") {
+ // Spark 3.2 does not have this configuration, but it does not affect the
test results.
+ withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") {
+ withTempView("t1") {
+ Seq((-1, 2), (-1, 3), (2, 3), (3, 4), (-3, 5), (4, 5))
+ .toDF("c1", "c2")
+ .createOrReplaceTempView("t1")
+ runQueryAndCompare("select c2, sum(if(c1<0,0,c1)) from t1 group by
c2") {
+ df =>
+ {
+ assert(
+ getExecutedPlan(df).count(
+ plan => {
+ plan.isInstanceOf[HashAggregateExecTransformer]
+ }) == 2)
+ assert(
+ getExecutedPlan(df).count(
+ plan => {
+ plan.isInstanceOf[SortExecTransformer]
+ }) == 0)
+ }
+ }
+ }
+ }
+ }
}
class VeloxAggregateFunctionsDefaultSuite extends VeloxAggregateFunctionsSuite
{
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/SortUtils.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/SortUtils.scala
index 2c0ad1b0a..b01c71738 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/execution/SortUtils.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/execution/SortUtils.scala
@@ -23,11 +23,27 @@ import org.apache.spark.sql.execution.{ProjectExec,
SortExec, SparkPlan}
object SortUtils {
def dropPartialSort(plan: SparkPlan): SparkPlan = plan match {
case RewrittenNodeWall(p) => RewrittenNodeWall(dropPartialSort(p))
- case sort: SortExec if !sort.global => sort.child
+ case PartialSortLike(child) => child
// from pre/post project-pulling
- case ProjectExec(_, SortExec(_, false, ProjectExec(_, p), _))
- if plan.outputSet == p.outputSet =>
- p
+ case ProjectLike(PartialSortLike(ProjectLike(child))) if plan.outputSet ==
child.outputSet =>
+ child
+ case ProjectLike(PartialSortLike(child)) =>
plan.withNewChildren(Seq(child))
case _ => plan
}
}
+
+object PartialSortLike {
+ def unapply(plan: SparkPlan): Option[SparkPlan] = plan match {
+ case sort: SortExecTransformer if !sort.global => Some(sort.child)
+ case sort: SortExec if !sort.global => Some(sort.child)
+ case _ => None
+ }
+}
+
+object ProjectLike {
+ def unapply(plan: SparkPlan): Option[SparkPlan] = plan match {
+ case project: ProjectExecTransformer => Some(project.child)
+ case project: ProjectExec => Some(project.child)
+ case _ => None
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
index 7a4222b5c..62c72af79 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
@@ -425,13 +425,7 @@ object OffloadOthers {
ColumnarCoalesceExec(plan.numPartitions, plan.child)
case plan: SortAggregateExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
- HashAggregateExecBaseTransformer.from(plan) {
- case sort: SortExecTransformer if !sort.global =>
- sort.child
- case sort: SortExec if !sort.global =>
- sort.child
- case other => other
- }
+
HashAggregateExecBaseTransformer.from(plan)(SortUtils.dropPartialSort)
case plan: ObjectHashAggregateExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
HashAggregateExecBaseTransformer.from(plan)()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]