alamb commented on code in PR #15022:
URL: https://github.com/apache/datafusion/pull/15022#discussion_r1982212016


##########
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs:
##########
@@ -299,6 +420,12 @@ impl GroupsAccumulatorAdapter {
 }
 
 impl GroupsAccumulator for GroupsAccumulatorAdapter {
+    fn register_metadata(&mut self, metadata: &GroupsAccumulatorMetadata) -> 
Result<()> {
+        self.contiguous_group_indices = metadata.contiguous_group_indices;

Review Comment:
   maybe it would make sense to copy the metadata entirely into 
`GroupsAccumulatorAdapter` 



##########
datafusion/expr-common/src/groups_accumulator.rs:
##########
@@ -106,6 +107,44 @@ impl EmitTo {
 /// [`Accumulator`]: crate::accumulator::Accumulator
 /// [Aggregating Millions of Groups Fast blog]: 
https://arrow.apache.org/blog/2023/08/05/datafusion_fast_grouping/
 pub trait GroupsAccumulator: Send {
+    /// Whether [`Self::with_group_indices_order_mode`] should be called.
+    ///
+    /// this is when the accumulator would benefit from knowing the order of 
the group indices.
+    ///
+    fn group_order_sensitivity(&self) -> bool {

Review Comment:
   Minor: I think a name like `supports_with_input_order_mode` is more 
consistent with other methods on this trait such as `supports_convert_to_state`



##########
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs:
##########
@@ -249,24 +303,92 @@ impl GroupsAccumulatorAdapter {
         let mut sizes_pre = 0;
         let mut sizes_post = 0;
         for (&group_idx, offsets) in iter {
-            let state = &mut self.states[group_idx];
-            sizes_pre += state.size();
-
-            let values_to_accumulate = slice_and_maybe_filter(
-                &values,
-                opt_filter.as_ref().map(|f| f.as_boolean()),
-                offsets,
-            )?;
-            f(state.accumulator.as_mut(), &values_to_accumulate)?;
+            sizes_pre += self.states[group_idx].size();
+            self.invoke_accumulator(group_idx, &values, offsets, 
opt_boolean_filter, f)?;
 
             // clear out the state so they are empty for next
             // iteration
-            state.indices.clear();
-            sizes_post += state.size();
+            self.states[group_idx].indices.clear();
+
+            sizes_post += self.states[group_idx].size();
         }
 
-        self.adjust_allocation(sizes_pre, sizes_post);
-        Ok(())
+        Ok((sizes_pre, sizes_post))
+    }
+
+    /// invokes f(accumulator, values) for each group that has values
+    /// in group_indices.
+    ///
+    /// This function is the same as 
[`Self::invoke_per_accumulator_on_non_ordered_group_indices`] but avoid 
reordering of the
+    /// input as we know that each group_index is contiguous
+    ///

Review Comment:
   As above can you please document what the returned `()` tuple represents?



##########
datafusion/expr-common/src/groups_accumulator.rs:
##########
@@ -106,6 +107,44 @@ impl EmitTo {
 /// [`Accumulator`]: crate::accumulator::Accumulator
 /// [Aggregating Millions of Groups Fast blog]: 
https://arrow.apache.org/blog/2023/08/05/datafusion_fast_grouping/
 pub trait GroupsAccumulator: Send {
+    /// Whether [`Self::with_group_indices_order_mode`] should be called.
+    ///
+    /// this is when the accumulator would benefit from knowing the order of 
the group indices.
+    ///
+    fn group_order_sensitivity(&self) -> bool {
+        false
+    }
+
+    /// Called with the order mode for the group indices.
+    ///
+    /// This will only be called if [`Self::group_order_sensitivity`] is true 
and can be called either right after initialization
+    /// or after [`Self::state`], [`Self::evaluate`] consumed all the groups.
+    ///
+    /// For example if `group_indices_order_mode` equals to 
[`InputOrderMode::Sorted`] it means that if you get the following group indices 
in [`Self::update_batch`]/[`Self::merge_batch`]
+    /// ```text
+    /// [1, 1, 1, 1, 1, 2, 2, 3]
+    /// ```
+    ///
+    /// You can be sure that you will never get another group with index 1 or 
2 (until call to [`Self::state`]/[`Self::evaluate`] which will shift the group 
indices).
+    /// However, you might get another group with index 3 in the future.
+    ///
+    /// Possible optimization you can do in your implementation when the input 
is sorted is:
+    /// 1. Only track the current group state
+    /// 2. Have a builder that is ready to be built by call to 
[`Self::state`]/[`Self::evaluate`]
+    ///
+    fn with_group_indices_order_mode(

Review Comment:
   As above, here is a naming suggestion
   
   ```suggestion
       fn with_input_order_mode(
   ```



##########
datafusion/physical-plan/src/ordering.rs:
##########
@@ -15,40 +15,4 @@
 // specific language governing permissions and limitations
 // under the License.
 
-/// Specifies how the input to an aggregation or window operator is ordered
-/// relative to their `GROUP BY` or  `PARTITION BY` expressions.
-///
-/// For example, if the existing ordering is `[a ASC, b ASC, c ASC]`
-///
-/// ## Window Functions
-/// - A `PARTITION BY b` clause can use `Linear` mode.
-/// - A `PARTITION BY a, c` or a `PARTITION BY c, a` can use
-///   `PartiallySorted([0])` or `PartiallySorted([1])` modes, respectively.
-///   (The vector stores the index of `a` in the respective PARTITION BY 
expression.)
-/// - A `PARTITION BY a, b` or a `PARTITION BY b, a` can use `Sorted` mode.
-///
-/// ## Aggregations
-/// - A `GROUP BY b` clause can use `Linear` mode, as the only one permutation 
`[b]`
-///   cannot satisfy the existing ordering.
-/// - A `GROUP BY a, c` or a `GROUP BY c, a` can use
-///   `PartiallySorted([0])` or `PartiallySorted([1])` modes, respectively, as
-///   the permutation `[a]` satisfies the existing ordering.
-///   (The vector stores the index of `a` in the respective PARTITION BY 
expression.)
-/// - A `GROUP BY a, b` or a `GROUP BY b, a` can use `Sorted` mode, as the
-///   full permutation `[a, b]` satisfies the existing ordering.
-///
-/// Note these are the same examples as above, but with `GROUP BY` instead of
-/// `PARTITION BY` to make the examples easier to read.
-#[derive(Debug, Clone, PartialEq)]
-pub enum InputOrderMode {
-    /// There is no partial permutation of the expressions satisfying the
-    /// existing ordering.
-    Linear,
-    /// There is a partial permutation of the expressions satisfying the
-    /// existing ordering. Indices describing the longest partial permutation
-    /// are stored in the vector.
-    PartiallySorted(Vec<usize>),
-    /// There is a (full) permutation of the expressions satisfying the
-    /// existing ordering.
-    Sorted,
-}
+pub use datafusion_expr_common::ordering::InputOrderMode;

Review Comment:
   👍 



##########
datafusion/expr-common/src/groups_accumulator.rs:
##########
@@ -106,6 +107,44 @@ impl EmitTo {
 /// [`Accumulator`]: crate::accumulator::Accumulator
 /// [Aggregating Millions of Groups Fast blog]: 
https://arrow.apache.org/blog/2023/08/05/datafusion_fast_grouping/
 pub trait GroupsAccumulator: Send {
+    /// Whether [`Self::with_group_indices_order_mode`] should be called.
+    ///
+    /// this is when the accumulator would benefit from knowing the order of 
the group indices.
+    ///
+    fn group_order_sensitivity(&self) -> bool {
+        false
+    }
+
+    /// Called with the order mode for the group indices.
+    ///
+    /// This will only be called if [`Self::group_order_sensitivity`] is true 
and can be called either right after initialization
+    /// or after [`Self::state`], [`Self::evaluate`] consumed all the groups.
+    ///
+    /// For example if `group_indices_order_mode` equals to 
[`InputOrderMode::Sorted`] it means that if you get the following group indices 
in [`Self::update_batch`]/[`Self::merge_batch`]
+    /// ```text
+    /// [1, 1, 1, 1, 1, 2, 2, 3]
+    /// ```
+    ///
+    /// You can be sure that you will never get another group with index 1 or 
2 (until call to [`Self::state`]/[`Self::evaluate`] which will shift the group 
indices).
+    /// However, you might get another group with index 3 in the future.
+    ///
+    /// Possible optimization you can do in your implementation when the input 
is sorted is:
+    /// 1. Only track the current group state
+    /// 2. Have a builder that is ready to be built by call to 
[`Self::state`]/[`Self::evaluate`]
+    ///
+    fn with_group_indices_order_mode(
+        self: Box<Self>,

Review Comment:
   Another thing I wonder is since accumulators have mutable state in general I 
don't understand the nee for an API that consumes.
   
   In other words I think this API would be less awkward:
   
   ```rust
       fn set_group_order_mode(&mut self,
           _group_indices_order_mode: &InputOrderMode
       ) -> Result<()> { 
   ```



##########
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs:
##########
@@ -163,6 +177,50 @@ impl GroupsAccumulatorAdapter {
 
     /// invokes f(accumulator, values) for each group that has values
     /// in group_indices.
+    fn invoke_per_accumulator<F>(
+        &mut self,
+        values: &[ArrayRef],
+        group_indices: &[usize],
+        opt_filter: Option<&BooleanArray>,
+        total_num_groups: usize,
+        f: &F,
+    ) -> Result<()>
+    where
+        F: Fn(&mut dyn Accumulator, &[ArrayRef]) -> Result<()>,
+    {
+        self.make_accumulators_if_needed(total_num_groups)?;
+
+        assert_eq!(values[0].len(), group_indices.len());
+
+        if group_indices.is_empty() {
+            return Ok(());
+        }
+
+        let (sizes_pre, sizes_post) = if self.contiguous_group_indices {
+            self.invoke_per_accumulator_on_contiguous_group_indices(
+                values,
+                group_indices,
+                opt_filter,
+                f,
+            )
+        } else {
+            self.invoke_per_accumulator_on_non_ordered_group_indices(
+                values,
+                group_indices,
+                opt_filter,
+                f,
+            )
+        }?;
+
+        self.adjust_allocation(sizes_pre, sizes_post);
+
+        Ok(())
+    }
+
+    /// invokes f(accumulator, values) for each group that has values
+    /// in group_indices.
+    ///
+    /// if the group indices are contiguous we avoiding

Review Comment:
   This seems like a prematurely cut off sentnece
   
   Can you also please add documentation about what the return `(usize, usize)` 
means



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to