mingmwang commented on issue #4973:
URL: 
https://github.com/apache/arrow-datafusion/issues/4973#issuecomment-1614211204

   @alamb @tustvold @Dandandan 
   
   I did another POC based on the changes in #6657. The basic idea is to reduce 
the memory size of `GroupState` and  avoid using `Arc, Box, Vec, dyn Trait`, 
etc.  And the result is very exciting !!!
   Compared to the main branch, there is about 50% improvement, and for the 
high cardinality aggregation itself,  the improvement is about 100%. 
   
   Test result:
   
   Q17
   
   ```
   Running benchmarks with the following options: DataFusionBenchmarkOpt { 
query: Some(17), debug: false, iterations: 10, partitions: 1, batch_size: 8192, 
path: "./parquet_data", file_format: "parquet", mem_table: false, output_path: 
None, disable_statistics: false }
   null_width 2
   values_width 24
   Query 17 iteration 0 took 818.2 ms and returned 1 rows
   null_width 2
   values_width 24
   Query 17 iteration 1 took 774.3 ms and returned 1 rows
   null_width 2
   values_width 24
   Query 17 iteration 2 took 765.0 ms and returned 1 rows
   null_width 2
   values_width 24
   Query 17 iteration 3 took 772.8 ms and returned 1 rows
   null_width 2
   values_width 24
   Query 17 iteration 4 took 770.9 ms and returned 1 rows
   null_width 2
   values_width 24
   Query 17 iteration 5 took 765.2 ms and returned 1 rows
   null_width 2
   values_width 24
   Query 17 iteration 6 took 765.9 ms and returned 1 rows
   null_width 2
   values_width 24
   Query 17 iteration 7 took 760.3 ms and returned 1 rows
   null_width 2
   values_width 24
   Query 17 iteration 8 took 758.3 ms and returned 1 rows
   null_width 2
   values_width 24
   Query 17 iteration 9 took 763.8 ms and returned 1 rows
   Query 17 avg time: 771.46 ms
   ```
   
   Q18
   
   ```
   Query 18 iteration 4 took 768.1 ms and returned 57 rows
   null_width 2
   values_width 24
   null_width 2
   values_width 24
   Query 18 iteration 5 took 776.4 ms and returned 57 rows
   null_width 2
   values_width 24
   null_width 2
   values_width 24
   Query 18 iteration 6 took 767.0 ms and returned 57 rows
   null_width 2
   values_width 24
   null_width 2
   values_width 24
   Query 18 iteration 7 took 775.9 ms and returned 57 rows
   null_width 2
   values_width 24
   null_width 2
   values_width 24
   Query 18 iteration 8 took 773.7 ms and returned 57 rows
   null_width 2
   values_width 24
   null_width 2
   values_width 24
   Query 18 iteration 9 took 771.2 ms and returned 57 rows
   Query 18 avg time: 778.98 ms
   ```
   
   ```rust
   
   pub(crate) struct NonFixedSizeGroupState {
       /// Group data
       pub group_data: Vec<u8>,
       /// Accumulator data
       pub acc_data: Vec<u8>,
   }
   
   impl NonFixedSizeGroupState {
       #[inline(always)]
       fn group_data(&self) -> &[u8] {
           &self.group_data
       }
   
       #[inline(always)]
       fn agg_data(&self) -> &[u8] {
           &self.acc_data
       }
   }
   
   pub(crate) struct FixedSizeGroupState {
       /// Group data and Accumulator state data, stored sequentially
       pub group_states: Vec<u8>,
   }
   
   impl FixedSizeGroupState {
       #[inline(always)]
       fn group_data(&self, data_width: usize) -> &[u8] {
           &self.group_states[0..data_width]
       }
   
       #[inline(always)]
       fn agg_data(&self, data_width: usize) -> &[u8] {
           &self.group_states[data_width..]
       }
   }
   
   
   fn update_one_accumulator_with_native_value<T1>(
           &mut self,
           groups_addresses: &[usize],
           agg_input_array: &T1,
           acc_idx: usize,
           filter_bool_array: &[Option<&BooleanArray>],
           row_layout: Arc<RowLayout>,
       ) -> Result<()>
       where
           T1: ArrowArrayReader,
       {
           let acc = &self.row_accumulators[acc_idx];
           let filter_array = &filter_bool_array[acc_idx];
           let mut state_accessor = RowAccessor::new_from_layout(row_layout);
           if filter_array.is_none() && agg_input_array.null_count() == 0 {
               for idx in 0..groups_addresses.len() {
                   unsafe {
                       let group_state_ptr = &mut 
*(&self.aggr_state.group_states
                           [groups_addresses[idx]]
                           as *const FixedSizeGroupState
                           as *mut FixedSizeGroupState);
                       state_accessor.point_to(
                           0,
                           
group_state_ptr.group_states[self.data_part_width..].as_mut(),
                       );
                       acc.update_value::<T1::Item>(
                           Some(agg_input_array.value_at_unchecked(idx)),
                           &mut state_accessor,
                       );
                   }
               }
           } else {
               for idx in 0..groups_addresses.len() {
                   unsafe {
                       let group_state_ptr = &mut 
*(&self.aggr_state.group_states
                           [groups_addresses[idx]]
                           as *const FixedSizeGroupState
                           as *mut FixedSizeGroupState);
                       state_accessor.point_to(
                           0,
                           
group_state_ptr.group_states[self.data_part_width..].as_mut(),
                       );
                       let value = col_to_value(agg_input_array, filter_array, 
idx);
                       acc.update_value::<T1::Item>(value, &mut state_accessor);
                   }
               }
           }
   
           Ok(())
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to