alamb commented on code in PR #8662:
URL: https://github.com/apache/arrow-datafusion/pull/8662#discussion_r1437836263


##########
datafusion/physical-expr/src/aggregate/first_last.rs:
##########
@@ -211,10 +212,45 @@ impl FirstValueAccumulator {
     }
 
     // Updates state with the values in the given row.
-    fn update_with_new_row(&mut self, row: &[ScalarValue]) {
-        self.first = row[0].clone();
-        self.orderings = row[1..].to_vec();
-        self.is_set = true;
+    fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
+        let [value, orderings @ ..] = row else {
+            return internal_err!("Empty row in FIRST_VALUE");
+        };
+        // Update when there is no entry in the state, or we have an "earlier"
+        // entry according to sort requirements.
+        if !self.is_set

Review Comment:
   In theory, we may be able to use a `Option<ScalarValue>` instead of 
`ScalarValue` and `is_set` flag, but I don't think it matters for performance 
and this PR follows the existing implementation as well 👍 



##########
datafusion/physical-expr/src/aggregate/first_last.rs:
##########
@@ -459,10 +493,52 @@ impl LastValueAccumulator {
     }
 
     // Updates state with the values in the given row.
-    fn update_with_new_row(&mut self, row: &[ScalarValue]) {
-        self.last = row[0].clone();
-        self.orderings = row[1..].to_vec();
-        self.is_set = true;
+    fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
+        let value = &row[0];
+        let orderings = &row[1..];
+        // Update when
+        // - no value in the state
+        // - There is no specific requirement, but a new value (most recent 
entry in terms of execution)
+        // - There is a more recent entry in terms of requirement
+        if !self.is_set
+            || self.orderings.is_empty()
+            || compare_rows(

Review Comment:
   I re-reviewed and I agree that the `RowFormat` is not needed here (and in 
fact it may actually be slower) because, as @mustafasrepo  points out, this 
code uses `ScalarValue` to compare a single row per batch (it finds the 
largest/smallest row per batch using `lexsort_to_indices`). We would have to 
benchmark to be sure. 



##########
datafusion/physical-expr/src/aggregate/mod.rs:
##########
@@ -134,10 +137,7 @@ pub trait AggregateExpr: Send + Sync + Debug + 
PartialEq<dyn Any> {
 
 /// Checks whether the given aggregate expression is order-sensitive.
 /// For instance, a `SUM` aggregation doesn't depend on the order of its 
inputs.
-/// However, a `FirstValue` depends on the input ordering (if the order 
changes,
-/// the first value in the list would change).
+/// However, an `ARRAY_AGG` with `ORDER BY` depends on the input ordering.
 pub fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
-    aggr_expr.as_any().is::<FirstValue>()
-        || aggr_expr.as_any().is::<LastValue>()
-        || aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
+    aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()

Review Comment:
   Eventually this would be a nice thing to move into the `AggregateExpr` trait 
directly so we could override it and avoid special casing built in functions. 
Not for this PR though :)



##########
datafusion/physical-expr/src/aggregate/first_last.rs:
##########
@@ -459,10 +493,52 @@ impl LastValueAccumulator {
     }
 
     // Updates state with the values in the given row.
-    fn update_with_new_row(&mut self, row: &[ScalarValue]) {
-        self.last = row[0].clone();
-        self.orderings = row[1..].to_vec();
-        self.is_set = true;
+    fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
+        let value = &row[0];
+        let orderings = &row[1..];
+        // Update when
+        // - no value in the state
+        // - There is no specific requirement, but a new value (most recent 
entry in terms of execution)
+        // - There is a more recent entry in terms of requirement
+        if !self.is_set
+            || self.orderings.is_empty()
+            || compare_rows(
+                &self.orderings,
+                orderings,
+                &get_sort_options(&self.ordering_req),
+            )?
+            .is_lt()
+        {
+            self.last = value.clone();
+            self.orderings = orderings.to_vec();
+            self.is_set = true;
+        }
+        Ok(())
+    }
+
+    fn get_last_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
+        let value = &values[0];
+        let ordering_values = &values[1..];
+        assert_eq!(ordering_values.len(), self.ordering_req.len());
+        if self.ordering_req.is_empty() {
+            return Ok((!value.is_empty()).then_some(value.len() - 1));
+        }
+        let sort_columns = ordering_values
+            .iter()
+            .zip(self.ordering_req.iter())
+            .map(|(values, req)| {
+                // Take reverse ordering requirement this would enable us to 
use fetch=1 for last value.
+                SortColumn {
+                    values: values.clone(),
+                    options: Some(!req.options),
+                }
+            })
+            .collect::<Vec<_>>();
+        let indices = lexsort_to_indices(&sort_columns, Some(1))?;

Review Comment:
   I think our implementation is (slightly) more efficient, but it is less 
general (only works for timestamp columns). You can see the basic idea here
   
   
https://github.com/influxdata/influxdb/blob/main/query_functions/src/selectors.rs
   
   And the comparision is here: 
https://github.com/influxdata/influxdb/blob/acfef87659c9a8c4c49e4628264369569e04cad1/query_functions/src/selectors/internal.rs#L119-L127
   
   I think we should stay with the `ScalarValue` implementation unless we find 
some query where this calculation is taking most of the time



##########
datafusion/sqllogictest/test_files/distinct_on.slt:
##########
@@ -100,10 +100,9 @@ ProjectionExec: expr=[FIRST_VALUE(aggregate_test_100.c3) 
ORDER BY [aggregate_tes
 ------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], 
aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
 --------CoalesceBatchesExec: target_batch_size=8192
 ----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
-------------AggregateExec: mode=Partial, gby=[c1@0 as c1], 
aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)], 
ordering_mode=Sorted
---------------SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC NULLS LAST]

Review Comment:
   I do love the lack of Sort here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to