alamb commented on code in PR #12996: URL: https://github.com/apache/datafusion/pull/12996#discussion_r1826322118
########## datafusion/physical-plan/src/aggregates/group_values/column.rs: ########## @@ -146,7 +550,7 @@ macro_rules! instantiate_primitive { }; } -impl GroupValues for GroupValuesColumn { +impl GroupValues for VectorizedGroupValuesColumn { Review Comment: I wonder if there is any reason to rename this ########## datafusion/physical-plan/src/aggregates/group_values/column.rs: ########## @@ -196,6 +600,328 @@ impl GroupValues for GroupValuesColumn { let b = ByteViewGroupValueBuilder::<BinaryViewType>::new(); v.push(Box::new(b) as _) } + dt => { + return not_impl_err!( + "{dt} not supported in VectorizedGroupValuesColumn" + ) + } + } + } + self.group_values = v; + } + + // tracks to which group each of the input rows belongs + groups.clear(); + groups.resize(n_rows, usize::MAX); + + let mut batch_hashes = mem::take(&mut self.hashes_buffer); Review Comment: 👍 ########## datafusion/physical-plan/src/aggregates/group_values/mod.rs: ########## @@ -143,8 +148,12 @@ pub fn new_group_values(schema: SchemaRef) -> Result<Box<dyn GroupValues>> { } } - if GroupValuesColumn::supported_schema(schema.as_ref()) { - Ok(Box::new(GroupValuesColumn::try_new(schema)?)) + if column::supported_schema(schema.as_ref()) { Review Comment: Can you explain here why GroupOrdering::None is required? Is it because the `VectorizedGroupValuesColumn` doesn't keep the groups in order? If that is the case, it seems like maybe `emit_n` would never be called 🤔 ########## datafusion/physical-plan/src/aggregates/group_values/column.rs: ########## @@ -35,29 +37,113 @@ use datafusion_common::{not_impl_err, DataFusionError, Result}; use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; use datafusion_expr::EmitTo; use datafusion_physical_expr::binary_map::OutputType; + use hashbrown::raw::RawTable; -use std::mem::size_of; -/// A [`GroupValues`] that stores multiple columns of group values. +const NON_INLINED_FLAG: u64 = 0x8000000000000000; +const VALUE_MASK: u64 = 0x7FFFFFFFFFFFFFFF; + +/// The view of indices pointing to the actual values in `GroupValues` /// +/// If only single `group index` represented by view, +/// value of view is just the `group index`, and we call it a `inlined view`. /// -pub struct GroupValuesColumn { +/// If multiple `group indices` represented by view, +/// value of view is the actually the index pointing to `group indices`, +/// and we call it `non-inlined view`. +/// +/// The view(a u64) format is like: +/// +---------------------+---------------------------------------------+ +/// | inlined flag(1bit) | group index / index to group indices(63bit) | +/// +---------------------+---------------------------------------------+ +/// +/// `inlined flag`: 1 represents `non-inlined`, and 0 represents `inlined` +/// +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct GroupIndexView(u64); + +impl GroupIndexView { + #[inline] + pub fn is_non_inlined(&self) -> bool { + (self.0 & NON_INLINED_FLAG) > 0 + } + + #[inline] + pub fn new_inlined(group_index: u64) -> Self { + Self(group_index) + } + + #[inline] + pub fn new_non_inlined(list_offset: u64) -> Self { + let non_inlined_value = list_offset | NON_INLINED_FLAG; + Self(non_inlined_value) + } + + #[inline] + pub fn value(&self) -> u64 { + self.0 & VALUE_MASK + } +} + +/// A [`GroupValues`] that stores multiple columns of group values, +/// and supports vectorized operators for them +/// +pub struct VectorizedGroupValuesColumn { /// The output schema schema: SchemaRef, /// Logically maps group values to a group_index in /// [`Self::group_values`] and in each accumulator /// - /// Uses the raw API of hashbrown to avoid actually storing the - /// keys (group values) in the table + /// It is a `hashtable` based on `hashbrown`. /// - /// keys: u64 hashes of the GroupValue - /// values: (hash, group_index) - map: RawTable<(u64, usize)>, + /// Key and value in the `hashtable`: + /// - The `key` is `hash value(u64)` of the `group value` + /// - The `value` is the `group values` with the same `hash value` + /// + /// We don't really store the actual `group values` in `hashtable`, + /// instead we store the `group indices` pointing to values in `GroupValues`. + /// And we use [`GroupIndexView`] to represent such `group indices` in table. + /// + /// + map: RawTable<(u64, GroupIndexView)>, /// The size of `map` in bytes map_size: usize, + /// The lists for group indices with the same hash value + /// + /// It is possible that hash value collision exists, + /// and we will chain the `group indices` with same hash value + /// + /// The chained indices is like: + /// `latest group index -> older group index -> even older group index -> ...` + /// + group_index_lists: Vec<Vec<usize>>, + + /// When emitting first n, we need to decrease/erase group indices in + /// `map` and `group_index_lists`. + /// + /// This buffer is used to temporarily store the remaining group indices in + /// a specific list in `group_index_lists`. + emit_group_index_list_buffer: Vec<usize>, + + /// Similar as `current_indices`, but `remaining_indices` + /// is used to store the rows will be processed in next round. + scalarized_indices: Vec<usize>, + + /// The `vectorized_equal_tod` row indices buffer Review Comment: Maybe we can rename these to "buffer" or something to make it clear they are temp processing space to avoid re-allocations rather than Something like ```rust buffer_equal_to_row_indices: Vec<usize>, ``` Or maybe we can even put all the scratch space into their own struct to make it clear ```rust struct ScratchSpace { vectorized_equal_to_row_indices: Vec<usize>, /// The `vectorized_equal_tod` group indices buffer vectorized_equal_to_group_indices: Vec<usize>, /// The `vectorized_equal_tod` result buffer vectorized_equal_to_results: Vec<bool>, /// The `vectorized append` row indices buffer vectorized_append_row_indices: Vec<usize>, } ``` Or something -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org