Thanks. It appears that TypedImperativeAggregate won't be available till 2.2.x. I'm stuck with my RDD approach then :(
------- Regards, Andy On Tue, Jan 10, 2017 at 2:01 AM, Liang-Chi Hsieh <vii...@gmail.com> wrote: > > Hi Andy, > > Because hash-based aggregate uses unsafe row as aggregation states, so the > aggregation buffer schema must be mutable types in unsafe row. > > If you can use TypedImperativeAggregate to implement your aggregation > function, SparkSQL has ObjectHashAggregateExec which supports hash-based > aggregate using arbitrary JVM objects as aggregation states. > > > > Andy Dang wrote > > Hi Takeshi, > > > > Thanks for the answer. My UDAF aggregates data into an array of rows. > > > > Apparently this makes it ineligible to using Hash-based aggregate based > on > > the logic at: > > https://github.com/apache/spark/blob/master/sql/core/ > src/main/java/org/apache/spark/sql/execution/ > UnsafeFixedWidthAggregationMap.java#L74 > > https://github.com/apache/spark/blob/master/sql/ > catalyst/src/main/java/org/apache/spark/sql/catalyst/ > expressions/UnsafeRow.java#L108 > > > > The list of support data type is VERY limited unfortunately. > > > > It doesn't make sense to me that data type must be mutable for the UDAF > to > > use hash-based aggregate, but I could be missing something here :). I > > could > > achieve hash-based aggregate by turning this query to RDD mode, but that > > is > > counter intuitive IMO. > > > > ------- > > Regards, > > Andy > > > > On Mon, Jan 9, 2017 at 2:05 PM, Takeshi Yamamuro < > > > linguin.m.s@ > > > > > > wrote: > > > >> Hi, > >> > >> Spark always uses hash-based aggregates if the types of aggregated data > >> are supported there; > >> otherwise, spark fails to use hash-based ones, then it uses sort-based > >> ones. > >> See: https://github.com/apache/spark/blob/master/sql/ > >> core/src/main/scala/org/apache/spark/sql/execution/ > >> aggregate/AggUtils.scala#L38 > >> > >> So, I'm not sure about your query though, it seems the types of > >> aggregated > >> data in your query > >> are not supported for hash-based aggregates. > >> > >> // maropu > >> > >> > >> > >> On Mon, Jan 9, 2017 at 10:52 PM, Andy Dang < > > > namd88@ > > > > wrote: > >> > >>> Hi all, > >>> > >>> It appears to me that Dataset.groupBy().agg(udaf) requires a full sort, > >>> which is very inefficient for certain aggration: > >>> > >>> The code is very simple: > >>> - I have a UDAF > >>> - What I want to do is: dataset.groupBy(cols).agg(udaf).count() > >>> > >>> The physical plan I got was: > >>> *HashAggregate(keys=[], functions=[count(1)], output=[count#67L]) > >>> +- Exchange SinglePartition > >>> +- *HashAggregate(keys=[], functions=[partial_count(1)], > >>> output=[count#71L]) > >>> +- *Project > >>> +- Generate explode(internal_col#31), false, false, > >>> [internal_col#42] > >>> +- SortAggregate(key=[key#0], > >>> functions=[aggregatefunction(key#0, > >>> nested#1, nestedArray#2, nestedObjectArray#3, value#4L, > >>> com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31]) > >>> +- *Sort [key#0 ASC], false, 0 > >>> +- Exchange hashpartitioning(key#0, 200) > >>> +- SortAggregate(key=[key#0], > >>> functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2, > >>> nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)], > >>> output=[key#0,internal_col#37]) > >>> +- *Sort [key#0 ASC], false, 0 > >>> +- Scan ExistingRDD[key#0,nested#1,nes > >>> tedArray#2,nestedObjectArray#3,value#4L] > >>> > >>> How can I make Spark to use HashAggregate (like the count(*) > expression) > >>> instead of SortAggregate with my UDAF? > >>> > >>> Is it intentional? Is there an issue tracking this? > >>> > >>> ------- > >>> Regards, > >>> Andy > >>> > >> > >> > >> > >> -- > >> --- > >> Takeshi Yamamuro > >> > > > > > > ----- > Liang-Chi Hsieh | @viirya > Spark Technology Center > http://www.spark.tc/ > -- > View this message in context: http://apache-spark- > developers-list.1001551.n3.nabble.com/How-to-hint-Spark- > to-use-HashAggregate-for-UDAF-tp20526p20531.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >