ic4y commented on issue #956:
URL:
https://github.com/apache/arrow-datafusion/issues/956#issuecomment-986936253
@Dandandan
I am currently working out ways to solve the performance problem of high
cardinality aggregation. Follow your method and tested it. I found that there
is a certain performance improvement, but not ideal enough, only improved by
about 10% under high base aggregation (I think it needs several times
performance improvement likes doris and trino's performance under the high
cardinality aggregation #1246). Do you have any better advice or other
optimizations for these codes.
The relevant code segments are as follows.
Accumulators:
```rust
struct Accumulators {
/// Logically maps group values to an index in `group_states`
///
/// Uses the raw API of hashbrown to avoid actually storing the
/// keys in the table
///
/// keys: u64 hashes of the GroupValue
/// values: (hash, index into `group_states`)
map: RawTable<(u64, usize)>,
// Accumulator state, keeps state of each group state
accumulator_items: Vec<AccumulatorItem>,
//group_states: Vec<GroupState>,
group_by_values: Vec<Vec<ScalarValue>>,
group_indices : Vec<Vec<u32>>,
}
```
group_aggregate_batch:
```rust
fn group_aggregate_batch(
mode: &AggregateMode,
random_state: &RandomState,
group_expr: &[Arc<dyn PhysicalExpr>],
aggr_expr: &[Arc<dyn AggregateExpr>],
batch: RecordBatch,
mut accumulators: Accumulators,
aggregate_expressions: &[Vec<Arc<dyn PhysicalExpr>>],
) -> Result<Accumulators> {
// evaluate the grouping expressions
let group_values = evaluate(group_expr, &batch)?;
// evaluate the aggregation expressions.
// We could evaluate them after the `take`, but since we need to
evaluate all
// of them anyways, it is more performant to do it while they are
together.
let aggr_input_values = evaluate_many(aggregate_expressions, &batch)?;
// 1.1 construct the key from the group values
// 1.2 construct the mapping key if it does not exist
// 1.3 add the row' index to `indices`
// track which entries in `accumulators` have rows in this batch to
aggregate
let mut groups_with_rows = vec![];
// 1.1 Calculate the group keys for the group values
let mut batch_hashes = vec![0; batch.num_rows()];
create_hashes(&group_values, random_state, &mut batch_hashes)?;
for (row, hash) in batch_hashes.into_iter().enumerate() {
let Accumulators { map, accumulator_items,group_by_values,
group_indices } = &mut accumulators;
let entry = map.get_mut(hash, |(_hash, group_idx)| {
let group_state_c = &group_by_values[*group_idx];
group_values
.iter()
.zip(group_state_c.iter())
.all(|(array, scalar)| scalar.eq_array(array, row))
});
match entry {
// Existing entry for this group value
Some((_hash, group_idx)) => {
let indices = &mut group_indices[*group_idx];
// 1.3
if indices.is_empty() {
groups_with_rows.push(*group_idx);
};
indices.push(row as u32); // remember this row
}
// 1.2 Need to create new entry
None => {
// Copy group values out of arrays into `ScalarValue`s
let col_group_by_values = group_values
.iter()
.map(|col| ScalarValue::try_from_array(col, row))
.collect::<Result<Vec<_>>>()?;
let group_idx = group_by_values.len();
group_by_values.push(col_group_by_values);
//TODO 这个地方需要给每个agg的状态初始化
accumulator_items[0].init_state(group_idx);
groups_with_rows.push(group_idx);
group_indices.push(vec![row as u32]);
// for hasher function, use precomputed hash value
map.insert(hash, (hash, group_idx), |(hash, _group_idx)|
*hash);
}
};
}
// Collect all indices + offsets based on keys in this vec
let mut batch_indices_cc: UInt32Builder = UInt32Builder::new(0);
let mut offsets = vec![0];
let mut offset_so_far = 0;
for group_idx in groups_with_rows.iter() {
let indices = &accumulators.group_indices[*group_idx];
batch_indices_cc.append_slice(indices)?;
offset_so_far += indices.len();
offsets.push(offset_so_far);
}
let batch_indices = batch_indices_cc.finish();
// `Take` all values based on indices into Arrays
let values: Vec<Vec<Arc<dyn Array>>> = aggr_input_values
.iter()
.map(|array| {
array
.iter()
.map(|array| {
compute::take(
array.as_ref(),
&batch_indices,
None, // None: no index check
)
.unwrap()
})
.collect()
// 2.3
})
.collect();
// 2.1 for each key in this batch
// 2.2 for each aggregation
// 2.3 `slice` from each of its arrays the keys' values
// 2.4 update / merge the accumulator with the values
// 2.5 clear indices
groups_with_rows.iter()
.zip(offsets.windows(2))
.try_for_each(|(group_idx, offsets)| {
accumulators.group_indices[*group_idx].clear();
accumulators.accumulator_items.iter_mut()
.zip(values.iter())
.try_for_each(|(accumulator, aggr_array)| {
let values = aggr_array
.iter()
.map(|array| {
array.slice(offsets[0], offsets[1] - offsets[0])
})
.collect::<Vec<ArrayRef>>();
match mode {
AggregateMode::Partial =>
accumulator.update_batch(*group_idx, &values),
AggregateMode::FinalPartitioned |
AggregateMode::Final => {
accumulator.merge_batch(*group_idx, &values)
}
}
})
});
Ok(accumulators)
}
```
Count:
```rust
pub struct CountAccumulatorFly {
count: Vec<u64>,
}
impl CountAccumulatorFly {
/// new count accumulator
pub fn new() -> Self {
Self { count: vec![] }
}
}
impl Accumulator for CountAccumulatorFly {
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let array = &values[0];
self.count[0] += (array.len() - array.data().null_count()) as u64;
Ok(())
}
fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
let value = &values[0];
if !value.is_null() {
self.count[0] += 1;
}
Ok(())
}
fn merge(&mut self, states: &[ScalarValue]) -> Result<()> {
let count = &states[0];
if let ScalarValue::UInt64(Some(delta)) = count {
self.count[0] += *delta;
} else {
unreachable!()
}
Ok(())
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
let counts =
states[0].as_any().downcast_ref::<UInt64Array>().unwrap();
let delta = &compute::sum(counts);
if let Some(d) = delta {
self.count[0] += *d;
}
Ok(())
}
fn state(&self) -> Result<Vec<ScalarValue>> {
Ok(vec![ScalarValue::UInt64(Some(self.count[0]))])
}
fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::UInt64(Some(self.count[0])))
}
}
impl AccumulatorFly for CountAccumulatorFly {
fn init_state(&mut self, index: usize) {
assert_eq!(self.count.len(), index);
self.count.push(0);
}
fn update_batch(&mut self, index: usize, values: &[ArrayRef]) ->
Result<()> {
let array = &values[0];
self.count[index] += (array.len() - array.data().null_count()) as
u64;
Ok(())
}
fn update(&mut self, index: usize, values: &[ScalarValue]) -> Result<()>
{
let value = &values[0];
if !value.is_null() {
self.count[index] += 1;
}
Ok(())
}
fn merge(&mut self, index: usize, states: &[ScalarValue]) -> Result<()> {
let count = &states[0];
if let ScalarValue::UInt64(Some(delta)) = count {
self.count[index] += *delta;
} else {
unreachable!()
}
Ok(())
}
fn merge_batch(&mut self, index: usize, states: &[ArrayRef]) ->
Result<()> {
let counts =
states[0].as_any().downcast_ref::<UInt64Array>().unwrap();
let delta = &compute::sum(counts);
if let Some(d) = delta {
self.count[index] += *d;
}
Ok(())
}
fn state(&self, index: usize) -> Result<Vec<ScalarValue>> {
Ok(vec![ScalarValue::UInt64(Some(self.count[index]))])
}
fn evaluate(&self, index: usize) -> Result<ScalarValue> {
Ok(ScalarValue::UInt64(Some(self.count[index])))
}
fn evaluate_all(&self) -> Result<ArrayRef> {
let result = ScalarValue::iter_to_array(
self.count.iter().map(|x| {
ScalarValue::UInt64(Some(*x))
}),
);
result
}
fn state_all(&self) -> Result<Vec<Vec<ScalarValue>>> {
let dt = Local::now();
let result = Ok(vec![self.count.iter().map(|x| {
ScalarValue::UInt64(Some(*x))
}).collect()]);
println!(
"state_all usage millis: {}",
Local::now().timestamp_millis() - dt.timestamp_millis()
);
result
}
}
```
more code:
https://github.com/ic4y/arrow-datafusion/blob/9d26b797f2e1565f7f65048ef1fc2ad2940d8f95/datafusion/src/physical_plan/hash_aggregate_fly.rs#L578-L596
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]