Hey Michael, Very cool!
To answer your question: it is critical to have a BufferAggregator. Some context; there are 3 kinds of aggregators: - Aggregator: stores intermediate state on heap; is used during ingestion and by the non-vectorized timeseries query engine. Required, or else some queries won't work properly. - BufferAggregator: stores intermediate state off-heap; is used by the non-vectorized topN and groupBy engines. Required, or else some queries won't work properly. - VectorAggregator: stores intermediate state off-heap; is used by vectorized query engines. Optional, but if there are any aggregators in a query that don't have a VectorAggregator implementation, then the whole query will run non-vectorized, which will slow it down. The main reason that BufferAggregators exist is to support off-heap aggregation, which minimizes GC pressure and prevents out-of-memory errors during the aggregation process. > Assuming it is, we can begin talking with the datasketches team about the possibility of a Direct implementation. With ItemsSketch, the biggest roadblock you're likely to run into is the fact that the items may be of variable size. Currently in Druid each BufferAggregator (and VectorAggregator) must have a fixed-size amount of space allocated off heap. The way this is done is by returning the number of bytes you need from AggregatorFactory.getMaxIntermediateSize. It's all allocated upfront. It can depend on aggregator configuration (for example, the HLL aggregators need more space if they're configured to have higher accuracy) but it can't depend on the data that is encountered during the query. So if you do have variable size items, you get into quite a conundrum! In order to have a "proper" implementation of an off-heap variable-sized sketch, we'd need an aggregator API that allows them to allocate additional memory at runtime. The DataSketches folks have asked us for this before but we haven't built it yet. There's a couple ways you could solve it in the meantime: 1) Allocate a fixed-size amount of memory that is enough to store most data you reasonably expect to encounter, update that off-heap, and allocate additional memory on heap if needed, without Druid "knowing" about it. This is something that'd require a Direct version of the ItemsSketch. It's a technique used by the quantiles sketches too. It's a hack but it works. You can see it in action by looking at DirectUpdateDoublesSketch <https://github.com/apache/datasketches-java/blob/master/src/main/java/org/apache/datasketches/quantiles/DirectUpdateDoublesSketch.java> in DataSketches (check out the spots where "mem_" is reassigned: it's allocating new on-heap memory that Druid doesn't know about) and DoublesSketchBuildBufferAggregator in Druid (check out the "sketches" map and "relocate" method in DoublesSketchBuildBufferAggregatorHelper: it's making sure to retain the references just in case one of them grew into the heap). 2) Allocate some arbitrary amount of memory, but don't use it, use a technique like the one in DoublesSketchBuildBufferAggregatorHelper to simply maintain references to on-heap sketches. This would work but you would run the risk of running out of heap memory. (There's nothing that explicitly controls how much heap you'll use.) To a degree you can mitigate this by allocating a larger amount of unused off-heap memory via AggregatorFactory.getMaxIntermediateSize, which will get Druid to flush aggregation state more often (it does this when it runs out of off-heap buffer space), which will sort of limit how many sketches are going to be in flight at once. It's quite indirect but it's the best I can think of. > I am also thinking of finishing the implementation by explicitly serializing the entire sketch on each update, but this would only be for experimentation as I doubt this is the intended behavior for implementations of BufferedAggregator. You're right, that's not the intended sort of implementation. It works but it's usually too slow to be practical. (It incurs a full serialization roundtrip for every row.) On Fri, Jul 23, 2021 at 12:18 PM Michael Schiff <michaelsch...@apache.org> wrote: > I am looking into implementing a new Aggregator in the datasketches > extension using the ItemSketch in the frequencies package: > > https://datasketches.apache.org/docs/Frequency/FrequentItemsOverview.html > > https://github.com/apache/datasketches-java/tree/master/src/main/java/org/apache/datasketches/frequencies > > Ive started on a partial implementation here (still a WIP, lots of TODOs): > > https://github.com/apache/druid/compare/master...michaelschiff:fis-aggregator?expand=1 > > From everything I've seen, it's critical that there is an efficient > implementation of BufferAggregator. The existing aggregators take advantage > of other sketch types providing "Direct" implementations that are > implemented directly against a ByteBuffer. This leads to fairly > transparent implementation of BufferAggregator. ItemSketch is able to > serialize itself and to wrap ByteBuffer for instantiation, but the actual > interactions are all on heap (core of the implementation is > https://github.com/apache/datasketches-java/blob/27ecce938555d731f29df97f12f4744a0efb663d/src/main/java/org/apache/datasketches/frequencies/ReversePurgeItemHashMap.java > ). > > Can anyone confirm that it is critical (i.e. Aggregator will not function) > to have an implementation of BufferAggregator? Assuming it is, we can begin > talking with the datasketches team about the possibility of a Direct > implementation. I am also thinking of finishing the implementation by > explicitly serializing the entire sketch on each update, but this would > only be for experimentation as I doubt this is the intended behavior for > implementations of BufferedAggregator. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org > For additional commands, e-mail: dev-h...@druid.apache.org > >