This is an automated email from the ASF dual-hosted git repository.

findepi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a1c09c74a Improve field naming in first_value, last_value 
implementation (#16631)
9a1c09c74a is described below

commit 9a1c09c74aa9869ecec482e1ae9ff28c65447073
Author: Piotr Findeisen <piotr.findei...@gmail.com>
AuthorDate: Tue Jul 1 09:40:10 2025 +0200

    Improve field naming in first_value, last_value implementation (#16631)
    
    Rename the "requirement_satisfied" fields to "is_input_pre_ordered".
    It is now slightly more obvious what exactly requirement is satisfied.
---
 datafusion/functions-aggregate/src/first_last.rs | 78 +++++++++---------------
 1 file changed, 29 insertions(+), 49 deletions(-)

diff --git a/datafusion/functions-aggregate/src/first_last.rs 
b/datafusion/functions-aggregate/src/first_last.rs
index 42c0a57fbf..790eaada6a 100644
--- a/datafusion/functions-aggregate/src/first_last.rs
+++ b/datafusion/functions-aggregate/src/first_last.rs
@@ -98,7 +98,7 @@ pub fn last_value(expression: Expr, order_by: 
Option<Vec<SortExpr>>) -> Expr {
 )]
 pub struct FirstValue {
     signature: Signature,
-    requirement_satisfied: bool,
+    is_input_pre_ordered: bool,
 }
 
 impl Debug for FirstValue {
@@ -121,14 +121,9 @@ impl FirstValue {
     pub fn new() -> Self {
         Self {
             signature: Signature::any(1, Volatility::Immutable),
-            requirement_satisfied: false,
+            is_input_pre_ordered: false,
         }
     }
-
-    fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> 
Self {
-        self.requirement_satisfied = requirement_satisfied;
-        self
-    }
 }
 
 impl AggregateUDFImpl for FirstValue {
@@ -160,15 +155,13 @@ impl AggregateUDFImpl for FirstValue {
             .iter()
             .map(|e| e.expr.data_type(acc_args.schema))
             .collect::<Result<Vec<_>>>()?;
-        FirstValueAccumulator::try_new(
+        Ok(Box::new(FirstValueAccumulator::try_new(
             acc_args.return_field.data_type(),
             &ordering_dtypes,
             ordering,
+            self.is_input_pre_ordered,
             acc_args.ignore_nulls,
-        )
-        .map(|acc| {
-            
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
-        })
+        )?))
     }
 
     fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
@@ -290,9 +283,10 @@ impl AggregateUDFImpl for FirstValue {
         self: Arc<Self>,
         beneficial_ordering: bool,
     ) -> Result<Option<Arc<dyn AggregateUDFImpl>>> {
-        Ok(Some(Arc::new(
-            FirstValue::new().with_requirement_satisfied(beneficial_ordering),
-        )))
+        Ok(Some(Arc::new(Self {
+            signature: self.signature.clone(),
+            is_input_pre_ordered: beneficial_ordering,
+        })))
     }
 
     fn order_sensitivity(&self) -> AggregateOrderSensitivity {
@@ -850,7 +844,7 @@ pub struct FirstValueAccumulator {
     // Stores the applicable ordering requirement.
     ordering_req: LexOrdering,
     // Stores whether incoming data already satisfies the ordering requirement.
-    requirement_satisfied: bool,
+    is_input_pre_ordered: bool,
     // Ignore null values.
     ignore_nulls: bool,
 }
@@ -861,6 +855,7 @@ impl FirstValueAccumulator {
         data_type: &DataType,
         ordering_dtypes: &[DataType],
         ordering_req: LexOrdering,
+        is_input_pre_ordered: bool,
         ignore_nulls: bool,
     ) -> Result<Self> {
         let orderings = ordering_dtypes
@@ -872,16 +867,11 @@ impl FirstValueAccumulator {
             is_set: false,
             orderings,
             ordering_req,
-            requirement_satisfied: false,
+            is_input_pre_ordered,
             ignore_nulls,
         })
     }
 
-    pub fn with_requirement_satisfied(mut self, requirement_satisfied: bool) 
-> Self {
-        self.requirement_satisfied = requirement_satisfied;
-        self
-    }
-
     // Updates state with the values in the given row.
     fn update_with_new_row(&mut self, mut row: Vec<ScalarValue>) {
         // Ensure any Array based scalars hold have a single value to reduce 
memory pressure
@@ -897,7 +887,7 @@ impl FirstValueAccumulator {
         let [value, ordering_values @ ..] = values else {
             return internal_err!("Empty row in FIRST_VALUE");
         };
-        if self.requirement_satisfied {
+        if self.is_input_pre_ordered {
             // Get first entry according to the pre-existing ordering (0th 
index):
             if self.ignore_nulls {
                 // If ignoring nulls, find the first non-null value.
@@ -948,7 +938,7 @@ impl Accumulator for FirstValueAccumulator {
         if let Some(first_idx) = self.get_first_idx(values)? {
             let row = get_row_at_idx(values, first_idx)?;
             if !self.is_set
-                || (!self.requirement_satisfied
+                || (!self.is_input_pre_ordered
                     && compare_rows(
                         &self.orderings,
                         &row[1..],
@@ -1024,7 +1014,7 @@ impl Accumulator for FirstValueAccumulator {
 )]
 pub struct LastValue {
     signature: Signature,
-    requirement_satisfied: bool,
+    is_input_pre_ordered: bool,
 }
 
 impl Debug for LastValue {
@@ -1047,14 +1037,9 @@ impl LastValue {
     pub fn new() -> Self {
         Self {
             signature: Signature::any(1, Volatility::Immutable),
-            requirement_satisfied: false,
+            is_input_pre_ordered: false,
         }
     }
-
-    fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> 
Self {
-        self.requirement_satisfied = requirement_satisfied;
-        self
-    }
 }
 
 impl AggregateUDFImpl for LastValue {
@@ -1086,15 +1071,13 @@ impl AggregateUDFImpl for LastValue {
             .iter()
             .map(|e| e.expr.data_type(acc_args.schema))
             .collect::<Result<Vec<_>>>()?;
-        LastValueAccumulator::try_new(
+        Ok(Box::new(LastValueAccumulator::try_new(
             acc_args.return_field.data_type(),
             &ordering_dtypes,
             ordering,
+            self.is_input_pre_ordered,
             acc_args.ignore_nulls,
-        )
-        .map(|acc| {
-            
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
-        })
+        )?))
     }
 
     fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
@@ -1113,9 +1096,10 @@ impl AggregateUDFImpl for LastValue {
         self: Arc<Self>,
         beneficial_ordering: bool,
     ) -> Result<Option<Arc<dyn AggregateUDFImpl>>> {
-        Ok(Some(Arc::new(
-            LastValue::new().with_requirement_satisfied(beneficial_ordering),
-        )))
+        Ok(Some(Arc::new(Self {
+            signature: self.signature.clone(),
+            is_input_pre_ordered: beneficial_ordering,
+        })))
     }
 
     fn order_sensitivity(&self) -> AggregateOrderSensitivity {
@@ -1330,7 +1314,7 @@ struct LastValueAccumulator {
     // Stores the applicable ordering requirement.
     ordering_req: LexOrdering,
     // Stores whether incoming data already satisfies the ordering requirement.
-    requirement_satisfied: bool,
+    is_input_pre_ordered: bool,
     // Ignore null values.
     ignore_nulls: bool,
 }
@@ -1341,6 +1325,7 @@ impl LastValueAccumulator {
         data_type: &DataType,
         ordering_dtypes: &[DataType],
         ordering_req: LexOrdering,
+        is_input_pre_ordered: bool,
         ignore_nulls: bool,
     ) -> Result<Self> {
         let orderings = ordering_dtypes
@@ -1352,7 +1337,7 @@ impl LastValueAccumulator {
             is_set: false,
             orderings,
             ordering_req,
-            requirement_satisfied: false,
+            is_input_pre_ordered,
             ignore_nulls,
         })
     }
@@ -1372,7 +1357,7 @@ impl LastValueAccumulator {
         let [value, ordering_values @ ..] = values else {
             return internal_err!("Empty row in LAST_VALUE");
         };
-        if self.requirement_satisfied {
+        if self.is_input_pre_ordered {
             // Get last entry according to the order of data:
             if self.ignore_nulls {
                 // If ignoring nulls, find the last non-null value.
@@ -1407,11 +1392,6 @@ impl LastValueAccumulator {
 
         Ok(max_ind)
     }
-
-    fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> 
Self {
-        self.requirement_satisfied = requirement_satisfied;
-        self
-    }
 }
 
 impl Accumulator for LastValueAccumulator {
@@ -1428,7 +1408,7 @@ impl Accumulator for LastValueAccumulator {
             let orderings = &row[1..];
             // Update when there is a more recent entry
             if !self.is_set
-                || self.requirement_satisfied
+                || self.is_input_pre_ordered
                 || compare_rows(
                     &self.orderings,
                     orderings,
@@ -1464,7 +1444,7 @@ impl Accumulator for LastValueAccumulator {
             // Either there is no existing value, or there is a newer (latest)
             // version in the new data:
             if !self.is_set
-                || self.requirement_satisfied
+                || self.is_input_pre_ordered
                 || compare_rows(&self.orderings, last_ordering, 
&sort_options)?.is_lt()
             {
                 // Update with last value in the state. Note that we should 
exclude the


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to