mingmwang commented on issue #4973:
URL:
https://github.com/apache/arrow-datafusion/issues/4973#issuecomment-1614211204
@alamb @tustvold @Dandandan
I did another POC based on the changes in #6657. The basic idea is to reduce
the memory size of `GroupState` and avoid using `Arc, Box, Vec, dyn Trait`,
etc. And the result is very exciting !!!
Compared to the main branch, there is about 50% improvement, and for the
high cardinality aggregation itself, the improvement is about 100%.
Test result:
Q17
```
Running benchmarks with the following options: DataFusionBenchmarkOpt {
query: Some(17), debug: false, iterations: 10, partitions: 1, batch_size: 8192,
path: "./parquet_data", file_format: "parquet", mem_table: false, output_path:
None, disable_statistics: false }
null_width 2
values_width 24
Query 17 iteration 0 took 818.2 ms and returned 1 rows
null_width 2
values_width 24
Query 17 iteration 1 took 774.3 ms and returned 1 rows
null_width 2
values_width 24
Query 17 iteration 2 took 765.0 ms and returned 1 rows
null_width 2
values_width 24
Query 17 iteration 3 took 772.8 ms and returned 1 rows
null_width 2
values_width 24
Query 17 iteration 4 took 770.9 ms and returned 1 rows
null_width 2
values_width 24
Query 17 iteration 5 took 765.2 ms and returned 1 rows
null_width 2
values_width 24
Query 17 iteration 6 took 765.9 ms and returned 1 rows
null_width 2
values_width 24
Query 17 iteration 7 took 760.3 ms and returned 1 rows
null_width 2
values_width 24
Query 17 iteration 8 took 758.3 ms and returned 1 rows
null_width 2
values_width 24
Query 17 iteration 9 took 763.8 ms and returned 1 rows
Query 17 avg time: 771.46 ms
```
Q18
```
Query 18 iteration 4 took 768.1 ms and returned 57 rows
null_width 2
values_width 24
null_width 2
values_width 24
Query 18 iteration 5 took 776.4 ms and returned 57 rows
null_width 2
values_width 24
null_width 2
values_width 24
Query 18 iteration 6 took 767.0 ms and returned 57 rows
null_width 2
values_width 24
null_width 2
values_width 24
Query 18 iteration 7 took 775.9 ms and returned 57 rows
null_width 2
values_width 24
null_width 2
values_width 24
Query 18 iteration 8 took 773.7 ms and returned 57 rows
null_width 2
values_width 24
null_width 2
values_width 24
Query 18 iteration 9 took 771.2 ms and returned 57 rows
Query 18 avg time: 778.98 ms
```
```rust
pub(crate) struct NonFixedSizeGroupState {
/// Group data
pub group_data: Vec<u8>,
/// Accumulator data
pub acc_data: Vec<u8>,
}
impl NonFixedSizeGroupState {
#[inline(always)]
fn group_data(&self) -> &[u8] {
&self.group_data
}
#[inline(always)]
fn agg_data(&self) -> &[u8] {
&self.acc_data
}
}
pub(crate) struct FixedSizeGroupState {
/// Group data and Accumulator state data, stored sequentially
pub group_states: Vec<u8>,
}
impl FixedSizeGroupState {
#[inline(always)]
fn group_data(&self, data_width: usize) -> &[u8] {
&self.group_states[0..data_width]
}
#[inline(always)]
fn agg_data(&self, data_width: usize) -> &[u8] {
&self.group_states[data_width..]
}
}
fn update_one_accumulator_with_native_value<T1>(
&mut self,
groups_addresses: &[usize],
agg_input_array: &T1,
acc_idx: usize,
filter_bool_array: &[Option<&BooleanArray>],
row_layout: Arc<RowLayout>,
) -> Result<()>
where
T1: ArrowArrayReader,
{
let acc = &self.row_accumulators[acc_idx];
let filter_array = &filter_bool_array[acc_idx];
let mut state_accessor = RowAccessor::new_from_layout(row_layout);
if filter_array.is_none() && agg_input_array.null_count() == 0 {
for idx in 0..groups_addresses.len() {
unsafe {
let group_state_ptr = &mut
*(&self.aggr_state.group_states
[groups_addresses[idx]]
as *const FixedSizeGroupState
as *mut FixedSizeGroupState);
state_accessor.point_to(
0,
group_state_ptr.group_states[self.data_part_width..].as_mut(),
);
acc.update_value::<T1::Item>(
Some(agg_input_array.value_at_unchecked(idx)),
&mut state_accessor,
);
}
}
} else {
for idx in 0..groups_addresses.len() {
unsafe {
let group_state_ptr = &mut
*(&self.aggr_state.group_states
[groups_addresses[idx]]
as *const FixedSizeGroupState
as *mut FixedSizeGroupState);
state_accessor.point_to(
0,
group_state_ptr.group_states[self.data_part_width..].as_mut(),
);
let value = col_to_value(agg_input_array, filter_array,
idx);
acc.update_value::<T1::Item>(value, &mut state_accessor);
}
}
}
Ok(())
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]