ic4y commented on issue #956:
URL:
https://github.com/apache/arrow-datafusion/issues/956#issuecomment-993785271
@Dandandan Thank you very much for your suggestions
1. Canceling the `make slice` solution can indeed bring a certain
performance improvement, but I tested on arrow2 and found that the performance
of `array.slice` has been greatly improved. So `slice` performance is not a
problem after using arrow2.
2. For `eq_array` I tried hashbrown's `get_each_mut` method, but the
performance did not change (through the analysis of PPROF, I found that
`eq_array` still takes a lot of time). The code show as follows
```rust
create_hashes(&group_values, random_state, &mut batch_hashes)?;
let mut absent_rows = vec![];
const CHUNK_SIZE: usize = 16;
let (chunks, remainder) = batch_hashes.as_chunks::<CHUNK_SIZE>();
for (num, chunk) in chunks.into_iter().enumerate() {
let Accumulators { map, group_states } = &mut accumulators;
let result = map.get_each_mut(*chunk, |index, (hash, group_idx)| {
if(batch_hashes[num*CHUNK_SIZE + index] != *hash){
false
}else {
let group_state = &group_states[*group_idx];
group_values
.iter()
.zip(group_state.group_by_values.iter())
.all(|(array, scalar)| scalar.eq_array(array, index +
num * CHUNK_SIZE))
}
});
for (i, re) in result.iter().enumerate() {
let row = num*CHUNK_SIZE+i;
match re {
Ok((hash, group_idx)) => {
let group_state = &mut group_states[*group_idx];
// 1.3
if group_state.indices.is_empty() {
groups_with_rows.push(*group_idx);
};
group_state.indices.push(row as u32); // remember this
row
}
Err(unavailable) => { match unavailable {
UnavailableMutError::Absent => {
absent_rows.push(row);
}
UnavailableMutError::Duplicate(index) => {
let (hash, group_idx) =
&result.get(*index).unwrap().as_ref().unwrap();
let group_state = &mut group_states[*group_idx];
// 1.3
if group_state.indices.is_empty() {
groups_with_rows.push(*group_idx);
};
group_state.indices.push(row as u32); // remember
this row
}
} }
}
}
}
//absent rows
for (i, row) in absent_rows.into_iter().enumerate() {
let hash= batch_hashes[row];
let Accumulators { map, group_states } = &mut accumulators;
let entry = map.get_mut(hash, |(_hash, group_idx)| {
let group_state = &group_states[*group_idx];
if (*_hash != hash){
false
}else{
group_values
.iter()
.zip(group_state.group_by_values.iter())
.all(|(array, scalar)| scalar.eq_array(array, row))
}
//*_hash == hash
});
match entry {
// Existing entry for this group value
Some((_hash, group_idx)) => {
let group_state = &mut group_states[*group_idx];
// 1.3
if group_state.indices.is_empty() {
groups_with_rows.push(*group_idx);
};
group_state.indices.push(row as u32); // remember this row
}
// 1.2 Need to create new entry
None => {
let accumulator_set = create_accumulators(aggr_expr)
.map_err(DataFusionError::into_arrow_external_error)?;
// Copy group values out of arrays into `ScalarValue`s
let group_by_values = group_values
.iter()
.map(|col| ScalarValue::try_from_array(col, row))
.collect::<Result<Vec<_>>>()?;
// Add new entry to group_states and save newly created index
let group_state = GroupState {
group_by_values: group_by_values.into_boxed_slice(),
accumulator_set,
indices: vec![row as u32], // 1.3
};
let group_idx = group_states.len();
group_states.push(group_state);
groups_with_rows.push(group_idx);
// for hasher function, use precomputed hash value
map.insert(hash, (hash, group_idx), |(hash, _group_idx)|
*hash);
}
};
}
//remainder rows
for (i, &hash) in remainder.into_iter().enumerate() {
let row = i+CHUNK_SIZE*chunks.len();
let Accumulators { map, group_states } = &mut accumulators;
let entry = map.get_mut(hash, |(_hash, group_idx)| {
let group_state = &group_states[*group_idx];
group_values
.iter()
.zip(group_state.group_by_values.iter())
.all(|(array, scalar)| scalar.eq_array(array, row))
});
match entry {
// Existing entry for this group value
Some((_hash, group_idx)) => {
let group_state = &mut group_states[*group_idx];
// 1.3
if group_state.indices.is_empty() {
groups_with_rows.push(*group_idx);
};
group_state.indices.push(row as u32); // remember this row
}
// 1.2 Need to create new entry
None => {
let accumulator_set = create_accumulators(aggr_expr)
.map_err(DataFusionError::into_arrow_external_error)?;
// Copy group values out of arrays into `ScalarValue`s
let group_by_values = group_values
.iter()
.map(|col| ScalarValue::try_from_array(col, row))
.collect::<Result<Vec<_>>>()?;
// Add new entry to group_states and save newly created index
let group_state = GroupState {
group_by_values: group_by_values.into_boxed_slice(),
accumulator_set,
indices: vec![row as u32], // 1.3
};
let group_idx = group_states.len();
group_states.push(group_state);
groups_with_rows.push(group_idx);
// for hasher function, use precomputed hash value
map.insert(hash, (hash, group_idx), |(hash, _group_idx)|
*hash);
}
};
}
```
3. I separated the `get_mut` and `eq_array` methods by useing hashbrown's
iter_hash to process them in two loops for making the `eq_array` vectorized
execution (but there is no implement vectorized execution yet. I am not very
familiar with rust's vectorized execution). the following code does separated
most of `get_mut` and `eq_array`, but how can we vectorize execution an
`eq_array` in a loop?
```rust
create_hashes(&group_values, random_state, &mut batch_hashes)?;
let map = &mut accumulators.map;
let mut allid = Vec::with_capacity(batch_hashes.len());
unsafe {
for (row, &hash) in batch_hashes.iter().enumerate() {
let hash1 = map.iter_hash(hash);
allid.push(hash1.map(|bucket|{
let (_hash, group_idx) = bucket.as_ref();
*group_idx
}).collect::<Vec<_>>())
}
}
let mut gids = Vec::with_capacity(batch_hashes.len());
for (row, iterHash) in allid.iter().enumerate(){
let group_states = &mut accumulators.group_states;
let mut gid = None;
iterHash.into_iter().for_each(|group_idx| {
let group_state = &group_states[*group_idx];
if group_values
.iter()
.zip(group_state.group_by_values.iter())
.all(|(array, scalar)| scalar.eq_array(array, row))
{
gid = Some(*group_idx);
}
});
gids.push(gid);
}
for (row, op_group_idx) in gids.into_iter().enumerate() {
let Accumulators { map, group_states } = &mut accumulators;
let hash = batch_hashes[row];
match op_group_idx {
None => {
let entry = map.get_mut(hash, |(_hash, group_idx)| {
let group_state = &group_states[*group_idx];
group_values
.iter()
.zip(group_state.group_by_values.iter())
.all(|(array, scalar)| scalar.eq_array(array, row))
});
match entry {
// Existing entry for this group value
Some((_hash, group_idx)) => {
let group_state = &mut group_states[*group_idx];
// 1.3
if group_state.indices.is_empty() {
groups_with_rows.push(*group_idx);
};
group_state.indices.push(row as u32); // remember
this row
}
// 1.2 Need to create new entry
None => {
let accumulator_set = create_accumulators(aggr_expr)
.map_err(DataFusionError::into_arrow_external_error)?;
// Copy group values out of arrays into
`ScalarValue`s
let group_by_values = group_values
.iter()
.map(|col| ScalarValue::try_from_array(col, row))
.collect::<Result<Vec<_>>>()?;
// Add new entry to group_states and save newly
created index
let group_state = GroupState {
group_by_values:
group_by_values.into_boxed_slice(),
accumulator_set,
indices: vec![row as u32], // 1.3
};
let group_idx = group_states.len();
group_states.push(group_state);
groups_with_rows.push(group_idx);
// for hasher function, use precomputed hash value
map.insert(hash, (hash, group_idx), |(hash,
_group_idx)| *hash);
}
};
}
Some(group_idx) => {
let group_state = &mut group_states[group_idx];
// 1.3
if group_state.indices.is_empty() {
groups_with_rows.push(group_idx);
};
group_state.indices.push(row as u32); // remember this row
}
}
}
```
I really want to solve the performance problem of high cardinality
aggregation, which has troubled me for a long time. Is there any problem with
the above code? How can we further optimize? thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]