alamb commented on code in PR #9874:
URL: https://github.com/apache/arrow-datafusion/pull/9874#discussion_r1546833856
##########
datafusion/expr/src/udaf.rs:
##########
@@ -269,15 +293,42 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
/// Return a new [`Accumulator`] that aggregates values for a specific
/// group during query execution.
- fn accumulator(&self, arg: &DataType) -> Result<Box<dyn Accumulator>>;
+ ///
+ /// `arg`: the type of the argument to this accumulator
+ ///
+ /// `sort_exprs`: contains a list of `Expr::SortExpr`s if the
Review Comment:
I think these comments are now out of date
##########
datafusion/expr/src/function.rs:
##########
@@ -37,10 +38,36 @@ pub type ScalarFunctionImplementation =
pub type ReturnTypeFunction =
Arc<dyn Fn(&[DataType]) -> Result<Arc<DataType>> + Send + Sync>;
-/// Factory that returns an accumulator for the given aggregate, given
-/// its return datatype.
+/// Arguments passed to create an accumulator
+pub struct AccumulatorArgs<'a> {
+ // default arguments
+ pub data_type: &'a DataType, // the return type of the function
+ pub schema: &'a Schema, // the schema of the input arguments
+ pub ignore_nulls: bool, // whether to ignore nulls
Review Comment:
Since these field are `pub` I think they show up in the rustdocs. Can you
please make them with three `///` and above the field in question to show up in
the docs?
##########
datafusion/expr/src/expr_fn.rs:
##########
@@ -710,6 +712,16 @@ pub fn create_udaf(
))
}
+/// Creates a new UDAF with a specific signature, state type and return type.
+/// The signature and state type must match the `Accumulator's implementation`.
Review Comment:
It might make sense to add a comment explaining this is a temporary solution
(i.e. that the idea is we'll pull the function out into its own crate, but for
now we need to keep the physical implementation separate
##########
datafusion/expr/src/expr_fn.rs:
##########
@@ -796,15 +809,101 @@ impl AggregateUDFImpl for SimpleAggregateUDF {
Ok(self.return_type.clone())
}
- fn accumulator(&self, arg: &DataType) -> Result<Box<dyn
crate::Accumulator>> {
- (self.accumulator)(arg)
+ fn accumulator(
+ &self,
+ arg: &DataType,
+ sort_exprs: &[Expr],
+ schema: &Schema,
+ _ignore_nulls: bool,
+ _requirement_satisfied: bool,
+ ) -> Result<Box<dyn crate::Accumulator>> {
+ (self.accumulator)(arg, sort_exprs, schema)
}
fn state_type(&self, _return_type: &DataType) -> Result<Vec<DataType>> {
Ok(self.state_type.clone())
}
}
+pub struct FirstValue {
Review Comment:
I see -- it seems like the issue is that the accumulator implementation
requires PhysicalSortExpr.
To pull the code into its own crate maybe we could pull out the relevant
pieces of `datafusion-physical-expr` into `datafusion-physical-core` or
something (as a follow on PR)
##########
datafusion/expr/src/function.rs:
##########
@@ -37,10 +38,36 @@ pub type ScalarFunctionImplementation =
pub type ReturnTypeFunction =
Arc<dyn Fn(&[DataType]) -> Result<Arc<DataType>> + Send + Sync>;
-/// Factory that returns an accumulator for the given aggregate, given
-/// its return datatype.
+/// Arguments passed to create an accumulator
+pub struct AccumulatorArgs<'a> {
+ // default arguments
+ pub data_type: &'a DataType, // the return type of the function
+ pub schema: &'a Schema, // the schema of the input arguments
+ pub ignore_nulls: bool, // whether to ignore nulls
+
+ // ordering arguments
Review Comment:
Can you also please document how we would tell if there was no `ORDER BY`
specified? (is `sort_exprs` empty?)
##########
datafusion/expr/src/function.rs:
##########
@@ -37,10 +38,36 @@ pub type ScalarFunctionImplementation =
pub type ReturnTypeFunction =
Arc<dyn Fn(&[DataType]) -> Result<Arc<DataType>> + Send + Sync>;
-/// Factory that returns an accumulator for the given aggregate, given
-/// its return datatype.
+/// Arguments passed to create an accumulator
+pub struct AccumulatorArgs<'a> {
+ // default arguments
+ pub data_type: &'a DataType, // the return type of the function
+ pub schema: &'a Schema, // the schema of the input arguments
+ pub ignore_nulls: bool, // whether to ignore nulls
+
+ // ordering arguments
+ pub sort_exprs: &'a [Expr], // the expressions of `order by`
Review Comment:
If you made this `Option<&'a [Expr]>` that would probably be easier for UDF
implementors to check if ORDER BY was specified
##########
datafusion/expr/src/expr_fn.rs:
##########
@@ -787,15 +799,94 @@ impl AggregateUDFImpl for SimpleAggregateUDF {
Ok(self.return_type.clone())
}
- fn accumulator(&self, arg: &DataType) -> Result<Box<dyn
crate::Accumulator>> {
- (self.accumulator)(arg)
+ fn accumulator(
+ &self,
+ acc_args: AccumulatorArgs,
+ ) -> Result<Box<dyn crate::Accumulator>> {
+ (self.accumulator)(acc_args)
}
fn state_type(&self, _return_type: &DataType) -> Result<Vec<DataType>> {
Ok(self.state_type.clone())
}
}
+pub struct FirstValue {
+ name: String,
+ signature: Signature,
+ accumulator: AccumulatorFactoryFunction,
+}
+
+impl Debug for FirstValue {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ f.debug_struct("FirstValue")
+ .field("name", &self.name)
+ .field("signature", &self.signature)
+ .field("fun", &"<FUNC>")
Review Comment:
```suggestion
.field("accumulator", &"<FUNC>")
```
##########
datafusion/expr/src/udaf.rs:
##########
@@ -269,15 +293,42 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
/// Return a new [`Accumulator`] that aggregates values for a specific
/// group during query execution.
- fn accumulator(&self, arg: &DataType) -> Result<Box<dyn Accumulator>>;
+ ///
+ /// `arg`: the type of the argument to this accumulator
+ ///
+ /// `sort_exprs`: contains a list of `Expr::SortExpr`s if the
+ /// aggregate is called with an explicit `ORDER BY`. For example,
+ /// `ARRAY_AGG(x ORDER BY y ASC)`. In this case, `sort_exprs` would
contain `[y ASC]`
+ ///
+ /// `schema` is the input schema to the udaf
+ fn accumulator(
+ &self,
+ arg: &DataType,
+ sort_exprs: &[Expr],
+ schema: &Schema,
+ ignore_nulls: bool,
+ requirement_satisfied: bool,
+ ) -> Result<Box<dyn Accumulator>>;
/// Return the type used to serialize the [`Accumulator`]'s intermediate
state.
/// See [`Accumulator::state()`] for more details
fn state_type(&self, return_type: &DataType) -> Result<Vec<DataType>>;
+ /// Return the fields of the intermediate state. It is mutually exclusive
with [`Self::state_type`].
Review Comment:
If we are going to change the AggregateUDFImpl anyways, maybe we should just
remove `state_type` and always require `state_fields`?
##########
datafusion/expr/src/udaf.rs:
##########
@@ -269,15 +281,35 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
/// Return a new [`Accumulator`] that aggregates values for a specific
/// group during query execution.
- fn accumulator(&self, arg: &DataType) -> Result<Box<dyn Accumulator>>;
+ ///
+ /// `data_type`: the type of the argument to this accumulator
+ ///
+ /// `sort_exprs`: contains a list of `Expr::SortExpr`s if the
+ /// aggregate is called with an explicit `ORDER BY`. For example,
+ /// `ARRAY_AGG(x ORDER BY y ASC)`. In this case, `sort_exprs` would
contain `[y ASC]`
+ ///
+ /// `schema` is the input schema to the udaf
+ fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn
Accumulator>>;
/// Return the type used to serialize the [`Accumulator`]'s intermediate
state.
/// See [`Accumulator::state()`] for more details
fn state_type(&self, return_type: &DataType) -> Result<Vec<DataType>>;
+ /// Return the fields of the intermediate state. It is mutually exclusive
with [`Self::state_type`].
+ /// If you define `state_type`, you don't need to define `state_fields`
and vice versa.
+ /// If you want empty fields, you should define empty `state_type`
+ fn state_fields(
+ &self,
+ _name: &str,
+ _value_type: DataType,
+ _ordering_fields: Vec<Field>,
Review Comment:
What are `ordering_fields` used for? I think they should be documented
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]