Inspired by this post:
http://eugenezhulenev.com/blog/2015/07/15/interactive-audience-analytics-with-spark-and-hyperloglog/,
I've started putting together something based on the Spark 1.5 UDAF
interface: https://gist.github.com/MLnick/eca566604f2e4e3c6141

Some questions -

1. How do I get the UDAF to accept input arguments of different type? We
can hash anything basically for HLL - Int, Long, String, Object, raw bytes
etc. Right now it seems we'd need to build a new UDAF for each input type,
which seems strange - I should be able to use one UDAF that can handle raw
input of different types, as well as handle existing HLLs that can be
merged/aggregated (e.g. for grouped data)
2. @Reynold, how would I ensure this works for Tungsten (ie against raw
bytes in memory)? Or does the new Aggregate2 stuff automatically do that?
Where should I look for examples on how this works internally?
3. I've based this on the Sum and Avg examples for the new UDAF interface -
any suggestions or issue please advise. Is the intermediate buffer
efficient?
4. The current HyperLogLogUDT is private - so I've had to make my own one
which is a bit pointless as it's copy-pasted. Any thoughts on exposing that
type? Or I need to make the package spark.sql ...

Nick

On Thu, Jul 2, 2015 at 8:06 AM, Reynold Xin <r...@databricks.com> wrote:

> Yes - it's very interesting. However, ideally we should have a version of
> hyperloglog that can work directly against some raw bytes in memory (rather
> than java objects), in order for this to fit the Tungsten execution model
> where everything is operating directly against some memory address.
>
> On Wed, Jul 1, 2015 at 11:00 PM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
>> Sure I can copy the code but my aim was more to understand:
>>
>> (A) if this is broadly interesting enough to folks to think about
>> updating / extending the existing UDAF within Spark
>> (b) how to register ones own custom UDAF - in which case it could be a
>> Spark package for example
>>
>> All examples deal with registering a UDF but nothing about UDAFs
>>
>> —
>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>
>>
>> On Wed, Jul 1, 2015 at 6:32 PM, Daniel Darabos <
>> daniel.dara...@lynxanalytics.com> wrote:
>>
>>> It's already possible to just copy the code from countApproxDistinct
>>> <https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1153>
>>>  and
>>> access the HLL directly, or do anything you like.
>>>
>>> On Wed, Jul 1, 2015 at 5:26 PM, Nick Pentreath <nick.pentre...@gmail.com
>>> > wrote:
>>>
>>>> Any thoughts?
>>>>
>>>> —
>>>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>>>
>>>>
>>>> On Tue, Jun 23, 2015 at 11:19 AM, Nick Pentreath <
>>>> nick.pentre...@gmail.com> wrote:
>>>>
>>>>> Hey Spark devs
>>>>>
>>>>> I've been looking at DF UDFs and UDAFs. The approx distinct is using
>>>>> hyperloglog,
>>>>> but there is only an option to return the count as a Long.
>>>>>
>>>>> It can be useful to be able to return and store the actual data
>>>>> structure (ie serialized HLL). This effectively allows one to do
>>>>> aggregation / rollups over columns while still preserving the ability to
>>>>> get distinct counts.
>>>>>
>>>>> For example, one can store daily aggregates of events, grouped by
>>>>> various columns, while storing for each grouping the HLL of say unique
>>>>> users. So you can get the uniques per day directly but could also very
>>>>> easily do arbitrary aggregates (say monthly, annually) and still be able 
>>>>> to
>>>>> get a unique count for that period by merging the daily HLLS.
>>>>>
>>>>> I did this a while back as a Hive UDAF (
>>>>> https://github.com/MLnick/hive-udf) which returns a Struct field
>>>>> containing a "cardinality" field and a "binary" field containing the
>>>>> serialized HLL.
>>>>>
>>>>> I was wondering if there would be interest in something like this? I
>>>>> am not so clear on how UDTs work with regards to SerDe - so could one 
>>>>> adapt
>>>>> the HyperLogLogUDT to be a Struct with the serialized HLL as a field as
>>>>> well as count as a field? Then I assume this would automatically play
>>>>> nicely with DataFrame I/O etc. The gotcha is one needs to then call
>>>>> "approx_count_field.count" (or is there a concept of a "default field" for
>>>>> a Struct?).
>>>>>
>>>>> Also, being able to provide the bitsize parameter may be useful...
>>>>>
>>>>> The same thinking would apply potentially to other approximate (and
>>>>> mergeable) data structures like T-Digest and maybe CMS.
>>>>>
>>>>> Nick
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to