This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9fdc3754d676 [SPARK-28386][SQL] Cannot resolve ORDER BY columns with
GROUP BY and HAVING
9fdc3754d676 is described below
commit 9fdc3754d676bcb5de500c87025b9c571a7cf523
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Dec 20 16:46:54 2023 +0800
[SPARK-28386][SQL] Cannot resolve ORDER BY columns with GROUP BY and HAVING
### What changes were proposed in this pull request?
This PR enhanced the analyzer to handle the following pattern properly.
```
Sort
- Filter
- Aggregate
```
### Why are the changes needed?
```
spark-sql (default)> CREATE TABLE t1 (flag BOOLEAN, dt STRING);
spark-sql (default)> SELECT LENGTH(dt),
> COUNT(t1.flag)
> FROM t1
> GROUP BY LENGTH(dt)
> HAVING COUNT(t1.flag) > 1
> ORDER BY LENGTH(dt);
[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with
name `dt` cannot be resolved. Did you mean one of the following? [`length(dt)`,
`count(flag)`].; line 6 pos 16;
'Sort ['LENGTH('dt) ASC NULLS FIRST], true
+- Filter (count(flag)#60L > cast(1 as bigint))
+- Aggregate [length(dt#9)], [length(dt#9) AS length(dt)#59,
count(flag#8) AS count(flag)#60L]
+- SubqueryAlias spark_catalog.default.t1
+- Relation spark_catalog.default.t1[flag#8,dt#9] parquet
```
The above code demonstrates the failure case, the query failed during the
analysis phase when both `HAVING` and `ORDER BY` clauses are present, but
successful if only one is present.
### Does this PR introduce _any_ user-facing change?
Yes, maybe we can call it a bugfix.
### How was this patch tested?
New UTs are added
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44352 from pan3793/SPARK-28386.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 9 +++++++
.../analysis/ResolveReferencesInSort.scala | 16 +++++++-----
.../sql-tests/analyzer-results/having.sql.out | 29 ++++++++++++++++++++++
.../udf/postgreSQL/udf-select_having.sql.out | 11 ++++----
.../src/test/resources/sql-tests/inputs/having.sql | 6 +++++
.../resources/sql-tests/results/having.sql.out | 18 ++++++++++++++
.../approved-plans-v2_7/q6.sf100/explain.txt | 8 +++---
.../approved-plans-v2_7/q6.sf100/simplified.txt | 2 +-
.../approved-plans-v2_7/q6/explain.txt | 8 +++---
.../approved-plans-v2_7/q6/simplified.txt | 2 +-
10 files changed, 87 insertions(+), 22 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e3538647e375..94f6d3346265 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2690,6 +2690,15 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
}
s.copy(order = newSortOrder, child = newChild)
})
+
+ case s @ Sort(_, _, f @ Filter(cond, agg: Aggregate))
+ if agg.resolved && cond.resolved && s.order.forall(_.resolved) =>
+ resolveOperatorWithAggregate(s.order.map(_.child), agg, (newExprs,
newChild) => {
+ val newSortOrder = s.order.zip(newExprs).map {
+ case (sortOrder, expr) => sortOrder.copy(child = expr)
+ }
+ s.copy(order = newSortOrder, child = f.copy(child = newChild))
+ })
}
/**
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInSort.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInSort.scala
index 02583ebb8f6b..6fa723d4a75f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInSort.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInSort.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.SortOrder
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter,
LogicalPlan, Project, Sort}
import org.apache.spark.sql.connector.catalog.CatalogManager
/**
@@ -28,10 +28,11 @@ import org.apache.spark.sql.connector.catalog.CatalogManager
* includes metadata columns as well.
* 2. Resolves the column to a literal function which is allowed to be invoked
without braces, e.g.
* `SELECT col, current_date FROM t`.
- * 3. If the child plan is Aggregate, resolves the column to
[[TempResolvedColumn]] with the output
- * of Aggregate's child plan. This is to allow Sort to host grouping
expressions and aggregate
- * functions, which can be pushed down to the Aggregate later. For example,
- * `SELECT max(a) FROM t GROUP BY b ORDER BY min(a)`.
+ * 3. If the child plan is Aggregate or Filter(_, Aggregate), resolves the
column to
+ * [[TempResolvedColumn]] with the output of Aggregate's child plan.
+ * This is to allow Sort to host grouping expressions and aggregate
functions, which can
+ * be pushed down to the Aggregate later. For example,
+ * `SELECT max(a) FROM t GROUP BY b HAVING max(a) > 1 ORDER BY min(a)`.
* 4. Resolves the column to [[AttributeReference]] with the output of a
descendant plan node.
* Spark will propagate the missing attributes from the descendant plan
node to the Sort node.
* This is to allow users to ORDER BY columns that are not in the SELECT
clause, which is
@@ -51,7 +52,10 @@ class ResolveReferencesInSort(val catalogManager:
CatalogManager)
def apply(s: Sort): LogicalPlan = {
val resolvedBasic = s.order.map(resolveExpressionByPlanOutput(_, s.child))
- val resolvedWithAgg = resolvedBasic.map(resolveColWithAgg(_, s.child))
+ val resolvedWithAgg = s.child match {
+ case Filter(_, agg: Aggregate) => resolvedBasic.map(resolveColWithAgg(_,
agg))
+ case _ => resolvedBasic.map(resolveColWithAgg(_, s.child))
+ }
val (missingAttrResolved, newChild) =
resolveExprsAndAddMissingAttrs(resolvedWithAgg, s.child)
val orderByAllResolved = resolveOrderByAll(
s.global, newChild, missingAttrResolved.map(_.asInstanceOf[SortOrder]))
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/having.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/having.sql.out
index 12eb5a34146a..d96ea1e43e18 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/having.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/having.sql.out
@@ -179,3 +179,32 @@ Filter (c1#x = 1)
+- Aggregate [c1#x], [c1#x]
+- SubqueryAlias t
+- LocalRelation [c1#x, c2#x]
+
+
+-- !query
+SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY sum(v)
+-- !query analysis
+Sort [sum(v)#xL ASC NULLS FIRST], true
++- Filter (sum(v)#xL > cast(2 as bigint))
+ +- Aggregate [k#x], [k#x, sum(v#x) AS sum(v)#xL]
+ +- SubqueryAlias hav
+ +- View (`hav`, [k#x,v#x])
+ +- Project [cast(k#x as string) AS k#x, cast(v#x as int) AS v#x]
+ +- Project [k#x, v#x]
+ +- SubqueryAlias hav
+ +- LocalRelation [k#x, v#x]
+
+
+-- !query
+SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY avg(v)
+-- !query analysis
+Project [k#x, sum(v)#xL]
++- Sort [avg(v#x)#x ASC NULLS FIRST], true
+ +- Filter (sum(v)#xL > cast(2 as bigint))
+ +- Aggregate [k#x], [k#x, sum(v#x) AS sum(v)#xL, avg(v#x) AS avg(v#x)#x]
+ +- SubqueryAlias hav
+ +- View (`hav`, [k#x,v#x])
+ +- Project [cast(k#x as string) AS k#x, cast(v#x as int) AS v#x]
+ +- Project [k#x, v#x]
+ +- SubqueryAlias hav
+ +- LocalRelation [k#x, v#x]
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/udf/postgreSQL/udf-select_having.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/udf/postgreSQL/udf-select_having.sql.out
index 5f7e92a62f30..ea6b1716869f 100644
---
a/sql/core/src/test/resources/sql-tests/analyzer-results/udf/postgreSQL/udf-select_having.sql.out
+++
b/sql/core/src/test/resources/sql-tests/analyzer-results/udf/postgreSQL/udf-select_having.sql.out
@@ -102,12 +102,11 @@ Project [udf(b)#x, udf(c)#x]
SELECT udf(b), udf(c) FROM test_having
GROUP BY b, c HAVING udf(b) = 3 ORDER BY udf(b), udf(c)
-- !query analysis
-Project [udf(b)#x, udf(c)#x]
-+- Sort [cast(udf(cast(b#x as string)) as int) ASC NULLS FIRST,
cast(udf(cast(c#x as string)) as string) ASC NULLS FIRST], true
- +- Filter (udf(b)#x = 3)
- +- Aggregate [b#x, c#x], [cast(udf(cast(b#x as string)) as int) AS
udf(b)#x, cast(udf(cast(c#x as string)) as string) AS udf(c)#x, b#x, c#x]
- +- SubqueryAlias spark_catalog.default.test_having
- +- Relation spark_catalog.default.test_having[a#x,b#x,c#x,d#x]
parquet
+Sort [udf(b)#x ASC NULLS FIRST, udf(c)#x ASC NULLS FIRST], true
++- Filter (udf(b)#x = 3)
+ +- Aggregate [b#x, c#x], [cast(udf(cast(b#x as string)) as int) AS
udf(b)#x, cast(udf(cast(c#x as string)) as string) AS udf(c)#x]
+ +- SubqueryAlias spark_catalog.default.test_having
+ +- Relation spark_catalog.default.test_having[a#x,b#x,c#x,d#x] parquet
-- !query
diff --git a/sql/core/src/test/resources/sql-tests/inputs/having.sql
b/sql/core/src/test/resources/sql-tests/inputs/having.sql
index 056b99e363d2..4c25a60c8abb 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/having.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/having.sql
@@ -33,3 +33,9 @@ SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY GROUPING
SETS(t.c1) HAVING t.
SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY CUBE(t.c1) HAVING t.c1 = 1;
SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY ROLLUP(t.c1) HAVING t.c1 =
1;
SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY t.c1 HAVING t.c1 = 1;
+
+-- SPARK-28386: Resolve ORDER BY agg function with HAVING clause, while the
agg function presents on SELECT list
+SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY sum(v);
+
+-- SPARK-28386: Resolve ORDER BY agg function with HAVING clause, while the
agg function does not present on SELECT list
+SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY avg(v);
diff --git a/sql/core/src/test/resources/sql-tests/results/having.sql.out
b/sql/core/src/test/resources/sql-tests/results/having.sql.out
index 6eaba0b4119c..c9d588642636 100644
--- a/sql/core/src/test/resources/sql-tests/results/having.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/having.sql.out
@@ -134,3 +134,21 @@ SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY t.c1
HAVING t.c1 = 1
struct<c1:int>
-- !query output
1
+
+
+-- !query
+SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY sum(v)
+-- !query schema
+struct<k:string,sum(v):bigint>
+-- !query output
+three 3
+one 6
+
+
+-- !query
+SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY avg(v)
+-- !query schema
+struct<k:string,sum(v):bigint>
+-- !query output
+one 6
+three 3
diff --git
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt
index afdfc51a17dd..82a6e00c79c4 100644
---
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt
+++
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt
@@ -248,15 +248,15 @@ Input [2]: [ca_state#18, count#22]
Keys [1]: [ca_state#18]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#23]
-Results [3]: [ca_state#18 AS state#24, count(1)#23 AS cnt#25, ca_state#18]
+Results [2]: [ca_state#18 AS state#24, count(1)#23 AS cnt#25]
(44) Filter [codegen id : 14]
-Input [3]: [state#24, cnt#25, ca_state#18]
+Input [2]: [state#24, cnt#25]
Condition : (cnt#25 >= 10)
(45) TakeOrderedAndProject
-Input [3]: [state#24, cnt#25, ca_state#18]
-Arguments: 100, [cnt#25 ASC NULLS FIRST, ca_state#18 ASC NULLS FIRST],
[state#24, cnt#25]
+Input [2]: [state#24, cnt#25]
+Arguments: 100, [cnt#25 ASC NULLS FIRST, state#24 ASC NULLS FIRST], [state#24,
cnt#25]
===== Subqueries =====
diff --git
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt
index 7339df16a289..d69eb47d92c8 100644
---
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt
+++
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt
@@ -1,4 +1,4 @@
-TakeOrderedAndProject [cnt,ca_state,state]
+TakeOrderedAndProject [cnt,state]
WholeStageCodegen (14)
Filter [cnt]
HashAggregate [ca_state,count] [count(1),state,cnt,count]
diff --git
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt
index a2638dac5645..507d4991a046 100644
---
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt
+++
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt
@@ -218,15 +218,15 @@ Input [2]: [ca_state#2, count#22]
Keys [1]: [ca_state#2]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#23]
-Results [3]: [ca_state#2 AS state#24, count(1)#23 AS cnt#25, ca_state#2]
+Results [2]: [ca_state#2 AS state#24, count(1)#23 AS cnt#25]
(38) Filter [codegen id : 8]
-Input [3]: [state#24, cnt#25, ca_state#2]
+Input [2]: [state#24, cnt#25]
Condition : (cnt#25 >= 10)
(39) TakeOrderedAndProject
-Input [3]: [state#24, cnt#25, ca_state#2]
-Arguments: 100, [cnt#25 ASC NULLS FIRST, ca_state#2 ASC NULLS FIRST],
[state#24, cnt#25]
+Input [2]: [state#24, cnt#25]
+Arguments: 100, [cnt#25 ASC NULLS FIRST, state#24 ASC NULLS FIRST], [state#24,
cnt#25]
===== Subqueries =====
diff --git
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
index c9c8358ba0b9..a15b638fbb2c 100644
---
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
+++
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
@@ -1,4 +1,4 @@
-TakeOrderedAndProject [cnt,ca_state,state]
+TakeOrderedAndProject [cnt,state]
WholeStageCodegen (8)
Filter [cnt]
HashAggregate [ca_state,count] [count(1),state,cnt,count]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]