alamb commented on code in PR #23309:
URL: https://github.com/apache/datafusion/pull/23309#discussion_r3522932671


##########
datafusion/physical-plan/src/aggregates/aggregate_hash_table/common.rs:
##########
@@ -60,19 +72,36 @@ pub(in crate::aggregates) struct FinalMarker;
 /// [`GroupsAccumulator`]. Both use columnar storage so aggregation can stay
 /// vectorized.
 ///
-/// # Marker Type
-/// `AggrMode` selects the aggregate semantics.
+/// # Mode
+///
+/// [`AggregateTableMode`] controls how input batches update accumulator state
+/// and how output batches are materialized.
 ///
-/// e.g. `AggregateHashTable::<PartialMarker>::new(...)` creates an aggregate 
hash table
-/// for the partial hash aggregate stage, the input schema is raw rows and 
output
-/// schema is intermediate states.
+/// ```text
+/// Example: `AVG(x) GROUP BY k`.
 ///
-/// It is a zero-sized compile-time marker, so each stage keeps its update 
logic
-/// in a separate impl block, to make the behavior difference explicit.
-pub(in crate::aggregates) struct AggregateHashTable<AggrMode> {
+/// In `Partial` mode, the table stores partial state:
+///     k, sum(x), count(x)
+/// The input batch contains raw values:
+///     k, x
+/// The output batch also contains partial state:
+///     k, sum(x), count(x)
+/// ```
+///
+/// So input uses [`GroupsAccumulator::update_batch`], and output uses
+/// [`GroupsAccumulator::state`].
+///
+/// Other modes use different input/output combinations:
+/// - `Final`: merge_batch + evaluate
+/// - `PartialReduce`: merge_batch + state
+/// - `Single`: update_batch + evaluate
+pub(in crate::aggregates) struct AggregateHashTable {

Review Comment:
   > My concern is that this would create a [shallow 
module](https://softengbook.org/articles/deep-modules). To fully understand the 
implementation, we would still need to reason about the same underlying mode 
differences, while the external AggregateHashTable variants would add 
complexity with almost no functional contribution.
   
   One idea could be to leave the structure that is already on main, and 
refactor the common behavior out as templated functions on the base class
   
   So for example, leave  `AggregateHashTable<AggrMode>`
   
   But then in the parts that are very similar like `aggregate_batch` instead of
   
   ```rust
   impl AggregateHashTable<PartialReduceMarker> {
   ...
      pub(in crate::aggregates) fn aggregate_batch(
           &mut self,
           batch: &RecordBatch,
       ) -> Result<()> {
           let evaluated_batch = self.evaluate_batch(batch)?;
           let state = self.state.building_mut();
   
           let timer = self.group_by_metrics.aggregation_time.timer();
           for group_values in &evaluated_batch.grouping_set_args {
               state
                   .group_values
                   .intern(group_values, &mut state.batch_group_indices)?;
               let group_indices = &state.batch_group_indices;
               let total_num_groups = state.group_values.len();
   
               for (acc, values) in state
                   .accumulators
                   .iter_mut()
                   .zip(evaluated_batch.accumulator_args.iter())
               {
                   acc.merge_batch(values, group_indices, total_num_groups)?;
               }
           }
           drop(timer);
   
           Ok(())
       }
   ..
   }
   ```
   
   It could be structured something so that the specialization was called
   
   ```rust
   impl AggregateHashTable<PartialReduceMarker> {
   ...
      pub(in crate::aggregates) fn aggregate_batch(
           &mut self,
           batch: &RecordBatch,
       ) -> Result<()> {
        // call a templated function with a closure with the hash table 
specific functionality
         self.aggregate_batch_inner(batch, |acc, values, group_indices, 
total_num_groups| {
                   acc.merge_batch(values, group_indices, total_num_groups)
          })?;
       }
   ...
   }
   ```
   
   And then move the common logic into the generic method
   
   ```rust
   /// Implement in "base" class (not the specialization)
   impl AggregateHashTable<AggrMode> {
   ...
       fn <F: Fn(...) -> Result<()>)aggregate_batch_inner(
           batch: &RecordBatch,
           agg_function: F,
       ) -> Result<()> {
         let evaluated_batch = self.evaluate_batch(batch)?;
           let state = self.state.building_mut();
   
           let timer = self.group_by_metrics.aggregation_time.timer();
           for group_values in &evaluated_batch.grouping_set_args {
               state
                   .group_values
                   .intern(group_values, &mut state.batch_group_indices)?;
               let group_indices = &state.batch_group_indices;
               let total_num_groups = state.group_values.len();
   
               for (acc, values) in state
                   .accumulators
                   .iter_mut()
                   .zip(evaluated_batch.accumulator_args.iter())
               {
                 // *** Note here call the generic function agg_function: F
                  agg_function(values, group_indices, total_num_groups)?; 
<---------
               }
           }
           drop(timer);
   
           Ok(())
       }
     }
   }
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to