Rachelint commented on code in PR #11943: URL: https://github.com/apache/datafusion/pull/11943#discussion_r1724580071
########## datafusion/expr-common/src/groups_accumulator.rs: ########## @@ -52,8 +71,248 @@ impl EmitTo { std::mem::swap(v, &mut t); t } + Self::NextBlock(_) => unreachable!( + "can not support blocked emission in take_needed, you should use take_needed_from_blocks" + ), + } + } + + /// Removes the number of rows from `blocks` required to emit, + /// returning a `Vec` with elements taken. + /// + /// The detailed behavior in different emissions: + /// - For Emit::CurrentBlock, the first block will be taken and return. + /// - For Emit::All and Emit::First, it will be only supported in `GroupStatesMode::Flat`, + /// similar as `take_needed`. + pub fn take_needed_from_blocks<T>( + &self, + blocks: &mut VecBlocks<T>, + mode: GroupStatesMode, + ) -> Vec<T> { + match self { + Self::All => { + debug_assert!(matches!(mode, GroupStatesMode::Flat)); + blocks.pop_first_block().unwrap() + } + Self::First(n) => { + debug_assert!(matches!(mode, GroupStatesMode::Flat)); + + let block = blocks.current_mut().unwrap(); + let split_at = min(block.len(), *n); + + // get end n+1,.. values into t + let mut t = block.split_off(split_at); + // leave n+1,.. in v + std::mem::swap(block, &mut t); + t + } + Self::NextBlock(_) => { + debug_assert!(matches!(mode, GroupStatesMode::Blocked(_))); + blocks.pop_first_block().unwrap() + } + } + } +} + +/// Mode for `accumulators` and `group values` +/// +/// Their meanings: +/// - Flat, the values in them will be managed with a single `Vec`. +/// It will grow constantly when more and more values are inserted, +/// that leads to a considerable amount of copying, and finally a bad performance. +/// +/// - Blocked(block_size), the values in them will be managed with multiple `Vec`s. +/// When the block is large enough(reach block_size), a new block will be allocated +/// and used for inserting. +/// Obviously, this strategy can avoid copying and get a good performance. +#[derive(Debug, Clone, Copy)] +pub enum GroupStatesMode { + Flat, + Blocked(usize), +} + +/// Blocked style group index used in blocked mode group values and accumulators +/// +/// Parts in index: +/// - High 32 bits represent `block_id` +/// - Low 32 bits represent `block_offset` +#[derive(Debug, Clone, Copy)] +pub struct BlockedGroupIndex { + pub block_id: usize, + pub block_offset: usize, +} + +impl BlockedGroupIndex { + pub fn new(group_index: usize) -> Self { + let block_id = + ((group_index as u64 >> 32) & BLOCKED_INDEX_LOW_32_BITS_MASK) as usize; + let block_offset = + ((group_index as u64) & BLOCKED_INDEX_LOW_32_BITS_MASK) as usize; + + Self { + block_id, + block_offset, } } + + pub fn new_from_parts(block_id: usize, block_offset: usize) -> Self { + Self { + block_id, + block_offset, + } + } + + pub fn as_packed_index(&self) -> usize { + ((((self.block_id as u64) << 32) & BLOCKED_INDEX_HIGH_32_BITS_MASK) + | (self.block_offset as u64 & BLOCKED_INDEX_LOW_32_BITS_MASK)) + as usize + } +} + +/// The basic data structure for blocked aggregation intermediate results +pub struct Blocks<T> { + /// The current block, it should be pushed into `previous` + /// when next block is pushed + current: Option<T>, + + /// Previous blocks pushed before `current` + previous: VecDeque<T>, Review Comment: > What is the reason there is `current` and `previous` instead of one `VecDeque<T>`? It is a small optimization especially for single block case, becasue I found some related queries with a single block(may be due to too small data amount, or disable the blocked mode) after impl the blocked mode accumulators and gourp values. And finally I found it may be caused by the call of `back` and `back_mut` call. For example, when blocked optimization is disabled(e.g. in streaming aggregation). - The `Block<Vec<T>>` in accumulator will just the the `current`, and `previous` will be always empty. - We will only call the `current()` and `current_mut` of `Block<Vec<T>>`, and it will just simply return the reference of the inner current(a `Vec<T>`). - We can avoid use the `back` and `back_mut` which will is a bit more expansive than return `Vec<T>` directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org