Andy Grove created SPARK-33744:
----------------------------------

             Summary: Canonicalization error in SortAggregate
                 Key: SPARK-33744
                 URL: https://issues.apache.org/jira/browse/SPARK-33744
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.1.0
            Reporter: Andy Grove


The canonicalization plan for a simple aggregate query is different each time 
for SortAggregate but not for HashAggregate.

The issue can be demonstrated by adding the following unit tests to 
SQLQuerySuite. The HashAggregate test passes and the SortAggregate test fails.

The first test has numeric input and the second test is operating on strings, 
which forces the use of SortAggregate rather than HashAggregate.
{code:java}
test("HashAggregate canonicalization") {
  val data = Seq((1, 1)).toDF("c0", "c1")
  val df1 = data.groupBy(col("c0")).agg(first("c1"))
  val df2 = data.groupBy(col("c0")).agg(first("c1"))
  assert(df1.queryExecution.executedPlan.canonicalized ==
      df2.queryExecution.executedPlan.canonicalized)
}

test("SortAggregate canonicalization") {
  val data = Seq(("a", "a")).toDF("c0", "c1")
  val df1 = data.groupBy(col("c0")).agg(first("c1"))
  val df2 = data.groupBy(col("c0")).agg(first("c1"))
  assert(df1.queryExecution.executedPlan.canonicalized ==
      df2.queryExecution.executedPlan.canonicalized)
} {code}
The SortAggregate test fails with the following output .
{code:java}
SortAggregate(key=[none#0], functions=[first(none#0, false)], output=[none#0, 
#1])
+- *(2) Sort [none#0 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(none#0, 5), ENSURE_REQUIREMENTS, [id=#105]
      +- SortAggregate(key=[none#0], functions=[partial_first(none#1, false)], 
output=[none#0, none#2, none#3])
         +- *(1) Sort [none#0 ASC NULLS FIRST], false, 0
            +- *(1) Project [none#0 AS #0, none#1 AS #1]
               +- *(1) LocalTableScan [none#0, none#1]

 did not equal 

SortAggregate(key=[none#0], functions=[first(none#0, false)], output=[none#0, 
#1])
+- *(2) Sort [none#0 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(none#0, 5), ENSURE_REQUIREMENTS, [id=#148]
      +- SortAggregate(key=[none#0], functions=[partial_first(none#1, false)], 
output=[none#0, none#2, none#3])
         +- *(1) Sort [none#0 ASC NULLS FIRST], false, 0
            +- *(1) Project [none#0 AS #0, none#1 AS #1]
               +- *(1) LocalTableScan [none#0, none#1] {code}
The error is caused by the resultExpression for the aggregate function being 
assigned a new ExprId in the final aggregate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to