This is an automated email from the ASF dual-hosted git repository.
mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new dd242b9655 refactor: change some `hashbrown` `RawTable` uses to
`HashTable` (round 2) (#13524)
dd242b9655 is described below
commit dd242b96556b0dd4d8adb7186cf32114c5cc53ed
Author: Marco Neumann <[email protected]>
AuthorDate: Wed Dec 4 16:28:22 2024 +0100
refactor: change some `hashbrown` `RawTable` uses to `HashTable` (round 2)
(#13524)
* refactor: migrate `GroupValuesRows` to `HashTable`
For #13433.
* refactor: migrate `GroupValuesPrimitive` to `HashTable`
For #13433.
* refactor: migrate `GroupValuesColumn` to `HashTable`
For #13433.
---
.../aggregates/group_values/multi_group_by/mod.rs | 110 ++++++++++-----------
.../src/aggregates/group_values/row.rs | 30 +++---
.../group_values/single_group_by/primitive.rs | 45 ++++-----
3 files changed, 89 insertions(+), 96 deletions(-)
diff --git
a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
index 034fb86d06..540f9c3c64 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
@@ -42,11 +42,11 @@ use arrow_array::{Array, ArrayRef};
use arrow_schema::{DataType, Schema, SchemaRef, TimeUnit};
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::{not_impl_err, DataFusionError, Result};
-use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
+use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt};
use datafusion_expr::EmitTo;
use datafusion_physical_expr::binary_map::OutputType;
-use hashbrown::raw::RawTable;
+use hashbrown::hash_table::HashTable;
const NON_INLINED_FLAG: u64 = 0x8000000000000000;
const VALUE_MASK: u64 = 0x7FFFFFFFFFFFFFFF;
@@ -180,7 +180,7 @@ pub struct GroupValuesColumn<const STREAMING: bool> {
/// And we use [`GroupIndexView`] to represent such `group indices` in
table.
///
///
- map: RawTable<(u64, GroupIndexView)>,
+ map: HashTable<(u64, GroupIndexView)>,
/// The size of `map` in bytes
map_size: usize,
@@ -261,7 +261,7 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
/// Create a new instance of GroupValuesColumn if supported for the
specified schema
pub fn try_new(schema: SchemaRef) -> Result<Self> {
- let map = RawTable::with_capacity(0);
+ let map = HashTable::with_capacity(0);
Ok(Self {
schema,
map,
@@ -338,7 +338,7 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
for (row, &target_hash) in batch_hashes.iter().enumerate() {
let entry = self
.map
- .get_mut(target_hash, |(exist_hash, group_idx_view)| {
+ .find_mut(target_hash, |(exist_hash, group_idx_view)| {
// It is ensured to be inlined in `scalarized_intern`
debug_assert!(!group_idx_view.is_non_inlined());
@@ -506,7 +506,7 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
for (row, &target_hash) in batch_hashes.iter().enumerate() {
let entry = self
.map
- .get(target_hash, |(exist_hash, _)| target_hash ==
*exist_hash);
+ .find(target_hash, |(exist_hash, _)| target_hash ==
*exist_hash);
let Some((_, group_index_view)) = entry else {
// 1. Bucket not found case
@@ -733,7 +733,7 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
for &row in &self.vectorized_operation_buffers.remaining_row_indices {
let target_hash = batch_hashes[row];
- let entry = map.get_mut(target_hash, |(exist_hash, _)| {
+ let entry = map.find_mut(target_hash, |(exist_hash, _)| {
// Somewhat surprisingly, this closure can be called even if
the
// hash doesn't match, so check the hash first with an integer
// comparison first avoid the more expensive comparison with
@@ -852,7 +852,7 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
/// Return group indices of the hash, also if its `group_index_view` is
non-inlined
#[cfg(test)]
fn get_indices_by_hash(&self, hash: u64) -> Option<(Vec<usize>,
GroupIndexView)> {
- let entry = self.map.get(hash, |(exist_hash, _)| hash == *exist_hash);
+ let entry = self.map.find(hash, |(exist_hash, _)| hash == *exist_hash);
match entry {
Some((_, group_index_view)) => {
@@ -1091,67 +1091,63 @@ impl<const STREAMING: bool> GroupValues for
GroupValuesColumn<STREAMING> {
.collect::<Vec<_>>();
let mut next_new_list_offset = 0;
- // SAFETY: self.map outlives iterator and is not modified
concurrently
- unsafe {
- for bucket in self.map.iter() {
- // In non-streaming case, we need to check if the
`group index view`
- // is `inlined` or `non-inlined`
- if !STREAMING && bucket.as_ref().1.is_non_inlined() {
- // Non-inlined case
- // We take `group_index_list` from
`old_group_index_lists`
-
- // list_offset is incrementally
- self.emit_group_index_list_buffer.clear();
- let list_offset = bucket.as_ref().1.value() as
usize;
- for group_index in
self.group_index_lists[list_offset].iter()
- {
- if let Some(remaining) =
group_index.checked_sub(n) {
-
self.emit_group_index_list_buffer.push(remaining);
- }
+ self.map.retain(|(_exist_hash, group_idx_view)| {
+ // In non-streaming case, we need to check if the `group
index view`
+ // is `inlined` or `non-inlined`
+ if !STREAMING && group_idx_view.is_non_inlined() {
+ // Non-inlined case
+ // We take `group_index_list` from
`old_group_index_lists`
+
+ // list_offset is incrementally
+ self.emit_group_index_list_buffer.clear();
+ let list_offset = group_idx_view.value() as usize;
+ for group_index in
self.group_index_lists[list_offset].iter() {
+ if let Some(remaining) =
group_index.checked_sub(n) {
+
self.emit_group_index_list_buffer.push(remaining);
}
-
- // The possible results:
- // - `new_group_index_list` is empty, we should
erase this bucket
- // - only one value in `new_group_index_list`,
switch the `view` to `inlined`
- // - still multiple values in
`new_group_index_list`, build and set the new `unlined view`
- if self.emit_group_index_list_buffer.is_empty() {
- self.map.erase(bucket);
- } else if self.emit_group_index_list_buffer.len()
== 1 {
- let group_index =
-
self.emit_group_index_list_buffer.first().unwrap();
- bucket.as_mut().1 =
- GroupIndexView::new_inlined(*group_index
as u64);
- } else {
- let group_index_list =
- &mut
self.group_index_lists[next_new_list_offset];
- group_index_list.clear();
- group_index_list
-
.extend(self.emit_group_index_list_buffer.iter());
- bucket.as_mut().1 =
GroupIndexView::new_non_inlined(
- next_new_list_offset as u64,
- );
- next_new_list_offset += 1;
- }
-
- continue;
}
+ // The possible results:
+ // - `new_group_index_list` is empty, we should
erase this bucket
+ // - only one value in `new_group_index_list`,
switch the `view` to `inlined`
+ // - still multiple values in
`new_group_index_list`, build and set the new `unlined view`
+ if self.emit_group_index_list_buffer.is_empty() {
+ false
+ } else if self.emit_group_index_list_buffer.len() == 1
{
+ let group_index =
+
self.emit_group_index_list_buffer.first().unwrap();
+ *group_idx_view =
+ GroupIndexView::new_inlined(*group_index as
u64);
+ true
+ } else {
+ let group_index_list =
+ &mut
self.group_index_lists[next_new_list_offset];
+ group_index_list.clear();
+ group_index_list
+
.extend(self.emit_group_index_list_buffer.iter());
+ *group_idx_view = GroupIndexView::new_non_inlined(
+ next_new_list_offset as u64,
+ );
+ next_new_list_offset += 1;
+ true
+ }
+ } else {
// In `streaming case`, the `group index view` is
ensured to be `inlined`
- debug_assert!(!bucket.as_ref().1.is_non_inlined());
+ debug_assert!(!group_idx_view.is_non_inlined());
// Inlined case, we just decrement group index by n)
- let group_index = bucket.as_ref().1.value() as usize;
+ let group_index = group_idx_view.value() as usize;
match group_index.checked_sub(n) {
// Group index was >= n, shift value down
Some(sub) => {
- bucket.as_mut().1 =
- GroupIndexView::new_inlined(sub as u64)
+ *group_idx_view =
GroupIndexView::new_inlined(sub as u64);
+ true
}
// Group index was < n, so remove from table
- None => self.map.erase(bucket),
+ None => false,
}
}
- }
+ });
if !STREAMING {
self.group_index_lists.truncate(next_new_list_offset);
@@ -1243,7 +1239,7 @@ mod tests {
use arrow::{compute::concat_batches, util::pretty::pretty_format_batches};
use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray,
StringViewArray};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
- use datafusion_common::utils::proxy::RawTableAllocExt;
+ use datafusion_common::utils::proxy::HashTableAllocExt;
use datafusion_expr::EmitTo;
use crate::aggregates::group_values::{
diff --git a/datafusion/physical-plan/src/aggregates/group_values/row.rs
b/datafusion/physical-plan/src/aggregates/group_values/row.rs
index 8e0f0a3d65..edc3f909bb 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/row.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/row.rs
@@ -24,9 +24,9 @@ use arrow_array::{Array, ArrayRef, ListArray, StructArray};
use arrow_schema::{DataType, SchemaRef};
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::Result;
-use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
+use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt};
use datafusion_expr::EmitTo;
-use hashbrown::raw::RawTable;
+use hashbrown::hash_table::HashTable;
use log::debug;
use std::mem::size_of;
use std::sync::Arc;
@@ -54,7 +54,7 @@ pub struct GroupValuesRows {
///
/// keys: u64 hashes of the GroupValue
/// values: (hash, group_index)
- map: RawTable<(u64, usize)>,
+ map: HashTable<(u64, usize)>,
/// The size of `map` in bytes
map_size: usize,
@@ -92,7 +92,7 @@ impl GroupValuesRows {
.collect(),
)?;
- let map = RawTable::with_capacity(0);
+ let map = HashTable::with_capacity(0);
let starting_rows_capacity = 1000;
@@ -135,7 +135,7 @@ impl GroupValues for GroupValuesRows {
create_hashes(cols, &self.random_state, batch_hashes)?;
for (row, &target_hash) in batch_hashes.iter().enumerate() {
- let entry = self.map.get_mut(target_hash, |(exist_hash,
group_idx)| {
+ let entry = self.map.find_mut(target_hash, |(exist_hash,
group_idx)| {
// Somewhat surprisingly, this closure can be called even if
the
// hash doesn't match, so check the hash first with an integer
// comparison first avoid the more expensive comparison with
@@ -216,18 +216,18 @@ impl GroupValues for GroupValuesRows {
}
std::mem::swap(&mut new_group_values, &mut group_values);
- // SAFETY: self.map outlives iterator and is not modified
concurrently
- unsafe {
- for bucket in self.map.iter() {
- // Decrement group index by n
- match bucket.as_ref().1.checked_sub(n) {
- // Group index was >= n, shift value down
- Some(sub) => bucket.as_mut().1 = sub,
- // Group index was < n, so remove from table
- None => self.map.erase(bucket),
+ self.map.retain(|(_exists_hash, group_idx)| {
+ // Decrement group index by n
+ match group_idx.checked_sub(n) {
+ // Group index was >= n, shift value down
+ Some(sub) => {
+ *group_idx = sub;
+ true
}
+ // Group index was < n, so remove from table
+ None => false,
}
- }
+ });
output
}
};
diff --git
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs
index 85cd2e79b9..6b69c00bca 100644
---
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs
+++
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs
@@ -29,7 +29,7 @@ use datafusion_common::Result;
use datafusion_execution::memory_pool::proxy::VecAllocExt;
use datafusion_expr::EmitTo;
use half::f16;
-use hashbrown::raw::RawTable;
+use hashbrown::hash_table::HashTable;
use std::mem::size_of;
use std::sync::Arc;
@@ -86,7 +86,7 @@ pub struct GroupValuesPrimitive<T: ArrowPrimitiveType> {
///
/// We don't store the hashes as hashing fixed width primitives
/// is fast enough for this not to benefit performance
- map: RawTable<usize>,
+ map: HashTable<usize>,
/// The group index of the null value if any
null_group: Option<usize>,
/// The values for each group index
@@ -100,7 +100,7 @@ impl<T: ArrowPrimitiveType> GroupValuesPrimitive<T> {
assert!(PrimitiveArray::<T>::is_compatible(&data_type));
Self {
data_type,
- map: RawTable::with_capacity(128),
+ map: HashTable::with_capacity(128),
values: Vec::with_capacity(128),
null_group: None,
random_state: Default::default(),
@@ -126,22 +126,19 @@ where
Some(key) => {
let state = &self.random_state;
let hash = key.hash(state);
- let insert = self.map.find_or_find_insert_slot(
+ let insert = self.map.entry(
hash,
|g| unsafe { self.values.get_unchecked(*g).is_eq(key)
},
|g| unsafe { self.values.get_unchecked(*g).hash(state)
},
);
- // SAFETY: No mutation occurred since
find_or_find_insert_slot
- unsafe {
- match insert {
- Ok(v) => *v.as_ref(),
- Err(slot) => {
- let g = self.values.len();
- self.map.insert_in_slot(hash, slot, g);
- self.values.push(key);
- g
- }
+ match insert {
+ hashbrown::hash_table::Entry::Occupied(o) => *o.get(),
+ hashbrown::hash_table::Entry::Vacant(v) => {
+ let g = self.values.len();
+ v.insert(g);
+ self.values.push(key);
+ g
}
}
}
@@ -183,18 +180,18 @@ where
build_primitive(std::mem::take(&mut self.values),
self.null_group.take())
}
EmitTo::First(n) => {
- // SAFETY: self.map outlives iterator and is not modified
concurrently
- unsafe {
- for bucket in self.map.iter() {
- // Decrement group index by n
- match bucket.as_ref().checked_sub(n) {
- // Group index was >= n, shift value down
- Some(sub) => *bucket.as_mut() = sub,
- // Group index was < n, so remove from table
- None => self.map.erase(bucket),
+ self.map.retain(|group_idx| {
+ // Decrement group index by n
+ match group_idx.checked_sub(n) {
+ // Group index was >= n, shift value down
+ Some(sub) => {
+ *group_idx = sub;
+ true
}
+ // Group index was < n, so remove from table
+ None => false,
}
- }
+ });
let null_group = match &mut self.null_group {
Some(v) if *v >= n => {
*v -= n;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]