alamb commented on code in PR #15591: URL: https://github.com/apache/datafusion/pull/15591#discussion_r2072019575
########## datafusion/common/src/config.rs: ########## @@ -405,6 +405,18 @@ config_namespace! { /// in joins can reduce memory usage when joining large /// tables with a highly-selective join filter, but is also slightly slower. pub enforce_batch_size_in_joins: bool, default = false + Review Comment: Do you expect users will ever disable this feature? Or does this setting exist as an "escape" valve in case we hit a problem with the new behavior and want to go back? ########## benchmarks/queries/clickbench/extended.sql: ########## @@ -5,3 +5,4 @@ SELECT "SocialSourceNetworkID", "RegionID", COUNT(*), AVG("Age"), AVG("ParamPric SELECT "ClientIP", "WatchID", COUNT(*) c, MIN("ResponseStartTiming") tmin, MEDIAN("ResponseStartTiming") tmed, MAX("ResponseStartTiming") tmax FROM hits WHERE "JavaEnable" = 0 GROUP BY "ClientIP", "WatchID" HAVING c > 1 ORDER BY tmed DESC LIMIT 10; SELECT "ClientIP", "WatchID", COUNT(*) c, MIN("ResponseStartTiming") tmin, APPROX_PERCENTILE_CONT("ResponseStartTiming", 0.95) tp95, MAX("ResponseStartTiming") tmax FROM 'hits' WHERE "JavaEnable" = 0 GROUP BY "ClientIP", "WatchID" HAVING c > 1 ORDER BY tp95 DESC LIMIT 10; SELECT COUNT(*) AS ShareCount FROM hits WHERE "IsMobile" = 1 AND "MobilePhoneModel" LIKE 'iPhone%' AND "SocialAction" = 'share' AND "SocialSourceNetworkID" IN (5, 12) AND "ClientTimeZone" BETWEEN -5 AND 5 AND regexp_match("Referer", '\/campaign\/(spring|summer)_promo') IS NOT NULL AND CASE WHEN split_part(split_part("URL", 'resolution=', 2), '&', 1) ~ '^\d+$' THEN split_part(split_part("URL", 'resolution=', 2), '&', 1)::INT ELSE 0 END > 1920 AND levenshtein(CAST("UTMSource" AS STRING), CAST("UTMCampaign" AS STRING)) < 3; +SELECT "WatchID", MIN("ResolutionWidth"), MAX("ResolutionWidth"), SUM("IsRefresh") FROM hits GROUP BY "WatchID" ORDER BY "WatchID" DESC LIMIT 10; Review Comment: If we are going to add a new query to the extended benchmarks, can we please also document the query here? https://github.com/apache/datafusion/tree/main/benchmarks/queries/clickbench#extended-queries ? ########## datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs: ########## @@ -53,10 +56,20 @@ where starting_value: T::Native, /// Track nulls in the input / filters - null_state: NullState, + null_state: NullStateAdapter, /// Function that computes the primitive result prim_fn: F, + + /// Block size of current `GroupAccumulator` if exist: + /// - If `None`, it means block optimization is disabled, + /// all `group values`` will be stored in a single `Vec` + /// + /// - If `Some(blk_size)`, it means block optimization is enabled, Review Comment: I wonder what you think about not keeping the code around to support both paths, but instead always used the blocked approach? It seems to me like in the long term if we try to support both paths it will be hard to ensure they are all tested and thus will result in subtle bugs. Keeping as few combinations as possible seems more maintainable to me ########## datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs: ########## @@ -198,4 +232,28 @@ where fn size(&self) -> usize { self.values.capacity() * size_of::<T::Native>() + self.null_state.size() } + + fn supports_blocked_groups(&self) -> bool { + true + } + + fn alter_block_size(&mut self, block_size: Option<usize>) -> Result<()> { + self.values.clear(); + self.null_state = NullStateAdapter::new(block_size); + self.block_size = block_size; + + Ok(()) + } +} + +impl<N: ArrowNativeTypeOp> Block for Vec<N> { Review Comment: this might make sense to put in the same module of as the definition of `Block` so it was easier to find ########## datafusion/expr-common/src/groups_accumulator.rs: ########## @@ -17,29 +17,52 @@ //! Vectorized [`GroupsAccumulator`] +use std::collections::VecDeque; + use arrow::array::{ArrayRef, BooleanArray}; -use datafusion_common::{not_impl_err, Result}; +use datafusion_common::{not_impl_err, DataFusionError, Result}; /// Describes how many rows should be emitted during grouping. #[derive(Debug, Clone, Copy)] pub enum EmitTo { - /// Emit all groups + /// Emit all groups, will clear all existing group indexes All, /// Emit only the first `n` groups and shift all existing group /// indexes down by `n`. /// /// For example, if `n=10`, group_index `0, 1, ... 9` are emitted /// and group indexes `10, 11, 12, ...` become `0, 1, 2, ...`. First(usize), + /// Emit next block in the blocked managed groups + /// + /// Similar as `Emit::All`, will also clear all existing group indexes + NextBlock, } impl EmitTo { + /// Remove and return `needed values` from `values`. + pub fn take_needed<T>( Review Comment: Can you document what the `is_blocked_groups` parameter means too? ########## datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs: ########## @@ -81,28 +85,49 @@ hash_float!(f16, f32, f64); pub struct GroupValuesPrimitive<T: ArrowPrimitiveType> { /// The data type of the output array data_type: DataType, + /// Stores the group index based on the hash of its value /// /// We don't store the hashes as hashing fixed width primitives /// is fast enough for this not to benefit performance - map: HashTable<usize>, + map: HashTable<u64>, + /// The group index of the null value if any - null_group: Option<usize>, + null_group: Option<u64>, + /// The values for each group index - values: Vec<T::Native>, + values: VecDeque<Vec<T::Native>>, + /// The random state used to generate hashes random_state: RandomState, + + /// Block size of current `GroupValues` if exist: + /// - If `None`, it means block optimization is disabled, + /// all `group values`` will be stored in a single `Vec` + /// + /// - If `Some(blk_size)`, it means block optimization is enabled, + /// `group values` will be stored in multiple `Vec`s, and each + /// `Vec` if of `blk_size` len, and we call it a `block` + /// + block_size: Option<usize>, Review Comment: See my comment above about making a `Blocks` struct which I think would avoid some non trivial duplication ########## datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs: ########## @@ -43,8 +46,8 @@ where T: ArrowPrimitiveType + Send, F: Fn(&mut T::Native, T::Native) + Send + Sync, { - /// values per group, stored as the native type - values: Vec<T::Native>, + /// Values per group, stored as the native type Review Comment: Something I noticed several times was `VecDeqeue<B:BlocK>` being used -- did you consider making a special struct for this? Something like ```rust struct Blocks<B: Block> { inner: VecDeque<Block> } impl Blocks<B: Block> { // resize filling all to default value fn resize(&mut self, num_values: usize, default_value: Block::T) { .. } fn ensure_room_enough_for_blocks(...) } ``` That would make it easier to understand and document how lists of blocks are managed and would make it easier to read code that used it. It would also make `ensure_room_enough_for_blocks` more discoverable You might also be able to encapsulate the `block_size` parameter in `Blocks` which would make migrating other GroupsAccumulators easier ########## datafusion/common/src/config.rs: ########## @@ -405,6 +405,18 @@ config_namespace! { /// in joins can reduce memory usage when joining large /// tables with a highly-selective join filter, but is also slightly slower. pub enforce_batch_size_in_joins: bool, default = false + + /// Should DataFusion use the the blocked approach to manage the groups + /// values and their related states in accumulators. + /// By default, the blocked approach will be used. And the blocked approach + /// allocates capacity for the block based on a predefined block size firstly. + /// When the block reaches its limit, we allocate a new block (also with + /// the same predefined block size based capacity) instead of expanding + /// the current one and copying the data. + /// If setting this flag to `false`, will fall-back to use the single approach, + /// values are managed within a single large block(can think of it as a Vec). + /// As this block grows, it often triggers numerous copies, resulting in poor performance. Review Comment: Here is a suggestion that focuses more on the external facing results rather than the internal implementation ```suggestion /// Should DataFusion use a blocked approach to manage grouping state. /// By default, the blocked approach is used which /// allocates capacity based on a predefined block size firstly. /// When the block reaches its limit, we allocate a new block (also with /// the same predefined block size based capacity) instead of expanding /// the current one and copying the data. /// If `false`, a single allocation approach is used, where /// values are managed within a single large memory block. /// As this block grows, it often triggers numerous copies, resulting in poor performance. ``` ########## datafusion/functions-aggregate/src/correlation.rs: ########## @@ -448,6 +448,9 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator { let n = match emit_to { EmitTo::All => self.count.len(), EmitTo::First(n) => n, + EmitTo::NextBlock => { + unreachable!("this accumulator still not support blocked groups") Review Comment: Instead of `unreachable!` which will panic, I think it would make more sense to return an internal error here to signal it is not expected: ```suggestion internal_err!("correlation does not support blocked groups") ``` Similarly below ########## datafusion/physical-plan/src/aggregates/row_hash.rs: ########## @@ -982,6 +1099,9 @@ impl GroupedHashAggregateStream { && self.update_memory_reservation().is_err() { assert_ne!(self.mode, AggregateMode::Partial); + // TODO: support spilling when blocked group optimization is on Review Comment: We may want to file a ticket to track this -- but I think in general figuring out how to handle spilling for hashing in a better way is worth considering so maybe this particular task would become irrelevant ########## datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs: ########## @@ -507,3 +509,157 @@ pub(crate) fn slice_and_maybe_filter( Ok(sliced_arrays) } } + +// =============================================== +// Useful tools for group index +// =============================================== + +/// Operations about group index parsing +/// +/// There are mainly 2 `group index` needing parsing: `flat` and `blocked`. +/// +/// # Flat group index +/// `flat group index` format is like: +/// +/// ```text +/// | block_offset(64bit) | +/// ``` +/// +/// It is used in `flat GroupValues/GroupAccumulator`, only a single block +/// exists, so its `block_id` is always 0, and use all 64 bits to store the +/// `block offset`. +/// +/// # Blocked group index +/// `blocked group index` format is like: +/// +/// ```text +/// | block_id(32bit) | block_offset(32bit) +/// ``` +/// +/// It is used in `blocked GroupValues/GroupAccumulator`, multiple blocks +/// exist, and we use high 32 bits to store `block_id`, and low 32 bit to +/// store `block_offset`. +/// +/// The `get_block_offset` method requires to return `block_offset` as u64, +/// that is for compatible for `flat group index`'s parsing. +/// +pub trait GroupIndexOperations: Debug { Review Comment: Maybe it would make sense to pull this into a new module - `datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/operations.rs` perhaps ########## datafusion/functions-aggregate/src/average.rs: ########## @@ -667,8 +668,8 @@ where partial_counts, opt_filter, total_num_groups, - |group_index, partial_count| { - self.counts[group_index] += partial_count; + |_, group_index, partial_count| { Review Comment: I think it would help to have a comment here explaining why the block index was ignored (because this accumulator only supports a single block) ########## datafusion/expr-common/src/groups_accumulator.rs: ########## @@ -250,4 +288,30 @@ pub trait GroupsAccumulator: Send { /// This function is called once per batch, so it should be `O(n)` to /// compute, not `O(num_groups)` fn size(&self) -> usize; + + /// Returns `true` if this accumulator supports blocked groups. + fn supports_blocked_groups(&self) -> bool { + false + } + + /// Alter the block size in the accumulator + /// + /// If the target block size is `None`, it will use a single big + /// block(can think it a `Vec`) to manage the state. + /// + /// If the target block size` is `Some(blk_size)`, it will try to + /// set the block size to `blk_size`, and the try will only success + /// when the accumulator has supported blocked mode. + /// + /// NOTICE: After altering block size, all data in previous will be cleared. Review Comment: Does this mean that all existing accumulators will be cleared? ########## datafusion/physical-plan/Cargo.toml: ########## @@ -51,6 +51,7 @@ datafusion-common = { workspace = true, default-features = true } datafusion-common-runtime = { workspace = true, default-features = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-functions-aggregate-common = { workspace = true } Review Comment: I think this new dependency is ok as it doesn't use any specific aggregate implementation which we are trying to avoid ########## datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs: ########## @@ -507,3 +509,157 @@ pub(crate) fn slice_and_maybe_filter( Ok(sliced_arrays) } } + +// =============================================== +// Useful tools for group index +// =============================================== + +/// Operations about group index parsing +/// +/// There are mainly 2 `group index` needing parsing: `flat` and `blocked`. +/// +/// # Flat group index +/// `flat group index` format is like: +/// +/// ```text +/// | block_offset(64bit) | +/// ``` +/// +/// It is used in `flat GroupValues/GroupAccumulator`, only a single block +/// exists, so its `block_id` is always 0, and use all 64 bits to store the +/// `block offset`. +/// +/// # Blocked group index +/// `blocked group index` format is like: +/// +/// ```text +/// | block_id(32bit) | block_offset(32bit) +/// ``` +/// +/// It is used in `blocked GroupValues/GroupAccumulator`, multiple blocks +/// exist, and we use high 32 bits to store `block_id`, and low 32 bit to +/// store `block_offset`. +/// +/// The `get_block_offset` method requires to return `block_offset` as u64, +/// that is for compatible for `flat group index`'s parsing. +/// +pub trait GroupIndexOperations: Debug { + fn pack_index(block_id: u32, block_offset: u64) -> u64; + + fn get_block_id(packed_index: u64) -> u32; + + fn get_block_offset(packed_index: u64) -> u64; +} + +#[derive(Debug)] +pub struct BlockedGroupIndexOperations; + +impl GroupIndexOperations for BlockedGroupIndexOperations { + fn pack_index(block_id: u32, block_offset: u64) -> u64 { + ((block_id as u64) << 32) | block_offset + } + + fn get_block_id(packed_index: u64) -> u32 { + (packed_index >> 32) as u32 + } + + fn get_block_offset(packed_index: u64) -> u64 { + (packed_index as u32) as u64 + } +} + +#[derive(Debug)] +pub struct FlatGroupIndexOperations; + +impl GroupIndexOperations for FlatGroupIndexOperations { + fn pack_index(_block_id: u32, block_offset: u64) -> u64 { + block_offset + } + + fn get_block_id(_packed_index: u64) -> u32 { + 0 + } + + fn get_block_offset(packed_index: u64) -> u64 { + packed_index + } +} + +// =============================================== +// Useful tools for block +// =============================================== +pub(crate) fn ensure_room_enough_for_blocks<B, F>( + blocks: &mut VecDeque<B>, + total_num_groups: usize, + block_size: usize, + new_block: F, + default_value: B::T, +) where + B: Block, + F: Fn(usize) -> B, +{ + // For resize, we need to: + // 1. Ensure the blks are enough first + // 2. and then ensure slots in blks are enough + let (mut cur_blk_idx, exist_slots) = if !blocks.is_empty() { + let cur_blk_idx = blocks.len() - 1; + let exist_slots = (blocks.len() - 1) * block_size + blocks.back().unwrap().len(); + + (cur_blk_idx, exist_slots) + } else { + (0, 0) + }; + + // No new groups, don't need to expand, just return + if exist_slots >= total_num_groups { + return; + } + + // 1. Ensure blks are enough + let exist_blks = blocks.len(); + let new_blks = total_num_groups.div_ceil(block_size) - exist_blks; + if new_blks > 0 { + for _ in 0..new_blks { + let block = new_block(block_size); + blocks.push_back(block); + } + } + + // 2. Ensure slots are enough + let mut new_slots = total_num_groups - exist_slots; + + // 2.1 Only fill current blk if it may be already enough + let cur_blk_rest_slots = block_size - blocks[cur_blk_idx].len(); + if cur_blk_rest_slots >= new_slots { + blocks[cur_blk_idx].fill_default_value(new_slots, default_value.clone()); + return; + } + + // 2.2 Fill current blk to full + blocks[cur_blk_idx].fill_default_value(cur_blk_rest_slots, default_value.clone()); + new_slots -= cur_blk_rest_slots; + + // 2.3 Fill complete blks + let complete_blks = new_slots / block_size; + for _ in 0..complete_blks { + cur_blk_idx += 1; + blocks[cur_blk_idx].fill_default_value(block_size, default_value.clone()); + } + + // 2.4 Fill last blk if needed + let rest_slots = new_slots % block_size; + if rest_slots > 0 { + blocks + .back_mut() + .unwrap() + .fill_default_value(rest_slots, default_value); + } +} + +pub(crate) trait Block { Review Comment: I think it would be good to add some comments here describing what the trait is for ########## datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs: ########## @@ -212,7 +229,66 @@ impl NullState { /// /// resets the internal state appropriately pub fn build(&mut self, emit_to: EmitTo) -> NullBuffer { - let nulls: BooleanBuffer = self.seen_values.finish(); + self.seen_values.emit(emit_to) + } +} + +/// Structure marking if accumulating groups are seen at least one +pub trait SeenValues: Default + Debug + Send { + fn resize(&mut self, total_num_groups: usize, default_value: bool); + + fn set_bit(&mut self, block_id: u32, block_offset: u64, value: bool); + + fn emit(&mut self, emit_to: EmitTo) -> NullBuffer; + + fn capacity(&self) -> usize; +} + +/// [`SeenValues`] for `flat groups input` +/// +/// The `flat groups input` are organized like: +/// +/// ```text +/// row_0 group_index_0 +/// row_1 group_index_1 +/// row_2 group_index_2 +/// ... +/// row_n group_index_n +/// ``` +/// +/// If `row_x group_index_x` is not filtered(`group_index_x` is seen) +/// `seen_values[group_index_x]` will be set to `true`. +/// +/// For `set_bit(block_id, block_offset, value)`, `block_id` is unused, Review Comment: Something I didn't see documented anywhere was what `block_id` and `block_offset` meant -- maybe we could add something on the HashAggregateStream 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org