alamb commented on code in PR #9874:
URL: https://github.com/apache/arrow-datafusion/pull/9874#discussion_r1545631943


##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1667,20 +1666,28 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
                 )?),
                 None => None,
             };
-            let order_by = match order_by {
-                Some(e) => Some(create_physical_sort_exprs(
-                    e,
-                    logical_input_schema,
-                    execution_props,
-                )?),
-                None => None,
-            };
             let ignore_nulls = null_treatment
                 .unwrap_or(sqlparser::ast::NullTreatment::RespectNulls)
                 == NullTreatment::IgnoreNulls;
             let (agg_expr, filter, order_by) = match func_def {
                 AggregateFunctionDefinition::BuiltIn(fun) => {
-                    let ordering_reqs = order_by.clone().unwrap_or(vec![]);
+                    let physical_sort_exprs = match order_by {
+                        Some(e) => Some(
+                            e.iter()
+                                .map(|expr| {
+                                    create_physical_sort_expr(
+                                        expr,
+                                        logical_input_schema,
+                                        execution_props,
+                                    )
+                                })
+                                .collect::<Result<Vec<_>>>()?,
+                        ),
+                        None => None,
+                    };

Review Comment:
   FWIW you can write this without the match, like this:
   
   ```suggestion
                       let physical_sort_exprs = order_by
                           .as_ref()
                           .map(|e| {
                               e.iter()
                                   .map(|expr| {
                                       create_physical_sort_expr(
                                           expr,
                                           logical_input_schema,
                                           execution_props,
                                       )
                                   })
                                   .collect::<Result<Vec<_>>>()
                           })
                           .transpose()?;
   ```
   
   but given it is more verbose, I don't think it is needed here
   
   Maybe we could reduce the nesting (and repetition) by pulling the code into 
its own function 🤔 



##########
datafusion/expr/src/expr_fn.rs:
##########
@@ -796,15 +809,101 @@ impl AggregateUDFImpl for SimpleAggregateUDF {
         Ok(self.return_type.clone())
     }
 
-    fn accumulator(&self, arg: &DataType) -> Result<Box<dyn 
crate::Accumulator>> {
-        (self.accumulator)(arg)
+    fn accumulator(
+        &self,
+        arg: &DataType,
+        sort_exprs: &[Expr],
+        schema: &Schema,
+        _ignore_nulls: bool,
+        _requirement_satisfied: bool,
+    ) -> Result<Box<dyn crate::Accumulator>> {
+        (self.accumulator)(arg, sort_exprs, schema)
     }
 
     fn state_type(&self, _return_type: &DataType) -> Result<Vec<DataType>> {
         Ok(self.state_type.clone())
     }
 }
 
+pub struct FirstValue {

Review Comment:
   Very nice!
   
   I think we should put this code in a new crate (maybe 
`datafusion-functions-aggregates`) to ensure the UDF API is the only way to run 
such functions. We could do that as a follow on PR though



##########
datafusion/expr/src/expr_fn.rs:
##########
@@ -796,15 +809,101 @@ impl AggregateUDFImpl for SimpleAggregateUDF {
         Ok(self.return_type.clone())
     }
 
-    fn accumulator(&self, arg: &DataType) -> Result<Box<dyn 
crate::Accumulator>> {
-        (self.accumulator)(arg)
+    fn accumulator(
+        &self,
+        arg: &DataType,
+        sort_exprs: &[Expr],
+        schema: &Schema,
+        _ignore_nulls: bool,
+        _requirement_satisfied: bool,
+    ) -> Result<Box<dyn crate::Accumulator>> {
+        (self.accumulator)(arg, sort_exprs, schema)
     }
 
     fn state_type(&self, _return_type: &DataType) -> Result<Vec<DataType>> {
         Ok(self.state_type.clone())
     }
 }
 
+pub struct FirstValue {
+    name: String,
+    signature: Signature,
+    accumulator: AccumulatorFactoryFunctionForFirstValue,
+}
+
+impl Debug for FirstValue {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        f.debug_struct("AggregateUDF")

Review Comment:
   ```suggestion
           f.debug_struct("FirstValue")
   ```
   
   I think it would be good to have name/signature too



##########
datafusion/expr/src/udaf.rs:
##########
@@ -269,15 +293,42 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
 
     /// Return a new [`Accumulator`] that aggregates values for a specific
     /// group during query execution.
-    fn accumulator(&self, arg: &DataType) -> Result<Box<dyn Accumulator>>;
+    ///
+    /// `arg`: the type of the argument to this accumulator
+    ///
+    /// `sort_exprs`: contains a list of `Expr::SortExpr`s if the

Review Comment:
   See suggestion above about API



##########
datafusion/expr/src/udaf.rs:
##########
@@ -269,15 +293,42 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
 
     /// Return a new [`Accumulator`] that aggregates values for a specific
     /// group during query execution.
-    fn accumulator(&self, arg: &DataType) -> Result<Box<dyn Accumulator>>;
+    ///
+    /// `arg`: the type of the argument to this accumulator
+    ///
+    /// `sort_exprs`: contains a list of `Expr::SortExpr`s if the
+    /// aggregate is called with an explicit `ORDER BY`. For example,
+    /// `ARRAY_AGG(x ORDER BY y ASC)`. In this case, `sort_exprs` would 
contain `[y ASC]`
+    ///
+    /// `schema` is the input schema to the udaf
+    fn accumulator(
+        &self,
+        arg: &DataType,
+        sort_exprs: &[Expr],
+        schema: &Schema,
+        ignore_nulls: bool,
+        requirement_satisfied: bool,
+    ) -> Result<Box<dyn Accumulator>>;
 
     /// Return the type used to serialize the  [`Accumulator`]'s intermediate 
state.
     /// See [`Accumulator::state()`] for more details
     fn state_type(&self, return_type: &DataType) -> Result<Vec<DataType>>;
 
+    /// Return the fields of the intermediate state. It is mutually exclusive 
with [`Self::state_type`].

Review Comment:
   It would be great to figure out a way to avoid two mutually exclusive APIs 
... I don't have any suggestions now though



##########
datafusion/expr/src/expr_fn.rs:
##########
@@ -796,15 +809,101 @@ impl AggregateUDFImpl for SimpleAggregateUDF {
         Ok(self.return_type.clone())
     }
 
-    fn accumulator(&self, arg: &DataType) -> Result<Box<dyn 
crate::Accumulator>> {
-        (self.accumulator)(arg)
+    fn accumulator(
+        &self,
+        arg: &DataType,
+        sort_exprs: &[Expr],
+        schema: &Schema,
+        _ignore_nulls: bool,
+        _requirement_satisfied: bool,
+    ) -> Result<Box<dyn crate::Accumulator>> {
+        (self.accumulator)(arg, sort_exprs, schema)
     }
 
     fn state_type(&self, _return_type: &DataType) -> Result<Vec<DataType>> {
         Ok(self.state_type.clone())
     }
 }
 
+pub struct FirstValue {
+    name: String,
+    signature: Signature,
+    accumulator: AccumulatorFactoryFunctionForFirstValue,

Review Comment:
   I wonder if there is any reason to have this function factory (rather than 
just instantiating the Accumulator directly)?



##########
datafusion/core/src/execution/context/mod.rs:
##########
@@ -1457,6 +1460,13 @@ impl SessionState {
         datafusion_functions_array::register_all(&mut new_self)
             .expect("can not register array expressions");
 
+        let first_value = create_first_value(
+            "FIRST_VALUE",
+            Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable),
+            Arc::new(create_first_value_accumulator),
+        );
+        let _ = new_self.register_udaf(Arc::new(first_value));

Review Comment:
   Why is the return value ignored? If it returns a `Result` maybe it could be 
`new_self.register_udaf(Arc::new(first_value))?;`. If it returns an option, 
leaving it `new_self.register_udaf(Arc::new(first_value));` might make the 
intent clearer than `let _`



##########
datafusion/expr/src/function.rs:
##########
@@ -38,9 +39,20 @@ pub type ReturnTypeFunction =
     Arc<dyn Fn(&[DataType]) -> Result<Arc<DataType>> + Send + Sync>;
 
 /// Factory that returns an accumulator for the given aggregate, given
-/// its return datatype.
-pub type AccumulatorFactoryFunction =
-    Arc<dyn Fn(&DataType) -> Result<Box<dyn Accumulator>> + Send + Sync>;
+/// its return datatype, the sorting expressions and the schema for ordering.
+pub type AccumulatorFactoryFunction = Arc<
+    dyn Fn(&DataType, &[Expr], &Schema) -> Result<Box<dyn Accumulator>> + Send 
+ Sync,
+>;
+
+/// Factory that returns an accumulator for the given aggregate, given
+/// its return datatype, the sorting expressions and the schema for ordering.
+/// FirstValue needs additional `ignore_nulls` and `requirement_satisfied` 
flags.
+// TODO: It would be nice if we can have flexible design for arbitrary 
arguments.

Review Comment:
   It might also be useful to add some way for the accumulator to specify what 
options it permits so DataFusion can do the error checking
   
   Maybe
   
   ```rust
   struct AccumulatorOptions {
     /// Aggregate supports `ORDER BY` to specify the order of arguments
     pub bool sort_exprs,
     /// Aggregate supports `IGNORE NULLS` to specify null treatment
     pub bool ignore_nulls,
   ...
   }
   ```
   
   That way the UDAFs could opt in to supporting this kind of syntax 



##########
datafusion/expr/src/function.rs:
##########
@@ -38,9 +39,20 @@ pub type ReturnTypeFunction =
     Arc<dyn Fn(&[DataType]) -> Result<Arc<DataType>> + Send + Sync>;
 
 /// Factory that returns an accumulator for the given aggregate, given
-/// its return datatype.
-pub type AccumulatorFactoryFunction =
-    Arc<dyn Fn(&DataType) -> Result<Box<dyn Accumulator>> + Send + Sync>;
+/// its return datatype, the sorting expressions and the schema for ordering.
+pub type AccumulatorFactoryFunction = Arc<
+    dyn Fn(&DataType, &[Expr], &Schema) -> Result<Box<dyn Accumulator>> + Send 
+ Sync,
+>;
+
+/// Factory that returns an accumulator for the given aggregate, given
+/// its return datatype, the sorting expressions and the schema for ordering.
+/// FirstValue needs additional `ignore_nulls` and `requirement_satisfied` 
flags.
+// TODO: It would be nice if we can have flexible design for arbitrary 
arguments.

Review Comment:
   It might be hard to remember what these arguments are for
   
   ```rust
   &DataType, &[Expr], &Schema, bool, bool
   ```
   
   What if we made a struct like the following? In this way we wouldn't have to 
change the signatures if/when we added new fields.
   
   ```rust
   /// Arguments passed to create an accumulator
   pub struct AccumulatorArgs<'a> {
     /// Sort expressions, if any, passed as `ORDER BY` expressiosn
     sort_exprs: &'a[Expr], 
     schema: &'a Schema, 
     /// Was `IGNORE NULLS` passed in the arguments to the accumulator
     ignore_nulls: bool,
     /// TBD what is this?
     requirement_satisfied: bool, 
   }
   
   impl <'a> AccumulatorArgs<'a> {
     /// Return a not yet implemented error if there are any sort expressions
     fn error_if_sort(&self, name: &str) -> Result<()> { ... }
   ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to