Rachelint commented on code in PR #23309:
URL: https://github.com/apache/datafusion/pull/23309#discussion_r3522407373


##########
datafusion/physical-plan/src/aggregates/aggregate_hash_table/common.rs:
##########
@@ -60,19 +72,36 @@ pub(in crate::aggregates) struct FinalMarker;
 /// [`GroupsAccumulator`]. Both use columnar storage so aggregation can stay
 /// vectorized.
 ///
-/// # Marker Type
-/// `AggrMode` selects the aggregate semantics.
+/// # Mode
+///
+/// [`AggregateTableMode`] controls how input batches update accumulator state
+/// and how output batches are materialized.
 ///
-/// e.g. `AggregateHashTable::<PartialMarker>::new(...)` creates an aggregate 
hash table
-/// for the partial hash aggregate stage, the input schema is raw rows and 
output
-/// schema is intermediate states.
+/// ```text
+/// Example: `AVG(x) GROUP BY k`.
 ///
-/// It is a zero-sized compile-time marker, so each stage keeps its update 
logic
-/// in a separate impl block, to make the behavior difference explicit.
-pub(in crate::aggregates) struct AggregateHashTable<AggrMode> {
+/// In `Partial` mode, the table stores partial state:
+///     k, sum(x), count(x)
+/// The input batch contains raw values:
+///     k, x
+/// The output batch also contains partial state:
+///     k, sum(x), count(x)
+/// ```
+///
+/// So input uses [`GroupsAccumulator::update_batch`], and output uses
+/// [`GroupsAccumulator::state`].
+///
+/// Other modes use different input/output combinations:
+/// - `Final`: merge_batch + evaluate
+/// - `PartialReduce`: merge_batch + state
+/// - `Single`: update_batch + evaluate
+pub(in crate::aggregates) struct AggregateHashTable {

Review Comment:
   I think maybe we should keep different `AggregateHashTable`s for different 
usage, and remove duplicated codes based on this?
   Due to :
   - Concerns in `aggregate_batch` and other methods.
   - Some method like `partial_skip_table`, it is logic for partial aggr and 
seems not common.
   



##########
datafusion/physical-plan/src/aggregates/aggregate_hash_table/common.rs:
##########
@@ -214,6 +321,239 @@ impl<AggrMode> AggregateHashTable<AggrMode> {
         state.batch_group_indices = Vec::new();
         self.state = AggregateHashTableState::Outputting(state);
     }
+
+    /// Aggregates one input batch according to this table's input semantics.
+    ///
+    /// `Partial` and `Single` update accumulator state from raw input rows.
+    /// `Final` and `PartialReduce` merge accumulator state emitted by an
+    /// earlier aggregate stage.
+    pub(in crate::aggregates) fn aggregate_batch(
+        &mut self,
+        batch: &RecordBatch,
+    ) -> Result<()> {
+        let evaluated_batch = self.evaluate_batch(batch)?;
+        let mode = self.mode;
+        let state = self.state.building_mut();
+
+        let timer = self.group_by_metrics.aggregation_time.timer();
+        for group_values in &evaluated_batch.grouping_set_args {
+            state
+                .group_values
+                .intern(group_values, &mut state.batch_group_indices)?;
+            let group_indices = &state.batch_group_indices;
+            let total_num_groups = state.group_values.len();
+
+            for (acc, values) in state
+                .accumulators
+                .iter_mut()
+                .zip(evaluated_batch.accumulator_args.iter())
+            {
+                match mode {
+                    AggregateTableMode::Partial | AggregateTableMode::Single 
=> {

Review Comment:
   I am concerned codes here seem switching to the way like `row_hash.rs`: all 
logic in one place, and using `if else` to decide where we go.



##########
datafusion/physical-plan/src/aggregates/aggregate_hash_table/common.rs:
##########
@@ -85,29 +114,63 @@ pub(in crate::aggregates) struct 
AggregateHashTable<AggrMode> {
 
     /// Lifecycle-specific state: building stage / outputting stage.
     pub(super) state: AggregateHashTableState,
-
-    pub(super) _mode: PhantomData<AggrMode>,
 }
 
 /// Methods shared by all aggregate hash table modes.
-impl<AggrMode> AggregateHashTable<AggrMode> {
-    pub(super) fn new_with_filters(
+impl AggregateHashTable {
+    pub(in crate::aggregates) fn new(
         agg: &AggregateExec,
         partition: usize,
         output_schema: SchemaRef,
         batch_size: usize,
-        filters: Vec<Option<Arc<dyn PhysicalExpr>>>,
     ) -> Result<Self> {
         if batch_size == 0 {
             return internal_err!("AggregateHashTable requires config 
batch_size >= 1");
         }
 
+        // Infer the internal `AggregateTableMode` based on `AggregateExec`'s 
mode
+        //
+        // TODO(simplification): `AggregateMode` seems bloated for aggregate 
hash
+        // table semantics. Consider remove `AggregateMode` and only use 
`AggregateTableMode`
+        // after the refactor has finished.
+        //
+        // Issue: <https://github.com/apache/datafusion/pull/22729>
+        let mode = match agg.mode {
+            AggregateMode::Partial => AggregateTableMode::Partial,
+            AggregateMode::Final | AggregateMode::FinalPartitioned => {
+                AggregateTableMode::Final
+            }
+            AggregateMode::PartialReduce => AggregateTableMode::PartialReduce,
+            AggregateMode::Single | AggregateMode::SinglePartitioned => {
+                AggregateTableMode::Single
+            }
+        };
+
         let input_schema = agg.input().schema();
+        let aggregate_mode = match mode {
+            AggregateTableMode::Partial => AggregateMode::Partial,
+            AggregateTableMode::Final => AggregateMode::Final,
+            AggregateTableMode::PartialReduce => AggregateMode::PartialReduce,
+            AggregateTableMode::Single => AggregateMode::Single,
+        };
         let aggregate_arguments = aggregate_expressions(
             &agg.aggr_expr,
-            &agg.mode,
+            &aggregate_mode,
             agg.group_by.num_group_exprs(),
         )?;
+
+        // Filters apply only when the table consumes raw input rows. Final and
+        // partial-reduce modes consume partial states, so their filters are 
not
+        // applicable.
+        let filters = match mode {

Review Comment:
   Same concern like what in `aggregate_batch`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to