Hi, Spark always uses hash-based aggregates if the types of aggregated data are supported there; otherwise, spark fails to use hash-based ones, then it uses sort-based ones. See: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala#L38
So, I'm not sure about your query though, it seems the types of aggregated data in your query are not supported for hash-based aggregates. // maropu On Mon, Jan 9, 2017 at 10:52 PM, Andy Dang <nam...@gmail.com> wrote: > Hi all, > > It appears to me that Dataset.groupBy().agg(udaf) requires a full sort, > which is very inefficient for certain aggration: > > The code is very simple: > - I have a UDAF > - What I want to do is: dataset.groupBy(cols).agg(udaf).count() > > The physical plan I got was: > *HashAggregate(keys=[], functions=[count(1)], output=[count#67L]) > +- Exchange SinglePartition > +- *HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#71L]) > +- *Project > +- Generate explode(internal_col#31), false, false, > [internal_col#42] > +- SortAggregate(key=[key#0], functions=[aggregatefunction(key#0, > nested#1, nestedArray#2, nestedObjectArray#3, value#4L, > com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31]) > +- *Sort [key#0 ASC], false, 0 > +- Exchange hashpartitioning(key#0, 200) > +- SortAggregate(key=[key#0], > functions=[partial_aggregatefunction(key#0, > nested#1, nestedArray#2, nestedObjectArray#3, value#4L, > com.[...]uDf@108b121f, 0, 0)], output=[key#0,internal_col#37]) > +- *Sort [key#0 ASC], false, 0 > +- Scan ExistingRDD[key#0,nested#1, > nestedArray#2,nestedObjectArray#3,value#4L] > > How can I make Spark to use HashAggregate (like the count(*) expression) > instead of SortAggregate with my UDAF? > > Is it intentional? Is there an issue tracking this? > > ------- > Regards, > Andy > -- --- Takeshi Yamamuro