alamb opened a new issue, #4973: URL: https://github.com/apache/arrow-datafusion/issues/4973
Note this was broken out of https://github.com/apache/arrow-datafusion/issues/2723 and is based on the wonderful writeup from @crepererum and @tustvold on https://github.com/apache/arrow-datafusion/issues/2723#issuecomment-1324876060 # Background ## Prior Art After https://github.com/apache/arrow-datafusion/pull/4924 we have a single GroupByHash implementation that uses the Arrow row format for group keys (good!) However, the aggregator (e.g. the thing storing partial sums, counts, etc) is non ideal. Depending on the type of aggregate, it is either: 1. `Box<dyn RowAccumulator>` per 2. `Box<dyn Accumulator>` The GroupByHash operator manages the mapping from group key to state and passes a `RowAccessor` to the `RowAccumulator` if needed. Groups are calculated once per batch, then a `take` kernel reshuffles everything and slices are passed to the `RowAccumulator`. ### Row Formats See #4179 -- Tl;Dr;: we have 3 row formats in our stack and we should focus on one. ## Proposal Ditch the *word-aligned* row format for the state management and change the aggregator to: ```rust trait Aggregator: MemoryConsumer { /// Update aggregator state for given groups. fn update_batch(&mut self, keys: Rows, batch: &RecordBatch)) -> Result<()>; ... } ``` Hence the aggregator will be dyn-dispatched ONCE per record batch and will keep its own internal state. This moves the key->state map from the `[row_]hash.rs` to the aggregators. We will provide tooling (macros or generics) to simplify the implementation and to avoid boilerplate code as much as possible. Note that this also removes the `take` kernel since we think that it doesn't provide any performance improvements over iterating over the hashable rows and perform the aggregation row-by-row. We may bring the `take` handling back (as a "perform `take` if at least one aggregator wants that) if we can proof (via benchmarks) that this is desirable for certain aggregators, but we leave this out for now. This also moves the memory consumer handling into the aggregator since the aggregator knows best how to split states and spill data. ### Implementation Plan TBD -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
