Rachelint commented on code in PR #15591: URL: https://github.com/apache/datafusion/pull/15591#discussion_r2072859602
########## datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs: ########## @@ -507,3 +509,157 @@ pub(crate) fn slice_and_maybe_filter( Ok(sliced_arrays) } } + +// =============================================== +// Useful tools for group index +// =============================================== + +/// Operations about group index parsing +/// +/// There are mainly 2 `group index` needing parsing: `flat` and `blocked`. +/// +/// # Flat group index +/// `flat group index` format is like: +/// +/// ```text +/// | block_offset(64bit) | +/// ``` +/// +/// It is used in `flat GroupValues/GroupAccumulator`, only a single block +/// exists, so its `block_id` is always 0, and use all 64 bits to store the +/// `block offset`. +/// +/// # Blocked group index +/// `blocked group index` format is like: +/// +/// ```text +/// | block_id(32bit) | block_offset(32bit) +/// ``` +/// +/// It is used in `blocked GroupValues/GroupAccumulator`, multiple blocks +/// exist, and we use high 32 bits to store `block_id`, and low 32 bit to +/// store `block_offset`. +/// +/// The `get_block_offset` method requires to return `block_offset` as u64, +/// that is for compatible for `flat group index`'s parsing. +/// +pub trait GroupIndexOperations: Debug { + fn pack_index(block_id: u32, block_offset: u64) -> u64; + + fn get_block_id(packed_index: u64) -> u32; + + fn get_block_offset(packed_index: u64) -> u64; +} + +#[derive(Debug)] +pub struct BlockedGroupIndexOperations; + +impl GroupIndexOperations for BlockedGroupIndexOperations { + fn pack_index(block_id: u32, block_offset: u64) -> u64 { + ((block_id as u64) << 32) | block_offset + } + + fn get_block_id(packed_index: u64) -> u32 { + (packed_index >> 32) as u32 + } + + fn get_block_offset(packed_index: u64) -> u64 { + (packed_index as u32) as u64 + } +} + +#[derive(Debug)] +pub struct FlatGroupIndexOperations; + +impl GroupIndexOperations for FlatGroupIndexOperations { + fn pack_index(_block_id: u32, block_offset: u64) -> u64 { + block_offset + } + + fn get_block_id(_packed_index: u64) -> u32 { + 0 + } + + fn get_block_offset(packed_index: u64) -> u64 { + packed_index + } +} + +// =============================================== +// Useful tools for block +// =============================================== +pub(crate) fn ensure_room_enough_for_blocks<B, F>( + blocks: &mut VecDeque<B>, + total_num_groups: usize, + block_size: usize, + new_block: F, + default_value: B::T, +) where + B: Block, + F: Fn(usize) -> B, +{ + // For resize, we need to: + // 1. Ensure the blks are enough first + // 2. and then ensure slots in blks are enough + let (mut cur_blk_idx, exist_slots) = if !blocks.is_empty() { + let cur_blk_idx = blocks.len() - 1; + let exist_slots = (blocks.len() - 1) * block_size + blocks.back().unwrap().len(); + + (cur_blk_idx, exist_slots) + } else { + (0, 0) + }; + + // No new groups, don't need to expand, just return + if exist_slots >= total_num_groups { + return; + } + + // 1. Ensure blks are enough + let exist_blks = blocks.len(); + let new_blks = total_num_groups.div_ceil(block_size) - exist_blks; + if new_blks > 0 { + for _ in 0..new_blks { + let block = new_block(block_size); + blocks.push_back(block); + } + } + + // 2. Ensure slots are enough + let mut new_slots = total_num_groups - exist_slots; + + // 2.1 Only fill current blk if it may be already enough + let cur_blk_rest_slots = block_size - blocks[cur_blk_idx].len(); + if cur_blk_rest_slots >= new_slots { + blocks[cur_blk_idx].fill_default_value(new_slots, default_value.clone()); + return; + } + + // 2.2 Fill current blk to full + blocks[cur_blk_idx].fill_default_value(cur_blk_rest_slots, default_value.clone()); + new_slots -= cur_blk_rest_slots; + + // 2.3 Fill complete blks + let complete_blks = new_slots / block_size; + for _ in 0..complete_blks { + cur_blk_idx += 1; + blocks[cur_blk_idx].fill_default_value(block_size, default_value.clone()); + } + + // 2.4 Fill last blk if needed + let rest_slots = new_slots % block_size; + if rest_slots > 0 { + blocks + .back_mut() + .unwrap() + .fill_default_value(rest_slots, default_value); + } +} + +pub(crate) trait Block { Review Comment: Addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org