2010YOUY01 commented on code in PR #11943: URL: https://github.com/apache/datafusion/pull/11943#discussion_r1724605120
########## datafusion/expr-common/src/groups_accumulator.rs: ########## @@ -143,6 +402,25 @@ pub trait GroupsAccumulator: Send { /// [`Accumulator::state`]: crate::accumulator::Accumulator::state fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>; + /// Returns `true` if this accumulator supports blocked mode. + fn supports_blocked_mode(&self) -> bool { Review Comment: Is it possible to support all accumulators in the future, so then this can be removed? 🤔 ########## datafusion/physical-plan/src/aggregates/row_hash.rs: ########## @@ -678,6 +744,48 @@ impl Stream for GroupedHashAggregateStream { ))); } + ExecutionState::ProducingBlocks(blocks) => { Review Comment: (As a future task) `ExecutionState::ProducingBlocks`, `ExecutionState::ProducingOutput` and `EmitTo` appear to be very similar concepts. Maybe we can clean up them with a single `ExecutionState::ProducingOutput(emit: OutputState)`, this can be done as a preparation before supporting early emit for blocked optimization ########## datafusion/physical-plan/src/aggregates/group_values/row.rs: ########## @@ -121,16 +135,31 @@ impl GroupValues for GroupValuesRows { create_hashes(cols, &self.random_state, batch_hashes)?; for (row, &target_hash) in batch_hashes.iter().enumerate() { + let mode = self.mode; let entry = self.map.get_mut(target_hash, |(exist_hash, group_idx)| { // Somewhat surprisingly, this closure can be called even if the // hash doesn't match, so check the hash first with an integer // comparison first avoid the more expensive comparison with // group value. https://github.com/apache/datafusion/pull/11718 - target_hash == *exist_hash - // verify that the group that we are inserting with hash is - // actually the same key value as the group in - // existing_idx (aka group_values @ row) - && group_rows.row(row) == group_values.row(*group_idx) + if target_hash != *exist_hash { + return false; + } + + // verify that the group that we are inserting with hash is + // actually the same key value as the group in + // existing_idx (aka group_values @ row) + match mode { + GroupStatesMode::Flat => { + group_rows.row(row) + == group_values.current().unwrap().row(*group_idx) + } + GroupStatesMode::Blocked(_) => { + let blocked_index = BlockedGroupIndex::new(*group_idx); + group_rows.row(row) + == group_values[blocked_index.block_id] Review Comment: (As a future task) Hash table access is the critical path for aggregation performance, now blocked storage introduced one extra memory access in the hash table's Get() operation (before it's only indexing into a vector, now it's first finding the block, then use offset to index within the block) So I suspect eliminating this extra memory access can further improve performance (by directly storing row reference instead of `blocked_index` in the hash table) Unless we can directly pursue vectorized hash table in https://github.com/apache/datafusion/issues/7095, the POC beats heavily optimized `HashBrown` with raw vector, really impressive ########## datafusion/physical-plan/src/aggregates/row_hash.rs: ########## @@ -509,6 +523,15 @@ impl GroupedHashAggregateStream { None }; + // Check if we can enable the blocked optimization for `GroupValues` and `GroupsAccumulator`s. + let enable_blocked_group_states = maybe_enable_blocked_group_states( + &context, + group_values.as_mut(), + &mut accumulators, + batch_size, Review Comment: One thing to do is expose block size as a different configuration knob, and also find a good default value (Now it's using the same as batch size) ########## datafusion/expr-common/src/groups_accumulator.rs: ########## @@ -52,8 +71,248 @@ impl EmitTo { std::mem::swap(v, &mut t); t } + Self::NextBlock(_) => unreachable!( + "can not support blocked emission in take_needed, you should use take_needed_from_blocks" + ), + } + } + + /// Removes the number of rows from `blocks` required to emit, + /// returning a `Vec` with elements taken. + /// + /// The detailed behavior in different emissions: + /// - For Emit::CurrentBlock, the first block will be taken and return. + /// - For Emit::All and Emit::First, it will be only supported in `GroupStatesMode::Flat`, + /// similar as `take_needed`. + pub fn take_needed_from_blocks<T>( + &self, + blocks: &mut VecBlocks<T>, + mode: GroupStatesMode, + ) -> Vec<T> { + match self { + Self::All => { + debug_assert!(matches!(mode, GroupStatesMode::Flat)); + blocks.pop_first_block().unwrap() + } + Self::First(n) => { + debug_assert!(matches!(mode, GroupStatesMode::Flat)); + + let block = blocks.current_mut().unwrap(); + let split_at = min(block.len(), *n); + + // get end n+1,.. values into t + let mut t = block.split_off(split_at); + // leave n+1,.. in v + std::mem::swap(block, &mut t); + t + } + Self::NextBlock(_) => { + debug_assert!(matches!(mode, GroupStatesMode::Blocked(_))); + blocks.pop_first_block().unwrap() + } + } + } +} + +/// Mode for `accumulators` and `group values` +/// +/// Their meanings: +/// - Flat, the values in them will be managed with a single `Vec`. +/// It will grow constantly when more and more values are inserted, +/// that leads to a considerable amount of copying, and finally a bad performance. +/// +/// - Blocked(block_size), the values in them will be managed with multiple `Vec`s. +/// When the block is large enough(reach block_size), a new block will be allocated +/// and used for inserting. +/// Obviously, this strategy can avoid copying and get a good performance. +#[derive(Debug, Clone, Copy)] +pub enum GroupStatesMode { + Flat, + Blocked(usize), +} + +/// Blocked style group index used in blocked mode group values and accumulators +/// +/// Parts in index: +/// - High 32 bits represent `block_id` +/// - Low 32 bits represent `block_offset` +#[derive(Debug, Clone, Copy)] +pub struct BlockedGroupIndex { + pub block_id: usize, + pub block_offset: usize, Review Comment: (As a future task) If the block size is 4096, then `block_offset` 5000 will be invalid, a sanity check inside can make it safer One way to do so is using `impl GroupStatesMode` instead for constructors, but we can also leave it as a follow-up task (because groups and accumulator should also be group-aligned, maybe there are better ideas to organize those related blocks and enforce sanity checks, I will continue to think about this ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org