This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new bb98dfed08 Change first/last implementation to prevent redundant 
comparisons when data is already sorted (#8678)
bb98dfed08 is described below

commit bb98dfed08d8c2b94ab668a064b206d8b84b51b0
Author: Mustafa Akur <[email protected]>
AuthorDate: Sat Dec 30 03:48:36 2023 +0300

    Change first/last implementation to prevent redundant comparisons when data 
is already sorted (#8678)
    
    * Change fist last implementation to prevent redundant computations
    
    * Remove redundant checks
    
    * Review
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 .../physical-expr/src/aggregate/first_last.rs      | 259 +++++++++++++--------
 datafusion/physical-plan/src/aggregates/mod.rs     |  77 +++++-
 datafusion/sqllogictest/test_files/groupby.slt     |  14 +-
 3 files changed, 234 insertions(+), 116 deletions(-)

diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs 
b/datafusion/physical-expr/src/aggregate/first_last.rs
index c7032e601c..4afa8d0dd5 100644
--- a/datafusion/physical-expr/src/aggregate/first_last.rs
+++ b/datafusion/physical-expr/src/aggregate/first_last.rs
@@ -36,13 +36,14 @@ use datafusion_common::{
 use datafusion_expr::Accumulator;
 
 /// FIRST_VALUE aggregate expression
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct FirstValue {
     name: String,
     input_data_type: DataType,
     order_by_data_types: Vec<DataType>,
     expr: Arc<dyn PhysicalExpr>,
     ordering_req: LexOrdering,
+    requirement_satisfied: bool,
 }
 
 impl FirstValue {
@@ -54,12 +55,14 @@ impl FirstValue {
         ordering_req: LexOrdering,
         order_by_data_types: Vec<DataType>,
     ) -> Self {
+        let requirement_satisfied = ordering_req.is_empty();
         Self {
             name: name.into(),
             input_data_type,
             order_by_data_types,
             expr,
             ordering_req,
+            requirement_satisfied,
         }
     }
 
@@ -87,6 +90,33 @@ impl FirstValue {
     pub fn ordering_req(&self) -> &LexOrdering {
         &self.ordering_req
     }
+
+    pub fn with_requirement_satisfied(mut self, requirement_satisfied: bool) 
-> Self {
+        self.requirement_satisfied = requirement_satisfied;
+        self
+    }
+
+    pub fn convert_to_last(self) -> LastValue {
+        let name = if self.name.starts_with("FIRST") {
+            format!("LAST{}", &self.name[5..])
+        } else {
+            format!("LAST_VALUE({})", self.expr)
+        };
+        let FirstValue {
+            expr,
+            input_data_type,
+            ordering_req,
+            order_by_data_types,
+            ..
+        } = self;
+        LastValue::new(
+            expr,
+            name,
+            input_data_type,
+            reverse_order_bys(&ordering_req),
+            order_by_data_types,
+        )
+    }
 }
 
 impl AggregateExpr for FirstValue {
@@ -100,11 +130,14 @@ impl AggregateExpr for FirstValue {
     }
 
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(FirstValueAccumulator::try_new(
+        FirstValueAccumulator::try_new(
             &self.input_data_type,
             &self.order_by_data_types,
             self.ordering_req.clone(),
-        )?))
+        )
+        .map(|acc| {
+            
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
+        })
     }
 
     fn state_fields(&self) -> Result<Vec<Field>> {
@@ -130,11 +163,7 @@ impl AggregateExpr for FirstValue {
     }
 
     fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
-        if self.ordering_req.is_empty() {
-            None
-        } else {
-            Some(&self.ordering_req)
-        }
+        (!self.ordering_req.is_empty()).then_some(&self.ordering_req)
     }
 
     fn name(&self) -> &str {
@@ -142,26 +171,18 @@ impl AggregateExpr for FirstValue {
     }
 
     fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
-        let name = if self.name.starts_with("FIRST") {
-            format!("LAST{}", &self.name[5..])
-        } else {
-            format!("LAST_VALUE({})", self.expr)
-        };
-        Some(Arc::new(LastValue::new(
-            self.expr.clone(),
-            name,
-            self.input_data_type.clone(),
-            reverse_order_bys(&self.ordering_req),
-            self.order_by_data_types.clone(),
-        )))
+        Some(Arc::new(self.clone().convert_to_last()))
     }
 
     fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(FirstValueAccumulator::try_new(
+        FirstValueAccumulator::try_new(
             &self.input_data_type,
             &self.order_by_data_types,
             self.ordering_req.clone(),
-        )?))
+        )
+        .map(|acc| {
+            
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
+        })
     }
 }
 
@@ -190,6 +211,8 @@ struct FirstValueAccumulator {
     orderings: Vec<ScalarValue>,
     // Stores the applicable ordering requirement.
     ordering_req: LexOrdering,
+    // Stores whether incoming data already satisfies the ordering requirement.
+    requirement_satisfied: bool,
 }
 
 impl FirstValueAccumulator {
@@ -203,42 +226,29 @@ impl FirstValueAccumulator {
             .iter()
             .map(ScalarValue::try_from)
             .collect::<Result<Vec<_>>>()?;
-        ScalarValue::try_from(data_type).map(|value| Self {
-            first: value,
+        let requirement_satisfied = ordering_req.is_empty();
+        ScalarValue::try_from(data_type).map(|first| Self {
+            first,
             is_set: false,
             orderings,
             ordering_req,
+            requirement_satisfied,
         })
     }
 
     // Updates state with the values in the given row.
-    fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
-        let [value, orderings @ ..] = row else {
-            return internal_err!("Empty row in FIRST_VALUE");
-        };
-        // Update when there is no entry in the state, or we have an "earlier"
-        // entry according to sort requirements.
-        if !self.is_set
-            || compare_rows(
-                &self.orderings,
-                orderings,
-                &get_sort_options(&self.ordering_req),
-            )?
-            .is_gt()
-        {
-            self.first = value.clone();
-            self.orderings = orderings.to_vec();
-            self.is_set = true;
-        }
-        Ok(())
+    fn update_with_new_row(&mut self, row: &[ScalarValue]) {
+        self.first = row[0].clone();
+        self.orderings = row[1..].to_vec();
+        self.is_set = true;
     }
 
     fn get_first_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
         let [value, ordering_values @ ..] = values else {
             return internal_err!("Empty row in FIRST_VALUE");
         };
-        if self.ordering_req.is_empty() {
-            // Get first entry according to receive order (0th index)
+        if self.requirement_satisfied {
+            // Get first entry according to the pre-existing ordering (0th 
index):
             return Ok((!value.is_empty()).then_some(0));
         }
         let sort_columns = ordering_values
@@ -252,6 +262,11 @@ impl FirstValueAccumulator {
         let indices = lexsort_to_indices(&sort_columns, Some(1))?;
         Ok((!indices.is_empty()).then_some(indices.value(0) as _))
     }
+
+    fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> 
Self {
+        self.requirement_satisfied = requirement_satisfied;
+        self
+    }
 }
 
 impl Accumulator for FirstValueAccumulator {
@@ -263,9 +278,25 @@ impl Accumulator for FirstValueAccumulator {
     }
 
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        if let Some(first_idx) = self.get_first_idx(values)? {
-            let row = get_row_at_idx(values, first_idx)?;
-            self.update_with_new_row(&row)?;
+        if !self.is_set {
+            if let Some(first_idx) = self.get_first_idx(values)? {
+                let row = get_row_at_idx(values, first_idx)?;
+                self.update_with_new_row(&row);
+            }
+        } else if !self.requirement_satisfied {
+            if let Some(first_idx) = self.get_first_idx(values)? {
+                let row = get_row_at_idx(values, first_idx)?;
+                let orderings = &row[1..];
+                if compare_rows(
+                    &self.orderings,
+                    orderings,
+                    &get_sort_options(&self.ordering_req),
+                )?
+                .is_gt()
+                {
+                    self.update_with_new_row(&row);
+                }
+            }
         }
         Ok(())
     }
@@ -294,12 +325,12 @@ impl Accumulator for FirstValueAccumulator {
             let sort_options = get_sort_options(&self.ordering_req);
             // Either there is no existing value, or there is an earlier 
version in new data.
             if !self.is_set
-                || compare_rows(first_ordering, &self.orderings, 
&sort_options)?.is_lt()
+                || compare_rows(&self.orderings, first_ordering, 
&sort_options)?.is_gt()
             {
                 // Update with first value in the state. Note that we should 
exclude the
                 // is_set flag from the state. Otherwise, we will end up with 
a state
                 // containing two is_set flags.
-                self.update_with_new_row(&first_row[0..is_set_idx])?;
+                self.update_with_new_row(&first_row[0..is_set_idx]);
             }
         }
         Ok(())
@@ -318,13 +349,14 @@ impl Accumulator for FirstValueAccumulator {
 }
 
 /// LAST_VALUE aggregate expression
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct LastValue {
     name: String,
     input_data_type: DataType,
     order_by_data_types: Vec<DataType>,
     expr: Arc<dyn PhysicalExpr>,
     ordering_req: LexOrdering,
+    requirement_satisfied: bool,
 }
 
 impl LastValue {
@@ -336,12 +368,14 @@ impl LastValue {
         ordering_req: LexOrdering,
         order_by_data_types: Vec<DataType>,
     ) -> Self {
+        let requirement_satisfied = ordering_req.is_empty();
         Self {
             name: name.into(),
             input_data_type,
             order_by_data_types,
             expr,
             ordering_req,
+            requirement_satisfied,
         }
     }
 
@@ -369,6 +403,33 @@ impl LastValue {
     pub fn ordering_req(&self) -> &LexOrdering {
         &self.ordering_req
     }
+
+    pub fn with_requirement_satisfied(mut self, requirement_satisfied: bool) 
-> Self {
+        self.requirement_satisfied = requirement_satisfied;
+        self
+    }
+
+    pub fn convert_to_first(self) -> FirstValue {
+        let name = if self.name.starts_with("LAST") {
+            format!("FIRST{}", &self.name[4..])
+        } else {
+            format!("FIRST_VALUE({})", self.expr)
+        };
+        let LastValue {
+            expr,
+            input_data_type,
+            ordering_req,
+            order_by_data_types,
+            ..
+        } = self;
+        FirstValue::new(
+            expr,
+            name,
+            input_data_type,
+            reverse_order_bys(&ordering_req),
+            order_by_data_types,
+        )
+    }
 }
 
 impl AggregateExpr for LastValue {
@@ -382,11 +443,14 @@ impl AggregateExpr for LastValue {
     }
 
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(LastValueAccumulator::try_new(
+        LastValueAccumulator::try_new(
             &self.input_data_type,
             &self.order_by_data_types,
             self.ordering_req.clone(),
-        )?))
+        )
+        .map(|acc| {
+            
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
+        })
     }
 
     fn state_fields(&self) -> Result<Vec<Field>> {
@@ -412,11 +476,7 @@ impl AggregateExpr for LastValue {
     }
 
     fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
-        if self.ordering_req.is_empty() {
-            None
-        } else {
-            Some(&self.ordering_req)
-        }
+        (!self.ordering_req.is_empty()).then_some(&self.ordering_req)
     }
 
     fn name(&self) -> &str {
@@ -424,26 +484,18 @@ impl AggregateExpr for LastValue {
     }
 
     fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
-        let name = if self.name.starts_with("LAST") {
-            format!("FIRST{}", &self.name[4..])
-        } else {
-            format!("FIRST_VALUE({})", self.expr)
-        };
-        Some(Arc::new(FirstValue::new(
-            self.expr.clone(),
-            name,
-            self.input_data_type.clone(),
-            reverse_order_bys(&self.ordering_req),
-            self.order_by_data_types.clone(),
-        )))
+        Some(Arc::new(self.clone().convert_to_first()))
     }
 
     fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(LastValueAccumulator::try_new(
+        LastValueAccumulator::try_new(
             &self.input_data_type,
             &self.order_by_data_types,
             self.ordering_req.clone(),
-        )?))
+        )
+        .map(|acc| {
+            
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
+        })
     }
 }
 
@@ -471,6 +523,8 @@ struct LastValueAccumulator {
     orderings: Vec<ScalarValue>,
     // Stores the applicable ordering requirement.
     ordering_req: LexOrdering,
+    // Stores whether incoming data already satisfies the ordering requirement.
+    requirement_satisfied: bool,
 }
 
 impl LastValueAccumulator {
@@ -484,42 +538,28 @@ impl LastValueAccumulator {
             .iter()
             .map(ScalarValue::try_from)
             .collect::<Result<Vec<_>>>()?;
-        Ok(Self {
-            last: ScalarValue::try_from(data_type)?,
+        let requirement_satisfied = ordering_req.is_empty();
+        ScalarValue::try_from(data_type).map(|last| Self {
+            last,
             is_set: false,
             orderings,
             ordering_req,
+            requirement_satisfied,
         })
     }
 
     // Updates state with the values in the given row.
-    fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
-        let [value, orderings @ ..] = row else {
-            return internal_err!("Empty row in LAST_VALUE");
-        };
-        // Update when there is no entry in the state, or we have a "later"
-        // entry (either according to sort requirements or the order of 
execution).
-        if !self.is_set
-            || self.orderings.is_empty()
-            || compare_rows(
-                &self.orderings,
-                orderings,
-                &get_sort_options(&self.ordering_req),
-            )?
-            .is_lt()
-        {
-            self.last = value.clone();
-            self.orderings = orderings.to_vec();
-            self.is_set = true;
-        }
-        Ok(())
+    fn update_with_new_row(&mut self, row: &[ScalarValue]) {
+        self.last = row[0].clone();
+        self.orderings = row[1..].to_vec();
+        self.is_set = true;
     }
 
     fn get_last_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
         let [value, ordering_values @ ..] = values else {
             return internal_err!("Empty row in LAST_VALUE");
         };
-        if self.ordering_req.is_empty() {
+        if self.requirement_satisfied {
             // Get last entry according to the order of data:
             return Ok((!value.is_empty()).then_some(value.len() - 1));
         }
@@ -538,6 +578,11 @@ impl LastValueAccumulator {
         let indices = lexsort_to_indices(&sort_columns, Some(1))?;
         Ok((!indices.is_empty()).then_some(indices.value(0) as _))
     }
+
+    fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> 
Self {
+        self.requirement_satisfied = requirement_satisfied;
+        self
+    }
 }
 
 impl Accumulator for LastValueAccumulator {
@@ -549,10 +594,26 @@ impl Accumulator for LastValueAccumulator {
     }
 
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        if let Some(last_idx) = self.get_last_idx(values)? {
+        if !self.is_set || self.requirement_satisfied {
+            if let Some(last_idx) = self.get_last_idx(values)? {
+                let row = get_row_at_idx(values, last_idx)?;
+                self.update_with_new_row(&row);
+            }
+        } else if let Some(last_idx) = self.get_last_idx(values)? {
             let row = get_row_at_idx(values, last_idx)?;
-            self.update_with_new_row(&row)?;
+            let orderings = &row[1..];
+            // Update when there is a more recent entry
+            if compare_rows(
+                &self.orderings,
+                orderings,
+                &get_sort_options(&self.ordering_req),
+            )?
+            .is_lt()
+            {
+                self.update_with_new_row(&row);
+            }
         }
+
         Ok(())
     }
 
@@ -583,12 +644,12 @@ impl Accumulator for LastValueAccumulator {
             // Either there is no existing value, or there is a newer (latest)
             // version in the new data:
             if !self.is_set
-                || compare_rows(last_ordering, &self.orderings, 
&sort_options)?.is_gt()
+                || compare_rows(&self.orderings, last_ordering, 
&sort_options)?.is_lt()
             {
                 // Update with last value in the state. Note that we should 
exclude the
                 // is_set flag from the state. Otherwise, we will end up with 
a state
                 // containing two is_set flags.
-                self.update_with_new_row(&last_row[0..is_set_idx])?;
+                self.update_with_new_row(&last_row[0..is_set_idx]);
             }
         }
         Ok(())
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index f5bb4fe59b..a38044de02 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -44,9 +44,9 @@ use datafusion_expr::Accumulator;
 use datafusion_physical_expr::{
     aggregate::is_order_sensitive,
     equivalence::{collapse_lex_req, ProjectionMapping},
-    expressions::{Column, Max, Min, UnKnownColumn},
-    physical_exprs_contains, AggregateExpr, EquivalenceProperties, LexOrdering,
-    LexRequirement, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
+    expressions::{Column, FirstValue, LastValue, Max, Min, UnKnownColumn},
+    physical_exprs_contains, reverse_order_bys, AggregateExpr, 
EquivalenceProperties,
+    LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortExpr, 
PhysicalSortRequirement,
 };
 
 use itertools::Itertools;
@@ -324,7 +324,7 @@ impl AggregateExec {
     fn try_new_with_schema(
         mode: AggregateMode,
         group_by: PhysicalGroupBy,
-        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+        mut aggr_expr: Vec<Arc<dyn AggregateExpr>>,
         filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
         input: Arc<dyn ExecutionPlan>,
         input_schema: SchemaRef,
@@ -347,7 +347,8 @@ impl AggregateExec {
             .collect::<Vec<_>>();
 
         let req = get_aggregate_exprs_requirement(
-            &aggr_expr,
+            &new_requirement,
+            &mut aggr_expr,
             &group_by,
             &input_eq_properties,
             &mode,
@@ -896,6 +897,11 @@ fn finer_ordering(
     eq_properties.get_finer_ordering(existing_req, &aggr_req)
 }
 
+/// Concatenates the given slices.
+fn concat_slices<T: Clone>(lhs: &[T], rhs: &[T]) -> Vec<T> {
+    [lhs, rhs].concat()
+}
+
 /// Get the common requirement that satisfies all the aggregate expressions.
 ///
 /// # Parameters
@@ -914,14 +920,64 @@ fn finer_ordering(
 /// A `LexRequirement` instance, which is the requirement that satisfies all 
the
 /// aggregate requirements. Returns an error in case of conflicting 
requirements.
 fn get_aggregate_exprs_requirement(
-    aggr_exprs: &[Arc<dyn AggregateExpr>],
+    prefix_requirement: &[PhysicalSortRequirement],
+    aggr_exprs: &mut [Arc<dyn AggregateExpr>],
     group_by: &PhysicalGroupBy,
     eq_properties: &EquivalenceProperties,
     agg_mode: &AggregateMode,
 ) -> Result<LexRequirement> {
     let mut requirement = vec![];
-    for aggr_expr in aggr_exprs.iter() {
-        if let Some(finer_ordering) =
+    for aggr_expr in aggr_exprs.iter_mut() {
+        let aggr_req = aggr_expr.order_bys().unwrap_or(&[]);
+        let reverse_aggr_req = reverse_order_bys(aggr_req);
+        let aggr_req = PhysicalSortRequirement::from_sort_exprs(aggr_req);
+        let reverse_aggr_req =
+            PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_req);
+        if let Some(first_value) = 
aggr_expr.as_any().downcast_ref::<FirstValue>() {
+            let mut first_value = first_value.clone();
+            if eq_properties.ordering_satisfy_requirement(&concat_slices(
+                prefix_requirement,
+                &aggr_req,
+            )) {
+                first_value = first_value.with_requirement_satisfied(true);
+                *aggr_expr = Arc::new(first_value) as _;
+            } else if 
eq_properties.ordering_satisfy_requirement(&concat_slices(
+                prefix_requirement,
+                &reverse_aggr_req,
+            )) {
+                // Converting to LAST_VALUE enables more efficient execution
+                // given the existing ordering:
+                let mut last_value = first_value.convert_to_last();
+                last_value = last_value.with_requirement_satisfied(true);
+                *aggr_expr = Arc::new(last_value) as _;
+            } else {
+                // Requirement is not satisfied with existing ordering.
+                first_value = first_value.with_requirement_satisfied(false);
+                *aggr_expr = Arc::new(first_value) as _;
+            }
+        } else if let Some(last_value) = 
aggr_expr.as_any().downcast_ref::<LastValue>() {
+            let mut last_value = last_value.clone();
+            if eq_properties.ordering_satisfy_requirement(&concat_slices(
+                prefix_requirement,
+                &aggr_req,
+            )) {
+                last_value = last_value.with_requirement_satisfied(true);
+                *aggr_expr = Arc::new(last_value) as _;
+            } else if 
eq_properties.ordering_satisfy_requirement(&concat_slices(
+                prefix_requirement,
+                &reverse_aggr_req,
+            )) {
+                // Converting to FIRST_VALUE enables more efficient execution
+                // given the existing ordering:
+                let mut first_value = last_value.convert_to_first();
+                first_value = first_value.with_requirement_satisfied(true);
+                *aggr_expr = Arc::new(first_value) as _;
+            } else {
+                // Requirement is not satisfied with existing ordering.
+                last_value = last_value.with_requirement_satisfied(false);
+                *aggr_expr = Arc::new(last_value) as _;
+            }
+        } else if let Some(finer_ordering) =
             finer_ordering(&requirement, aggr_expr, group_by, eq_properties, 
agg_mode)
         {
             requirement = finer_ordering;
@@ -2071,7 +2127,7 @@ mod tests {
                 options: options1,
             },
         ];
-        let aggr_exprs = order_by_exprs
+        let mut aggr_exprs = order_by_exprs
             .into_iter()
             .map(|order_by_expr| {
                 Arc::new(OrderSensitiveArrayAgg::new(
@@ -2086,7 +2142,8 @@ mod tests {
             .collect::<Vec<_>>();
         let group_by = PhysicalGroupBy::new_single(vec![]);
         let res = get_aggregate_exprs_requirement(
-            &aggr_exprs,
+            &[],
+            &mut aggr_exprs,
             &group_by,
             &eq_properties,
             &AggregateMode::Partial,
diff --git a/datafusion/sqllogictest/test_files/groupby.slt 
b/datafusion/sqllogictest/test_files/groupby.slt
index bbf21e135f..b09ff79e88 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -2508,7 +2508,7 @@ Projection: sales_global.country, 
ARRAY_AGG(sales_global.amount) ORDER BY [sales
 ----TableScan: sales_global projection=[country, amount]
 physical_plan
 ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, 
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]@3 as fv2]
---AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]
+--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]
 ----SortExec: expr=[amount@1 DESC]
 ------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2539,7 +2539,7 @@ Projection: sales_global.country, 
ARRAY_AGG(sales_global.amount) ORDER BY [sales
 ----TableScan: sales_global projection=[country, amount]
 physical_plan
 ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts, 
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]@3 as fv2]
---AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]
+--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), 
FIRST_VALUE(sales_global.amount)]
 ----SortExec: expr=[amount@1 ASC NULLS LAST]
 ------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2571,7 +2571,7 @@ Projection: sales_global.country, 
FIRST_VALUE(sales_global.amount) ORDER BY [sal
 ----TableScan: sales_global projection=[country, amount]
 physical_plan
 ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST]@1 as fv1, 
LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST]@2 as fv2, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount 
ASC NULLS LAST]@3 as amounts]
---AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), 
ARRAY_AGG(sales_global.amount)]
+--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), 
ARRAY_AGG(sales_global.amount)]
 ----SortExec: expr=[amount@1 ASC NULLS LAST]
 ------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2636,7 +2636,7 @@ Projection: sales_global.country, 
FIRST_VALUE(sales_global.amount) ORDER BY [sal
 ------TableScan: sales_global projection=[country, ts, amount]
 physical_plan
 ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) 
ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, 
LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 
as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 
as sum1]
---AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), 
SUM(sales_global.amount)]
+--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[LAST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), 
SUM(sales_global.amount)]
 ----MemoryExec: partitions=1, partition_sizes=[1]
 
 query TRRR rowsort
@@ -2988,7 +2988,7 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST]
 ------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]
 --------CoalesceBatchesExec: target_batch_size=4
 ----------RepartitionExec: partitioning=Hash([country@0], 8), 
input_partitions=8
-------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]
+------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]
 --------------SortExec: expr=[amount@1 DESC]
 ----------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
 ------------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3631,10 +3631,10 @@ Projection: FIRST_VALUE(multiple_ordered_table.a) ORDER 
BY [multiple_ordered_tab
 ----TableScan: multiple_ordered_table projection=[a, c, d]
 physical_plan
 ProjectionExec: expr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY 
[multiple_ordered_table.a ASC NULLS LAST]@1 as first_a, 
LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC 
NULLS FIRST]@2 as last_c]
---AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], 
aggr=[FIRST_VALUE(multiple_ordered_table.a), 
LAST_VALUE(multiple_ordered_table.c)]
+--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], 
aggr=[FIRST_VALUE(multiple_ordered_table.a), 
FIRST_VALUE(multiple_ordered_table.c)]
 ----CoalesceBatchesExec: target_batch_size=2
 ------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8
---------AggregateExec: mode=Partial, gby=[d@2 as d], 
aggr=[FIRST_VALUE(multiple_ordered_table.a), 
LAST_VALUE(multiple_ordered_table.c)]
+--------AggregateExec: mode=Partial, gby=[d@2 as d], 
aggr=[FIRST_VALUE(multiple_ordered_table.a), 
FIRST_VALUE(multiple_ordered_table.c)]
 ----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
 ------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, 
d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], 
has_header=true
 

Reply via email to