Inspired by this post: http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/, I've started putting together something based on the Spark 1.5 UDAF interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141
Some questions - 1. How do I get the UDAF to accept input arguments of different type? We can hash anything basically for HLL - Int, Long, String, Object, raw bytes etc. Right now it seems we'd need to build a new UDAF for each input type, which seems strange - I should be able to use one UDAF that can handle raw input of different types, as well as handle existing HLLs that can be merged/aggregated (e.g. for grouped data) 2. @Reynold, how would I ensure this works for Tungsten (ie against raw bytes in memory)? Or does the new Aggregate2 stuff automatically do that? Where should I look for examples on how this works internally? 3. I've based this on the Sum and Avg examples for the new UDAF interface - any suggestions or issue please advise. Is the intermediate buffer efficient? 4. The current HyperLogLogUDT is private - so I've had to make my own one which is a bit pointless as it's copy-pasted. Any thoughts on exposing that type? Or I need to make the package spark.sql ... Nick On Thu, Jul 2, 2015 at 8:06 AM, Reynold Xin <r...@databricks.com> wrote: > Yes - it's very interesting. However, ideally we should have a version of > hyperloglog that can work directly against some raw bytes in memory (rather > than java objects), in order for this to fit the Tungsten execution model > where everything is operating directly against some memory address. > > On Wed, Jul 1, 2015 at 11:00 PM, Nick Pentreath <nick.pentre...@gmail.com> > wrote: > >> Sure I can copy the code but my aim was more to understand: >> >> (A) if this is broadly interesting enough to folks to think about >> updating / extending the existing UDAF within Spark >> (b) how to register ones own custom UDAF - in which case it could be a >> Spark package for example >> >> All examples deal with registering a UDF but nothing about UDAFs >> >> — >> Sent from Mailbox <https://www.dropbox.com/mailbox> >> >> >> On Wed, Jul 1, 2015 at 6:32 PM, Daniel Darabos < >> daniel.dara...@lynxanalytics.com> wrote: >> >>> It's already possible to just copy the code from countApproxDistinct >>> <https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1153> >>> and >>> access the HLL directly, or do anything you like. >>> >>> On Wed, Jul 1, 2015 at 5:26 PM, Nick Pentreath <nick.pentre...@gmail.com >>> > wrote: >>> >>>> Any thoughts? >>>> >>>> — >>>> Sent from Mailbox <https://www.dropbox.com/mailbox> >>>> >>>> >>>> On Tue, Jun 23, 2015 at 11:19 AM, Nick Pentreath < >>>> nick.pentre...@gmail.com> wrote: >>>> >>>>> Hey Spark devs >>>>> >>>>> I've been looking at DF UDFs and UDAFs. The approx distinct is using >>>>> hyperloglog, >>>>> but there is only an option to return the count as a Long. >>>>> >>>>> It can be useful to be able to return and store the actual data >>>>> structure (ie serialized HLL). This effectively allows one to do >>>>> aggregation / rollups over columns while still preserving the ability to >>>>> get distinct counts. >>>>> >>>>> For example, one can store daily aggregates of events, grouped by >>>>> various columns, while storing for each grouping the HLL of say unique >>>>> users. So you can get the uniques per day directly but could also very >>>>> easily do arbitrary aggregates (say monthly, annually) and still be able >>>>> to >>>>> get a unique count for that period by merging the daily HLLS. >>>>> >>>>> I did this a while back as a Hive UDAF ( >>>>> https://github.com/MLnick/hive-udf) which returns a Struct field >>>>> containing a "cardinality" field and a "binary" field containing the >>>>> serialized HLL. >>>>> >>>>> I was wondering if there would be interest in something like this? I >>>>> am not so clear on how UDTs work with regards to SerDe - so could one >>>>> adapt >>>>> the HyperLogLogUDT to be a Struct with the serialized HLL as a field as >>>>> well as count as a field? Then I assume this would automatically play >>>>> nicely with DataFrame I/O etc. The gotcha is one needs to then call >>>>> "approx_count_field.count" (or is there a concept of a "default field" for >>>>> a Struct?). >>>>> >>>>> Also, being able to provide the bitsize parameter may be useful... >>>>> >>>>> The same thinking would apply potentially to other approximate (and >>>>> mergeable) data structures like T-Digest and maybe CMS. >>>>> >>>>> Nick >>>>> >>>> >>>> >>> >> >