Andy Grove created SPARK-33744:
----------------------------------
Summary: Canonicalization error in SortAggregate
Key: SPARK-33744
URL: https://issues.apache.org/jira/browse/SPARK-33744
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.1.0
Reporter: Andy Grove
The canonicalization plan for a simple aggregate query is different each time
for SortAggregate but not for HashAggregate.
The issue can be demonstrated by adding the following unit tests to
SQLQuerySuite. The HashAggregate test passes and the SortAggregate test fails.
The first test has numeric input and the second test is operating on strings,
which forces the use of SortAggregate rather than HashAggregate.
{code:java}
test("HashAggregate canonicalization") {
val data = Seq((1, 1)).toDF("c0", "c1")
val df1 = data.groupBy(col("c0")).agg(first("c1"))
val df2 = data.groupBy(col("c0")).agg(first("c1"))
assert(df1.queryExecution.executedPlan.canonicalized ==
df2.queryExecution.executedPlan.canonicalized)
}
test("SortAggregate canonicalization") {
val data = Seq(("a", "a")).toDF("c0", "c1")
val df1 = data.groupBy(col("c0")).agg(first("c1"))
val df2 = data.groupBy(col("c0")).agg(first("c1"))
assert(df1.queryExecution.executedPlan.canonicalized ==
df2.queryExecution.executedPlan.canonicalized)
} {code}
The SortAggregate test fails with the following output .
{code:java}
SortAggregate(key=[none#0], functions=[first(none#0, false)], output=[none#0,
#1])
+- *(2) Sort [none#0 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(none#0, 5), ENSURE_REQUIREMENTS, [id=#105]
+- SortAggregate(key=[none#0], functions=[partial_first(none#1, false)],
output=[none#0, none#2, none#3])
+- *(1) Sort [none#0 ASC NULLS FIRST], false, 0
+- *(1) Project [none#0 AS #0, none#1 AS #1]
+- *(1) LocalTableScan [none#0, none#1]
did not equal
SortAggregate(key=[none#0], functions=[first(none#0, false)], output=[none#0,
#1])
+- *(2) Sort [none#0 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(none#0, 5), ENSURE_REQUIREMENTS, [id=#148]
+- SortAggregate(key=[none#0], functions=[partial_first(none#1, false)],
output=[none#0, none#2, none#3])
+- *(1) Sort [none#0 ASC NULLS FIRST], false, 0
+- *(1) Project [none#0 AS #0, none#1 AS #1]
+- *(1) LocalTableScan [none#0, none#1] {code}
The error is caused by the resultExpression for the aggregate function being
assigned a new ExprId in the final aggregate.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]