jayzhan211 commented on code in PR #11617: URL: https://github.com/apache/datafusion/pull/11617#discussion_r1689447097
########## datafusion/physical-expr-common/src/aggregate/mod.rs: ########## @@ -121,44 +101,192 @@ pub fn create_aggregate_expr_with_dfschema( is_distinct: bool, is_reversed: bool, ) -> Result<Arc<dyn AggregateExpr>> { - debug_assert_eq!(sort_exprs.len(), ordering_req.len()); - + let mut builder = + AggregateExprBuilder::new(Arc::new(fun.clone()), input_phy_exprs.to_vec()); + builder = builder.sort_exprs(sort_exprs.to_vec()); + builder = builder.order_by(ordering_req.to_vec()); + builder = builder.logical_exprs(input_exprs.to_vec()); + builder = builder.dfschema(dfschema.clone()); let schema: Schema = dfschema.into(); + builder = builder.schema(Arc::new(schema)); + builder = builder.name(name); + + if ignore_nulls { + builder = builder.ignore_nulls(); + } + if is_distinct { + builder = builder.distinct(); + } + if is_reversed { + builder = builder.reversed(); + } + + builder.build() +} + +#[derive(Debug, Clone)] +pub struct AggregateExprBuilder { + fun: Arc<AggregateUDF>, + /// Physical expressions of the aggregate function + args: Vec<Arc<dyn PhysicalExpr>>, + /// Logical expressions of the aggregate function, it will be deprecated in <https://github.com/apache/datafusion/issues/11359> + logical_args: Vec<Expr>, + name: String, + /// Arrow Schema for the aggregate function + schema: SchemaRef, + /// Datafusion Schema for the aggregate function + dfschema: DFSchema, + /// The logical order by expressions, it will be deprecated in <https://github.com/apache/datafusion/issues/11359> + sort_exprs: Vec<Expr>, + /// The physical order by expressions + ordering_req: LexOrdering, + /// Whether to ignore null values + ignore_nulls: bool, + /// Whether is distinct aggregate function + is_distinct: bool, + /// Whether the expression is reversed + is_reversed: bool, +} + +impl AggregateExprBuilder { + pub fn new(fun: Arc<AggregateUDF>, args: Vec<Arc<dyn PhysicalExpr>>) -> Self { + Self { + fun, + args, + logical_args: vec![], + name: String::new(), + schema: Arc::new(Schema::empty()), + dfschema: DFSchema::empty(), + sort_exprs: vec![], + ordering_req: vec![], + ignore_nulls: false, + is_distinct: false, + is_reversed: false, + } + } + + pub fn build(self) -> Result<Arc<dyn AggregateExpr>> { + let Self { + fun, + args, + logical_args, + name, + schema, + dfschema, + sort_exprs, + ordering_req, + ignore_nulls, + is_distinct, + is_reversed, + } = self; + if args.is_empty() { + return internal_err!("args should not be empty"); + } + + let mut ordering_fields = vec![]; + + debug_assert_eq!(sort_exprs.len(), ordering_req.len()); + if !ordering_req.is_empty() { + let ordering_types = ordering_req + .iter() + .map(|e| e.expr.data_type(&schema)) + .collect::<Result<Vec<_>>>()?; + + ordering_fields = utils::ordering_fields(&ordering_req, &ordering_types); + } + + let input_exprs_types = args + .iter() + .map(|arg| arg.data_type(&schema)) + .collect::<Result<Vec<_>>>()?; + + check_arg_count( + fun.name(), + &input_exprs_types, + &fun.signature().type_signature, + )?; + + let data_type = fun.return_type(&input_exprs_types)?; - let input_exprs_types = input_phy_exprs - .iter() - .map(|arg| arg.data_type(&schema)) - .collect::<Result<Vec<_>>>()?; - - check_arg_count( - fun.name(), - &input_exprs_types, - &fun.signature().type_signature, - )?; - - let ordering_types = ordering_req - .iter() - .map(|e| e.expr.data_type(&schema)) - .collect::<Result<Vec<_>>>()?; - - let ordering_fields = ordering_fields(ordering_req, &ordering_types); - - Ok(Arc::new(AggregateFunctionExpr { - fun: fun.clone(), - args: input_phy_exprs.to_vec(), - logical_args: input_exprs.to_vec(), - data_type: fun.return_type(&input_exprs_types)?, - name: name.into(), - schema: schema.clone(), - dfschema: dfschema.clone(), - sort_exprs: sort_exprs.to_vec(), - ordering_req: ordering_req.to_vec(), - ignore_nulls, - ordering_fields, - is_distinct, - input_type: input_exprs_types[0].clone(), - is_reversed, - })) + Ok(Arc::new(AggregateFunctionExpr { + fun: Arc::unwrap_or_clone(fun), + args, + logical_args, + data_type, + name, + schema: Arc::unwrap_or_clone(schema), + dfschema, + sort_exprs, + ordering_req, + ignore_nulls, + ordering_fields, + is_distinct, + input_type: input_exprs_types[0].clone(), + is_reversed, + })) + } + + pub fn name(mut self, name: impl Into<String>) -> Self { Review Comment: I'm not if it is necessary, maybe we can build it from function+args, so keep it out from `new()` so it is easier for deprecation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org