alamb commented on code in PR #11943: URL: https://github.com/apache/datafusion/pull/11943#discussion_r1727162759
########## datafusion/expr-common/src/groups_accumulator.rs: ########## @@ -143,6 +145,25 @@ pub trait GroupsAccumulator: Send { /// [`Accumulator::state`]: crate::accumulator::Accumulator::state fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>; + /// Returns `true` if this accumulator supports blocked mode. + fn supports_blocked_mode(&self) -> bool { + false + } + + /// Switch the accumulator to flat or blocked mode. + /// You can see detail about the mode on [GroupStatesMode]. + /// + /// After switching mode, all data in previous mode will be cleared. + fn switch_to_mode(&mut self, mode: GroupStatesMode) -> Result<()> { Review Comment: I understand why you introduced this API, but I think it makes the accumulators harder to reason about because now each now has two potential modes so there are two similar, but not the same parallel implementations that we have to ensure are tested. I had an idea to avoid this switch_to_mode API below ########## datafusion/physical-plan/src/aggregates/row_hash.rs: ########## @@ -529,10 +552,53 @@ impl GroupedHashAggregateStream { spill_state, group_values_soft_limit: agg.limit, skip_aggregation_probe, + enable_blocked_group_states, }) } } +/// Check if we can enable the blocked optimization for `GroupValues` and `GroupsAccumulator`s. +/// The blocked optimization will be enabled when: +/// - It is not streaming aggregation(because blocked mode can't support Emit::first(exact n)) +/// - The spilling is disabled(still need to consider more to support it efficiently) +/// - The accumulator is not empty(I am still not sure about logic in this case) +/// - `GroupValues` and all `GroupsAccumulator`s support blocked mode +// TODO: support blocked optimization in streaming, spilling, and maybe empty accumulators case? +fn maybe_enable_blocked_group_states( + context: &TaskContext, + group_values: &mut dyn GroupValues, + accumulators: &mut [Box<dyn GroupsAccumulator>], + block_size: usize, + group_ordering: &GroupOrdering, +) -> Result<bool> { + if !matches!(group_ordering, GroupOrdering::None) + || accumulators.is_empty() + || enable_spilling(context.memory_pool().as_ref()) + { + return Ok(false); + } + + let group_supports_blocked = group_values.supports_blocked_mode(); + let accumulators_support_blocked = + accumulators.iter().all(|acc| acc.supports_blocked_mode()); + + match (group_supports_blocked, accumulators_support_blocked) { + (true, true) => { + group_values.switch_to_mode(GroupStatesMode::Blocked(block_size))?; + accumulators.iter_mut().try_for_each(|acc| { + acc.switch_to_mode(GroupStatesMode::Blocked(block_size)) + })?; + Ok(true) + } + _ => Ok(false), + } +} + +// TODO: we should add a function(like `name`) to distinguish different memory pools. +fn enable_spilling(memory_pool: &dyn MemoryPool) -> bool { + !format!("{memory_pool:?}").contains("UnboundedMemoryPool") Review Comment: I think using this check https://docs.rs/datafusion/latest/datafusion/execution/struct.DiskManager.html#method.tmp_files_enabled is likely the more correct way. Also, the fact that many systems won't use a unbounded pool during execution means that this check will make this optimization only supported in very specialized cases. However I see that the issue is that when chunked emission is enabled, then we haven't figured out spilling yet. ########## datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs: ########## @@ -92,32 +101,69 @@ where opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<()> { + if total_num_groups == 0 { + return Ok(()); + } + assert_eq!(values.len(), 1, "single argument to update_batch"); let values = values[0].as_primitive::<T>(); - // update values - self.values.resize(total_num_groups, self.starting_value); - // NullState dispatches / handles tracking nulls and groups that saw no values - self.null_state.accumulate( - group_indices, - values, - opt_filter, - total_num_groups, - |group_index, new_value| { - let value = &mut self.values[group_index]; - (self.prim_fn)(value, new_value); - }, - ); + match self.mode { + GroupStatesMode::Flat => { + // Ensure enough room in values + ensure_enough_room_for_flat_values( + &mut self.values_blocks, + total_num_groups, + self.starting_value, + ); + + let block = self.values_blocks.current_mut().unwrap(); + self.null_state.accumulate_for_flat( + group_indices, + values, + opt_filter, + total_num_groups, + |group_index, new_value| { + let value = &mut block[group_index]; + (self.prim_fn)(value, new_value); + }, + ); + } + GroupStatesMode::Blocked(blk_size) => { Review Comment: Would it be possible to change `prim_op` so that it *always* used blocked state. I am concerned (as I mentioned above) about the fact that we have now two parallel implementations in *all* the accumulators that support this chunked state Not only is this a bit more code, now we have a second path that must be tested in all of them, which I think is a substantial undertaking (@2010YOUY01 referred to this as well) What if we changed the group by hash operator so it always got blocked output (`Vec<RecordBatch>`) from the accumulators that supported it? It could then slice the output from accumulators that could only output a single record batch, as it does today. This would mean that if an accumulator supported Blocked output, it could always create BlockedOutput. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org