This is an automated email from the ASF dual-hosted git repository.

jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e9f2d5260 Move conversion of FIRST/LAST Aggregate function to 
independent physical optimizer rule (#10061)
4e9f2d5260 is described below

commit 4e9f2d5260ecaca261076d2e0a626d3f595ff5a5
Author: Jay Zhan <[email protected]>
AuthorDate: Mon Apr 15 21:13:07 2024 +0800

    Move conversion of FIRST/LAST Aggregate function to independent physical 
optimizer rule (#10061)
    
    * move out the ordering ruel
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * introduce rule
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * revert test result
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * pass mulit order test
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * cleanup
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * with new childes
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * revert slt
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * revert back
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * rm rewrite in new child
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * backup
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * only move conversion to optimizer
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * find test that do reverse
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * add test for first and last
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * pass all test
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * upd test
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * upd test
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * cleanup
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * cleanup
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * cleanup
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * add aggregate test
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * cleanup
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * final draft
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * cleanup again
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * pull out finer ordering code and reuse
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * clippy
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * remove finer in optimize rule
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * add comments and clenaup
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * rename fun
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * rename fun
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * fmt
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * avoid unnecessary recursion and rename
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * remove unnecessary rule
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * fix merge
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    ---------
    
    Signed-off-by: jayzhan211 <[email protected]>
---
 .../src/physical_optimizer/convert_first_last.rs   | 260 +++++++++++++++++++++
 datafusion/core/src/physical_optimizer/mod.rs      |   1 +
 .../core/src/physical_optimizer/optimizer.rs       |   3 +
 datafusion/core/tests/data/convert_first_last.csv  |  11 +
 datafusion/physical-plan/src/aggregates/mod.rs     | 117 ++++------
 datafusion/physical-plan/src/tree_node.rs          |   1 +
 datafusion/physical-plan/src/windows/mod.rs        |   2 +-
 datafusion/sqllogictest/test_files/aggregate.slt   |  42 ++++
 datafusion/sqllogictest/test_files/explain.slt     |   3 +
 datafusion/sqllogictest/test_files/group_by.slt    |   4 +-
 10 files changed, 372 insertions(+), 72 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/convert_first_last.rs 
b/datafusion/core/src/physical_optimizer/convert_first_last.rs
new file mode 100644
index 0000000000..4102313d31
--- /dev/null
+++ b/datafusion/core/src/physical_optimizer/convert_first_last.rs
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::Result;
+use datafusion_common::{
+    config::ConfigOptions,
+    tree_node::{Transformed, TransformedResult, TreeNode},
+};
+use datafusion_physical_expr::expressions::{FirstValue, LastValue};
+use datafusion_physical_expr::{
+    equivalence::ProjectionMapping, reverse_order_bys, AggregateExpr,
+    EquivalenceProperties, PhysicalSortRequirement,
+};
+use datafusion_physical_plan::aggregates::concat_slices;
+use datafusion_physical_plan::{
+    aggregates::{AggregateExec, AggregateMode},
+    ExecutionPlan, ExecutionPlanProperties, InputOrderMode,
+};
+use std::sync::Arc;
+
+use datafusion_physical_plan::windows::get_ordered_partition_by_indices;
+
+use super::PhysicalOptimizerRule;
+
+/// The optimizer rule check the ordering requirements of the aggregate 
expressions.
+/// And convert between FIRST_VALUE and LAST_VALUE if possible.
+/// For example, If we have an ascending values and we want LastValue from the 
descending requirement,
+/// it is equivalent to FirstValue with the current ascending ordering.
+///
+/// The concrete example is that, says we have values c1 with [1, 2, 3], which 
is an ascending order.
+/// If we want LastValue(c1 order by desc), which is the first value of 
reversed c1 [3, 2, 1],
+/// so we can convert the aggregate expression to FirstValue(c1 order by asc),
+/// since the current ordering is already satisfied, it saves our time!
+#[derive(Default)]
+pub struct OptimizeAggregateOrder {}
+
+impl OptimizeAggregateOrder {
+    pub fn new() -> Self {
+        Self::default()
+    }
+}
+
+impl PhysicalOptimizerRule for OptimizeAggregateOrder {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        plan.transform_up(&get_common_requirement_of_aggregate_input)
+            .data()
+    }
+
+    fn name(&self) -> &str {
+        "OptimizeAggregateOrder"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+fn get_common_requirement_of_aggregate_input(
+    plan: Arc<dyn ExecutionPlan>,
+) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
+    if let Some(aggr_exec) = plan.as_any().downcast_ref::<AggregateExec>() {
+        let input = aggr_exec.input();
+        let mut aggr_expr = try_get_updated_aggr_expr_from_child(aggr_exec);
+        let group_by = aggr_exec.group_by();
+        let mode = aggr_exec.mode();
+
+        let input_eq_properties = input.equivalence_properties();
+        let groupby_exprs = group_by.input_exprs();
+        // If existing ordering satisfies a prefix of the GROUP BY expressions,
+        // prefix requirements with this section. In this case, aggregation 
will
+        // work more efficiently.
+        let indices = get_ordered_partition_by_indices(&groupby_exprs, input);
+        let requirement = indices
+            .iter()
+            .map(|&idx| PhysicalSortRequirement {
+                expr: groupby_exprs[idx].clone(),
+                options: None,
+            })
+            .collect::<Vec<_>>();
+
+        try_convert_first_last_if_better(
+            &requirement,
+            &mut aggr_expr,
+            input_eq_properties,
+        )?;
+
+        let required_input_ordering = 
(!requirement.is_empty()).then_some(requirement);
+
+        let input_order_mode =
+            if indices.len() == groupby_exprs.len() && !indices.is_empty() {
+                InputOrderMode::Sorted
+            } else if !indices.is_empty() {
+                InputOrderMode::PartiallySorted(indices)
+            } else {
+                InputOrderMode::Linear
+            };
+        let projection_mapping =
+            ProjectionMapping::try_new(group_by.expr(), &input.schema())?;
+
+        let cache = AggregateExec::compute_properties(
+            input,
+            plan.schema().clone(),
+            &projection_mapping,
+            mode,
+            &input_order_mode,
+        );
+
+        let aggr_exec = aggr_exec.new_with_aggr_expr_and_ordering_info(
+            required_input_ordering,
+            aggr_expr,
+            cache,
+            input_order_mode,
+        );
+
+        Ok(Transformed::yes(
+            Arc::new(aggr_exec) as Arc<dyn ExecutionPlan>
+        ))
+    } else {
+        Ok(Transformed::no(plan))
+    }
+}
+
+/// In `create_initial_plan` for LogicalPlan::Aggregate, we have a nested 
AggregateExec where the first layer
+/// is in Partial mode and the second layer is in Final or Finalpartitioned 
mode.
+/// If the first layer of aggregate plan is transformed, we need to update the 
child of the layer with final mode.
+/// Therefore, we check it and get the updated aggregate expressions.
+///
+/// If AggregateExec is created from elsewhere, we skip the check and return 
the original aggregate expressions.
+fn try_get_updated_aggr_expr_from_child(
+    aggr_exec: &AggregateExec,
+) -> Vec<Arc<dyn AggregateExpr>> {
+    let input = aggr_exec.input();
+    if aggr_exec.mode() == &AggregateMode::Final
+        || aggr_exec.mode() == &AggregateMode::FinalPartitioned
+    {
+        // Some aggregators may be modified during initialization for
+        // optimization purposes. For example, a FIRST_VALUE may turn
+        // into a LAST_VALUE with the reverse ordering requirement.
+        // To reflect such changes to subsequent stages, use the updated
+        // `AggregateExpr`/`PhysicalSortExpr` objects.
+        //
+        // The bottom up transformation is the mirror of 
LogicalPlan::Aggregate creation in [create_initial_plan]
+        if let Some(c_aggr_exec) = 
input.as_any().downcast_ref::<AggregateExec>() {
+            if c_aggr_exec.mode() == &AggregateMode::Partial {
+                // If the input is an AggregateExec in Partial mode, then the
+                // input is a CoalescePartitionsExec. In this case, the
+                // AggregateExec is the second stage of aggregation. The
+                // requirements of the second stage are the requirements of
+                // the first stage.
+                return c_aggr_exec.aggr_expr().to_vec();
+            }
+        }
+    }
+
+    aggr_exec.aggr_expr().to_vec()
+}
+
+/// Get the common requirement that satisfies all the aggregate expressions.
+///
+/// # Parameters
+///
+/// - `aggr_exprs`: A slice of `Arc<dyn AggregateExpr>` containing all the
+///   aggregate expressions.
+/// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the
+///   physical GROUP BY expression.
+/// - `eq_properties`: A reference to an `EquivalenceProperties` instance
+///   representing equivalence properties for ordering.
+/// - `agg_mode`: A reference to an `AggregateMode` instance representing the
+///   mode of aggregation.
+///
+/// # Returns
+///
+/// A `LexRequirement` instance, which is the requirement that satisfies all 
the
+/// aggregate requirements. Returns an error in case of conflicting 
requirements.
+///
+/// Similar to the one in datafusion/physical-plan/src/aggregates/mod.rs, but 
this
+/// function care only the possible conversion between FIRST_VALUE and 
LAST_VALUE
+fn try_convert_first_last_if_better(
+    prefix_requirement: &[PhysicalSortRequirement],
+    aggr_exprs: &mut [Arc<dyn AggregateExpr>],
+    eq_properties: &EquivalenceProperties,
+) -> Result<()> {
+    for aggr_expr in aggr_exprs.iter_mut() {
+        let aggr_req = aggr_expr.order_bys().unwrap_or(&[]);
+        let reverse_aggr_req = reverse_order_bys(aggr_req);
+        let aggr_req = PhysicalSortRequirement::from_sort_exprs(aggr_req);
+        let reverse_aggr_req =
+            PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_req);
+
+        if let Some(first_value) = 
aggr_expr.as_any().downcast_ref::<FirstValue>() {
+            let mut first_value = first_value.clone();
+
+            if eq_properties.ordering_satisfy_requirement(&concat_slices(
+                prefix_requirement,
+                &aggr_req,
+            )) {
+                first_value = first_value.with_requirement_satisfied(true);
+                *aggr_expr = Arc::new(first_value) as _;
+            } else if 
eq_properties.ordering_satisfy_requirement(&concat_slices(
+                prefix_requirement,
+                &reverse_aggr_req,
+            )) {
+                // Converting to LAST_VALUE enables more efficient execution
+                // given the existing ordering:
+                let mut last_value = first_value.convert_to_last();
+                last_value = last_value.with_requirement_satisfied(true);
+                *aggr_expr = Arc::new(last_value) as _;
+            } else {
+                // Requirement is not satisfied with existing ordering.
+                first_value = first_value.with_requirement_satisfied(false);
+                *aggr_expr = Arc::new(first_value) as _;
+            }
+            continue;
+        }
+        if let Some(last_value) = 
aggr_expr.as_any().downcast_ref::<LastValue>() {
+            let mut last_value = last_value.clone();
+            if eq_properties.ordering_satisfy_requirement(&concat_slices(
+                prefix_requirement,
+                &aggr_req,
+            )) {
+                last_value = last_value.with_requirement_satisfied(true);
+                *aggr_expr = Arc::new(last_value) as _;
+            } else if 
eq_properties.ordering_satisfy_requirement(&concat_slices(
+                prefix_requirement,
+                &reverse_aggr_req,
+            )) {
+                // Converting to FIRST_VALUE enables more efficient execution
+                // given the existing ordering:
+                let mut first_value = last_value.convert_to_first();
+                first_value = first_value.with_requirement_satisfied(true);
+                *aggr_expr = Arc::new(first_value) as _;
+            } else {
+                // Requirement is not satisfied with existing ordering.
+                last_value = last_value.with_requirement_satisfied(false);
+                *aggr_expr = Arc::new(last_value) as _;
+            }
+            continue;
+        }
+    }
+
+    Ok(())
+}
diff --git a/datafusion/core/src/physical_optimizer/mod.rs 
b/datafusion/core/src/physical_optimizer/mod.rs
index e990fead61..c80668c6da 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -24,6 +24,7 @@
 pub mod aggregate_statistics;
 pub mod coalesce_batches;
 pub mod combine_partial_final_agg;
+mod convert_first_last;
 pub mod enforce_distribution;
 pub mod enforce_sorting;
 pub mod join_selection;
diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs 
b/datafusion/core/src/physical_optimizer/optimizer.rs
index 48da68cb2e..08cbf68fa6 100644
--- a/datafusion/core/src/physical_optimizer/optimizer.rs
+++ b/datafusion/core/src/physical_optimizer/optimizer.rs
@@ -19,6 +19,7 @@
 
 use std::sync::Arc;
 
+use super::convert_first_last::OptimizeAggregateOrder;
 use super::projection_pushdown::ProjectionPushdown;
 use crate::config::ConfigOptions;
 use crate::physical_optimizer::aggregate_statistics::AggregateStatistics;
@@ -101,6 +102,8 @@ impl PhysicalOptimizer {
             // Note that one should always run this rule after running the 
EnforceDistribution rule
             // as the latter may break local sorting requirements.
             Arc::new(EnforceSorting::new()),
+            // Run once after the local sorting requirement is changed
+            Arc::new(OptimizeAggregateOrder::new()),
             // TODO: `try_embed_to_hash_join` in the ProjectionPushdown rule 
would be block by the CoalesceBatches, so add it before CoalesceBatches. Maybe 
optimize it in the future.
             Arc::new(ProjectionPushdown::new()),
             // The CoalesceBatches rule will not influence the distribution 
and ordering of the
diff --git a/datafusion/core/tests/data/convert_first_last.csv 
b/datafusion/core/tests/data/convert_first_last.csv
new file mode 100644
index 0000000000..059b631e57
--- /dev/null
+++ b/datafusion/core/tests/data/convert_first_last.csv
@@ -0,0 +1,11 @@
+c1,c2,c3
+1,9,0
+2,8,1
+3,7,2
+4,6,3
+5,5,4
+6,4,5
+7,3,6
+8,2,7
+9,1,8
+10,0,9
\ No newline at end of file
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index 98c44e23c6..ba9a6b1be0 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -39,12 +39,15 @@ use datafusion_common::stats::Precision;
 use datafusion_common::{internal_err, not_impl_err, Result};
 use datafusion_execution::TaskContext;
 use datafusion_expr::Accumulator;
+use datafusion_physical_expr::aggregate::is_order_sensitive;
+use datafusion_physical_expr::equivalence::collapse_lex_req;
 use datafusion_physical_expr::{
-    aggregate::is_order_sensitive,
-    equivalence::{collapse_lex_req, ProjectionMapping},
-    expressions::{Column, FirstValue, LastValue, Max, Min, UnKnownColumn},
-    physical_exprs_contains, reverse_order_bys, AggregateExpr, 
EquivalenceProperties,
-    LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortRequirement,
+    equivalence::ProjectionMapping,
+    expressions::{Column, Max, Min, UnKnownColumn},
+    AggregateExpr, LexRequirement, PhysicalExpr,
+};
+use datafusion_physical_expr::{
+    physical_exprs_contains, EquivalenceProperties, LexOrdering, 
PhysicalSortRequirement,
 };
 
 use itertools::Itertools;
@@ -269,6 +272,36 @@ pub struct AggregateExec {
 }
 
 impl AggregateExec {
+    /// Function used in `ConvertFirstLast` optimizer rule,
+    /// where we need parts of the new value, others cloned from the old one
+    pub fn new_with_aggr_expr_and_ordering_info(
+        &self,
+        required_input_ordering: Option<LexRequirement>,
+        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+        cache: PlanProperties,
+        input_order_mode: InputOrderMode,
+    ) -> Self {
+        Self {
+            aggr_expr,
+            required_input_ordering,
+            metrics: ExecutionPlanMetricsSet::new(),
+            input_order_mode,
+            cache,
+            // clone the rest of the fields
+            mode: self.mode,
+            group_by: self.group_by.clone(),
+            filter_expr: self.filter_expr.clone(),
+            limit: self.limit,
+            input: self.input.clone(),
+            schema: self.schema.clone(),
+            input_schema: self.input_schema.clone(),
+        }
+    }
+
+    pub fn cache(&self) -> &PlanProperties {
+        &self.cache
+    }
+
     /// Create a new hash aggregate execution plan
     pub fn try_new(
         mode: AggregateMode,
@@ -336,8 +369,7 @@ impl AggregateExec {
             })
             .collect::<Vec<_>>();
 
-        let req = get_aggregate_exprs_requirement(
-            &new_requirement,
+        let req = get_finer_aggregate_exprs_requirement(
             &mut aggr_expr,
             &group_by,
             input_eq_properties,
@@ -369,6 +401,7 @@ impl AggregateExec {
             &mode,
             &input_order_mode,
         );
+
         Ok(AggregateExec {
             mode,
             group_by,
@@ -507,7 +540,7 @@ impl AggregateExec {
     }
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
-    fn compute_properties(
+    pub fn compute_properties(
         input: &Arc<dyn ExecutionPlan>,
         schema: SchemaRef,
         projection_mapping: &ProjectionMapping,
@@ -683,9 +716,9 @@ impl ExecutionPlan for AggregateExec {
             children[0].clone(),
             self.input_schema.clone(),
             self.schema.clone(),
-            //self.original_schema.clone(),
         )?;
         me.limit = self.limit;
+
         Ok(Arc::new(me))
     }
 
@@ -870,7 +903,7 @@ fn finer_ordering(
 }
 
 /// Concatenates the given slices.
-fn concat_slices<T: Clone>(lhs: &[T], rhs: &[T]) -> Vec<T> {
+pub fn concat_slices<T: Clone>(lhs: &[T], rhs: &[T]) -> Vec<T> {
     [lhs, rhs].concat()
 }
 
@@ -891,8 +924,7 @@ fn concat_slices<T: Clone>(lhs: &[T], rhs: &[T]) -> Vec<T> {
 ///
 /// A `LexRequirement` instance, which is the requirement that satisfies all 
the
 /// aggregate requirements. Returns an error in case of conflicting 
requirements.
-fn get_aggregate_exprs_requirement(
-    prefix_requirement: &[PhysicalSortRequirement],
+fn get_finer_aggregate_exprs_requirement(
     aggr_exprs: &mut [Arc<dyn AggregateExpr>],
     group_by: &PhysicalGroupBy,
     eq_properties: &EquivalenceProperties,
@@ -900,60 +932,6 @@ fn get_aggregate_exprs_requirement(
 ) -> Result<LexRequirement> {
     let mut requirement = vec![];
     for aggr_expr in aggr_exprs.iter_mut() {
-        let aggr_req = aggr_expr.order_bys().unwrap_or(&[]);
-        let reverse_aggr_req = reverse_order_bys(aggr_req);
-        let aggr_req = PhysicalSortRequirement::from_sort_exprs(aggr_req);
-        let reverse_aggr_req =
-            PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_req);
-
-        if let Some(first_value) = 
aggr_expr.as_any().downcast_ref::<FirstValue>() {
-            let mut first_value = first_value.clone();
-            if eq_properties.ordering_satisfy_requirement(&concat_slices(
-                prefix_requirement,
-                &aggr_req,
-            )) {
-                first_value = first_value.with_requirement_satisfied(true);
-                *aggr_expr = Arc::new(first_value) as _;
-            } else if 
eq_properties.ordering_satisfy_requirement(&concat_slices(
-                prefix_requirement,
-                &reverse_aggr_req,
-            )) {
-                // Converting to LAST_VALUE enables more efficient execution
-                // given the existing ordering:
-                let mut last_value = first_value.convert_to_last();
-                last_value = last_value.with_requirement_satisfied(true);
-                *aggr_expr = Arc::new(last_value) as _;
-            } else {
-                // Requirement is not satisfied with existing ordering.
-                first_value = first_value.with_requirement_satisfied(false);
-                *aggr_expr = Arc::new(first_value) as _;
-            }
-            continue;
-        }
-        if let Some(last_value) = 
aggr_expr.as_any().downcast_ref::<LastValue>() {
-            let mut last_value = last_value.clone();
-            if eq_properties.ordering_satisfy_requirement(&concat_slices(
-                prefix_requirement,
-                &aggr_req,
-            )) {
-                last_value = last_value.with_requirement_satisfied(true);
-                *aggr_expr = Arc::new(last_value) as _;
-            } else if 
eq_properties.ordering_satisfy_requirement(&concat_slices(
-                prefix_requirement,
-                &reverse_aggr_req,
-            )) {
-                // Converting to FIRST_VALUE enables more efficient execution
-                // given the existing ordering:
-                let mut first_value = last_value.convert_to_first();
-                first_value = first_value.with_requirement_satisfied(true);
-                *aggr_expr = Arc::new(first_value) as _;
-            } else {
-                // Requirement is not satisfied with existing ordering.
-                last_value = last_value.with_requirement_satisfied(false);
-                *aggr_expr = Arc::new(last_value) as _;
-            }
-            continue;
-        }
         if let Some(finer_ordering) =
             finer_ordering(&requirement, aggr_expr, group_by, eq_properties, 
agg_mode)
         {
@@ -1003,6 +981,7 @@ fn get_aggregate_exprs_requirement(
                 continue;
             }
         }
+
         // Neither the existing requirement and current aggregate requirement 
satisfy the other, this means
         // requirements are conflicting. Currently, we do not support
         // conflicting requirements.
@@ -1010,6 +989,7 @@ fn get_aggregate_exprs_requirement(
             "Conflicting ordering requirements in aggregate functions is not 
supported"
         );
     }
+
     Ok(PhysicalSortRequirement::from_sort_exprs(&requirement))
 }
 
@@ -1235,7 +1215,7 @@ mod tests {
     use datafusion_execution::memory_pool::FairSpillPool;
     use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
     use datafusion_physical_expr::expressions::{
-        lit, ApproxDistinct, Count, LastValue, Median, OrderSensitiveArrayAgg,
+        lit, ApproxDistinct, Count, FirstValue, LastValue, Median, 
OrderSensitiveArrayAgg,
     };
     use datafusion_physical_expr::{
         reverse_order_bys, AggregateExpr, EquivalenceProperties, PhysicalExpr,
@@ -2171,8 +2151,7 @@ mod tests {
             })
             .collect::<Vec<_>>();
         let group_by = PhysicalGroupBy::new_single(vec![]);
-        let res = get_aggregate_exprs_requirement(
-            &[],
+        let res = get_finer_aggregate_exprs_requirement(
             &mut aggr_exprs,
             &group_by,
             &eq_properties,
diff --git a/datafusion/physical-plan/src/tree_node.rs 
b/datafusion/physical-plan/src/tree_node.rs
index 52a52f81bd..46460cbb66 100644
--- a/datafusion/physical-plan/src/tree_node.rs
+++ b/datafusion/physical-plan/src/tree_node.rs
@@ -64,6 +64,7 @@ impl<T> PlanContext<T> {
     pub fn update_plan_from_children(mut self) -> Result<Self> {
         let children_plans = self.children.iter().map(|c| 
c.plan.clone()).collect();
         self.plan = with_new_children_if_necessary(self.plan, children_plans)?;
+
         Ok(self)
     }
 }
diff --git a/datafusion/physical-plan/src/windows/mod.rs 
b/datafusion/physical-plan/src/windows/mod.rs
index c5c845614c..e01ee06a12 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -373,7 +373,7 @@ pub(crate) fn calc_requirements<
 /// For instance, if input is ordered by a, b, c and PARTITION BY b, a is used,
 /// this vector will be [1, 0]. It means that when we iterate b, a columns 
with the order [1, 0]
 /// resulting vector (a, b) is a preset of the existing ordering (a, b, c).
-pub(crate) fn get_ordered_partition_by_indices(
+pub fn get_ordered_partition_by_indices(
     partition_by_exprs: &[Arc<dyn PhysicalExpr>],
     input: &Arc<dyn ExecutionPlan>,
 ) -> Vec<usize> {
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt 
b/datafusion/sqllogictest/test_files/aggregate.slt
index 30d5c7243f..3d24fe3888 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -3455,3 +3455,45 @@ SELECT LAST_VALUE(column1 ORDER BY column2 DESC) IGNORE 
NULLS FROM t;
 
 statement ok
 DROP TABLE t;
+
+# Test Convert FirstLast optimizer rule
+statement ok
+CREATE EXTERNAL TABLE convert_first_last_table (
+c1 INT NOT NULL,
+c2 INT NOT NULL,
+c3 INT NOT NULL
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (c1 ASC)
+WITH ORDER (c2 DESC)
+WITH ORDER (c3 ASC)
+LOCATION '../core/tests/data/convert_first_last.csv';
+
+# test first to last, the result does not show difference, we need to check 
the conversion by `explain`
+query TT
+explain select first_value(c1 order by c3 desc) from convert_first_last_table;
+----
+logical_plan
+01)Aggregate: groupBy=[[]], aggr=[[FIRST_VALUE(convert_first_last_table.c1) 
ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]]
+02)--TableScan: convert_first_last_table projection=[c1, c3]
+physical_plan
+01)AggregateExec: mode=Final, gby=[], 
aggr=[FIRST_VALUE(convert_first_last_table.c1)]
+02)--CoalescePartitionsExec
+03)----AggregateExec: mode=Partial, gby=[], 
aggr=[LAST_VALUE(convert_first_last_table.c1)]
+04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+05)--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, 
projection=[c1, c3], output_orderings=[[c1@0 ASC NULLS LAST], [c3@1 ASC NULLS 
LAST]], has_header=true
+
+# test last to first
+query TT
+explain select last_value(c1 order by c2 asc) from convert_first_last_table;
+----
+logical_plan
+01)Aggregate: groupBy=[[]], aggr=[[LAST_VALUE(convert_first_last_table.c1) 
ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]]
+02)--TableScan: convert_first_last_table projection=[c1, c2]
+physical_plan
+01)AggregateExec: mode=Final, gby=[], 
aggr=[LAST_VALUE(convert_first_last_table.c1)]
+02)--CoalescePartitionsExec
+03)----AggregateExec: mode=Partial, gby=[], 
aggr=[FIRST_VALUE(convert_first_last_table.c1)]
+04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+05)--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, 
projection=[c1, c2], output_orderings=[[c1@0 ASC NULLS LAST], [c2@1 DESC]], 
has_header=true
diff --git a/datafusion/sqllogictest/test_files/explain.slt 
b/datafusion/sqllogictest/test_files/explain.slt
index c357391e70..d8c8dcd41b 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -248,6 +248,7 @@ physical_plan after LimitedDistinctAggregation SAME TEXT AS 
ABOVE
 physical_plan after EnforceDistribution SAME TEXT AS ABOVE
 physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
 physical_plan after EnforceSorting SAME TEXT AS ABOVE
+physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE
 physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after coalesce_batches SAME TEXT AS ABOVE
 physical_plan after OutputRequirements CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, 
c], has_header=true
@@ -304,6 +305,7 @@ physical_plan after LimitedDistinctAggregation SAME TEXT AS 
ABOVE
 physical_plan after EnforceDistribution SAME TEXT AS ABOVE
 physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
 physical_plan after EnforceSorting SAME TEXT AS ABOVE
+physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE
 physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after coalesce_batches SAME TEXT AS ABOVE
 physical_plan after OutputRequirements
@@ -340,6 +342,7 @@ physical_plan after LimitedDistinctAggregation SAME TEXT AS 
ABOVE
 physical_plan after EnforceDistribution SAME TEXT AS ABOVE
 physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
 physical_plan after EnforceSorting SAME TEXT AS ABOVE
+physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE
 physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after coalesce_batches SAME TEXT AS ABOVE
 physical_plan after OutputRequirements
diff --git a/datafusion/sqllogictest/test_files/group_by.slt 
b/datafusion/sqllogictest/test_files/group_by.slt
index 1acdcde9c8..5c5bf58dd0 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -2805,7 +2805,7 @@ logical_plan
 04)------TableScan: sales_global projection=[country, ts, amount]
 physical_plan
 01)ProjectionExec: expr=[country@0 as country, 
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 
as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS 
FIRST]@3 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[LAST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), 
SUM(sales_global.amount)]
+02)--AggregateExec: mode=Single, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), 
SUM(sales_global.amount)]
 03)----MemoryExec: partitions=1, partition_sizes=[1]
 
 query TRRR rowsort
@@ -3800,7 +3800,7 @@ logical_plan
 03)----TableScan: multiple_ordered_table projection=[a, c, d]
 physical_plan
 01)ProjectionExec: expr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY 
[multiple_ordered_table.a ASC NULLS LAST]@1 as first_a, 
LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC 
NULLS FIRST]@2 as last_c]
-02)--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], 
aggr=[FIRST_VALUE(multiple_ordered_table.a), 
FIRST_VALUE(multiple_ordered_table.c)]
+02)--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], 
aggr=[FIRST_VALUE(multiple_ordered_table.a), 
LAST_VALUE(multiple_ordered_table.c)]
 03)----CoalesceBatchesExec: target_batch_size=2
 04)------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8
 05)--------AggregateExec: mode=Partial, gby=[d@2 as d], 
aggr=[FIRST_VALUE(multiple_ordered_table.a), 
FIRST_VALUE(multiple_ordered_table.c)]

Reply via email to