alamb commented on code in PR #11617:
URL: https://github.com/apache/datafusion/pull/11617#discussion_r1689647167


##########
datafusion/physical-plan/src/windows/mod.rs:
##########
@@ -129,43 +130,35 @@ pub fn create_window_expr(
             ))
         }
         WindowFunctionDefinition::AggregateUDF(fun) => {
-            // TODO: Ordering not supported for Window UDFs yet
             // Convert `Vec<PhysicalSortExpr>` into `Vec<Expr::Sort>`
             let sort_exprs = order_by
                 .iter()
                 .map(|PhysicalSortExpr { expr, options }| {
                     let field_name = expr.to_string();
                     let field_name = 
field_name.split('@').next().unwrap_or(&field_name);
                     Expr::Sort(SortExpr {
-                        expr: Box::new(Expr::Column(Column::new(
-                            None::<String>,
-                            field_name,
-                        ))),
+                        expr: Box::new(col(field_name)),

Review Comment:
   ๐Ÿงน 



##########
datafusion/core/src/physical_optimizer/aggregate_statistics.rs:
##########
@@ -419,19 +419,11 @@ pub(crate) mod tests {
 
         // Return appropriate expr depending if COUNT is for col or table (*)
         pub(crate) fn count_expr(&self, schema: &Schema) -> Arc<dyn 
AggregateExpr> {
-            create_aggregate_expr(
-                &count_udaf(),
-                &[self.column()],
-                &[],
-                &[],
-                &[],
-                schema,
-                self.column_name(),
-                false,
-                false,
-                false,
-            )
-            .unwrap()
+            AggregateExprBuilder::new(count_udaf(), vec![self.column()])

Review Comment:
   I think that is much easier to understand ๐Ÿ’ฏ 



##########
datafusion/physical-expr-common/src/aggregate/mod.rs:
##########
@@ -121,44 +101,192 @@ pub fn create_aggregate_expr_with_dfschema(
     is_distinct: bool,
     is_reversed: bool,
 ) -> Result<Arc<dyn AggregateExpr>> {
-    debug_assert_eq!(sort_exprs.len(), ordering_req.len());
-
+    let mut builder =
+        AggregateExprBuilder::new(Arc::new(fun.clone()), 
input_phy_exprs.to_vec());
+    builder = builder.sort_exprs(sort_exprs.to_vec());
+    builder = builder.order_by(ordering_req.to_vec());
+    builder = builder.logical_exprs(input_exprs.to_vec());
+    builder = builder.dfschema(dfschema.clone());
     let schema: Schema = dfschema.into();
+    builder = builder.schema(Arc::new(schema));
+    builder = builder.name(name);
+
+    if ignore_nulls {
+        builder = builder.ignore_nulls();
+    }
+    if is_distinct {
+        builder = builder.distinct();
+    }
+    if is_reversed {
+        builder = builder.reversed();
+    }
+
+    builder.build()
+}
+
+#[derive(Debug, Clone)]
+pub struct AggregateExprBuilder {
+    fun: Arc<AggregateUDF>,
+    /// Physical expressions of the aggregate function
+    args: Vec<Arc<dyn PhysicalExpr>>,
+    /// Logical expressions of the aggregate function, it will be deprecated 
in <https://github.com/apache/datafusion/issues/11359>
+    logical_args: Vec<Expr>,
+    name: String,
+    /// Arrow Schema for the aggregate function
+    schema: SchemaRef,
+    /// Datafusion Schema for the aggregate function
+    dfschema: DFSchema,
+    /// The logical order by expressions, it will be deprecated in 
<https://github.com/apache/datafusion/issues/11359>
+    sort_exprs: Vec<Expr>,
+    /// The physical order by expressions
+    ordering_req: LexOrdering,
+    /// Whether to ignore null values
+    ignore_nulls: bool,
+    /// Whether is distinct aggregate function
+    is_distinct: bool,
+    /// Whether the expression is reversed
+    is_reversed: bool,
+}
+
+impl AggregateExprBuilder {
+    pub fn new(fun: Arc<AggregateUDF>, args: Vec<Arc<dyn PhysicalExpr>>) -> 
Self {
+        Self {
+            fun,
+            args,
+            logical_args: vec![],
+            name: String::new(),
+            schema: Arc::new(Schema::empty()),
+            dfschema: DFSchema::empty(),
+            sort_exprs: vec![],
+            ordering_req: vec![],
+            ignore_nulls: false,
+            is_distinct: false,
+            is_reversed: false,
+        }
+    }
+
+    pub fn build(self) -> Result<Arc<dyn AggregateExpr>> {
+        let Self {
+            fun,
+            args,
+            logical_args,
+            name,
+            schema,
+            dfschema,
+            sort_exprs,
+            ordering_req,
+            ignore_nulls,
+            is_distinct,
+            is_reversed,
+        } = self;
+        if args.is_empty() {
+            return internal_err!("args should not be empty");
+        }
+
+        let mut ordering_fields = vec![];
+
+        debug_assert_eq!(sort_exprs.len(), ordering_req.len());
+        if !ordering_req.is_empty() {
+            let ordering_types = ordering_req
+                .iter()
+                .map(|e| e.expr.data_type(&schema))
+                .collect::<Result<Vec<_>>>()?;
+
+            ordering_fields = utils::ordering_fields(&ordering_req, 
&ordering_types);
+        }
+
+        let input_exprs_types = args
+            .iter()
+            .map(|arg| arg.data_type(&schema))
+            .collect::<Result<Vec<_>>>()?;
+
+        check_arg_count(
+            fun.name(),
+            &input_exprs_types,
+            &fun.signature().type_signature,
+        )?;
+
+        let data_type = fun.return_type(&input_exprs_types)?;
 
-    let input_exprs_types = input_phy_exprs
-        .iter()
-        .map(|arg| arg.data_type(&schema))
-        .collect::<Result<Vec<_>>>()?;
-
-    check_arg_count(
-        fun.name(),
-        &input_exprs_types,
-        &fun.signature().type_signature,
-    )?;
-
-    let ordering_types = ordering_req
-        .iter()
-        .map(|e| e.expr.data_type(&schema))
-        .collect::<Result<Vec<_>>>()?;
-
-    let ordering_fields = ordering_fields(ordering_req, &ordering_types);
-
-    Ok(Arc::new(AggregateFunctionExpr {
-        fun: fun.clone(),
-        args: input_phy_exprs.to_vec(),
-        logical_args: input_exprs.to_vec(),
-        data_type: fun.return_type(&input_exprs_types)?,
-        name: name.into(),
-        schema: schema.clone(),
-        dfschema: dfschema.clone(),
-        sort_exprs: sort_exprs.to_vec(),
-        ordering_req: ordering_req.to_vec(),
-        ignore_nulls,
-        ordering_fields,
-        is_distinct,
-        input_type: input_exprs_types[0].clone(),
-        is_reversed,
-    }))
+        Ok(Arc::new(AggregateFunctionExpr {
+            fun: Arc::unwrap_or_clone(fun),
+            args,
+            logical_args,
+            data_type,
+            name,
+            schema: Arc::unwrap_or_clone(schema),
+            dfschema,
+            sort_exprs,
+            ordering_req,
+            ignore_nulls,
+            ordering_fields,
+            is_distinct,
+            input_type: input_exprs_types[0].clone(),
+            is_reversed,
+        }))
+    }
+
+    pub fn name(mut self, name: impl Into<String>) -> Self {

Review Comment:
   I agree creating the name automatically from existing information would be 
better. Perhaps we can do that as a follow on PR



##########
datafusion/proto/Cargo.toml:
##########
@@ -50,6 +50,7 @@ chrono = { workspace = true }
 datafusion = { workspace = true, default-features = true }
 datafusion-common = { workspace = true, default-features = true }
 datafusion-expr = { workspace = true }
+datafusion-physical-expr-common = { workspace = true }

Review Comment:
   since proto already relies on the core datafusion crate this is not a new 
(direct) dependency.
   
   Though maybe it highlights that it would be good to `pub use` the 
`AggregateExprBuilder` in `datafusion` so using it doesn't need a new explicit 
dependency ๐Ÿค” 



##########
datafusion/physical-expr-common/src/aggregate/mod.rs:
##########
@@ -121,44 +101,192 @@ pub fn create_aggregate_expr_with_dfschema(
     is_distinct: bool,
     is_reversed: bool,
 ) -> Result<Arc<dyn AggregateExpr>> {
-    debug_assert_eq!(sort_exprs.len(), ordering_req.len());
-
+    let mut builder =
+        AggregateExprBuilder::new(Arc::new(fun.clone()), 
input_phy_exprs.to_vec());
+    builder = builder.sort_exprs(sort_exprs.to_vec());
+    builder = builder.order_by(ordering_req.to_vec());
+    builder = builder.logical_exprs(input_exprs.to_vec());
+    builder = builder.dfschema(dfschema.clone());
     let schema: Schema = dfschema.into();
+    builder = builder.schema(Arc::new(schema));
+    builder = builder.name(name);
+
+    if ignore_nulls {
+        builder = builder.ignore_nulls();
+    }
+    if is_distinct {
+        builder = builder.distinct();
+    }
+    if is_reversed {
+        builder = builder.reversed();
+    }
+
+    builder.build()
+}
+
+#[derive(Debug, Clone)]
+pub struct AggregateExprBuilder {

Review Comment:
   ```suggestion
   /// Builder for physical [`AggregateExpr`]
   ///
   /// `AggregateExpr` contains the information necessary to call
   /// an aggregate expression. 
   pub struct AggregateExprBuilder {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to