eejbyfeldt commented on code in PR #12565:
URL: https://github.com/apache/datafusion/pull/12565#discussion_r1769514218
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -578,7 +579,13 @@ impl GroupedHashAggregateStream {
/// [`GroupsAccumulatorAdapter`] if not.
pub(crate) fn create_group_accumulator(
agg_expr: &AggregateFunctionExpr,
+ group_by: &PhysicalGroupBy,
) -> Result<Box<dyn GroupsAccumulator>> {
+ // GROUPING is a special fxn that exposes info about group organization
+ if let Some(grouping) =
agg_expr.fun().inner().as_any().downcast_ref::<Grouping>() {
+ let args = agg_expr.all_expressions().args;
+ return grouping.create_grouping_accumulator(&args, &group_by.expr);
+ }
Review Comment:
If we need special handling like this it seems to me that we should consider
just making `Grouping` a build in.
Or we should probably make it more generic so it can be used to implement
some other function. But since the input is is just the bitmaks and the output
is the same. I wonder if there are any conceivable functions that could not
just be implemented as a transformation on a builtin grouping function.
##########
datafusion/functions-aggregate/src/grouping.rs:
##########
@@ -59,9 +73,55 @@ impl Grouping {
/// Create a new GROUPING aggregate function.
pub fn new() -> Self {
Self {
- signature: Signature::any(1, Volatility::Immutable),
+ signature: Signature::variadic_any(Volatility::Immutable),
}
}
+
+ /// Create an accumulator for GROUPING(grouping_args) in a GROUP BY over
group_exprs
+ /// A special creation function is necessary because GROUPING has unusual
input requirements.
+ pub fn create_grouping_accumulator(
+ &self,
+ grouping_args: &[Arc<dyn PhysicalExpr>],
+ group_exprs: &[(Arc<dyn PhysicalExpr>, String)],
+ ) -> Result<Box<dyn GroupsAccumulator>> {
+ if grouping_args.len() > 32 {
+ return plan_err!(
+ "GROUPING is supported for up to 32 columns. Consider another \
+ GROUPING statement if you need to aggregate over more columns."
+ );
+ }
+ // The PhysicalExprs of grouping_exprs must be Column PhysicalExpr.
Because if
+ // the group by PhysicalExpr in SQL is non-Column PhysicalExpr, then
there is
+ // a ProjectionExec before AggregateExec to convert the non-column
PhysicalExpr
+ // to Column PhysicalExpr.
+ let column_index =
+ |expr: &Arc<dyn PhysicalExpr>| match
expr.as_any().downcast_ref::<Column>() {
+ Some(column) => Ok(column.index()),
+ None => internal_err!("Grouping doesn't support expr: {}",
expr),
+ };
Review Comment:
This is only true when one enabled the optimizer rule
`CommonSubexprEliminate` . Does not seems like a acceptable to depend on
optimizer rules for correctness/basic support.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]