jayzhan211 commented on code in PR #11617:
URL: https://github.com/apache/datafusion/pull/11617#discussion_r1689448351


##########
datafusion/physical-expr-common/src/aggregate/mod.rs:
##########
@@ -121,44 +101,192 @@ pub fn create_aggregate_expr_with_dfschema(
     is_distinct: bool,
     is_reversed: bool,
 ) -> Result<Arc<dyn AggregateExpr>> {
-    debug_assert_eq!(sort_exprs.len(), ordering_req.len());
-
+    let mut builder =
+        AggregateExprBuilder::new(Arc::new(fun.clone()), 
input_phy_exprs.to_vec());
+    builder = builder.sort_exprs(sort_exprs.to_vec());
+    builder = builder.order_by(ordering_req.to_vec());
+    builder = builder.logical_exprs(input_exprs.to_vec());
+    builder = builder.dfschema(dfschema.clone());
     let schema: Schema = dfschema.into();
+    builder = builder.schema(Arc::new(schema));
+    builder = builder.name(name);
+
+    if ignore_nulls {
+        builder = builder.ignore_nulls();
+    }
+    if is_distinct {
+        builder = builder.distinct();
+    }
+    if is_reversed {
+        builder = builder.reversed();
+    }
+
+    builder.build()
+}
+
+#[derive(Debug, Clone)]
+pub struct AggregateExprBuilder {
+    fun: Arc<AggregateUDF>,
+    /// Physical expressions of the aggregate function
+    args: Vec<Arc<dyn PhysicalExpr>>,
+    /// Logical expressions of the aggregate function, it will be deprecated 
in <https://github.com/apache/datafusion/issues/11359>
+    logical_args: Vec<Expr>,
+    name: String,
+    /// Arrow Schema for the aggregate function
+    schema: SchemaRef,
+    /// Datafusion Schema for the aggregate function
+    dfschema: DFSchema,
+    /// The logical order by expressions, it will be deprecated in 
<https://github.com/apache/datafusion/issues/11359>
+    sort_exprs: Vec<Expr>,
+    /// The physical order by expressions
+    ordering_req: LexOrdering,
+    /// Whether to ignore null values
+    ignore_nulls: bool,
+    /// Whether is distinct aggregate function
+    is_distinct: bool,
+    /// Whether the expression is reversed
+    is_reversed: bool,
+}
+
+impl AggregateExprBuilder {
+    pub fn new(fun: Arc<AggregateUDF>, args: Vec<Arc<dyn PhysicalExpr>>) -> 
Self {
+        Self {
+            fun,
+            args,
+            logical_args: vec![],
+            name: String::new(),
+            schema: Arc::new(Schema::empty()),
+            dfschema: DFSchema::empty(),
+            sort_exprs: vec![],
+            ordering_req: vec![],
+            ignore_nulls: false,
+            is_distinct: false,
+            is_reversed: false,
+        }
+    }
+
+    pub fn build(self) -> Result<Arc<dyn AggregateExpr>> {
+        let Self {
+            fun,
+            args,
+            logical_args,
+            name,
+            schema,
+            dfschema,
+            sort_exprs,
+            ordering_req,
+            ignore_nulls,
+            is_distinct,
+            is_reversed,
+        } = self;
+        if args.is_empty() {
+            return internal_err!("args should not be empty");
+        }
+
+        let mut ordering_fields = vec![];
+
+        debug_assert_eq!(sort_exprs.len(), ordering_req.len());
+        if !ordering_req.is_empty() {
+            let ordering_types = ordering_req
+                .iter()
+                .map(|e| e.expr.data_type(&schema))
+                .collect::<Result<Vec<_>>>()?;
+
+            ordering_fields = utils::ordering_fields(&ordering_req, 
&ordering_types);
+        }
+
+        let input_exprs_types = args
+            .iter()
+            .map(|arg| arg.data_type(&schema))
+            .collect::<Result<Vec<_>>>()?;
+
+        check_arg_count(
+            fun.name(),
+            &input_exprs_types,
+            &fun.signature().type_signature,
+        )?;
+
+        let data_type = fun.return_type(&input_exprs_types)?;
 
-    let input_exprs_types = input_phy_exprs
-        .iter()
-        .map(|arg| arg.data_type(&schema))
-        .collect::<Result<Vec<_>>>()?;
-
-    check_arg_count(
-        fun.name(),
-        &input_exprs_types,
-        &fun.signature().type_signature,
-    )?;
-
-    let ordering_types = ordering_req
-        .iter()
-        .map(|e| e.expr.data_type(&schema))
-        .collect::<Result<Vec<_>>>()?;
-
-    let ordering_fields = ordering_fields(ordering_req, &ordering_types);
-
-    Ok(Arc::new(AggregateFunctionExpr {
-        fun: fun.clone(),
-        args: input_phy_exprs.to_vec(),
-        logical_args: input_exprs.to_vec(),
-        data_type: fun.return_type(&input_exprs_types)?,
-        name: name.into(),
-        schema: schema.clone(),
-        dfschema: dfschema.clone(),
-        sort_exprs: sort_exprs.to_vec(),
-        ordering_req: ordering_req.to_vec(),
-        ignore_nulls,
-        ordering_fields,
-        is_distinct,
-        input_type: input_exprs_types[0].clone(),
-        is_reversed,
-    }))
+        Ok(Arc::new(AggregateFunctionExpr {
+            fun: Arc::unwrap_or_clone(fun),
+            args,
+            logical_args,
+            data_type,
+            name,
+            schema: Arc::unwrap_or_clone(schema),
+            dfschema,
+            sort_exprs,
+            ordering_req,
+            ignore_nulls,
+            ordering_fields,
+            is_distinct,
+            input_type: input_exprs_types[0].clone(),
+            is_reversed,
+        }))
+    }
+
+    pub fn name(mut self, name: impl Into<String>) -> Self {
+        self.name = name.into();
+        self
+    }
+
+    pub fn schema(mut self, schema: SchemaRef) -> Self {

Review Comment:
   Not sure whether we need `schema` or `dfschema` after refactor, so keep them 
out from `new` to prepare for possible deprecation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to