This is an automated email from the ASF dual-hosted git repository.
lgbo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 27ab77c97b [GLUTEN-7759][CH]Fix pre project push down in aggregate
(#7779)
27ab77c97b is described below
commit 27ab77c97b72bac0614ae4f1ec749047089dca2b
Author: kevinyhzou <[email protected]>
AuthorDate: Thu Nov 7 14:10:25 2024 +0800
[GLUTEN-7759][CH]Fix pre project push down in aggregate (#7779)
* fix pre project
* add test
* another fix
* fix ci
* fix ci
* fix review
---------
Co-authored-by: zouyunhe <[email protected]>
---
...PushdownAggregatePreProjectionAheadExpand.scala | 26 +++++++++++++++++++---
.../GlutenClickHouseTPCHSaltNullParquetSuite.scala | 25 +++++++++++++++++++++
2 files changed, 48 insertions(+), 3 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/PushdownAggregatePreProjectionAheadExpand.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/PushdownAggregatePreProjectionAheadExpand.scala
index a3fab3c954..21f1be2f2f 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/PushdownAggregatePreProjectionAheadExpand.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/PushdownAggregatePreProjectionAheadExpand.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
// If there is an expression (not a attribute) in an aggregation function's
-// parameters. It will introduce a pr-projection to calculate the expression
+// parameters. It will introduce a pre-projection to calculate the expression
// at first, and make all the parameters be attributes.
// If it's a aggregation with grouping set, this pre-projection is placed after
// expand operator. This is not efficient, we cannot move this pre-projection
@@ -83,7 +83,7 @@ case class PushdownAggregatePreProjectionAheadExpand(session:
SparkSession)
val originInputAttributes = aheadProjectExprs.filter(e =>
isAttributeOrLiteral(e))
val preProjectExprs = aheadProjectExprs.filter(e =>
!isAttributeOrLiteral(e))
- if (preProjectExprs.length == 0) {
+ if (preProjectExprs.isEmpty) {
return hashAggregate
}
@@ -93,11 +93,31 @@ case class
PushdownAggregatePreProjectionAheadExpand(session: SparkSession)
return hashAggregate
}
+ def projectInputExists(expr: Expression, inputs: Seq[Attribute]):
Boolean = {
+ expr.children.foreach {
+ case a: Attribute =>
+ return inputs.indexOf(a) != -1
+ case p: Expression =>
+ return projectInputExists(p, inputs)
+ case _ =>
+ return true
+ }
+ true
+ }
+
+ val couldPushDown = preProjectExprs.forall {
+ case p: Expression => projectInputExists(p, rootChild.output)
+ case _ => true
+ }
+
+ if (!couldPushDown) {
+ return hashAggregate;
+ }
+
// The new ahead project node will take rootChild's output and
preProjectExprs as the
// the projection expressions.
val aheadProject = ProjectExecTransformer(rootChild.output ++
preProjectExprs, rootChild)
val aheadProjectOuput = aheadProject.output
-
val preProjectOutputAttrs = aheadProjectOuput.filter(
e =>
!originInputAttributes.exists(_.exprId.equals(e.asInstanceOf[NamedExpression].exprId)))
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
index 40b704d2e8..12047b300c 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
@@ -3067,5 +3067,30 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
|""".stripMargin
compareResultsAgainstVanillaSpark(sql, true, checkLazyExpand)
}
+
+ test("GLUTEN-7759: Fix bug of agg pre-project push down") {
+ val table_create_sql =
+ "create table test_tbl_7759(id bigint, name string, day string) using
parquet"
+ val insert_data_sql =
+ "insert into test_tbl_7759 values(1, 'a123', '2024-11-01'),(2, 'a124',
'2024-11-01')"
+ val query_sql =
+ """
+ |select distinct day, name from(
+ |select '2024-11-01' as day
+ |,coalesce(name,'all') name
+ |,cnt
+ |from
+ |(
+ |select count(distinct id) as cnt, name
+ |from test_tbl_7759
+ |group by name
+ |with cube
+ |)) limit 10
+ |""".stripMargin
+ spark.sql(table_create_sql)
+ spark.sql(insert_data_sql)
+ compareResultsAgainstVanillaSpark(query_sql, true, { _ => })
+ spark.sql("drop table test_tbl_7759")
+ }
}
// scalastyle:on line.size.limit
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]