[ 
https://issues.apache.org/jira/browse/SPARK-34464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17311845#comment-17311845
 ] 

Pablo Langa Blanco commented on SPARK-34464:
--------------------------------------------

Hi [~lfruleux],

Here it's a link that explain very good the reasons when the different types of 
aggregations are applied. 

[https://www.waitingforcode.com/apache-spark-sql/aggregations-execution-apache-spark-sql/read]

In the case you expose there are two things that make the aggregation fallback 
in a SortAggregate. The first is that the types of the aggregation are not 
primitive mutable types (necessary for HashAggregate). The first fallback is 
ObjectHashAggregate, but in this case first function is not supported by 
ObjectHashAggregate because it's not a TypedImperativeAggregate, so it fallback 
to SorteAggregate.

I don't know if this has any reason, I'm going to take a look if it's possible 
to TypedImperativeAggregate to fallback to ObjectHashAggregate.

Thanks!

> `first` function is sorting the dataset while sometimes it is used to get 
> "any value"
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-34464
>                 URL: https://issues.apache.org/jira/browse/SPARK-34464
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Louis Fruleux
>            Priority: Minor
>
> When one wants to groupBy and take any value (not necessarily the first), one 
> usually uses 
> [first|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L485]
>  aggregation function.
> Unfortunately, this method uses a `SortAggregate` for some data types, which 
> is not always necessary and might impact performances. Is this the desired 
> behavior?
>  
>  
> {code:java}
> Current behavior:
>  val df = Seq((0, "value")).toDF("key", "value")
> df.groupBy("key").agg(first("value")).explain()
>  /*
>  == Physical Plan ==
>  SortAggregate(key=key#342, functions=first(value#343, false))
>  +- *(2) Sort key#342 ASC NULLS FIRST, false, 0
>     +- Exchange hashpartitioning(key#342, 200)
>        +- SortAggregate(key=key#342, functions=partial_first(value#343, 
> false))
>           +- *(1) Sort key#342 ASC NULLS FIRST, false, 0
>              +- LocalTableScan key#342, value#343
>  */
> {code}
>  
> My understanding of the source code does not allow me to fully understand why 
> this is the current behavior.
> The solution might be to implement a new aggregate function. But the code 
> would be highly similar to the first one. And if I don't fully understand why 
> this 
> [createAggregate|https://github.com/apache/spark/blob/3a299aa6480ac22501512cd0310d31a441d7dfdc/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala#L45]
>  method falls back to SortAggregate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to