alamb commented on issue #4973:
URL: 
https://github.com/apache/arrow-datafusion/issues/4973#issuecomment-1608068287

   @tustvold  and I had a large talk about this topic in the context of 
https://github.com/apache/arrow-datafusion/pull/6657 and how to improve the 
performance of both grouping in general and high cardinality groups in 
particular
   
   It is very similar in spirit to @Dandandan 's / @ic4y 's ticket 
https://github.com/apache/arrow-datafusion/issues/956 and the proposals from 
@crepererum above, 
   
   The high level idea is to change the state management of the Aggregate 
operators so that it
   1. Manages state for *all* groups rather than just one group
   3. There is still only a single hash table (that maps group values to 
group_ids)
   
   In ASCII art:
   
   ## How aggregation currently works
   
   ```
                                                                  ...           
         
                                                                                
         
                                            
┌───────────────────────────────────────────┐
                                            
│┌────────────┐┌────────────┐┌────────────┐ │
       ┌─────┐                              ││accumulator ││accumulator 
││accumulator │ │
       │  5  │               ┌─────────────▶││  0 state   ││  1 state   ││  2 
state   │ │
       ├─────┤               │              
│└────────────┘└────────────┘└────────────┘ │
       │  9  │──────────────┐│              
└───────────────────────────────────────────┘
       ├─────┤              ││                                                  
         
       │     │              ││                                  ...             
         
       ├─────┤              ││                                                  
         
       │  1  │ ─────────────┼┘              
┌───────────────────────────────────────────┐
       ├─────┤              │               
│┌────────────┐┌────────────┐┌────────────┐ │
       │     │              │               ││accumulator ││accumulator 
││accumulator │ │
       └─────┘              └──────────────▶││  0 state   ││  1 state   ││  2 
state   │ │
                                            
│└────────────┘└────────────┘└────────────┘ │
                                            
└───────────────────────────────────────────┘
                                                                                
         
       Hash Table                                               ...             
         
                                                                                
         
                                                                                
         
                                               GroupState s                     
         
                                                                                
         
                                                                                
         
   stores "group indexes"                                                       
         
   which are indexes into                     stores the aggregation states for 
each     
   Vec<GroupState>                            aggregate (either as Vec<dyn 
Accumulator>  
                                              or Row Vec<u8>).                  
         
                                                                                
         
                                              There is one GroupState PER GROUP 
         
                                                                                
         
                                                                                
         
   ```
   
   Currently the hash table stores an index into a `Vec` of [`GroupState`
   
](https://github.com/apache/arrow-datafusion/blob/1522e7a58e801b87a0f8f5c6187a2038453eba9d/datafusion/core/src/physical_plan/aggregates/utils.rs#L38)
 which hold state per group. 
   
   This design means that when grouping by high cardinality groups where each 
accumulator receives only a few rows per group per batch, the overhead is quite 
high
   
   ## Proposal
   
   ```
                                            ┌──────────────┐   ┌──────────────┐ 
  ┌──────────────┐
                                            │┌────────────┐│   │┌────────────┐│ 
  │┌────────────┐│
       ┌─────┐                              ││accumulator ││   ││accumulator ││ 
  ││accumulator ││
       │  5  │                              ││     0      ││   ││     0      ││ 
  ││     0      ││
       ├─────┤                              ││ ┌────────┐ ││   ││ ┌────────┐ ││ 
  ││ ┌────────┐ ││
       │  9  │                              ││ │ state  │ ││   ││ │ state  │ ││ 
  ││ │ state  │ ││
       ├─────┤                              ││ │        │ ││   ││ │        │ ││ 
  ││ │        │ ││
       │     │                              ││ │        │ ││   ││ │        │ ││ 
  ││ │        │ ││
       ├─────┤                              ││ │        │ ││   ││ │        │ ││ 
  ││ │        │ ││
       │  1  │                              ││ │        │ ││   ││ │        │ ││ 
  ││ │        │ ││
       ├─────┤                              ││ │        │ ││   ││ │        │ ││ 
  ││ │        │ ││
       │     │                              ││ │        │ ││   ││ │        │ ││ 
  ││ │        │ ││
       └─────┘                              ││ │        │ ││   ││ │        │ ││ 
  ││ │        │ ││
                                            ││ │        │ ││   ││ │        │ ││ 
  ││ │        │ ││
                                            ││ └────────┘ ││   ││ └────────┘ ││ 
  ││ └────────┘ ││
                                            │└────────────┘│   │└────────────┘│ 
  │└────────────┘│
       Hash Table                           └──────────────┘   └──────────────┘ 
  └──────────────┘
                                                                                
                  
                                             New NewAccumulator                 
                  
                                                                                
                  
                                                                                
                  
                                                                                
                  
   stores "group indexes"                     There is one NewAccumulator per 
aggregate           
   which are indexes into                     (NOT PER GROUP). Internally, each 
                  
   Vec<GroupState>                            NewAccumulator manages the state 
for                
                                              multiple groups                   
                  
   ```
   
   In this design, the hash table still stores the group -> group index 
mapping. However, the state for all groups is handled internally by a (new) 
type of accumulator that stores the group state internally. 
   
   As today, the group by hash will calculate the group_id for each input row, 
after applying any filters and then make a **SINGLE** function call to the 
NewAccumulator to update all relevant group values for the ENTIRE BATCH.
   
   ## More details:
   
   
   Make a `NewAccumulator` trait so that it has:
   1. Manages the accumulated state for *all* groups. (remove 
`aggregation_buffer` from GroupState`)
   4. There will be one `NewAccumulator` instance per aggregate in the query 
(NOT one per group)
   2. `NewAccumulator` API has update_batch that is invoked once per input 
batch, per aggrgate (NOT PER GROUP). Something like:
   
   ```rust
       /// updates the accumulator's state from a vector of arrays.
       fn update_batch(
           &mut self,
           values: &[ArrayRef],
           // ( Which rows in `values` to be accumulated, group states)
           group_indicies: &[(Range<usize>, usize)],
       ) -> Result<()>;
   ```
   
   Other details:
   1.  Assumes that the group_indexes are contiguous (there aren't gaps, aka it 
is ok to use a `Vec<...>` internally to store the group states within the 
accumulator)
   2. The internal state (evaluate and state) for row accumulator will produce 
an `ArrayRef` (which can be done without copying using the newer zero copy 
arrow APIs that convert `Vec` to Array without any copy).  
   4. The `NewAccumulator` API has some way to get the size of the accumulated 
state size (`size()`)
   5. We would not change the existing (public) 
[`Accumulator`](https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.Accumulator.html#)
 API, instead would implement a wrapper using `Vec<Box<dyn Accumulator>>>`
   
   ## Benefits of this design:
   1. No per-group overhead (the root cause why high cardinality grouping is 
slow)
   2. There is still only one hash table, mapping from group key to logical 
group number
   6. There is no copy of intermediate vales into a new array to call 
accumulator interface
   2. There are no `ScalarValue`s (outside of what is needed to adapt existing 
Accumulator API)
   3. Unified implementation rather than split between diffferent accumulator 
(e.g. `RowAccumulator` and `dyn Accumulator`)
   6. Could remove the fixed length row format
   7. Expected performance to be the similar for `Accumulator` based aggregates
   5. Expected performance much faster for other aggregates, especially for for 
high cardinality  groups
   
   
   High level implementation plan:
   1. Remove the copy/paste between row_hash.rs and bounded_aggregate_stream.rs 
   2. Sketch out the new API
   3. Migrate over
   
   cc @yjshen @mingmwang 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to