alamb commented on issue #4973: URL: https://github.com/apache/arrow-datafusion/issues/4973#issuecomment-1608068287
@tustvold and I had a large talk about this topic in the context of https://github.com/apache/arrow-datafusion/pull/6657 and how to improve the performance of both grouping in general and high cardinality groups in particular It is very similar in spirit to @Dandandan 's / @ic4y 's ticket https://github.com/apache/arrow-datafusion/issues/956 and the proposals from @crepererum above, The high level idea is to change the state management of the Aggregate operators so that it 1. Manages state for *all* groups rather than just one group 3. There is still only a single hash table (that maps group values to group_ids) In ASCII art: ## How aggregation currently works ``` ... ┌───────────────────────────────────────────┐ │┌────────────┐┌────────────┐┌────────────┐ │ ┌─────┐ ││accumulator ││accumulator ││accumulator │ │ │ 5 │ ┌─────────────▶││ 0 state ││ 1 state ││ 2 state │ │ ├─────┤ │ │└────────────┘└────────────┘└────────────┘ │ │ 9 │──────────────┐│ └───────────────────────────────────────────┘ ├─────┤ ││ │ │ ││ ... ├─────┤ ││ │ 1 │ ─────────────┼┘ ┌───────────────────────────────────────────┐ ├─────┤ │ │┌────────────┐┌────────────┐┌────────────┐ │ │ │ │ ││accumulator ││accumulator ││accumulator │ │ └─────┘ └──────────────▶││ 0 state ││ 1 state ││ 2 state │ │ │└────────────┘└────────────┘└────────────┘ │ └───────────────────────────────────────────┘ Hash Table ... GroupState s stores "group indexes" which are indexes into stores the aggregation states for each Vec<GroupState> aggregate (either as Vec<dyn Accumulator> or Row Vec<u8>). There is one GroupState PER GROUP ``` Currently the hash table stores an index into a `Vec` of [`GroupState` ](https://github.com/apache/arrow-datafusion/blob/1522e7a58e801b87a0f8f5c6187a2038453eba9d/datafusion/core/src/physical_plan/aggregates/utils.rs#L38) which hold state per group. This design means that when grouping by high cardinality groups where each accumulator receives only a few rows per group per batch, the overhead is quite high ## Proposal ``` ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │┌────────────┐│ │┌────────────┐│ │┌────────────┐│ ┌─────┐ ││accumulator ││ ││accumulator ││ ││accumulator ││ │ 5 │ ││ 0 ││ ││ 0 ││ ││ 0 ││ ├─────┤ ││ ┌────────┐ ││ ││ ┌────────┐ ││ ││ ┌────────┐ ││ │ 9 │ ││ │ state │ ││ ││ │ state │ ││ ││ │ state │ ││ ├─────┤ ││ │ │ ││ ││ │ │ ││ ││ │ │ ││ │ │ ││ │ │ ││ ││ │ │ ││ ││ │ │ ││ ├─────┤ ││ │ │ ││ ││ │ │ ││ ││ │ │ ││ │ 1 │ ││ │ │ ││ ││ │ │ ││ ││ │ │ ││ ├─────┤ ││ │ │ ││ ││ │ │ ││ ││ │ │ ││ │ │ ││ │ │ ││ ││ │ │ ││ ││ │ │ ││ └─────┘ ││ │ │ ││ ││ │ │ ││ ││ │ │ ││ ││ │ │ ││ ││ │ │ ││ ││ │ │ ││ ││ └────────┘ ││ ││ └────────┘ ││ ││ └────────┘ ││ │└────────────┘│ │└────────────┘│ │└────────────┘│ Hash Table └──────────────┘ └──────────────┘ └──────────────┘ New NewAccumulator stores "group indexes" There is one NewAccumulator per aggregate which are indexes into (NOT PER GROUP). Internally, each Vec<GroupState> NewAccumulator manages the state for multiple groups ``` In this design, the hash table still stores the group -> group index mapping. However, the state for all groups is handled internally by a (new) type of accumulator that stores the group state internally. As today, the group by hash will calculate the group_id for each input row, after applying any filters and then make a **SINGLE** function call to the NewAccumulator to update all relevant group values for the ENTIRE BATCH. ## More details: Make a `NewAccumulator` trait so that it has: 1. Manages the accumulated state for *all* groups. (remove `aggregation_buffer` from GroupState`) 4. There will be one `NewAccumulator` instance per aggregate in the query (NOT one per group) 2. `NewAccumulator` API has update_batch that is invoked once per input batch, per aggrgate (NOT PER GROUP). Something like: ```rust /// updates the accumulator's state from a vector of arrays. fn update_batch( &mut self, values: &[ArrayRef], // ( Which rows in `values` to be accumulated, group states) group_indicies: &[(Range<usize>, usize)], ) -> Result<()>; ``` Other details: 1. Assumes that the group_indexes are contiguous (there aren't gaps, aka it is ok to use a `Vec<...>` internally to store the group states within the accumulator) 2. The internal state (evaluate and state) for row accumulator will produce an `ArrayRef` (which can be done without copying using the newer zero copy arrow APIs that convert `Vec` to Array without any copy). 4. The `NewAccumulator` API has some way to get the size of the accumulated state size (`size()`) 5. We would not change the existing (public) [`Accumulator`](https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.Accumulator.html#) API, instead would implement a wrapper using `Vec<Box<dyn Accumulator>>>` ## Benefits of this design: 1. No per-group overhead (the root cause why high cardinality grouping is slow) 2. There is still only one hash table, mapping from group key to logical group number 6. There is no copy of intermediate vales into a new array to call accumulator interface 2. There are no `ScalarValue`s (outside of what is needed to adapt existing Accumulator API) 3. Unified implementation rather than split between diffferent accumulator (e.g. `RowAccumulator` and `dyn Accumulator`) 6. Could remove the fixed length row format 7. Expected performance to be the similar for `Accumulator` based aggregates 5. Expected performance much faster for other aggregates, especially for for high cardinality groups High level implementation plan: 1. Remove the copy/paste between row_hash.rs and bounded_aggregate_stream.rs 2. Sketch out the new API 3. Migrate over cc @yjshen @mingmwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
