alamb commented on code in PR #9960:
URL: https://github.com/apache/arrow-datafusion/pull/9960#discussion_r1554163458


##########
datafusion/core/src/execution/context/mod.rs:
##########
@@ -1460,21 +1456,8 @@ impl SessionState {
         datafusion_functions_array::register_all(&mut new_self)
             .expect("can not register array expressions");
 
-        let first_value = create_first_value(
-            "FIRST_VALUE",
-            Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable),
-            Arc::new(create_first_value_accumulator),
-        );
-
-        match new_self.register_udaf(Arc::new(first_value)) {
-            Ok(Some(existing_udaf)) => {
-                debug!("Overwrite existing UDAF: {}", existing_udaf.name());
-            }
-            Ok(None) => {}
-            Err(err) => {
-                panic!("Failed to register UDAF: {}", err);
-            }
-        }
+        datafusion_aggregate_functions::register_all(&mut new_self)

Review Comment:
   ❤️ 



##########
datafusion/aggregate-functions/src/first_last.rs:
##########
@@ -393,53 +339,190 @@ impl Accumulator for FirstValueAccumulator {
     }
 }
 
-pub fn create_first_value_accumulator(
-    acc_args: AccumulatorArgs,
-) -> Result<Box<dyn Accumulator>> {
-    let mut all_sort_orders = vec![];
+/// TO BE DEPRECATED: Builtin FIRST_VALUE physical aggregate expression

Review Comment:
   I don't understand this comment -- what does it mean to be deprecated? What 
is the alternative?
   
   Is the idea it will be replaced with a UDF version ?



##########
datafusion/aggregate-functions/src/first_last.rs:
##########
@@ -17,209 +17,155 @@
 
 //! Defines the FIRST_VALUE/LAST_VALUE aggregations.
 
-use std::any::Any;
-use std::sync::Arc;
-
-use crate::aggregate::utils::{down_cast_any_ref, get_sort_options, 
ordering_fields};
-use crate::expressions::{self, format_state_name};
-use crate::{
-    reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, 
PhysicalSortExpr,
-};
-
-use arrow::array::{Array, ArrayRef, AsArray, BooleanArray};
-use arrow::compute::{self, lexsort_to_indices, SortColumn};
+use arrow::array::{ArrayRef, AsArray, BooleanArray};
+use arrow::compute::{self, lexsort_to_indices, SortColumn, SortOptions};
 use arrow::datatypes::{DataType, Field};
-use arrow_schema::SortOptions;
 use datafusion_common::utils::{compare_rows, get_arrayref_at_indices, 
get_row_at_idx};
 use datafusion_common::{
     arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue,
 };
 use datafusion_expr::function::AccumulatorArgs;
-use datafusion_expr::{Accumulator, Expr};
+use datafusion_expr::type_coercion::aggregates::NUMERICS;
+use datafusion_expr::utils::format_state_name;
+use datafusion_expr::{
+    Accumulator, AccumulatorFactoryFunction, AggregateUDFImpl, Expr, Signature,
+    Volatility,
+};
+use datafusion_physical_expr_common::aggregate::utils::{
+    down_cast_any_ref, get_sort_options, ordering_fields,
+};
+use datafusion_physical_expr_common::aggregate::AggregateExpr;
+use datafusion_physical_expr_common::expressions;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
PhysicalSortExpr};
+use datafusion_physical_expr_common::utils::reverse_order_bys;
+use std::any::Any;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+make_udaf_function!(
+    FirstValue,
+    first_value,
+    value,
+    "Returns the first value in a group of values.",
+    first_value_udaf,
+    create_first_value_accumulator
+);
 
-/// FIRST_VALUE aggregate expression
-#[derive(Debug, Clone)]
 pub struct FirstValue {
-    name: String,
-    input_data_type: DataType,
-    order_by_data_types: Vec<DataType>,
-    expr: Arc<dyn PhysicalExpr>,
-    ordering_req: LexOrdering,
-    requirement_satisfied: bool,
-    ignore_nulls: bool,
-    state_fields: Vec<Field>,
+    signature: Signature,
+    aliases: Vec<String>,
+    accumulator: AccumulatorFactoryFunction,
+}
+
+impl Debug for FirstValue {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        f.debug_struct("FirstValue")
+            .field("name", &self.name())
+            .field("signature", &self.signature)
+            .field("accumulator", &"<FUNC>")
+            .finish()
+    }
 }
 
 impl FirstValue {
-    /// Creates a new FIRST_VALUE aggregation function.
-    pub fn new(
-        expr: Arc<dyn PhysicalExpr>,
-        name: impl Into<String>,
-        input_data_type: DataType,
-        ordering_req: LexOrdering,
-        order_by_data_types: Vec<DataType>,
-        state_fields: Vec<Field>,
-    ) -> Self {
-        let requirement_satisfied = ordering_req.is_empty();
+    pub fn new(accumulator: AccumulatorFactoryFunction) -> Self {

Review Comment:
   Now that you have moved the code into its own module, is there a reason to 
keep this level if indirection
   
   as why not inline the definition of `create_first_value_accumulator` so 
instead of  
   
   ```rust
       fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn 
Accumulator>> {
           (self.accumulator)(acc_args)
   ```
   
   it would look like
   
   
   ```rust
    fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn 
Accumulator>> {
       let mut all_sort_orders = vec![];
   
       // Construct PhysicalSortExpr objects from Expr objects:
       let mut sort_exprs = vec![];
       for expr in acc_args.sort_exprs {
           if let Expr::Sort(sort) = expr {
            ...
       }
      ...
    }
   ```
   



##########
datafusion/aggregate-functions/src/macros.rs:
##########
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+macro_rules! make_udaf_function {
+    ($UDAF:ty, $EXPR_FN:ident, $($arg:ident)*, $DOC:expr, 
$AGGREGATE_UDF_FN:ident, $ACCUMULATOR:ident) => {
+        paste::paste! {
+            // "fluent expr_fn" style function
+            #[doc = $DOC]
+            pub fn $EXPR_FN($($arg: Expr),*) -> Expr {
+                
Expr::AggregateFunction(datafusion_expr::expr::AggregateFunction::new_udf(
+                    $AGGREGATE_UDF_FN(),
+                    vec![$($arg),*],
+                    // TODO: Support arguments for `expr` API
+                    false,
+                    None,
+                    None,
+                    None,
+                ))
+            }
+
+            /// Singleton instance of [$UDAF], ensures the UDF is only created 
once
+            /// named STATIC_$(UDAF). For example `STATIC_FirstValue`
+            #[allow(non_upper_case_globals)]
+            static [< STATIC_ $UDAF >]: 
std::sync::OnceLock<std::sync::Arc<datafusion_expr::AggregateUDF>> =
+                std::sync::OnceLock::new();
+
+            /// Aggregatefunction that returns a [AggregateUDF] for [$UDAF]

Review Comment:
   ```suggestion
               /// AggregateFunction that returns a [AggregateUDF] for [$UDAF]
   ```



##########
Cargo.toml:
##########
@@ -73,6 +74,7 @@ chrono = { version = "0.4.34", default-features = false }
 ctor = "0.2.0"
 dashmap = "5.4.0"
 datafusion = { path = "datafusion/core", version = "37.0.0", default-features 
= false }
+datafusion-aggregate-functions = { path = "datafusion/aggregate-functions", 
version = "37.0.0" }

Review Comment:
   What do you think about calling the new crate 
`datafusion-functions-aggregates` (to mirror the naming of 
`datafusion-functions` and `datafusion-functions-array`)?



##########
datafusion/proto/tests/cases/roundtrip_logical_plan.rs:
##########
@@ -612,6 +612,7 @@ async fn roundtrip_expr_api() -> Result<()> {
             lit(1),
         ),
         array_replace_all(make_array(vec![lit(1), lit(2), lit(3)]), lit(2), 
lit(4)),
+        // TODO: Add first value after built-in functions are deprecated

Review Comment:
   I don't understand why we would wait until the built in functions are 
deprecated 🤔 



##########
datafusion/aggregate-functions/src/macros.rs:
##########
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+macro_rules! make_udaf_function {
+    ($UDAF:ty, $EXPR_FN:ident, $($arg:ident)*, $DOC:expr, 
$AGGREGATE_UDF_FN:ident, $ACCUMULATOR:ident) => {
+        paste::paste! {
+            // "fluent expr_fn" style function
+            #[doc = $DOC]
+            pub fn $EXPR_FN($($arg: Expr),*) -> Expr {
+                
Expr::AggregateFunction(datafusion_expr::expr::AggregateFunction::new_udf(
+                    $AGGREGATE_UDF_FN(),
+                    vec![$($arg),*],
+                    // TODO: Support arguments for `expr` API
+                    false,
+                    None,
+                    None,
+                    None,
+                ))
+            }
+
+            /// Singleton instance of [$UDAF], ensures the UDF is only created 
once

Review Comment:
   ```suggestion
               /// Singleton instance of [$UDAF], ensures the UDAF is only 
created once
   ```



##########
datafusion/expr/src/utils.rs:
##########
@@ -1240,6 +1240,12 @@ pub fn merge_schema(inputs: Vec<&LogicalPlan>) -> 
DFSchema {
     }
 }
 
+/// Construct state name. State is the intermidiate state of the aggregate 
function.
+/// TODO: Remove duplicated function in physical-expr

Review Comment:
   Do you plan to do this TODO in this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to