alamb commented on code in PR #8662:
URL: https://github.com/apache/arrow-datafusion/pull/8662#discussion_r1437836263
##########
datafusion/physical-expr/src/aggregate/first_last.rs:
##########
@@ -211,10 +212,45 @@ impl FirstValueAccumulator {
}
// Updates state with the values in the given row.
- fn update_with_new_row(&mut self, row: &[ScalarValue]) {
- self.first = row[0].clone();
- self.orderings = row[1..].to_vec();
- self.is_set = true;
+ fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
+ let [value, orderings @ ..] = row else {
+ return internal_err!("Empty row in FIRST_VALUE");
+ };
+ // Update when there is no entry in the state, or we have an "earlier"
+ // entry according to sort requirements.
+ if !self.is_set
Review Comment:
In theory, we may be able to use a `Option<ScalarValue>` instead of
`ScalarValue` and `is_set` flag, but I don't think it matters for performance
and this PR follows the existing implementation as well 👍
##########
datafusion/physical-expr/src/aggregate/first_last.rs:
##########
@@ -459,10 +493,52 @@ impl LastValueAccumulator {
}
// Updates state with the values in the given row.
- fn update_with_new_row(&mut self, row: &[ScalarValue]) {
- self.last = row[0].clone();
- self.orderings = row[1..].to_vec();
- self.is_set = true;
+ fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
+ let value = &row[0];
+ let orderings = &row[1..];
+ // Update when
+ // - no value in the state
+ // - There is no specific requirement, but a new value (most recent
entry in terms of execution)
+ // - There is a more recent entry in terms of requirement
+ if !self.is_set
+ || self.orderings.is_empty()
+ || compare_rows(
Review Comment:
I re-reviewed and I agree that the `RowFormat` is not needed here (and in
fact it may actually be slower) because, as @mustafasrepo points out, this
code uses `ScalarValue` to compare a single row per batch (it finds the
largest/smallest row per batch using `lexsort_to_indices`). We would have to
benchmark to be sure.
##########
datafusion/physical-expr/src/aggregate/mod.rs:
##########
@@ -134,10 +137,7 @@ pub trait AggregateExpr: Send + Sync + Debug +
PartialEq<dyn Any> {
/// Checks whether the given aggregate expression is order-sensitive.
/// For instance, a `SUM` aggregation doesn't depend on the order of its
inputs.
-/// However, a `FirstValue` depends on the input ordering (if the order
changes,
-/// the first value in the list would change).
+/// However, an `ARRAY_AGG` with `ORDER BY` depends on the input ordering.
pub fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
- aggr_expr.as_any().is::<FirstValue>()
- || aggr_expr.as_any().is::<LastValue>()
- || aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
+ aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
Review Comment:
Eventually this would be a nice thing to move into the `AggregateExpr` trait
directly so we could override it and avoid special casing built in functions.
Not for this PR though :)
##########
datafusion/physical-expr/src/aggregate/first_last.rs:
##########
@@ -459,10 +493,52 @@ impl LastValueAccumulator {
}
// Updates state with the values in the given row.
- fn update_with_new_row(&mut self, row: &[ScalarValue]) {
- self.last = row[0].clone();
- self.orderings = row[1..].to_vec();
- self.is_set = true;
+ fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
+ let value = &row[0];
+ let orderings = &row[1..];
+ // Update when
+ // - no value in the state
+ // - There is no specific requirement, but a new value (most recent
entry in terms of execution)
+ // - There is a more recent entry in terms of requirement
+ if !self.is_set
+ || self.orderings.is_empty()
+ || compare_rows(
+ &self.orderings,
+ orderings,
+ &get_sort_options(&self.ordering_req),
+ )?
+ .is_lt()
+ {
+ self.last = value.clone();
+ self.orderings = orderings.to_vec();
+ self.is_set = true;
+ }
+ Ok(())
+ }
+
+ fn get_last_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
+ let value = &values[0];
+ let ordering_values = &values[1..];
+ assert_eq!(ordering_values.len(), self.ordering_req.len());
+ if self.ordering_req.is_empty() {
+ return Ok((!value.is_empty()).then_some(value.len() - 1));
+ }
+ let sort_columns = ordering_values
+ .iter()
+ .zip(self.ordering_req.iter())
+ .map(|(values, req)| {
+ // Take reverse ordering requirement this would enable us to
use fetch=1 for last value.
+ SortColumn {
+ values: values.clone(),
+ options: Some(!req.options),
+ }
+ })
+ .collect::<Vec<_>>();
+ let indices = lexsort_to_indices(&sort_columns, Some(1))?;
Review Comment:
I think our implementation is (slightly) more efficient, but it is less
general (only works for timestamp columns). You can see the basic idea here
https://github.com/influxdata/influxdb/blob/main/query_functions/src/selectors.rs
And the comparision is here:
https://github.com/influxdata/influxdb/blob/acfef87659c9a8c4c49e4628264369569e04cad1/query_functions/src/selectors/internal.rs#L119-L127
I think we should stay with the `ScalarValue` implementation unless we find
some query where this calculation is taking most of the time
##########
datafusion/sqllogictest/test_files/distinct_on.slt:
##########
@@ -100,10 +100,9 @@ ProjectionExec: expr=[FIRST_VALUE(aggregate_test_100.c3)
ORDER BY [aggregate_tes
------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1],
aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
-------------AggregateExec: mode=Partial, gby=[c1@0 as c1],
aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)],
ordering_mode=Sorted
---------------SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC NULLS LAST]
Review Comment:
I do love the lack of Sort here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]