First I would like to talk about the overall approach to aggregation. Usually it is best to arrange a 2-phase process (known as map-reduce or scatter-gather). The first phase is processing each data partition on some workers, and the second phase is merging the results on some other workers. And network shuffle in between. With this in mind, serialization should happen only at the end of the first phase to transfer the intermediate results to the second phase workers. Another serialization might be desired at the end of the whole process if the results contain sketches to be saved for future use. We don't have much experience with Spark. There is an example with Theta sketches on the web site here: https://datasketches.apache.org/docs/Theta/ThetaSparkExample.html It might be outdated if Spark API changed since this example was worked out some years ago. Regarding memory wrapping. I believe the idea is to have sketches live in some region of memory that is owned and managed by somebody else. So if, let's say, during the first phase of aggregation the state of the aggregation is passed around from update to update as a chunk of memory, then a sketch or union can recreate itself from that chunk with minimal overhead by wrapping it. But if you could arrange passing around a sketch or union object representing that aggregation state, that would be even better. I believe there are some static methods to get the required memory size upfront. Yes, they give an upper bound, so there might be some overallocation. I am not sure I understand your question about the union and having to do something on-heap. You may want to have a look at how off-heap HLL aggregation is done in Druid. https://github.com/apache/druid/blob/master/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java https://github.com/apache/druid/blob/master/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
On Tue, Mar 14, 2023 at 5:33 PM Ryan Berti <[email protected]> wrote: > Hello! > > I'm working on integrating Datasketches' HllSketch into Apache Spark, such > that we have the ability to write out + reaggregate intermediate sketches > (not currently supported via approx_count_distinct's HLL++ implementation). > I had a few questions about best practices. > > I'm working on an implementation that utilizes a static length byte array > within Spark's aggregation buffer, wrapped within a WritableMemory > instance. I'm then wrapping that within a HllSketch instance when I want to > update the sketch, or wrapping it in a Union instance when I want to merge > sketches. Hoping someone can give me some guidance on the following: > > - I initially was having the HllSketch instances operate 'on-heap' and > then serializing them out / heapifying them back into existence as often as > is required by Spark. My bet is that passing around a raw byte array (and > wrapping with WriteableMemory/HllSketch/Union instances as needed) will > reduce serialization/deserialization/garbage collection overhead. Can > someone confirm this is the intended usage/benefit of the writeableWrap() > functionality? > - Utilizing the raw byte array requires that I initialize a > max-sized buffer (given the HllSketch config) up-front, so it seems the > tradeoff here is that I'm allocating more memory up-front than I may need. > Is my understanding of the tradeoff correct? > - The Union implementation will only wrap a HLL_8 typed buffer; right > now I'm having to have the Union merge sketches 'on-heap' and then > overwrite the Spark byte buffer with the Union's updateableByteArray when > the HllSketches aren't configured as HLL_8. I think this is expected, but > wanted to confirm? > > I have a few follow-up questions about Theta sketches, but figured I'd > start with the HllSketch before broadening the implementation. > > Thanks! > > Ryan Berti > > Senior Data Engineer | Ads DE > > M 7023217573 > > 5808 W Sunset Blvd | Los Angeles, CA 90028 > >
