alamb commented on code in PR #11943: URL: https://github.com/apache/datafusion/pull/11943#discussion_r1723125203
########## datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs: ########## @@ -458,6 +717,91 @@ fn initialize_builder( builder } +/// Similar as the [initialize_builder] but designed for the blocked version accumulator +fn ensure_enough_room_for_nulls( + builder_blocks: &mut Blocks<BooleanBufferBuilder>, + mode: GroupStatesMode, + total_num_groups: usize, + default_value: bool, +) { + if total_num_groups == 0 { + return; + } + + match mode { + // It flat mode, we just a single builder, and grow it constantly. + GroupStatesMode::Flat => { + if builder_blocks.num_blocks() == 0 { + builder_blocks.push_block(BooleanBufferBuilder::new(0)); + } + + let builder = builder_blocks.current_mut().unwrap(); + if builder.len() < total_num_groups { + let new_groups = total_num_groups - builder.len(); + builder.append_n(new_groups, default_value); + } + } + // In blocked mode, we ensure the blks are enough first, + // and then ensure slots in blks are enough. + GroupStatesMode::Blocked(blk_size) => { + let (mut cur_blk_idx, exist_slots) = if builder_blocks.num_blocks() > 0 { + let cur_blk_idx = builder_blocks.num_blocks() - 1; + let exist_slots = (builder_blocks.num_blocks() - 1) * blk_size + + builder_blocks.current().unwrap().len(); + + (cur_blk_idx, exist_slots) + } else { + (0, 0) + }; + + // No new groups, don't need to expand, just return. + if exist_slots >= total_num_groups { + return; + } + + // Ensure blks are enough + let exist_blks = builder_blocks.num_blocks(); + let new_blks = (total_num_groups + blk_size - 1) / blk_size - exist_blks; + if new_blks > 0 { + for _ in 0..new_blks { + builder_blocks.push_block(BooleanBufferBuilder::new(blk_size)); + } + } + + // Ensure slots are enough. + let mut new_slots = total_num_groups - exist_slots; + + // Expand current blk. + let cur_blk_rest_slots = blk_size - builder_blocks[cur_blk_idx].len(); + if cur_blk_rest_slots >= new_slots { + builder_blocks[cur_blk_idx].append_n(new_slots, default_value); + return; + } + + // Expand current blk to full, and expand next blks + builder_blocks[cur_blk_idx].append_n(cur_blk_rest_slots, default_value); + new_slots -= cur_blk_rest_slots; + cur_blk_idx += 1; + + // Expand blks + let expand_blks = new_slots / blk_size; + for _ in 0..expand_blks { + builder_blocks[cur_blk_idx].append_n(blk_size, default_value); + cur_blk_idx += 1; + } + + // Expand the last blk. + let last_expand_slots = new_slots % blk_size; + if last_expand_slots > 0 { + builder_blocks + .current_mut() + .unwrap() + .append_n(last_expand_slots, default_value); + } + } + } +} + Review Comment: given the amount of code here, I this should have unit tests that cover the basic operation ########## datafusion/expr-common/src/groups_accumulator.rs: ########## @@ -52,8 +71,248 @@ impl EmitTo { std::mem::swap(v, &mut t); t } + Self::NextBlock(_) => unreachable!( + "can not support blocked emission in take_needed, you should use take_needed_from_blocks" + ), + } + } + + /// Removes the number of rows from `blocks` required to emit, + /// returning a `Vec` with elements taken. + /// + /// The detailed behavior in different emissions: + /// - For Emit::CurrentBlock, the first block will be taken and return. + /// - For Emit::All and Emit::First, it will be only supported in `GroupStatesMode::Flat`, + /// similar as `take_needed`. + pub fn take_needed_from_blocks<T>( + &self, + blocks: &mut VecBlocks<T>, + mode: GroupStatesMode, + ) -> Vec<T> { + match self { + Self::All => { + debug_assert!(matches!(mode, GroupStatesMode::Flat)); + blocks.pop_first_block().unwrap() + } + Self::First(n) => { + debug_assert!(matches!(mode, GroupStatesMode::Flat)); + + let block = blocks.current_mut().unwrap(); + let split_at = min(block.len(), *n); + + // get end n+1,.. values into t + let mut t = block.split_off(split_at); + // leave n+1,.. in v + std::mem::swap(block, &mut t); + t + } + Self::NextBlock(_) => { + debug_assert!(matches!(mode, GroupStatesMode::Blocked(_))); + blocks.pop_first_block().unwrap() + } + } + } +} + +/// Mode for `accumulators` and `group values` Review Comment: 👍 ########## datafusion/expr-common/src/groups_accumulator.rs: ########## @@ -52,8 +71,248 @@ impl EmitTo { std::mem::swap(v, &mut t); t } + Self::NextBlock(_) => unreachable!( + "can not support blocked emission in take_needed, you should use take_needed_from_blocks" + ), + } + } + + /// Removes the number of rows from `blocks` required to emit, + /// returning a `Vec` with elements taken. + /// + /// The detailed behavior in different emissions: + /// - For Emit::CurrentBlock, the first block will be taken and return. + /// - For Emit::All and Emit::First, it will be only supported in `GroupStatesMode::Flat`, + /// similar as `take_needed`. + pub fn take_needed_from_blocks<T>( + &self, + blocks: &mut VecBlocks<T>, + mode: GroupStatesMode, + ) -> Vec<T> { + match self { + Self::All => { + debug_assert!(matches!(mode, GroupStatesMode::Flat)); + blocks.pop_first_block().unwrap() + } + Self::First(n) => { + debug_assert!(matches!(mode, GroupStatesMode::Flat)); + + let block = blocks.current_mut().unwrap(); + let split_at = min(block.len(), *n); + + // get end n+1,.. values into t + let mut t = block.split_off(split_at); + // leave n+1,.. in v + std::mem::swap(block, &mut t); + t + } + Self::NextBlock(_) => { + debug_assert!(matches!(mode, GroupStatesMode::Blocked(_))); + blocks.pop_first_block().unwrap() + } + } + } +} + +/// Mode for `accumulators` and `group values` +/// +/// Their meanings: +/// - Flat, the values in them will be managed with a single `Vec`. +/// It will grow constantly when more and more values are inserted, +/// that leads to a considerable amount of copying, and finally a bad performance. +/// +/// - Blocked(block_size), the values in them will be managed with multiple `Vec`s. +/// When the block is large enough(reach block_size), a new block will be allocated +/// and used for inserting. +/// Obviously, this strategy can avoid copying and get a good performance. +#[derive(Debug, Clone, Copy)] +pub enum GroupStatesMode { + Flat, + Blocked(usize), +} + +/// Blocked style group index used in blocked mode group values and accumulators Review Comment: a minor comment here is it would be great to try and keep the logic (`BlockedGroupIndex`, `Blocks`, etc) in another crate (as datafusion-expr mostly currently contains logical `Expr`s and interfaces) Perhaps we could move them to datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/ (obviously we can't move some things like `GroupStatesMode` as that is needed for `GroupsAccumulator::switch_to_mode`) ########## datafusion/physical-plan/src/aggregates/group_values/bytes.rs: ########## @@ -115,6 +116,11 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> { emit_group_values } + EmitTo::NextBlock(_) => { Review Comment: I think supporting chunked emission in groups would also likely help click bench performance, but is a natural follow on project. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org