This is an automated email from the ASF dual-hosted git repository.

ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 06ed3dd1ac Handle ordering of first last aggregation inside aggregator 
(#8662)
06ed3dd1ac is described below

commit 06ed3dd1ac01b1bd6a70b93b56cb72cb40777690
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu Dec 28 23:34:40 2023 +0300

    Handle ordering of first last aggregation inside aggregator (#8662)
    
    * Initial commit
    
    * Update tests in distinct_on
    
    * Update group by joins slt
    
    * Remove unused code
    
    * Minor changes
    
    * Minor changes
    
    * Simplifications
    
    * Update comments
    
    * Review
    
    * Fix clippy
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 datafusion-cli/src/functions.rs                    |   2 +-
 datafusion/common/src/error.rs                     |   1 -
 .../src/physical_optimizer/projection_pushdown.rs  |   4 +
 .../src/simplify_expressions/guarantees.rs         |   4 +
 .../physical-expr/src/aggregate/first_last.rs      | 131 ++++--
 datafusion/physical-expr/src/aggregate/mod.rs      |  30 +-
 datafusion/physical-expr/src/aggregate/utils.rs    |  18 +-
 datafusion/physical-expr/src/array_expressions.rs  |   2 +-
 datafusion/physical-plan/src/aggregates/mod.rs     | 461 +++++++++------------
 .../src/engines/datafusion_engine/mod.rs           |   1 -
 datafusion/sqllogictest/test_files/distinct_on.slt |   9 +-
 datafusion/sqllogictest/test_files/groupby.slt     |  82 ++--
 datafusion/sqllogictest/test_files/joins.slt       |   4 +-
 13 files changed, 373 insertions(+), 376 deletions(-)

diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs
index f8d9ed238b..5390fa9f22 100644
--- a/datafusion-cli/src/functions.rs
+++ b/datafusion-cli/src/functions.rs
@@ -297,7 +297,7 @@ pub struct ParquetMetadataFunc {}
 
 impl TableFunctionImpl for ParquetMetadataFunc {
     fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
-        let filename = match exprs.get(0) {
+        let filename = match exprs.first() {
             Some(Expr::Literal(ScalarValue::Utf8(Some(s)))) => s, // single 
quote: parquet_metadata('x.parquet')
             Some(Expr::Column(Column { name, .. })) => name, // double quote: 
parquet_metadata("x.parquet")
             _ => {
diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs
index 515acc6d1c..e58faaa150 100644
--- a/datafusion/common/src/error.rs
+++ b/datafusion/common/src/error.rs
@@ -558,7 +558,6 @@ macro_rules! arrow_err {
 
 // To avoid compiler error when using macro in the same crate:
 // macros from the current crate cannot be referred to by absolute paths
-pub use exec_err as _exec_err;
 pub use internal_datafusion_err as _internal_datafusion_err;
 pub use internal_err as _internal_err;
 pub use not_impl_err as _not_impl_err;
diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs 
b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index 7e1312dad2..d237a3e860 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -990,6 +990,10 @@ fn update_join_on(
     proj_right_exprs: &[(Column, String)],
     hash_join_on: &[(Column, Column)],
 ) -> Option<Vec<(Column, Column)>> {
+    // TODO: Clippy wants the "map" call removed, but doing so generates
+    //       a compilation error. Remove the clippy directive once this
+    //       issue is fixed.
+    #[allow(clippy::map_identity)]
     let (left_idx, right_idx): (Vec<_>, Vec<_>) = hash_join_on
         .iter()
         .map(|(left, right)| (left, right))
diff --git a/datafusion/optimizer/src/simplify_expressions/guarantees.rs 
b/datafusion/optimizer/src/simplify_expressions/guarantees.rs
index 860dc326b9..aa7bb4f78a 100644
--- a/datafusion/optimizer/src/simplify_expressions/guarantees.rs
+++ b/datafusion/optimizer/src/simplify_expressions/guarantees.rs
@@ -47,6 +47,10 @@ impl<'a> GuaranteeRewriter<'a> {
         guarantees: impl IntoIterator<Item = &'a (Expr, NullableInterval)>,
     ) -> Self {
         Self {
+            // TODO: Clippy wants the "map" call removed, but doing so 
generates
+            //       a compilation error. Remove the clippy directive once this
+            //       issue is fixed.
+            #[allow(clippy::map_identity)]
             guarantees: guarantees.into_iter().map(|(k, v)| (k, v)).collect(),
         }
     }
diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs 
b/datafusion/physical-expr/src/aggregate/first_last.rs
index c009881d89..c7032e601c 100644
--- a/datafusion/physical-expr/src/aggregate/first_last.rs
+++ b/datafusion/physical-expr/src/aggregate/first_last.rs
@@ -20,7 +20,7 @@
 use std::any::Any;
 use std::sync::Arc;
 
-use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
+use crate::aggregate::utils::{down_cast_any_ref, get_sort_options, 
ordering_fields};
 use crate::expressions::format_state_name;
 use crate::{
     reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, 
PhysicalSortExpr,
@@ -29,9 +29,10 @@ use crate::{
 use arrow::array::{Array, ArrayRef, AsArray, BooleanArray};
 use arrow::compute::{self, lexsort_to_indices, SortColumn};
 use arrow::datatypes::{DataType, Field};
-use arrow_schema::SortOptions;
 use datafusion_common::utils::{compare_rows, get_arrayref_at_indices, 
get_row_at_idx};
-use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, 
ScalarValue};
+use datafusion_common::{
+    arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue,
+};
 use datafusion_expr::Accumulator;
 
 /// FIRST_VALUE aggregate expression
@@ -211,10 +212,45 @@ impl FirstValueAccumulator {
     }
 
     // Updates state with the values in the given row.
-    fn update_with_new_row(&mut self, row: &[ScalarValue]) {
-        self.first = row[0].clone();
-        self.orderings = row[1..].to_vec();
-        self.is_set = true;
+    fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
+        let [value, orderings @ ..] = row else {
+            return internal_err!("Empty row in FIRST_VALUE");
+        };
+        // Update when there is no entry in the state, or we have an "earlier"
+        // entry according to sort requirements.
+        if !self.is_set
+            || compare_rows(
+                &self.orderings,
+                orderings,
+                &get_sort_options(&self.ordering_req),
+            )?
+            .is_gt()
+        {
+            self.first = value.clone();
+            self.orderings = orderings.to_vec();
+            self.is_set = true;
+        }
+        Ok(())
+    }
+
+    fn get_first_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
+        let [value, ordering_values @ ..] = values else {
+            return internal_err!("Empty row in FIRST_VALUE");
+        };
+        if self.ordering_req.is_empty() {
+            // Get first entry according to receive order (0th index)
+            return Ok((!value.is_empty()).then_some(0));
+        }
+        let sort_columns = ordering_values
+            .iter()
+            .zip(self.ordering_req.iter())
+            .map(|(values, req)| SortColumn {
+                values: values.clone(),
+                options: Some(req.options),
+            })
+            .collect::<Vec<_>>();
+        let indices = lexsort_to_indices(&sort_columns, Some(1))?;
+        Ok((!indices.is_empty()).then_some(indices.value(0) as _))
     }
 }
 
@@ -227,11 +263,9 @@ impl Accumulator for FirstValueAccumulator {
     }
 
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        // If we have seen first value, we shouldn't update it
-        if !values[0].is_empty() && !self.is_set {
-            let row = get_row_at_idx(values, 0)?;
-            // Update with first value in the array.
-            self.update_with_new_row(&row);
+        if let Some(first_idx) = self.get_first_idx(values)? {
+            let row = get_row_at_idx(values, first_idx)?;
+            self.update_with_new_row(&row)?;
         }
         Ok(())
     }
@@ -265,7 +299,7 @@ impl Accumulator for FirstValueAccumulator {
                 // Update with first value in the state. Note that we should 
exclude the
                 // is_set flag from the state. Otherwise, we will end up with 
a state
                 // containing two is_set flags.
-                self.update_with_new_row(&first_row[0..is_set_idx]);
+                self.update_with_new_row(&first_row[0..is_set_idx])?;
             }
         }
         Ok(())
@@ -459,10 +493,50 @@ impl LastValueAccumulator {
     }
 
     // Updates state with the values in the given row.
-    fn update_with_new_row(&mut self, row: &[ScalarValue]) {
-        self.last = row[0].clone();
-        self.orderings = row[1..].to_vec();
-        self.is_set = true;
+    fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
+        let [value, orderings @ ..] = row else {
+            return internal_err!("Empty row in LAST_VALUE");
+        };
+        // Update when there is no entry in the state, or we have a "later"
+        // entry (either according to sort requirements or the order of 
execution).
+        if !self.is_set
+            || self.orderings.is_empty()
+            || compare_rows(
+                &self.orderings,
+                orderings,
+                &get_sort_options(&self.ordering_req),
+            )?
+            .is_lt()
+        {
+            self.last = value.clone();
+            self.orderings = orderings.to_vec();
+            self.is_set = true;
+        }
+        Ok(())
+    }
+
+    fn get_last_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
+        let [value, ordering_values @ ..] = values else {
+            return internal_err!("Empty row in LAST_VALUE");
+        };
+        if self.ordering_req.is_empty() {
+            // Get last entry according to the order of data:
+            return Ok((!value.is_empty()).then_some(value.len() - 1));
+        }
+        let sort_columns = ordering_values
+            .iter()
+            .zip(self.ordering_req.iter())
+            .map(|(values, req)| {
+                // Take the reverse ordering requirement. This enables us to
+                // use "fetch = 1" to get the last value.
+                SortColumn {
+                    values: values.clone(),
+                    options: Some(!req.options),
+                }
+            })
+            .collect::<Vec<_>>();
+        let indices = lexsort_to_indices(&sort_columns, Some(1))?;
+        Ok((!indices.is_empty()).then_some(indices.value(0) as _))
     }
 }
 
@@ -475,10 +549,9 @@ impl Accumulator for LastValueAccumulator {
     }
 
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        if !values[0].is_empty() {
-            let row = get_row_at_idx(values, values[0].len() - 1)?;
-            // Update with last value in the array.
-            self.update_with_new_row(&row);
+        if let Some(last_idx) = self.get_last_idx(values)? {
+            let row = get_row_at_idx(values, last_idx)?;
+            self.update_with_new_row(&row)?;
         }
         Ok(())
     }
@@ -515,7 +588,7 @@ impl Accumulator for LastValueAccumulator {
                 // Update with last value in the state. Note that we should 
exclude the
                 // is_set flag from the state. Otherwise, we will end up with 
a state
                 // containing two is_set flags.
-                self.update_with_new_row(&last_row[0..is_set_idx]);
+                self.update_with_new_row(&last_row[0..is_set_idx])?;
             }
         }
         Ok(())
@@ -559,26 +632,18 @@ fn convert_to_sort_cols(
         .collect::<Vec<_>>()
 }
 
-/// Selects the sort option attribute from all the given `PhysicalSortExpr`s.
-fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec<SortOptions> {
-    ordering_req
-        .iter()
-        .map(|item| item.options)
-        .collect::<Vec<_>>()
-}
-
 #[cfg(test)]
 mod tests {
+    use std::sync::Arc;
+
     use crate::aggregate::first_last::{FirstValueAccumulator, 
LastValueAccumulator};
 
+    use arrow::compute::concat;
     use arrow_array::{ArrayRef, Int64Array};
     use arrow_schema::DataType;
     use datafusion_common::{Result, ScalarValue};
     use datafusion_expr::Accumulator;
 
-    use arrow::compute::concat;
-    use std::sync::Arc;
-
     #[test]
     fn test_first_last_value_value() -> Result<()> {
         let mut first_accumulator =
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs 
b/datafusion/physical-expr/src/aggregate/mod.rs
index 329bb1e641..5bd1fca385 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -15,16 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::expressions::{FirstValue, LastValue, OrderSensitiveArrayAgg};
-use crate::{PhysicalExpr, PhysicalSortExpr};
-use arrow::datatypes::Field;
-use datafusion_common::{not_impl_err, DataFusionError, Result};
-use datafusion_expr::Accumulator;
 use std::any::Any;
 use std::fmt::Debug;
 use std::sync::Arc;
 
 use self::groups_accumulator::GroupsAccumulator;
+use crate::expressions::OrderSensitiveArrayAgg;
+use crate::{PhysicalExpr, PhysicalSortExpr};
+
+use arrow::datatypes::Field;
+use datafusion_common::{not_impl_err, DataFusionError, Result};
+use datafusion_expr::Accumulator;
+
+mod hyperloglog;
+mod tdigest;
 
 pub(crate) mod approx_distinct;
 pub(crate) mod approx_median;
@@ -46,19 +50,18 @@ pub(crate) mod median;
 pub(crate) mod string_agg;
 #[macro_use]
 pub(crate) mod min_max;
-pub mod build_in;
 pub(crate) mod groups_accumulator;
-mod hyperloglog;
-pub mod moving_min_max;
 pub(crate) mod regr;
 pub(crate) mod stats;
 pub(crate) mod stddev;
 pub(crate) mod sum;
 pub(crate) mod sum_distinct;
-mod tdigest;
-pub mod utils;
 pub(crate) mod variance;
 
+pub mod build_in;
+pub mod moving_min_max;
+pub mod utils;
+
 /// An aggregate expression that:
 /// * knows its resulting field
 /// * knows how to create its accumulator
@@ -134,10 +137,7 @@ pub trait AggregateExpr: Send + Sync + Debug + 
PartialEq<dyn Any> {
 
 /// Checks whether the given aggregate expression is order-sensitive.
 /// For instance, a `SUM` aggregation doesn't depend on the order of its 
inputs.
-/// However, a `FirstValue` depends on the input ordering (if the order 
changes,
-/// the first value in the list would change).
+/// However, an `ARRAY_AGG` with `ORDER BY` depends on the input ordering.
 pub fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
-    aggr_expr.as_any().is::<FirstValue>()
-        || aggr_expr.as_any().is::<LastValue>()
-        || aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
+    aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
 }
diff --git a/datafusion/physical-expr/src/aggregate/utils.rs 
b/datafusion/physical-expr/src/aggregate/utils.rs
index e5421ef5ab..9777158da1 100644
--- a/datafusion/physical-expr/src/aggregate/utils.rs
+++ b/datafusion/physical-expr/src/aggregate/utils.rs
@@ -17,20 +17,21 @@
 
 //! Utilities used in aggregates
 
+use std::any::Any;
+use std::sync::Arc;
+
 use crate::{AggregateExpr, PhysicalSortExpr};
-use arrow::array::ArrayRef;
+
+use arrow::array::{ArrayRef, ArrowNativeTypeOp};
 use arrow_array::cast::AsArray;
 use arrow_array::types::{
     Decimal128Type, DecimalType, TimestampMicrosecondType, 
TimestampMillisecondType,
     TimestampNanosecondType, TimestampSecondType,
 };
-use arrow_array::ArrowNativeTypeOp;
 use arrow_buffer::ArrowNativeType;
-use arrow_schema::{DataType, Field};
+use arrow_schema::{DataType, Field, SortOptions};
 use datafusion_common::{exec_err, DataFusionError, Result};
 use datafusion_expr::Accumulator;
-use std::any::Any;
-use std::sync::Arc;
 
 /// Convert scalar values from an accumulator into arrays.
 pub fn get_accum_scalar_values_as_arrays(
@@ -40,7 +41,7 @@ pub fn get_accum_scalar_values_as_arrays(
         .state()?
         .iter()
         .map(|s| s.to_array_of_size(1))
-        .collect::<Result<Vec<_>>>()
+        .collect()
 }
 
 /// Computes averages for `Decimal128`/`Decimal256` values, checking for 
overflow
@@ -205,3 +206,8 @@ pub(crate) fn ordering_fields(
         })
         .collect()
 }
+
+/// Selects the sort option attribute from all the given `PhysicalSortExpr`s.
+pub fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec<SortOptions> 
{
+    ordering_req.iter().map(|item| item.options).collect()
+}
diff --git a/datafusion/physical-expr/src/array_expressions.rs 
b/datafusion/physical-expr/src/array_expressions.rs
index 274d1db4eb..7a986810ba 100644
--- a/datafusion/physical-expr/src/array_expressions.rs
+++ b/datafusion/physical-expr/src/array_expressions.rs
@@ -2453,7 +2453,7 @@ pub fn general_array_distinct<OffsetSize: 
OffsetSizeTrait>(
         let last_offset: OffsetSize = offsets.last().copied().unwrap();
         offsets.push(last_offset + OffsetSize::usize_as(rows.len()));
         let arrays = converter.convert_rows(rows)?;
-        let array = match arrays.get(0) {
+        let array = match arrays.first() {
             Some(array) => array.clone(),
             None => {
                 return internal_err!("array_distinct: failed to get array from 
rows")
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index f779322456..f5bb4fe59b 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -27,7 +27,7 @@ use crate::aggregates::{
 };
 
 use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
-use crate::windows::{get_ordered_partition_by_indices, get_window_mode};
+use crate::windows::get_ordered_partition_by_indices;
 use crate::{
     DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode, 
Partitioning,
     SendableRecordBatchStream, Statistics,
@@ -45,11 +45,11 @@ use datafusion_physical_expr::{
     aggregate::is_order_sensitive,
     equivalence::{collapse_lex_req, ProjectionMapping},
     expressions::{Column, Max, Min, UnKnownColumn},
-    physical_exprs_contains, reverse_order_bys, AggregateExpr, 
EquivalenceProperties,
-    LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortExpr, 
PhysicalSortRequirement,
+    physical_exprs_contains, AggregateExpr, EquivalenceProperties, LexOrdering,
+    LexRequirement, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
 };
 
-use itertools::{izip, Itertools};
+use itertools::Itertools;
 
 mod group_values;
 mod no_grouping;
@@ -277,159 +277,6 @@ pub struct AggregateExec {
     output_ordering: Option<LexOrdering>,
 }
 
-/// This function returns the ordering requirement of the first non-reversible
-/// order-sensitive aggregate function such as ARRAY_AGG. This requirement 
serves
-/// as the initial requirement while calculating the finest requirement among 
all
-/// aggregate functions. If this function returns `None`, it means there is no
-/// hard ordering requirement for the aggregate functions (in terms of 
direction).
-/// Then, we can generate two alternative requirements with opposite 
directions.
-fn get_init_req(
-    aggr_expr: &[Arc<dyn AggregateExpr>],
-    order_by_expr: &[Option<LexOrdering>],
-) -> Option<LexOrdering> {
-    for (aggr_expr, fn_reqs) in aggr_expr.iter().zip(order_by_expr.iter()) {
-        // If the aggregation function is a non-reversible order-sensitive 
function
-        // and there is a hard requirement, choose first such requirement:
-        if is_order_sensitive(aggr_expr)
-            && aggr_expr.reverse_expr().is_none()
-            && fn_reqs.is_some()
-        {
-            return fn_reqs.clone();
-        }
-    }
-    None
-}
-
-/// This function gets the finest ordering requirement among all the 
aggregation
-/// functions. If requirements are conflicting, (i.e. we can not compute the
-/// aggregations in a single [`AggregateExec`]), the function returns an error.
-fn get_finest_requirement(
-    aggr_expr: &mut [Arc<dyn AggregateExpr>],
-    order_by_expr: &mut [Option<LexOrdering>],
-    eq_properties: &EquivalenceProperties,
-) -> Result<Option<LexOrdering>> {
-    // First, we check if all the requirements are satisfied by the existing
-    // ordering. If so, we return `None` to indicate this.
-    let mut all_satisfied = true;
-    for (aggr_expr, fn_req) in 
aggr_expr.iter_mut().zip(order_by_expr.iter_mut()) {
-        if eq_properties.ordering_satisfy(fn_req.as_deref().unwrap_or(&[])) {
-            continue;
-        }
-        if let Some(reverse) = aggr_expr.reverse_expr() {
-            let reverse_req = fn_req.as_ref().map(|item| 
reverse_order_bys(item));
-            if 
eq_properties.ordering_satisfy(reverse_req.as_deref().unwrap_or(&[])) {
-                // We need to update `aggr_expr` with its reverse since only 
its
-                // reverse requirement is compatible with the existing 
requirements:
-                *aggr_expr = reverse;
-                *fn_req = reverse_req;
-                continue;
-            }
-        }
-        // Requirement is not satisfied:
-        all_satisfied = false;
-    }
-    if all_satisfied {
-        // All of the requirements are already satisfied.
-        return Ok(None);
-    }
-    let mut finest_req = get_init_req(aggr_expr, order_by_expr);
-    for (aggr_expr, fn_req) in 
aggr_expr.iter_mut().zip(order_by_expr.iter_mut()) {
-        let Some(fn_req) = fn_req else {
-            continue;
-        };
-
-        if let Some(finest_req) = &mut finest_req {
-            if let Some(finer) = eq_properties.get_finer_ordering(finest_req, 
fn_req) {
-                *finest_req = finer;
-                continue;
-            }
-            // If an aggregate function is reversible, analyze whether its 
reverse
-            // direction is compatible with existing requirements:
-            if let Some(reverse) = aggr_expr.reverse_expr() {
-                let fn_req_reverse = reverse_order_bys(fn_req);
-                if let Some(finer) =
-                    eq_properties.get_finer_ordering(finest_req, 
&fn_req_reverse)
-                {
-                    // We need to update `aggr_expr` with its reverse, since 
only its
-                    // reverse requirement is compatible with existing 
requirements:
-                    *aggr_expr = reverse;
-                    *finest_req = finer;
-                    *fn_req = fn_req_reverse;
-                    continue;
-                }
-            }
-            // If neither of the requirements satisfy the other, this means
-            // requirements are conflicting. Currently, we do not support
-            // conflicting requirements.
-            return not_impl_err!(
-                "Conflicting ordering requirements in aggregate functions is 
not supported"
-            );
-        } else {
-            finest_req = Some(fn_req.clone());
-        }
-    }
-    Ok(finest_req)
-}
-
-/// Calculates search_mode for the aggregation
-fn get_aggregate_search_mode(
-    group_by: &PhysicalGroupBy,
-    input: &Arc<dyn ExecutionPlan>,
-    aggr_expr: &mut [Arc<dyn AggregateExpr>],
-    order_by_expr: &mut [Option<LexOrdering>],
-    ordering_req: &mut Vec<PhysicalSortExpr>,
-) -> InputOrderMode {
-    let groupby_exprs = group_by
-        .expr
-        .iter()
-        .map(|(item, _)| item.clone())
-        .collect::<Vec<_>>();
-    let mut input_order_mode = InputOrderMode::Linear;
-    if !group_by.is_single() || groupby_exprs.is_empty() {
-        return input_order_mode;
-    }
-
-    if let Some((should_reverse, mode)) =
-        get_window_mode(&groupby_exprs, ordering_req, input)
-    {
-        let all_reversible = aggr_expr
-            .iter()
-            .all(|expr| !is_order_sensitive(expr) || 
expr.reverse_expr().is_some());
-        if should_reverse && all_reversible {
-            izip!(aggr_expr.iter_mut(), order_by_expr.iter_mut()).for_each(
-                |(aggr, order_by)| {
-                    if let Some(reverse) = aggr.reverse_expr() {
-                        *aggr = reverse;
-                    } else {
-                        unreachable!();
-                    }
-                    *order_by = order_by.as_ref().map(|ob| 
reverse_order_bys(ob));
-                },
-            );
-            *ordering_req = reverse_order_bys(ordering_req);
-        }
-        input_order_mode = mode;
-    }
-    input_order_mode
-}
-
-/// Check whether group by expression contains all of the expression inside 
`requirement`
-// As an example Group By (c,b,a) contains all of the expressions in the 
`requirement`: (a ASC, b DESC)
-fn group_by_contains_all_requirements(
-    group_by: &PhysicalGroupBy,
-    requirement: &LexOrdering,
-) -> bool {
-    let physical_exprs = group_by.input_exprs();
-    // When we have multiple groups (grouping set)
-    // since group by may be calculated on the subset of the group_by.expr()
-    // it is not guaranteed to have all of the requirements among group by 
expressions.
-    // Hence do the analysis: whether group by contains all requirements in 
the single group case.
-    group_by.is_single()
-        && requirement
-            .iter()
-            .all(|req| physical_exprs_contains(&physical_exprs, &req.expr))
-}
-
 impl AggregateExec {
     /// Create a new hash aggregate execution plan
     pub fn try_new(
@@ -477,50 +324,14 @@ impl AggregateExec {
     fn try_new_with_schema(
         mode: AggregateMode,
         group_by: PhysicalGroupBy,
-        mut aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
         filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
         input: Arc<dyn ExecutionPlan>,
         input_schema: SchemaRef,
         schema: SchemaRef,
         original_schema: SchemaRef,
     ) -> Result<Self> {
-        // Reset ordering requirement to `None` if aggregator is not 
order-sensitive
-        let mut order_by_expr = aggr_expr
-            .iter()
-            .map(|aggr_expr| {
-                let fn_reqs = aggr_expr.order_bys().map(|ordering| 
ordering.to_vec());
-                // If
-                // - aggregation function is order-sensitive and
-                // - aggregation is performing a "first stage" calculation, and
-                // - at least one of the aggregate function requirement is not 
inside group by expression
-                // keep the ordering requirement as is; otherwise ignore the 
ordering requirement.
-                // In non-first stage modes, we accumulate data (using 
`merge_batch`)
-                // from different partitions (i.e. merge partial results). 
During
-                // this merge, we consider the ordering of each partial result.
-                // Hence, we do not need to use the ordering requirement in 
such
-                // modes as long as partial results are generated with the
-                // correct ordering.
-                fn_reqs.filter(|req| {
-                    is_order_sensitive(aggr_expr)
-                        && mode.is_first_stage()
-                        && !group_by_contains_all_requirements(&group_by, req)
-                })
-            })
-            .collect::<Vec<_>>();
-        let requirement = get_finest_requirement(
-            &mut aggr_expr,
-            &mut order_by_expr,
-            &input.equivalence_properties(),
-        )?;
-        let mut ordering_req = requirement.unwrap_or(vec![]);
-        let input_order_mode = get_aggregate_search_mode(
-            &group_by,
-            &input,
-            &mut aggr_expr,
-            &mut order_by_expr,
-            &mut ordering_req,
-        );
-
+        let input_eq_properties = input.equivalence_properties();
         // Get GROUP BY expressions:
         let groupby_exprs = group_by.input_exprs();
         // If existing ordering satisfies a prefix of the GROUP BY expressions,
@@ -528,17 +339,31 @@ impl AggregateExec {
         // work more efficiently.
         let indices = get_ordered_partition_by_indices(&groupby_exprs, &input);
         let mut new_requirement = indices
-            .into_iter()
-            .map(|idx| PhysicalSortRequirement {
+            .iter()
+            .map(|&idx| PhysicalSortRequirement {
                 expr: groupby_exprs[idx].clone(),
                 options: None,
             })
             .collect::<Vec<_>>();
-        // Postfix ordering requirement of the aggregation to the requirement.
-        let req = PhysicalSortRequirement::from_sort_exprs(&ordering_req);
+
+        let req = get_aggregate_exprs_requirement(
+            &aggr_expr,
+            &group_by,
+            &input_eq_properties,
+            &mode,
+        )?;
         new_requirement.extend(req);
         new_requirement = collapse_lex_req(new_requirement);
 
+        let input_order_mode =
+            if indices.len() == groupby_exprs.len() && !indices.is_empty() {
+                InputOrderMode::Sorted
+            } else if !indices.is_empty() {
+                InputOrderMode::PartiallySorted(indices)
+            } else {
+                InputOrderMode::Linear
+            };
+
         // construct a map from the input expression to the output expression 
of the Aggregation group by
         let projection_mapping =
             ProjectionMapping::try_new(&group_by.expr, &input.schema())?;
@@ -546,9 +371,8 @@ impl AggregateExec {
         let required_input_ordering =
             (!new_requirement.is_empty()).then_some(new_requirement);
 
-        let aggregate_eqs = input
-            .equivalence_properties()
-            .project(&projection_mapping, schema.clone());
+        let aggregate_eqs =
+            input_eq_properties.project(&projection_mapping, schema.clone());
         let output_ordering = aggregate_eqs.oeq_class().output_ordering();
 
         Ok(AggregateExec {
@@ -998,6 +822,121 @@ fn group_schema(schema: &Schema, group_count: usize) -> 
SchemaRef {
     Arc::new(Schema::new(group_fields))
 }
 
+/// Determines the lexical ordering requirement for an aggregate expression.
+///
+/// # Parameters
+///
+/// - `aggr_expr`: A reference to an `Arc<dyn AggregateExpr>` representing the
+///   aggregate expression.
+/// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the
+///   physical GROUP BY expression.
+/// - `agg_mode`: A reference to an `AggregateMode` instance representing the
+///   mode of aggregation.
+///
+/// # Returns
+///
+/// A `LexOrdering` instance indicating the lexical ordering requirement for
+/// the aggregate expression.
+fn get_aggregate_expr_req(
+    aggr_expr: &Arc<dyn AggregateExpr>,
+    group_by: &PhysicalGroupBy,
+    agg_mode: &AggregateMode,
+) -> LexOrdering {
+    // If the aggregation function is not order sensitive, or the aggregation
+    // is performing a "second stage" calculation, or all aggregate function
+    // requirements are inside the GROUP BY expression, then ignore the 
ordering
+    // requirement.
+    if !is_order_sensitive(aggr_expr) || !agg_mode.is_first_stage() {
+        return vec![];
+    }
+
+    let mut req = aggr_expr.order_bys().unwrap_or_default().to_vec();
+
+    // In non-first stage modes, we accumulate data (using `merge_batch`) from
+    // different partitions (i.e. merge partial results). During this merge, we
+    // consider the ordering of each partial result. Hence, we do not need to
+    // use the ordering requirement in such modes as long as partial results 
are
+    // generated with the correct ordering.
+    if group_by.is_single() {
+        // Remove all orderings that occur in the group by. These requirements
+        // will definitely be satisfied -- Each group by expression will have
+        // distinct values per group, hence all requirements are satisfied.
+        let physical_exprs = group_by.input_exprs();
+        req.retain(|sort_expr| {
+            !physical_exprs_contains(&physical_exprs, &sort_expr.expr)
+        });
+    }
+    req
+}
+
+/// Computes the finer ordering for between given existing ordering requirement
+/// of aggregate expression.
+///
+/// # Parameters
+///
+/// * `existing_req` - The existing lexical ordering that needs refinement.
+/// * `aggr_expr` - A reference to an aggregate expression trait object.
+/// * `group_by` - Information about the physical grouping (e.g group by 
expression).
+/// * `eq_properties` - Equivalence properties relevant to the computation.
+/// * `agg_mode` - The mode of aggregation (e.g., Partial, Final, etc.).
+///
+/// # Returns
+///
+/// An `Option<LexOrdering>` representing the computed finer lexical ordering,
+/// or `None` if there is no finer ordering; e.g. the existing requirement and
+/// the aggregator requirement is incompatible.
+fn finer_ordering(
+    existing_req: &LexOrdering,
+    aggr_expr: &Arc<dyn AggregateExpr>,
+    group_by: &PhysicalGroupBy,
+    eq_properties: &EquivalenceProperties,
+    agg_mode: &AggregateMode,
+) -> Option<LexOrdering> {
+    let aggr_req = get_aggregate_expr_req(aggr_expr, group_by, agg_mode);
+    eq_properties.get_finer_ordering(existing_req, &aggr_req)
+}
+
+/// Get the common requirement that satisfies all the aggregate expressions.
+///
+/// # Parameters
+///
+/// - `aggr_exprs`: A slice of `Arc<dyn AggregateExpr>` containing all the
+///   aggregate expressions.
+/// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the
+///   physical GROUP BY expression.
+/// - `eq_properties`: A reference to an `EquivalenceProperties` instance
+///   representing equivalence properties for ordering.
+/// - `agg_mode`: A reference to an `AggregateMode` instance representing the
+///   mode of aggregation.
+///
+/// # Returns
+///
+/// A `LexRequirement` instance, which is the requirement that satisfies all 
the
+/// aggregate requirements. Returns an error in case of conflicting 
requirements.
+fn get_aggregate_exprs_requirement(
+    aggr_exprs: &[Arc<dyn AggregateExpr>],
+    group_by: &PhysicalGroupBy,
+    eq_properties: &EquivalenceProperties,
+    agg_mode: &AggregateMode,
+) -> Result<LexRequirement> {
+    let mut requirement = vec![];
+    for aggr_expr in aggr_exprs.iter() {
+        if let Some(finer_ordering) =
+            finer_ordering(&requirement, aggr_expr, group_by, eq_properties, 
agg_mode)
+        {
+            requirement = finer_ordering;
+        } else {
+            // If neither of the requirements satisfy the other, this means
+            // requirements are conflicting. Currently, we do not support
+            // conflicting requirements.
+            return not_impl_err!(
+                "Conflicting ordering requirements in aggregate functions is 
not supported"
+            );
+        }
+    }
+    Ok(PhysicalSortRequirement::from_sort_exprs(&requirement))
+}
+
 /// returns physical expressions for arguments to evaluate against a batch
 /// The expressions are different depending on `mode`:
 /// * Partial: AggregateExpr::expressions
@@ -1013,33 +952,27 @@ fn aggregate_expressions(
         | AggregateMode::SinglePartitioned => Ok(aggr_expr
             .iter()
             .map(|agg| {
-                let mut result = agg.expressions().clone();
-                // In partial mode, append ordering requirements to 
expressions' results.
-                // Ordering requirements are used by subsequent executors to 
satisfy the required
-                // ordering for 
`AggregateMode::FinalPartitioned`/`AggregateMode::Final` modes.
-                if matches!(mode, AggregateMode::Partial) {
-                    if let Some(ordering_req) = agg.order_bys() {
-                        let ordering_exprs = ordering_req
-                            .iter()
-                            .map(|item| item.expr.clone())
-                            .collect::<Vec<_>>();
-                        result.extend(ordering_exprs);
-                    }
+                let mut result = agg.expressions();
+                // Append ordering requirements to expressions' results. This
+                // way order sensitive aggregators can satisfy requirement
+                // themselves.
+                if let Some(ordering_req) = agg.order_bys() {
+                    result.extend(ordering_req.iter().map(|item| 
item.expr.clone()));
                 }
                 result
             })
             .collect()),
-        // in this mode, we build the merge expressions of the aggregation
+        // In this mode, we build the merge expressions of the aggregation.
         AggregateMode::Final | AggregateMode::FinalPartitioned => {
             let mut col_idx_base = col_idx_base;
-            Ok(aggr_expr
+            aggr_expr
                 .iter()
                 .map(|agg| {
                     let exprs = merge_expressions(col_idx_base, agg)?;
                     col_idx_base += exprs.len();
                     Ok(exprs)
                 })
-                .collect::<Result<Vec<_>>>()?)
+                .collect()
         }
     }
 }
@@ -1052,14 +985,13 @@ fn merge_expressions(
     index_base: usize,
     expr: &Arc<dyn AggregateExpr>,
 ) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
-    Ok(expr
-        .state_fields()?
-        .iter()
-        .enumerate()
-        .map(|(idx, f)| {
-            Arc::new(Column::new(f.name(), index_base + idx)) as Arc<dyn 
PhysicalExpr>
-        })
-        .collect::<Vec<_>>())
+    expr.state_fields().map(|fields| {
+        fields
+            .iter()
+            .enumerate()
+            .map(|(idx, f)| Arc::new(Column::new(f.name(), index_base + idx)) 
as _)
+            .collect()
+    })
 }
 
 pub(crate) type AccumulatorItem = Box<dyn Accumulator>;
@@ -1070,7 +1002,7 @@ fn create_accumulators(
     aggr_expr
         .iter()
         .map(|expr| expr.create_accumulator())
-        .collect::<Result<Vec<_>>>()
+        .collect()
 }
 
 /// returns a vector of ArrayRefs, where each entry corresponds to either the
@@ -1081,8 +1013,8 @@ fn finalize_aggregation(
 ) -> Result<Vec<ArrayRef>> {
     match mode {
         AggregateMode::Partial => {
-            // build the vector of states
-            let a = accumulators
+            // Build the vector of states
+            accumulators
                 .iter()
                 .map(|accumulator| {
                     accumulator.state().and_then(|e| {
@@ -1091,18 +1023,18 @@ fn finalize_aggregation(
                             .collect::<Result<Vec<ArrayRef>>>()
                     })
                 })
-                .collect::<Result<Vec<_>>>()?;
-            Ok(a.iter().flatten().cloned().collect::<Vec<_>>())
+                .flatten_ok()
+                .collect()
         }
         AggregateMode::Final
         | AggregateMode::FinalPartitioned
         | AggregateMode::Single
         | AggregateMode::SinglePartitioned => {
-            // merge the state to the final value
+            // Merge the state to the final value
             accumulators
                 .iter()
                 .map(|accumulator| accumulator.evaluate().and_then(|v| 
v.to_array()))
-                .collect::<Result<Vec<ArrayRef>>>()
+                .collect()
         }
     }
 }
@@ -1125,9 +1057,7 @@ pub(crate) fn evaluate_many(
     expr: &[Vec<Arc<dyn PhysicalExpr>>],
     batch: &RecordBatch,
 ) -> Result<Vec<Vec<ArrayRef>>> {
-    expr.iter()
-        .map(|expr| evaluate(expr, batch))
-        .collect::<Result<Vec<_>>>()
+    expr.iter().map(|expr| evaluate(expr, batch)).collect()
 }
 
 fn evaluate_optional(
@@ -1143,7 +1073,7 @@ fn evaluate_optional(
                 })
                 .transpose()
         })
-        .collect::<Result<Vec<_>>>()
+        .collect()
 }
 
 /// Evaluate a group by expression against a `RecordBatch`
@@ -1204,9 +1134,7 @@ mod tests {
     use std::task::{Context, Poll};
 
     use super::*;
-    use crate::aggregates::{
-        get_finest_requirement, AggregateExec, AggregateMode, PhysicalGroupBy,
-    };
+    use crate::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
     use crate::coalesce_batches::CoalesceBatchesExec;
     use crate::coalesce_partitions::CoalescePartitionsExec;
     use crate::common;
@@ -1228,15 +1156,16 @@ mod tests {
         Result, ScalarValue,
     };
     use datafusion_execution::config::SessionConfig;
+    use datafusion_execution::memory_pool::FairSpillPool;
     use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
     use datafusion_physical_expr::expressions::{
-        lit, ApproxDistinct, Count, FirstValue, LastValue, Median,
+        lit, ApproxDistinct, Count, FirstValue, LastValue, Median, 
OrderSensitiveArrayAgg,
     };
     use datafusion_physical_expr::{
-        AggregateExpr, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+        reverse_order_bys, AggregateExpr, EquivalenceProperties, PhysicalExpr,
+        PhysicalSortExpr,
     };
 
-    use datafusion_execution::memory_pool::FairSpillPool;
     use futures::{FutureExt, Stream};
 
     // Generate a schema which consists of 5 columns (a, b, c, d, e)
@@ -2093,11 +2022,6 @@ mod tests {
             descending: false,
             nulls_first: false,
         };
-        // This is the reverse requirement of options1
-        let options2 = SortOptions {
-            descending: true,
-            nulls_first: true,
-        };
         let col_a = &col("a", &test_schema)?;
         let col_b = &col("b", &test_schema)?;
         let col_c = &col("c", &test_schema)?;
@@ -2106,7 +2030,7 @@ mod tests {
         eq_properties.add_equal_conditions(col_a, col_b);
         // Aggregate requirements are
         // [None], [a ASC], [a ASC, b ASC, c ASC], [a ASC, b ASC] respectively
-        let mut order_by_exprs = vec![
+        let order_by_exprs = vec![
             None,
             Some(vec![PhysicalSortExpr {
                 expr: col_a.clone(),
@@ -2136,14 +2060,8 @@ mod tests {
                     options: options1,
                 },
             ]),
-            // Since aggregate expression is reversible (FirstValue), we 
should be able to resolve below
-            // contradictory requirement by reversing it.
-            Some(vec![PhysicalSortExpr {
-                expr: col_b.clone(),
-                options: options2,
-            }]),
         ];
-        let common_requirement = Some(vec![
+        let common_requirement = vec![
             PhysicalSortExpr {
                 expr: col_a.clone(),
                 options: options1,
@@ -2152,17 +2070,28 @@ mod tests {
                 expr: col_c.clone(),
                 options: options1,
             },
-        ]);
-        let aggr_expr = Arc::new(FirstValue::new(
-            col_a.clone(),
-            "first1",
-            DataType::Int32,
-            vec![],
-            vec![],
-        )) as _;
-        let mut aggr_exprs = vec![aggr_expr; order_by_exprs.len()];
-        let res =
-            get_finest_requirement(&mut aggr_exprs, &mut order_by_exprs, 
&eq_properties)?;
+        ];
+        let aggr_exprs = order_by_exprs
+            .into_iter()
+            .map(|order_by_expr| {
+                Arc::new(OrderSensitiveArrayAgg::new(
+                    col_a.clone(),
+                    "array_agg",
+                    DataType::Int32,
+                    false,
+                    vec![],
+                    order_by_expr.unwrap_or_default(),
+                )) as _
+            })
+            .collect::<Vec<_>>();
+        let group_by = PhysicalGroupBy::new_single(vec![]);
+        let res = get_aggregate_exprs_requirement(
+            &aggr_exprs,
+            &group_by,
+            &eq_properties,
+            &AggregateMode::Partial,
+        )?;
+        let res = PhysicalSortRequirement::to_sort_exprs(res);
         assert_eq!(res, common_requirement);
         Ok(())
     }
diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/mod.rs 
b/datafusion/sqllogictest/src/engines/datafusion_engine/mod.rs
index 663bbdd5a3..8e2bbbfe4f 100644
--- a/datafusion/sqllogictest/src/engines/datafusion_engine/mod.rs
+++ b/datafusion/sqllogictest/src/engines/datafusion_engine/mod.rs
@@ -21,5 +21,4 @@ mod normalize;
 mod runner;
 
 pub use error::*;
-pub use normalize::*;
 pub use runner::*;
diff --git a/datafusion/sqllogictest/test_files/distinct_on.slt 
b/datafusion/sqllogictest/test_files/distinct_on.slt
index 9a7117b69b..3f609e2548 100644
--- a/datafusion/sqllogictest/test_files/distinct_on.slt
+++ b/datafusion/sqllogictest/test_files/distinct_on.slt
@@ -78,7 +78,7 @@ c 4
 query I
 SELECT DISTINCT ON (c1) c2 FROM aggregate_test_100 ORDER BY c1, c3;
 ----
-5
+4
 4
 2
 1
@@ -100,10 +100,9 @@ ProjectionExec: expr=[FIRST_VALUE(aggregate_test_100.c3) 
ORDER BY [aggregate_tes
 ------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], 
aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
 --------CoalesceBatchesExec: target_batch_size=8192
 ----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
-------------AggregateExec: mode=Partial, gby=[c1@0 as c1], 
aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)], 
ordering_mode=Sorted
---------------SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC NULLS LAST]
-----------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c2, c3], has_header=true
+------------AggregateExec: mode=Partial, gby=[c1@0 as c1], 
aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
+--------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+----------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c2, c3], has_header=true
 
 # ON expressions are not a sub-set of the ORDER BY expressions
 query error SELECT DISTINCT ON expressions must match initial ORDER BY 
expressions
diff --git a/datafusion/sqllogictest/test_files/groupby.slt 
b/datafusion/sqllogictest/test_files/groupby.slt
index f1b6a57287..bbf21e135f 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -2019,17 +2019,16 @@ SortPreservingMergeExec: [col0@0 ASC NULLS LAST]
 ------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as 
col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)]
 --------CoalesceBatchesExec: target_batch_size=8192
 ----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), 
input_partitions=4
-------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, 
col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallySorted([0])
---------------SortExec: expr=[col0@3 ASC NULLS LAST]
-----------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 
as col2, col0@0 as col0, col1@1 as col1]
-------------------CoalesceBatchesExec: target_batch_size=8192
---------------------HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(col0@0, col0@0)]
-----------------------CoalesceBatchesExec: target_batch_size=8192
-------------------------RepartitionExec: partitioning=Hash([col0@0], 4), 
input_partitions=1
---------------------------MemoryExec: partitions=1, partition_sizes=[3]
-----------------------CoalesceBatchesExec: target_batch_size=8192
-------------------------RepartitionExec: partitioning=Hash([col0@0], 4), 
input_partitions=1
---------------------------MemoryExec: partitions=1, partition_sizes=[3]
+------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, 
col2@2 as col2], aggr=[LAST_VALUE(r.col1)]
+--------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as 
col2, col0@0 as col0, col1@1 as col1]
+----------------CoalesceBatchesExec: target_batch_size=8192
+------------------HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(col0@0, col0@0)]
+--------------------CoalesceBatchesExec: target_batch_size=8192
+----------------------RepartitionExec: partitioning=Hash([col0@0], 4), 
input_partitions=1
+------------------------MemoryExec: partitions=1, partition_sizes=[3]
+--------------------CoalesceBatchesExec: target_batch_size=8192
+----------------------RepartitionExec: partitioning=Hash([col0@0], 4), 
input_partitions=1
+------------------------MemoryExec: partitions=1, partition_sizes=[3]
 
 # Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
 # a,b,c column. Column a has cardinality 2, column b has cardinality 4.
@@ -2209,7 +2208,7 @@ ProjectionExec: expr=[a@0 as a, b@1 as b, 
LAST_VALUE(annotated_data_infinite2.c)
 ----StreamingTableExec: partition_sizes=1, projection=[a, b, c], 
infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, 
c@2 ASC NULLS LAST]
 
 query III
-SELECT a, b, LAST_VALUE(c ORDER BY a DESC) as last_c
+SELECT a, b, LAST_VALUE(c ORDER BY a DESC, c ASC) as last_c
   FROM annotated_data_infinite2
   GROUP BY a, b
 ----
@@ -2509,7 +2508,7 @@ Projection: sales_global.country, 
ARRAY_AGG(sales_global.amount) ORDER BY [sales
 ----TableScan: sales_global projection=[country, amount]
 physical_plan
 ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, 
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]@3 as fv2]
---AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]
+--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]
 ----SortExec: expr=[amount@1 DESC]
 ------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2540,7 +2539,7 @@ Projection: sales_global.country, 
ARRAY_AGG(sales_global.amount) ORDER BY [sales
 ----TableScan: sales_global projection=[country, amount]
 physical_plan
 ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts, 
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]@3 as fv2]
---AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), 
FIRST_VALUE(sales_global.amount)]
+--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]
 ----SortExec: expr=[amount@1 ASC NULLS LAST]
 ------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2572,7 +2571,7 @@ Projection: sales_global.country, 
FIRST_VALUE(sales_global.amount) ORDER BY [sal
 ----TableScan: sales_global projection=[country, amount]
 physical_plan
 ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST]@1 as fv1, 
LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS 
FIRST]@2 as fv2, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount 
ASC NULLS LAST]@3 as amounts]
---AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), 
ARRAY_AGG(sales_global.amount)]
+--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), 
ARRAY_AGG(sales_global.amount)]
 ----SortExec: expr=[amount@1 ASC NULLS LAST]
 ------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -2637,9 +2636,8 @@ Projection: sales_global.country, 
FIRST_VALUE(sales_global.amount) ORDER BY [sal
 ------TableScan: sales_global projection=[country, ts, amount]
 physical_plan
 ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) 
ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, 
LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 
as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 
as sum1]
---AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[LAST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), 
SUM(sales_global.amount)]
-----SortExec: expr=[ts@1 ASC NULLS LAST]
-------MemoryExec: partitions=1, partition_sizes=[1]
+--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), 
SUM(sales_global.amount)]
+----MemoryExec: partitions=1, partition_sizes=[1]
 
 query TRRR rowsort
 SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1,
@@ -2672,8 +2670,7 @@ Projection: sales_global.country, 
FIRST_VALUE(sales_global.amount) ORDER BY [sal
 physical_plan
 ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) 
ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, 
LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 
as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 
as sum1]
 --AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), 
SUM(sales_global.amount)]
-----SortExec: expr=[ts@1 DESC]
-------MemoryExec: partitions=1, partition_sizes=[1]
+----MemoryExec: partitions=1, partition_sizes=[1]
 
 query TRRR rowsort
 SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1,
@@ -2709,12 +2706,11 @@ physical_plan
 SortExec: expr=[sn@2 ASC NULLS LAST]
 --ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 as 
sn, ts@3 as ts, currency@4 as currency, LAST_VALUE(e.amount) ORDER BY [e.sn ASC 
NULLS LAST]@5 as last_rate]
 ----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code, 
country@1 as country, ts@3 as ts, currency@4 as currency], 
aggr=[LAST_VALUE(e.amount)]
-------SortExec: expr=[sn@5 ASC NULLS LAST]
---------ProjectionExec: expr=[zip_code@4 as zip_code, country@5 as country, 
sn@6 as sn, ts@7 as ts, currency@8 as currency, sn@0 as sn, amount@3 as amount]
-----------CoalesceBatchesExec: target_batch_size=8192
-------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2, 
currency@4)], filter=ts@0 >= ts@1
---------------MemoryExec: partitions=1, partition_sizes=[1]
---------------MemoryExec: partitions=1, partition_sizes=[1]
+------ProjectionExec: expr=[zip_code@4 as zip_code, country@5 as country, sn@6 
as sn, ts@7 as ts, currency@8 as currency, sn@0 as sn, amount@3 as amount]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2, 
currency@4)], filter=ts@0 >= ts@1
+------------MemoryExec: partitions=1, partition_sizes=[1]
+------------MemoryExec: partitions=1, partition_sizes=[1]
 
 query ITIPTR rowsort
 SELECT s.zip_code, s.country, s.sn, s.ts, s.currency, LAST_VALUE(e.amount 
ORDER BY e.sn) AS last_rate
@@ -2759,8 +2755,7 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST]
 ----------RepartitionExec: partitioning=Hash([country@0], 8), 
input_partitions=8
 ------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
 --------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
-----------------SortExec: expr=[ts@1 ASC NULLS LAST]
-------------------MemoryExec: partitions=1, partition_sizes=[1]
+----------------MemoryExec: partitions=1, partition_sizes=[1]
 
 query TRR
 SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
@@ -2791,13 +2786,12 @@ physical_plan
 SortPreservingMergeExec: [country@0 ASC NULLS LAST]
 --SortExec: expr=[country@0 ASC NULLS LAST]
 ----ProjectionExec: expr=[country@0 as country, 
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as 
fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST]@2 as fv2]
-------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
+------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
 --------CoalesceBatchesExec: target_batch_size=8192
 ----------RepartitionExec: partitioning=Hash([country@0], 8), 
input_partitions=8
 ------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
---------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
-----------------SortExec: expr=[ts@1 ASC NULLS LAST]
-------------------MemoryExec: partitions=1, partition_sizes=[1]
+--------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+----------------MemoryExec: partitions=1, partition_sizes=[1]
 
 query TRR
 SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
@@ -2831,16 +2825,15 @@ ProjectionExec: expr=[FIRST_VALUE(sales_global.amount) 
ORDER BY [sales_global.ts
 --AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]
 ----CoalescePartitionsExec
 ------AggregateExec: mode=Partial, gby=[], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
---------SortExec: expr=[ts@0 ASC NULLS LAST]
-----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
-------------MemoryExec: partitions=1, partition_sizes=[1]
+--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+----------MemoryExec: partitions=1, partition_sizes=[1]
 
 query RR
 SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
   LAST_VALUE(amount ORDER BY ts ASC) AS fv2
   FROM sales_global
 ----
-30 80
+30 100
 
 # Conversion in between FIRST_VALUE and LAST_VALUE to resolve
 # contradictory requirements should work in multi partitions.
@@ -2855,12 +2848,11 @@ Projection: FIRST_VALUE(sales_global.amount) ORDER BY 
[sales_global.ts ASC NULLS
 ----TableScan: sales_global projection=[ts, amount]
 physical_plan
 ProjectionExec: expr=[FIRST_VALUE(sales_global.amount) ORDER BY 
[sales_global.ts ASC NULLS LAST]@0 as fv1, LAST_VALUE(sales_global.amount) 
ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv2]
---AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount), 
FIRST_VALUE(sales_global.amount)]
+--AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]
 ----CoalescePartitionsExec
-------AggregateExec: mode=Partial, gby=[], 
aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
---------SortExec: expr=[ts@0 ASC NULLS LAST]
-----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
-------------MemoryExec: partitions=1, partition_sizes=[1]
+------AggregateExec: mode=Partial, gby=[], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+----------MemoryExec: partitions=1, partition_sizes=[1]
 
 query RR
 SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
@@ -2993,10 +2985,10 @@ physical_plan
 SortPreservingMergeExec: [country@0 ASC NULLS LAST]
 --SortExec: expr=[country@0 ASC NULLS LAST]
 ----ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, 
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]@3 as fv2]
-------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]
+------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]
 --------CoalesceBatchesExec: target_batch_size=4
 ----------RepartitionExec: partitioning=Hash([country@0], 8), 
input_partitions=8
-------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]
+------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]
 --------------SortExec: expr=[amount@1 DESC]
 ----------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
 ------------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3639,10 +3631,10 @@ Projection: FIRST_VALUE(multiple_ordered_table.a) ORDER 
BY [multiple_ordered_tab
 ----TableScan: multiple_ordered_table projection=[a, c, d]
 physical_plan
 ProjectionExec: expr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY 
[multiple_ordered_table.a ASC NULLS LAST]@1 as first_a, 
LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC 
NULLS FIRST]@2 as last_c]
---AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], 
aggr=[FIRST_VALUE(multiple_ordered_table.a), 
FIRST_VALUE(multiple_ordered_table.c)]
+--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], 
aggr=[FIRST_VALUE(multiple_ordered_table.a), 
LAST_VALUE(multiple_ordered_table.c)]
 ----CoalesceBatchesExec: target_batch_size=2
 ------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8
---------AggregateExec: mode=Partial, gby=[d@2 as d], 
aggr=[FIRST_VALUE(multiple_ordered_table.a), 
FIRST_VALUE(multiple_ordered_table.c)]
+--------AggregateExec: mode=Partial, gby=[d@2 as d], 
aggr=[FIRST_VALUE(multiple_ordered_table.a), 
LAST_VALUE(multiple_ordered_table.c)]
 ----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
 ------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, 
d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], 
has_header=true
 
diff --git a/datafusion/sqllogictest/test_files/joins.slt 
b/datafusion/sqllogictest/test_files/joins.slt
index 9a349f6000..a7146a5a91 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -3454,7 +3454,7 @@ SortPreservingMergeExec: [a@0 ASC]
 ------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as 
c], aggr=[LAST_VALUE(r.b)]
 --------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 2), 
input_partitions=2
-------------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], 
aggr=[LAST_VALUE(r.b)], ordering_mode=PartiallySorted([0])
+------------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], 
aggr=[LAST_VALUE(r.b)]
 --------------CoalesceBatchesExec: target_batch_size=2
 ----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, 
a@0)]
 ------------------CoalesceBatchesExec: target_batch_size=2
@@ -3462,7 +3462,7 @@ SortPreservingMergeExec: [a@0 ASC]
 ----------------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 ------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], 
has_header=true
 ------------------CoalesceBatchesExec: target_batch_size=2
---------------------RepartitionExec: partitioning=Hash([a@0], 2), 
input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC,b@1 ASC NULLS LAST
+--------------------RepartitionExec: partitioning=Hash([a@0], 2), 
input_partitions=2
 ----------------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 ------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], 
output_ordering=[a@0 ASC, b@1 ASC NULLS LAST], has_header=true
 

Reply via email to