jayzhan211 commented on code in PR #10091:
URL: 
https://github.com/apache/arrow-datafusion/pull/10091#discussion_r1566569515


##########
datafusion/functions-aggregate/src/first_last.rs:
##########
@@ -333,359 +382,120 @@ impl Accumulator for FirstValueAccumulator {
     }
 }
 
-/// TO BE DEPRECATED: Builtin FIRST_VALUE physical aggregate expression will 
be replaced by udf in the future
-#[derive(Debug, Clone)]
-pub struct FirstValuePhysicalExpr {
-    name: String,
-    input_data_type: DataType,
-    order_by_data_types: Vec<DataType>,
-    expr: Arc<dyn PhysicalExpr>,
-    ordering_req: LexOrdering,
-    requirement_satisfied: bool,
-    ignore_nulls: bool,
-    state_fields: Vec<Field>,
-}
-
-impl FirstValuePhysicalExpr {
-    /// Creates a new FIRST_VALUE aggregation function.
-    pub fn new(
-        expr: Arc<dyn PhysicalExpr>,
-        name: impl Into<String>,
-        input_data_type: DataType,
-        ordering_req: LexOrdering,
-        order_by_data_types: Vec<DataType>,
-        state_fields: Vec<Field>,
-    ) -> Self {
-        let requirement_satisfied = ordering_req.is_empty();
-        Self {
-            name: name.into(),
-            input_data_type,
-            order_by_data_types,
-            expr,
-            ordering_req,
-            requirement_satisfied,
-            ignore_nulls: false,
-            state_fields,
-        }
-    }
-
-    pub fn with_ignore_nulls(mut self, ignore_nulls: bool) -> Self {
-        self.ignore_nulls = ignore_nulls;
-        self
-    }
-
-    /// Returns the name of the aggregate expression.
-    pub fn name(&self) -> &str {
-        &self.name
-    }
-
-    /// Returns the input data type of the aggregate expression.
-    pub fn input_data_type(&self) -> &DataType {
-        &self.input_data_type
-    }
-
-    /// Returns the data types of the order-by columns.
-    pub fn order_by_data_types(&self) -> &Vec<DataType> {
-        &self.order_by_data_types
-    }
-
-    /// Returns the expression associated with the aggregate function.
-    pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
-        &self.expr
-    }
-
-    /// Returns the lexical ordering requirements of the aggregate expression.
-    pub fn ordering_req(&self) -> &LexOrdering {
-        &self.ordering_req
-    }
-
-    pub fn with_requirement_satisfied(mut self, requirement_satisfied: bool) 
-> Self {
-        self.requirement_satisfied = requirement_satisfied;
-        self
-    }
-
-    pub fn convert_to_last(self) -> LastValuePhysicalExpr {
-        let name = if self.name.starts_with("FIRST") {
-            format!("LAST{}", &self.name[5..])
-        } else {
-            format!("LAST_VALUE({})", self.expr)
-        };
-        let FirstValuePhysicalExpr {
-            expr,
-            input_data_type,
-            ordering_req,
-            order_by_data_types,
-            ..
-        } = self;
-        LastValuePhysicalExpr::new(
-            expr,
-            name,
-            input_data_type,
-            reverse_order_bys(&ordering_req),
-            order_by_data_types,
-        )
-    }
+pub struct LastValue {
+    signature: Signature,
+    aliases: Vec<String>,
 }
 
-impl AggregateExpr for FirstValuePhysicalExpr {
-    /// Return a reference to Any that can be used for downcasting
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn field(&self) -> Result<Field> {
-        Ok(Field::new(&self.name, self.input_data_type.clone(), true))
-    }
-
-    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        FirstValueAccumulator::try_new(
-            &self.input_data_type,
-            &self.order_by_data_types,
-            self.ordering_req.clone(),
-            self.ignore_nulls,
-        )
-        .map(|acc| {
-            
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
-        })
-    }
-
-    fn state_fields(&self) -> Result<Vec<Field>> {
-        if !self.state_fields.is_empty() {
-            return Ok(self.state_fields.clone());
-        }
-
-        let mut fields = vec![Field::new(
-            format_state_name(&self.name, "first_value"),
-            self.input_data_type.clone(),
-            true,
-        )];
-        fields.extend(ordering_fields(
-            &self.ordering_req,
-            &self.order_by_data_types,
-        ));
-        fields.push(Field::new(
-            format_state_name(&self.name, "is_set"),
-            DataType::Boolean,
-            true,
-        ));
-        Ok(fields)
-    }
-
-    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![self.expr.clone()]
-    }
-
-    fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
-        (!self.ordering_req.is_empty()).then_some(&self.ordering_req)
-    }
-
-    fn name(&self) -> &str {
-        &self.name
-    }
-
-    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
-        Some(Arc::new(self.clone().convert_to_last()))
-    }
-
-    fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        FirstValueAccumulator::try_new(
-            &self.input_data_type,
-            &self.order_by_data_types,
-            self.ordering_req.clone(),
-            self.ignore_nulls,
-        )
-        .map(|acc| {
-            
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
-        })
+impl Debug for LastValue {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        f.debug_struct("LastValue")
+            .field("name", &self.name())
+            .field("signature", &self.signature)
+            .field("accumulator", &"<FUNC>")
+            .finish()
     }
 }
 
-impl PartialEq<dyn Any> for FirstValuePhysicalExpr {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.name == x.name
-                    && self.input_data_type == x.input_data_type
-                    && self.order_by_data_types == x.order_by_data_types
-                    && self.expr.eq(&x.expr)
-            })
-            .unwrap_or(false)
+impl Default for LastValue {
+    fn default() -> Self {
+        Self::new()
     }
 }
 
-/// TO BE DEPRECATED: Builtin LAST_VALUE physical aggregate expression will be 
replaced by udf in the future
-#[derive(Debug, Clone)]
-pub struct LastValuePhysicalExpr {
-    name: String,
-    input_data_type: DataType,
-    order_by_data_types: Vec<DataType>,
-    expr: Arc<dyn PhysicalExpr>,
-    ordering_req: LexOrdering,
-    requirement_satisfied: bool,
-    ignore_nulls: bool,
-}
-
-impl LastValuePhysicalExpr {
-    /// Creates a new LAST_VALUE aggregation function.
-    pub fn new(
-        expr: Arc<dyn PhysicalExpr>,
-        name: impl Into<String>,
-        input_data_type: DataType,
-        ordering_req: LexOrdering,
-        order_by_data_types: Vec<DataType>,
-    ) -> Self {
-        let requirement_satisfied = ordering_req.is_empty();
+impl LastValue {
+    pub fn new() -> Self {
         Self {
-            name: name.into(),
-            input_data_type,
-            order_by_data_types,
-            expr,
-            ordering_req,
-            requirement_satisfied,
-            ignore_nulls: false,
+            aliases: vec![String::from("LAST_VALUE"), 
String::from("last_value")],
+            signature: Signature::one_of(
+                vec![
+                    // TODO: we can introduce more strict signature that only 
numeric of array types are allowed
+                    
TypeSignature::ArraySignature(ArrayFunctionSignature::Array),
+                    TypeSignature::Uniform(1, NUMERICS.to_vec()),
+                ],
+                Volatility::Immutable,
+            ),
         }
     }
+}
 
-    pub fn with_ignore_nulls(mut self, ignore_nulls: bool) -> Self {
-        self.ignore_nulls = ignore_nulls;
+impl AggregateUDFImpl for LastValue {
+    fn as_any(&self) -> &dyn Any {
         self
     }
 
-    /// Returns the name of the aggregate expression.
-    pub fn name(&self) -> &str {
-        &self.name
-    }
-
-    /// Returns the input data type of the aggregate expression.
-    pub fn input_data_type(&self) -> &DataType {
-        &self.input_data_type
+    fn name(&self) -> &str {
+        "LAST_VALUE"
     }
 
-    /// Returns the data types of the order-by columns.
-    pub fn order_by_data_types(&self) -> &Vec<DataType> {
-        &self.order_by_data_types
+    fn signature(&self) -> &Signature {
+        &self.signature
     }
 
-    /// Returns the expression associated with the aggregate function.
-    pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
-        &self.expr
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        Ok(arg_types[0].clone())
     }
 
-    /// Returns the lexical ordering requirements of the aggregate expression.
-    pub fn ordering_req(&self) -> &LexOrdering {
-        &self.ordering_req
-    }
+    fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn 
Accumulator>> {
+        let mut all_sort_orders = vec![];
 
-    pub fn with_requirement_satisfied(mut self, requirement_satisfied: bool) 
-> Self {
-        self.requirement_satisfied = requirement_satisfied;
-        self
-    }
+        // Construct PhysicalSortExpr objects from Expr objects:
+        let mut sort_exprs = vec![];
+        for expr in acc_args.sort_exprs {
+            if let Expr::Sort(sort) = expr {
+                if let Expr::Column(col) = sort.expr.as_ref() {

Review Comment:
   LastValue has no test case about Alias, so I keep it simple first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to