alamb commented on code in PR #11943:
URL: https://github.com/apache/datafusion/pull/11943#discussion_r1727162759


##########
datafusion/expr-common/src/groups_accumulator.rs:
##########
@@ -143,6 +145,25 @@ pub trait GroupsAccumulator: Send {
     /// [`Accumulator::state`]: crate::accumulator::Accumulator::state
     fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
 
+    /// Returns `true` if this accumulator supports blocked mode.
+    fn supports_blocked_mode(&self) -> bool {
+        false
+    }
+
+    /// Switch the accumulator to flat or blocked mode.
+    /// You can see detail about the mode on [GroupStatesMode].
+    ///
+    /// After switching mode, all data in previous mode will be cleared.
+    fn switch_to_mode(&mut self, mode: GroupStatesMode) -> Result<()> {

Review Comment:
   
   I understand why you introduced this API, but I think it makes the 
accumulators harder to reason about because now each now has two potential 
modes so there are two similar, but not the same parallel implementations that 
we have to ensure are tested. I had an idea to avoid this switch_to_mode API 
below



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -529,10 +552,53 @@ impl GroupedHashAggregateStream {
             spill_state,
             group_values_soft_limit: agg.limit,
             skip_aggregation_probe,
+            enable_blocked_group_states,
         })
     }
 }
 
+/// Check if we can enable the blocked optimization for `GroupValues` and 
`GroupsAccumulator`s.
+/// The blocked optimization will be enabled when:
+///   - It is not streaming aggregation(because blocked mode can't support 
Emit::first(exact n))
+///   - The spilling is disabled(still need to consider more to support it 
efficiently)
+///   - The accumulator is not empty(I am still not sure about logic in this 
case)
+///   - `GroupValues` and all `GroupsAccumulator`s support blocked mode
+// TODO: support blocked optimization in streaming, spilling, and maybe empty 
accumulators case?
+fn maybe_enable_blocked_group_states(
+    context: &TaskContext,
+    group_values: &mut dyn GroupValues,
+    accumulators: &mut [Box<dyn GroupsAccumulator>],
+    block_size: usize,
+    group_ordering: &GroupOrdering,
+) -> Result<bool> {
+    if !matches!(group_ordering, GroupOrdering::None)
+        || accumulators.is_empty()
+        || enable_spilling(context.memory_pool().as_ref())
+    {
+        return Ok(false);
+    }
+
+    let group_supports_blocked = group_values.supports_blocked_mode();
+    let accumulators_support_blocked =
+        accumulators.iter().all(|acc| acc.supports_blocked_mode());
+
+    match (group_supports_blocked, accumulators_support_blocked) {
+        (true, true) => {
+            group_values.switch_to_mode(GroupStatesMode::Blocked(block_size))?;
+            accumulators.iter_mut().try_for_each(|acc| {
+                acc.switch_to_mode(GroupStatesMode::Blocked(block_size))
+            })?;
+            Ok(true)
+        }
+        _ => Ok(false),
+    }
+}
+
+// TODO: we should add a function(like `name`) to distinguish different memory 
pools.
+fn enable_spilling(memory_pool: &dyn MemoryPool) -> bool {
+    !format!("{memory_pool:?}").contains("UnboundedMemoryPool")

Review Comment:
   I think using this check 
https://docs.rs/datafusion/latest/datafusion/execution/struct.DiskManager.html#method.tmp_files_enabled
 is likely the more correct way.
   
   Also, the fact that many systems won't use a unbounded pool during execution 
means that this check will make this optimization only supported in very 
specialized cases. 
   
   However I see that the issue is that when chunked emission is enabled, then 
we haven't figured out spilling yet. 



##########
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs:
##########
@@ -92,32 +101,69 @@ where
         opt_filter: Option<&BooleanArray>,
         total_num_groups: usize,
     ) -> Result<()> {
+        if total_num_groups == 0 {
+            return Ok(());
+        }
+
         assert_eq!(values.len(), 1, "single argument to update_batch");
         let values = values[0].as_primitive::<T>();
 
-        // update values
-        self.values.resize(total_num_groups, self.starting_value);
-
         // NullState dispatches / handles tracking nulls and groups that saw 
no values
-        self.null_state.accumulate(
-            group_indices,
-            values,
-            opt_filter,
-            total_num_groups,
-            |group_index, new_value| {
-                let value = &mut self.values[group_index];
-                (self.prim_fn)(value, new_value);
-            },
-        );
+        match self.mode {
+            GroupStatesMode::Flat => {
+                // Ensure enough room in values
+                ensure_enough_room_for_flat_values(
+                    &mut self.values_blocks,
+                    total_num_groups,
+                    self.starting_value,
+                );
+
+                let block = self.values_blocks.current_mut().unwrap();
+                self.null_state.accumulate_for_flat(
+                    group_indices,
+                    values,
+                    opt_filter,
+                    total_num_groups,
+                    |group_index, new_value| {
+                        let value = &mut block[group_index];
+                        (self.prim_fn)(value, new_value);
+                    },
+                );
+            }
+            GroupStatesMode::Blocked(blk_size) => {

Review Comment:
   Would it be possible to change `prim_op` so that it *always* used blocked 
state. I am concerned (as I mentioned above) about the fact that we have now 
two parallel implementations in *all* the accumulators that support this 
chunked state
   
   Not only is this a bit more code, now we have a second path that must be 
tested in all of them, which I think is a substantial undertaking (@2010YOUY01 
referred to this as well)
   
   What if we changed the group by hash operator  so it always got blocked 
output (`Vec<RecordBatch>`) from the accumulators that supported it?  It could 
then slice the output from accumulators that could only output a single record 
batch, as it does today.
   
   This would mean that if an accumulator supported Blocked output, it could 
always create BlockedOutput.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to