This is an automated email from the ASF dual-hosted git repository. findepi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 9a1c09c74a Improve field naming in first_value, last_value implementation (#16631) 9a1c09c74a is described below commit 9a1c09c74aa9869ecec482e1ae9ff28c65447073 Author: Piotr Findeisen <piotr.findei...@gmail.com> AuthorDate: Tue Jul 1 09:40:10 2025 +0200 Improve field naming in first_value, last_value implementation (#16631) Rename the "requirement_satisfied" fields to "is_input_pre_ordered". It is now slightly more obvious what exactly requirement is satisfied. --- datafusion/functions-aggregate/src/first_last.rs | 78 +++++++++--------------- 1 file changed, 29 insertions(+), 49 deletions(-) diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 42c0a57fbf..790eaada6a 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -98,7 +98,7 @@ pub fn last_value(expression: Expr, order_by: Option<Vec<SortExpr>>) -> Expr { )] pub struct FirstValue { signature: Signature, - requirement_satisfied: bool, + is_input_pre_ordered: bool, } impl Debug for FirstValue { @@ -121,14 +121,9 @@ impl FirstValue { pub fn new() -> Self { Self { signature: Signature::any(1, Volatility::Immutable), - requirement_satisfied: false, + is_input_pre_ordered: false, } } - - fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> Self { - self.requirement_satisfied = requirement_satisfied; - self - } } impl AggregateUDFImpl for FirstValue { @@ -160,15 +155,13 @@ impl AggregateUDFImpl for FirstValue { .iter() .map(|e| e.expr.data_type(acc_args.schema)) .collect::<Result<Vec<_>>>()?; - FirstValueAccumulator::try_new( + Ok(Box::new(FirstValueAccumulator::try_new( acc_args.return_field.data_type(), &ordering_dtypes, ordering, + self.is_input_pre_ordered, acc_args.ignore_nulls, - ) - .map(|acc| { - Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _ - }) + )?)) } fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> { @@ -290,9 +283,10 @@ impl AggregateUDFImpl for FirstValue { self: Arc<Self>, beneficial_ordering: bool, ) -> Result<Option<Arc<dyn AggregateUDFImpl>>> { - Ok(Some(Arc::new( - FirstValue::new().with_requirement_satisfied(beneficial_ordering), - ))) + Ok(Some(Arc::new(Self { + signature: self.signature.clone(), + is_input_pre_ordered: beneficial_ordering, + }))) } fn order_sensitivity(&self) -> AggregateOrderSensitivity { @@ -850,7 +844,7 @@ pub struct FirstValueAccumulator { // Stores the applicable ordering requirement. ordering_req: LexOrdering, // Stores whether incoming data already satisfies the ordering requirement. - requirement_satisfied: bool, + is_input_pre_ordered: bool, // Ignore null values. ignore_nulls: bool, } @@ -861,6 +855,7 @@ impl FirstValueAccumulator { data_type: &DataType, ordering_dtypes: &[DataType], ordering_req: LexOrdering, + is_input_pre_ordered: bool, ignore_nulls: bool, ) -> Result<Self> { let orderings = ordering_dtypes @@ -872,16 +867,11 @@ impl FirstValueAccumulator { is_set: false, orderings, ordering_req, - requirement_satisfied: false, + is_input_pre_ordered, ignore_nulls, }) } - pub fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> Self { - self.requirement_satisfied = requirement_satisfied; - self - } - // Updates state with the values in the given row. fn update_with_new_row(&mut self, mut row: Vec<ScalarValue>) { // Ensure any Array based scalars hold have a single value to reduce memory pressure @@ -897,7 +887,7 @@ impl FirstValueAccumulator { let [value, ordering_values @ ..] = values else { return internal_err!("Empty row in FIRST_VALUE"); }; - if self.requirement_satisfied { + if self.is_input_pre_ordered { // Get first entry according to the pre-existing ordering (0th index): if self.ignore_nulls { // If ignoring nulls, find the first non-null value. @@ -948,7 +938,7 @@ impl Accumulator for FirstValueAccumulator { if let Some(first_idx) = self.get_first_idx(values)? { let row = get_row_at_idx(values, first_idx)?; if !self.is_set - || (!self.requirement_satisfied + || (!self.is_input_pre_ordered && compare_rows( &self.orderings, &row[1..], @@ -1024,7 +1014,7 @@ impl Accumulator for FirstValueAccumulator { )] pub struct LastValue { signature: Signature, - requirement_satisfied: bool, + is_input_pre_ordered: bool, } impl Debug for LastValue { @@ -1047,14 +1037,9 @@ impl LastValue { pub fn new() -> Self { Self { signature: Signature::any(1, Volatility::Immutable), - requirement_satisfied: false, + is_input_pre_ordered: false, } } - - fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> Self { - self.requirement_satisfied = requirement_satisfied; - self - } } impl AggregateUDFImpl for LastValue { @@ -1086,15 +1071,13 @@ impl AggregateUDFImpl for LastValue { .iter() .map(|e| e.expr.data_type(acc_args.schema)) .collect::<Result<Vec<_>>>()?; - LastValueAccumulator::try_new( + Ok(Box::new(LastValueAccumulator::try_new( acc_args.return_field.data_type(), &ordering_dtypes, ordering, + self.is_input_pre_ordered, acc_args.ignore_nulls, - ) - .map(|acc| { - Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _ - }) + )?)) } fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> { @@ -1113,9 +1096,10 @@ impl AggregateUDFImpl for LastValue { self: Arc<Self>, beneficial_ordering: bool, ) -> Result<Option<Arc<dyn AggregateUDFImpl>>> { - Ok(Some(Arc::new( - LastValue::new().with_requirement_satisfied(beneficial_ordering), - ))) + Ok(Some(Arc::new(Self { + signature: self.signature.clone(), + is_input_pre_ordered: beneficial_ordering, + }))) } fn order_sensitivity(&self) -> AggregateOrderSensitivity { @@ -1330,7 +1314,7 @@ struct LastValueAccumulator { // Stores the applicable ordering requirement. ordering_req: LexOrdering, // Stores whether incoming data already satisfies the ordering requirement. - requirement_satisfied: bool, + is_input_pre_ordered: bool, // Ignore null values. ignore_nulls: bool, } @@ -1341,6 +1325,7 @@ impl LastValueAccumulator { data_type: &DataType, ordering_dtypes: &[DataType], ordering_req: LexOrdering, + is_input_pre_ordered: bool, ignore_nulls: bool, ) -> Result<Self> { let orderings = ordering_dtypes @@ -1352,7 +1337,7 @@ impl LastValueAccumulator { is_set: false, orderings, ordering_req, - requirement_satisfied: false, + is_input_pre_ordered, ignore_nulls, }) } @@ -1372,7 +1357,7 @@ impl LastValueAccumulator { let [value, ordering_values @ ..] = values else { return internal_err!("Empty row in LAST_VALUE"); }; - if self.requirement_satisfied { + if self.is_input_pre_ordered { // Get last entry according to the order of data: if self.ignore_nulls { // If ignoring nulls, find the last non-null value. @@ -1407,11 +1392,6 @@ impl LastValueAccumulator { Ok(max_ind) } - - fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> Self { - self.requirement_satisfied = requirement_satisfied; - self - } } impl Accumulator for LastValueAccumulator { @@ -1428,7 +1408,7 @@ impl Accumulator for LastValueAccumulator { let orderings = &row[1..]; // Update when there is a more recent entry if !self.is_set - || self.requirement_satisfied + || self.is_input_pre_ordered || compare_rows( &self.orderings, orderings, @@ -1464,7 +1444,7 @@ impl Accumulator for LastValueAccumulator { // Either there is no existing value, or there is a newer (latest) // version in the new data: if !self.is_set - || self.requirement_satisfied + || self.is_input_pre_ordered || compare_rows(&self.orderings, last_ordering, &sort_options)?.is_lt() { // Update with last value in the state. Note that we should exclude the --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org