Dandandan opened a new issue #956:
URL: https://github.com/apache/arrow-datafusion/issues/956


   **Is your feature request related to a problem or challenge? Please describe 
what you are trying to do.**
   Currently, the aggregate code keeps the states of each group by value, by 
storing it in a `Vec` together with the group by values.
   
   For low-cardinality aggregates, this works OK, as there are only a limited 
number of keys and thus accumulators.
   But for medium/high cardinality aggregate, the current way is inefficient: 
   
   * Higher memory usage,  as each group by value has an extra `Vec` with a 
number of `AccumulatorItem`, which adds at least 2 ( 
   `Box<dyn T>`) + 3 (empty `Vec`) + 3 (initial ) = 8 pointers (64 bytes) of 
overhead per item when storing one aggregate per group (e.g. one of 
count/avg/sum).
   * Extra allocations while inserting new groups into the data structures and 
when returning states as `Vec<ScalarValue>`, and some cloning.
   * Less cache efficient (because memory is scattered around)
   * Requires more / expensive paths to convert into `Vec` and `ScalarValue`s 
and back to an `Array`s again.
   
   This issue is for the `Accumulator` state only, but a similar thing could be 
done for the `group_by_values`. 
   
   **Describe the solution you'd like**
   We should define a trait that allows storing the required state in the 
accumulators in a contiguous/columnar manner.
   The idea here is that the required state can be stored in a `Vec`-like 
container, where each item at the index contains the current state.
   
   My proposal is to add an extra `index` to the methods, so the Accumulators 
can update the state at a certain `index`. 
   Some methods could be added to retrieve the entire state of the accumulator 
and/or convert the state values to  array(s) in one go. If Arrow provides 
mutable Arrays in the future, it could avoid the extra conversion step back to 
an `Array`.
   
   ```rust
   pub trait Accumulator: Send + Sync + Debug {
       /// Initializes the state for a new group with a `index`
       fn init_state(&self, index: usize);
   
       /// Returns the state of the accumulator at the end of the accumulation.
       // in the case of an average on which we track `sum` and `n`, this 
function should return a vector
       // of two values, sum and n.
       fn state(&self, index: usize) -> Result<Vec<ScalarValue>>;
   
       /// updates the accumulator's state from a vector of scalars.
       fn update(&mut self, index: usize, values: &[ScalarValue]) -> Result<()>;
   
       /// updates the accumulator's state from a vector of arrays.
       fn update_batch(&mut self, index: usize, values: &[ArrayRef]) -> 
Result<()> {
           if values.is_empty() {
               return Ok(());
           };
           (0..values[0].len()).try_for_each(|idx| {
               let v = values
                   .iter()
                   .map(|array| ScalarValue::try_from_array(array, idx))
                   .collect::<Result<Vec<_>>>()?;
               self.update(index, &v)
           })
       }
   
       /// updates the accumulator's state from a vector of scalars.
       fn merge(&mut self, index: usize, states: &[ScalarValue]) -> Result<()>;
   
       /// updates the accumulator's state from a vector of states.
       fn merge_batch(&mut self, index: usize, states: &[ArrayRef]) -> 
Result<()> {
           if states.is_empty() {
               return Ok(());
           };
           (0..states[0].len()).try_for_each(|idx| {
               let v = states
                   .iter()
                   .map(|array| ScalarValue::try_from_array(array, index))
                   .collect::<Result<Vec<_>>>()?;
               self.merge(index, &v)
           })
       }
   
       /// returns its value based on its current state.
       fn evaluate(&self, index: usize) -> Result<ScalarValue>;
   }
   
   ```
   For `Count` this would be changed, most notably the change to store the 
state as `Vec<u64>`:
   
   ```diff
    #[derive(Debug)]
    struct CountAccumulator {
   -    count: u64,
   +    count: Vec<u64>,
    }
    
    impl CountAccumulator {
        /// new count accumulator
        pub fn new() -> Self {
   -        Self { count: 0 }
   +        Self { count: vec![] }
        }
    }
    
    impl Accumulator for CountAccumulator {
   -    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
   +    fn init_state(&self, index: usize) {
   +        assert_eq!(self.count.len(), index);
   +        self.count.push(0);
   +    }
   +    fn update_batch(&mut self, index: usize, values: &[ArrayRef]) -> 
Result<()> {
            let array = &values[0];
   -        self.count += (array.len() - array.data().null_count()) as u64;
   +        self.count[index] += (array.len() - array.data().null_count()) as 
u64;
            Ok(())
        }
    
   -    fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
   +    fn update(&mut self, index: usize, values: &[ScalarValue]) -> 
Result<()> {
            let value = &values[0];
            if !value.is_null() {
   -            self.count += 1;
   +            self.count[index] += 1;
            }
            Ok(())
        }
    
   -    fn merge(&mut self, states: &[ScalarValue]) -> Result<()> {
   +    fn merge(&mut self, index: usize, states: &[ScalarValue]) -> Result<()> 
{
            let count = &states[0];
            if let ScalarValue::UInt64(Some(delta)) = count {
   -            self.count += *delta;
   +            self.count[index] += *delta;
            } else {
                unreachable!()
            }
            Ok(())
        }
    
   -    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
   +    fn merge_batch(&mut self, index: usize, states: &[ArrayRef]) -> 
Result<()> {
            let counts = 
states[0].as_any().downcast_ref::<UInt64Array>().unwrap();
            let delta = &compute::sum(counts);
            if let Some(d) = delta {
   -            self.count += *d;
   +            self.count[index] += *d;
            }
            Ok(())
        }
    
   -    fn state(&self) -> Result<Vec<ScalarValue>> {
   -        Ok(vec![ScalarValue::UInt64(Some(self.count))])
   +    fn state(&self, index: usize) -> Result<Vec<ScalarValue>> {
   +        Ok(vec![ScalarValue::UInt64(Some(self.count[index]))])
        }
    
   -    fn evaluate(&self) -> Result<ScalarValue> {
   -        Ok(ScalarValue::UInt64(Some(self.count)))
   +    fn evaluate(&self, index: usize) -> Result<ScalarValue> {
   +        Ok(ScalarValue::UInt64(Some(self.count[index])))
        }
    }
   ```
   
   **Describe alternatives you've considered**
   
   **Additional context**
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to