This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c67a774f084c [SPARK-52895][SQL] Don't add duplicate elements in
`resolveExprsWithAggregate`
c67a774f084c is described below
commit c67a774f084c97578b63a00fd9bb70d4e71838af
Author: Mihailo Timotic <[email protected]>
AuthorDate: Mon Jul 21 18:34:24 2025 +0800
[SPARK-52895][SQL] Don't add duplicate elements in
`resolveExprsWithAggregate`
### What changes were proposed in this pull request?
Don't add duplicate elements in `resolveExprsWithAggregate`.
### Why are the changes needed?
This is needed in order to resolve plan mismatches between fixed-point and
single-pass analyzer. At the moment fixed-point duplicates columns if there are
duplicate columns missing in HAVING/ORDER BY. However, if there are LCAs,
fixed-point will deduplicate these columns because LCA resolution uses a set
(and LCA resolution runs after ORDER BY/HAVING resolution in fixed-point). In
single-pass LCA resolution is done first and only after comes ORDER BY/HAVING
resolution which adds dupli [...]
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added new test cases to golden files.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #51567 from mihailotim-db/mihailotim-db/deduplicate_agg_exprs.
Authored-by: Mihailo Timotic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 34 ++++++++++----------
.../analysis/resolver/HavingResolver.scala | 4 ++-
.../resolver/ResolvesNameByHiddenOutput.scala | 14 +++++++++
.../catalyst/analysis/resolver/SortResolver.scala | 4 ++-
.../analyzer-results/group-analytics.sql.out | 2 +-
.../sql-tests/analyzer-results/having.sql.out | 20 ++++++++++++
.../sql-tests/analyzer-results/order-by.sql.out | 20 ++++++++++++
.../udf/udf-group-analytics.sql.out | 2 +-
.../src/test/resources/sql-tests/inputs/having.sql | 5 +++
.../test/resources/sql-tests/inputs/order-by.sql | 5 +++
.../resources/sql-tests/results/having.sql.out | 16 ++++++++++
.../resources/sql-tests/results/order-by.sql.out | 16 ++++++++++
.../scala/org/apache/spark/sql/SubquerySuite.scala | 36 +++++++++++++++++++++-
13 files changed, 157 insertions(+), 21 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 532bf51a517c..ae3c0c303699 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.analysis
import java.util
-import java.util.Locale
+import java.util.{LinkedHashMap, Locale}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -2919,7 +2919,7 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
def resolveExprsWithAggregate(
exprs: Seq[Expression],
agg: Aggregate): (Seq[NamedExpression], Seq[Expression]) = {
- val extraAggExprs = ArrayBuffer.empty[NamedExpression]
+ val extraAggExprs = new LinkedHashMap[Expression, NamedExpression]
val transformed = exprs.map { e =>
if (!e.resolved) {
e
@@ -2927,13 +2927,13 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
buildAggExprList(e, agg, extraAggExprs)
}
}
- (extraAggExprs.toSeq, transformed)
+ (extraAggExprs.values().asScala.toSeq, transformed)
}
private def buildAggExprList(
expr: Expression,
agg: Aggregate,
- aggExprList: ArrayBuffer[NamedExpression]): Expression = {
+ aggExprMap: LinkedHashMap[Expression, NamedExpression]): Expression = {
// Avoid adding an extra aggregate expression if it's already present in
// `agg.aggregateExpressions`. Trim inner aliases from aggregate
expressions because of
// expressions like `spark_grouping_id` that can have inner aliases.
@@ -2949,20 +2949,22 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
expr match {
case ae: AggregateExpression =>
val cleaned = trimTempResolvedColumn(ae)
- val alias =
- Alias(cleaned, toPrettySQL(e = cleaned,
shouldTrimTempResolvedColumn = true))()
- aggExprList += alias
- alias.toAttribute
+ val resultAlias = aggExprMap.computeIfAbsent(
+ cleaned.canonicalized,
+ _ => Alias(cleaned, toPrettySQL(e = cleaned,
shouldTrimTempResolvedColumn = true))()
+ )
+ resultAlias.toAttribute
case grouping: Expression if
agg.groupingExpressions.exists(grouping.semanticEquals) =>
trimTempResolvedColumn(grouping) match {
case ne: NamedExpression =>
- aggExprList += ne
- ne.toAttribute
+ val resultAttribute =
aggExprMap.computeIfAbsent(ne.canonicalized, _ => ne)
+ resultAttribute.toAttribute
case other =>
- val alias =
- Alias(other, toPrettySQL(e = other,
shouldTrimTempResolvedColumn = true))()
- aggExprList += alias
- alias.toAttribute
+ val resultAlias = aggExprMap.computeIfAbsent(
+ other.canonicalized,
+ _ => Alias(other, toPrettySQL(e = other,
shouldTrimTempResolvedColumn = true))()
+ )
+ resultAlias.toAttribute
}
case t: TempResolvedColumn =>
if (t.child.isInstanceOf[Attribute]) {
@@ -2977,7 +2979,7 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
val childWithTempCol = t.child.transformUp {
case a: Attribute => TempResolvedColumn(a, Seq(a.name))
}
- val newChild = buildAggExprList(childWithTempCol, agg,
aggExprList)
+ val newChild = buildAggExprList(childWithTempCol, agg,
aggExprMap)
if (newChild.containsPattern(TEMP_RESOLVED_COLUMN)) {
withOrigin(t.origin)(t.copy(hasTried = true))
} else {
@@ -2985,7 +2987,7 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
}
}
case other =>
- other.withNewChildren(other.children.map(buildAggExprList(_, agg,
aggExprList)))
+ other.withNewChildren(other.children.map(buildAggExprList(_, agg,
aggExprMap)))
}
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HavingResolver.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HavingResolver.scala
index e84eb9a0bc29..ec0907807d5d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HavingResolver.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HavingResolver.scala
@@ -74,8 +74,10 @@ class HavingResolver(resolver: Resolver, expressionResolver:
ExpressionResolver)
val (resolvedConditionWithAliasReplacement, filteredMissingExpressions) =
tryReplaceSortOrderOrHavingConditionWithAlias(resolvedCondition, scopes,
missingExpressions)
+ val deduplicatedMissingExpressions =
deduplicateMissingExpressions(filteredMissingExpressions)
+
val resolvedChildWithMissingAttributes =
- insertMissingExpressions(resolvedChild, filteredMissingExpressions)
+ insertMissingExpressions(resolvedChild, deduplicatedMissingExpressions)
val isChildChangedByMissingExpressions =
!resolvedChildWithMissingAttributes.eq(resolvedChild)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolvesNameByHiddenOutput.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolvesNameByHiddenOutput.scala
index cab55ea3b66a..ca6f4a63c88e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolvesNameByHiddenOutput.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolvesNameByHiddenOutput.scala
@@ -222,6 +222,20 @@ trait ResolvesNameByHiddenOutput extends SQLConfHelper {
case other => other
}
+ /**
+ * Deduplicates missing expressions by [[ExprId]].
+ */
+ def deduplicateMissingExpressions(
+ missingExpressions: Seq[NamedExpression]): Seq[NamedExpression] = {
+ val duplicateMissingExpressions = new HashSet[ExprId]
+ missingExpressions.collect {
+ case expression: NamedExpression
+ if !duplicateMissingExpressions.contains(expression.exprId) =>
+ duplicateMissingExpressions.add(expression.exprId)
+ expression
+ }
+ }
+
private def expandOperatorsOutputList(
operator: UnaryNode,
operatorOutput: Seq[NamedExpression],
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/SortResolver.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/SortResolver.scala
index 0c7432af7193..ead425510d57 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/SortResolver.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/SortResolver.scala
@@ -134,8 +134,10 @@ class SortResolver(operatorResolver: Resolver,
expressionResolver: ExpressionRes
val (resolvedOrderExpressionsWithAliasesReplaced,
filteredMissingExpressions) =
tryReplaceSortOrderWithAlias(resolvedOrderExpressions,
missingExpressions)
+ val deduplicatedMissingExpressions =
deduplicateMissingExpressions(filteredMissingExpressions)
+
val resolvedChildWithMissingAttributes =
- insertMissingExpressions(resolvedChild, filteredMissingExpressions)
+ insertMissingExpressions(resolvedChild, deduplicatedMissingExpressions)
val isChildChangedByMissingExpressions =
!resolvedChildWithMissingAttributes.eq(resolvedChild)
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/group-analytics.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/group-analytics.sql.out
index b1f7aecc8051..fc0f6fef8c16 100644
---
a/sql/core/src/test/resources/sql-tests/analyzer-results/group-analytics.sql.out
+++
b/sql/core/src/test/resources/sql-tests/analyzer-results/group-analytics.sql.out
@@ -383,7 +383,7 @@ HAVING GROUPING(year) = 1 AND GROUPING_ID(course, year) > 0
ORDER BY course, yea
Sort [course#x ASC NULLS FIRST, year#x ASC NULLS FIRST], true
+- Project [course#x, year#x]
+- Filter ((cast(cast((shiftright(spark_grouping_id#xL, 0) & 1) as tinyint)
as int) = 1) AND (spark_grouping_id#xL > cast(0 as bigint)))
- +- Aggregate [course#x, year#x, spark_grouping_id#xL], [course#x,
year#x, spark_grouping_id#xL, spark_grouping_id#xL]
+ +- Aggregate [course#x, year#x, spark_grouping_id#xL], [course#x,
year#x, spark_grouping_id#xL]
+- Expand [[course#x, year#x, earnings#x, course#x, year#x, 0],
[course#x, year#x, earnings#x, course#x, null, 1], [course#x, year#x,
earnings#x, null, year#x, 2], [course#x, year#x, earnings#x, null, null, 3]],
[course#x, year#x, earnings#x, course#x, year#x, spark_grouping_id#xL]
+- Project [course#x, year#x, earnings#x, course#x AS course#x,
year#x AS year#x]
+- SubqueryAlias coursesales
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/having.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/having.sql.out
index 918c3fc5c4ba..dab6b87e8073 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/having.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/having.sql.out
@@ -486,3 +486,23 @@ Filter cast(scalar-subquery#x [alias#x] as boolean)
: +- OneRowRelation
+- Aggregate [col1#x], [col1#x AS alias#x]
+- LocalRelation [col1#x]
+
+
+-- !query
+SELECT col1 FROM VALUES(1,2) GROUP BY col1, col2 HAVING col2 = col2
+-- !query analysis
+Project [col1#x]
++- Filter (col2#x = col2#x)
+ +- Aggregate [col1#x, col2#x], [col1#x, col2#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT col1 AS a, a AS b FROM VALUES(1,2) GROUP BY col1, col2 HAVING col2 =
col2
+-- !query analysis
+Project [a#x, b#x]
++- Filter (col2#x = col2#x)
+ +- Project [a#x, a#x AS b#x, col2#x]
+ +- Project [col1#x, col2#x, col1#x AS a#x]
+ +- Aggregate [col1#x, col2#x], [col1#x, col2#x]
+ +- LocalRelation [col1#x, col2#x]
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/order-by.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/order-by.sql.out
index 9c9305ac632d..920e16ea8687 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/order-by.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/order-by.sql.out
@@ -454,6 +454,26 @@ Sort [(sum(b) + 1)#xL ASC NULLS FIRST], true
+- LocalRelation [a#x, b#x]
+-- !query
+SELECT col1 FROM VALUES(1,2) GROUP BY col1, col2 ORDER BY col2, col2
+-- !query analysis
+Project [col1#x]
++- Sort [col2#x ASC NULLS FIRST, col2#x ASC NULLS FIRST], true
+ +- Aggregate [col1#x, col2#x], [col1#x, col2#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT col1 AS a, a AS b FROM VALUES(1,2) GROUP BY col1, col2 ORDER BY col2,
col2
+-- !query analysis
+Project [a#x, b#x]
++- Sort [col2#x ASC NULLS FIRST, col2#x ASC NULLS FIRST], true
+ +- Project [a#x, a#x AS b#x, col2#x]
+ +- Project [col1#x, col2#x, col1#x AS a#x]
+ +- Aggregate [col1#x, col2#x], [col1#x, col2#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
-- !query
DROP VIEW IF EXISTS testData
-- !query analysis
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/udf/udf-group-analytics.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/udf/udf-group-analytics.sql.out
index 9874bdcc95b6..fc8199333f09 100644
---
a/sql/core/src/test/resources/sql-tests/analyzer-results/udf/udf-group-analytics.sql.out
+++
b/sql/core/src/test/resources/sql-tests/analyzer-results/udf/udf-group-analytics.sql.out
@@ -256,7 +256,7 @@ HAVING GROUPING(year) = 1 AND GROUPING_ID(course, year) > 0
ORDER BY course, udf
Sort [course#x ASC NULLS FIRST, cast(udf(cast(year#x as string)) as int) ASC
NULLS FIRST], true
+- Project [course#x, year#x]
+- Filter ((cast(cast((shiftright(spark_grouping_id#xL, 0) & 1) as tinyint)
as int) = 1) AND (spark_grouping_id#xL > cast(0 as bigint)))
- +- Aggregate [course#x, year#x, spark_grouping_id#xL], [course#x,
year#x, spark_grouping_id#xL, spark_grouping_id#xL]
+ +- Aggregate [course#x, year#x, spark_grouping_id#xL], [course#x,
year#x, spark_grouping_id#xL]
+- Expand [[course#x, year#x, earnings#x, course#x, year#x, 0],
[course#x, year#x, earnings#x, course#x, null, 1], [course#x, year#x,
earnings#x, null, year#x, 2], [course#x, year#x, earnings#x, null, null, 3]],
[course#x, year#x, earnings#x, course#x, year#x, spark_grouping_id#xL]
+- Project [course#x, year#x, earnings#x, course#x AS course#x,
year#x AS year#x]
+- SubqueryAlias coursesales
diff --git a/sql/core/src/test/resources/sql-tests/inputs/having.sql
b/sql/core/src/test/resources/sql-tests/inputs/having.sql
index 49998757bb0c..92f1d5aec74d 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/having.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/having.sql
@@ -91,3 +91,8 @@ GROUP BY col1
HAVING (
SELECT col1[0] = 1
);
+
+-- Missing attribute (col2) in HAVING is added only once
+
+SELECT col1 FROM VALUES(1,2) GROUP BY col1, col2 HAVING col2 = col2;
+SELECT col1 AS a, a AS b FROM VALUES(1,2) GROUP BY col1, col2 HAVING col2 =
col2;
diff --git a/sql/core/src/test/resources/sql-tests/inputs/order-by.sql
b/sql/core/src/test/resources/sql-tests/inputs/order-by.sql
index c9590a70046c..88c04377de0b 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/order-by.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/order-by.sql
@@ -54,5 +54,10 @@ SELECT MAX(a) + SUM(b) FROM testData ORDER BY SUM(b) +
MAX(a);
SELECT SUM(a) + 1 + MIN(a) FROM testData ORDER BY 1 + 1 + 1 + MIN(a) + 1 +
SUM(a);
SELECT SUM(b) + 1 FROM testData HAVING SUM(b) + 1 > 0 ORDER BY SUM(b) + 1;
+-- Missing attribute (col2) in ORDER BY is added only once
+
+SELECT col1 FROM VALUES(1,2) GROUP BY col1, col2 ORDER BY col2, col2;
+SELECT col1 AS a, a AS b FROM VALUES(1,2) GROUP BY col1, col2 ORDER BY col2,
col2;
+
-- Clean up
DROP VIEW IF EXISTS testData;
diff --git a/sql/core/src/test/resources/sql-tests/results/having.sql.out
b/sql/core/src/test/resources/sql-tests/results/having.sql.out
index 135ab5990028..4ea7add3fc72 100644
--- a/sql/core/src/test/resources/sql-tests/results/having.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/having.sql.out
@@ -343,3 +343,19 @@ HAVING (
struct<alias:map<string,int>>
-- !query output
+
+
+-- !query
+SELECT col1 FROM VALUES(1,2) GROUP BY col1, col2 HAVING col2 = col2
+-- !query schema
+struct<col1:int>
+-- !query output
+1
+
+
+-- !query
+SELECT col1 AS a, a AS b FROM VALUES(1,2) GROUP BY col1, col2 HAVING col2 =
col2
+-- !query schema
+struct<a:int,b:int>
+-- !query output
+1 1
diff --git a/sql/core/src/test/resources/sql-tests/results/order-by.sql.out
b/sql/core/src/test/resources/sql-tests/results/order-by.sql.out
index 3666bc49189e..6ce8d395fb0d 100644
--- a/sql/core/src/test/resources/sql-tests/results/order-by.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/order-by.sql.out
@@ -442,6 +442,22 @@ struct<(sum(b) + 1):bigint>
11
+-- !query
+SELECT col1 FROM VALUES(1,2) GROUP BY col1, col2 ORDER BY col2, col2
+-- !query schema
+struct<col1:int>
+-- !query output
+1
+
+
+-- !query
+SELECT col1 AS a, a AS b FROM VALUES(1,2) GROUP BY col1, col2 ORDER BY col2,
col2
+-- !query schema
+struct<a:int,b:int>
+-- !query output
+1 1
+
+
-- !query
DROP VIEW IF EXISTS testData
-- !query schema
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 576f93e94ec1..205f4f7eec80 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkRuntimeException
-import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
+import org.apache.spark.sql.catalyst.expressions.{EqualTo, NamedExpression,
OuterReference, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join,
LogicalPlan, Project, Sort, Union}
import org.apache.spark.sql.execution._
@@ -2846,4 +2846,38 @@ class SubquerySuite extends QueryTest
:: Row(true) :: Row(true) :: Row(true) :: Nil
)
}
+
+
+ test("SPARK-52896: Outer reference ExprId should match exposed attribute") {
+ val plan =
+ sql(
+ """
+ | SELECT col1
+ | FROM VALUES(1,2)
+ | GROUP BY col1
+ | HAVING MAX(col2) == (SELECT 1 WHERE MAX(col2) = 1)
+ |
+ """.stripMargin).queryExecution.analyzed
+
+ // Expected plan:
+ // Project
+ // +- Filter (scalar-subquery)
+ // : +- Project
+ // : +- Filter
+ // : +- OneRowRelation
+ // +- Aggregate
+ // +- LocalRelation
+
+ val havingNode = plan.asInstanceOf[Project].child.asInstanceOf[Filter]
+ val subquery =
+
havingNode.condition.asInstanceOf[EqualTo].right.asInstanceOf[SubqueryExpression]
+ val subqueryFilter =
subquery.plan.asInstanceOf[Project].child.asInstanceOf[Filter]
+
+ val exposedAttribute =
subquery.getOuterAttrs.head.asInstanceOf[NamedExpression]
+ val outerReferenceAttribute =
subqueryFilter.condition.asInstanceOf[EqualTo].collectFirst {
+ case outerReference: OuterReference => outerReference.e
+ }.get
+
+ assert(exposedAttribute.exprId == outerReferenceAttribute.exprId)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]