alamb commented on code in PR #8119:
URL: https://github.com/apache/arrow-datafusion/pull/8119#discussion_r1389978492
##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -674,23 +677,37 @@ pub async fn from_substrait_agg_func(
args.push(arg_expr?.as_ref().clone());
}
- let fun = match extensions.get(&f.function_reference) {
- Some(function_name) => {
- aggregate_function::AggregateFunction::from_str(function_name)
- }
- None => not_impl_err!(
- "Aggregated function not found: function anchor = {:?}",
+ let Some(function_name) = extensions.get(&f.function_reference) else {
+ return plan_err!(
+ "Aggregated function not registered: function anchor = {:?}",
f.function_reference
- ),
+ );
};
- Ok(Arc::new(Expr::AggregateFunction(expr::AggregateFunction {
- fun: fun.unwrap(),
- args,
- distinct,
- filter,
- order_by,
- })))
+ // try udaf first, then built-in aggr fn.
Review Comment:
This is basically the same function resolution logic that we have for sql. I
wonder if we should consolidate it somewhere :thinking:
##########
datafusion/substrait/src/logical_plan/producer.rs:
##########
@@ -610,6 +610,35 @@ pub fn to_substrait_agg_measure(
}
})
}
+ Expr::AggregateUDF(expr::AggregateUDF{ fun, args, filter, order_by })
=>{
+ let sorts = if let Some(order_by) = order_by {
+ order_by.iter().map(|expr| to_substrait_sort_field(expr,
schema, extension_info)).collect::<Result<Vec<_>>>()?
+ } else {
+ vec![]
+ };
+ let mut arguments: Vec<FunctionArgument> = vec![];
+ for arg in args {
+ arguments.push(FunctionArgument { arg_type:
Some(ArgType::Value(to_substrait_rex(arg, schema, 0, extension_info)?)) });
+ }
+ let function_name = fun.name.to_lowercase();
Review Comment:
Why lowercase?
##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -674,23 +677,37 @@ pub async fn from_substrait_agg_func(
args.push(arg_expr?.as_ref().clone());
}
- let fun = match extensions.get(&f.function_reference) {
- Some(function_name) => {
- aggregate_function::AggregateFunction::from_str(function_name)
- }
- None => not_impl_err!(
- "Aggregated function not found: function anchor = {:?}",
+ let Some(function_name) = extensions.get(&f.function_reference) else {
+ return plan_err!(
+ "Aggregated function not registered: function anchor = {:?}",
Review Comment:
```suggestion
"Aggregate function not registered: function anchor = {:?}",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]