This is an automated email from the ASF dual-hosted git repository.
jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 4e9f2d5260 Move conversion of FIRST/LAST Aggregate function to
independent physical optimizer rule (#10061)
4e9f2d5260 is described below
commit 4e9f2d5260ecaca261076d2e0a626d3f595ff5a5
Author: Jay Zhan <[email protected]>
AuthorDate: Mon Apr 15 21:13:07 2024 +0800
Move conversion of FIRST/LAST Aggregate function to independent physical
optimizer rule (#10061)
* move out the ordering ruel
Signed-off-by: jayzhan211 <[email protected]>
* introduce rule
Signed-off-by: jayzhan211 <[email protected]>
* revert test result
Signed-off-by: jayzhan211 <[email protected]>
* pass mulit order test
Signed-off-by: jayzhan211 <[email protected]>
* cleanup
Signed-off-by: jayzhan211 <[email protected]>
* with new childes
Signed-off-by: jayzhan211 <[email protected]>
* revert slt
Signed-off-by: jayzhan211 <[email protected]>
* revert back
Signed-off-by: jayzhan211 <[email protected]>
* rm rewrite in new child
Signed-off-by: jayzhan211 <[email protected]>
* backup
Signed-off-by: jayzhan211 <[email protected]>
* only move conversion to optimizer
Signed-off-by: jayzhan211 <[email protected]>
* find test that do reverse
Signed-off-by: jayzhan211 <[email protected]>
* add test for first and last
Signed-off-by: jayzhan211 <[email protected]>
* pass all test
Signed-off-by: jayzhan211 <[email protected]>
* upd test
Signed-off-by: jayzhan211 <[email protected]>
* upd test
Signed-off-by: jayzhan211 <[email protected]>
* cleanup
Signed-off-by: jayzhan211 <[email protected]>
* cleanup
Signed-off-by: jayzhan211 <[email protected]>
* cleanup
Signed-off-by: jayzhan211 <[email protected]>
* add aggregate test
Signed-off-by: jayzhan211 <[email protected]>
* cleanup
Signed-off-by: jayzhan211 <[email protected]>
* final draft
Signed-off-by: jayzhan211 <[email protected]>
* cleanup again
Signed-off-by: jayzhan211 <[email protected]>
* pull out finer ordering code and reuse
Signed-off-by: jayzhan211 <[email protected]>
* clippy
Signed-off-by: jayzhan211 <[email protected]>
* remove finer in optimize rule
Signed-off-by: jayzhan211 <[email protected]>
* add comments and clenaup
Signed-off-by: jayzhan211 <[email protected]>
* rename fun
Signed-off-by: jayzhan211 <[email protected]>
* rename fun
Signed-off-by: jayzhan211 <[email protected]>
* fmt
Signed-off-by: jayzhan211 <[email protected]>
* avoid unnecessary recursion and rename
Signed-off-by: jayzhan211 <[email protected]>
* remove unnecessary rule
Signed-off-by: jayzhan211 <[email protected]>
* fix merge
Signed-off-by: jayzhan211 <[email protected]>
---------
Signed-off-by: jayzhan211 <[email protected]>
---
.../src/physical_optimizer/convert_first_last.rs | 260 +++++++++++++++++++++
datafusion/core/src/physical_optimizer/mod.rs | 1 +
.../core/src/physical_optimizer/optimizer.rs | 3 +
datafusion/core/tests/data/convert_first_last.csv | 11 +
datafusion/physical-plan/src/aggregates/mod.rs | 117 ++++------
datafusion/physical-plan/src/tree_node.rs | 1 +
datafusion/physical-plan/src/windows/mod.rs | 2 +-
datafusion/sqllogictest/test_files/aggregate.slt | 42 ++++
datafusion/sqllogictest/test_files/explain.slt | 3 +
datafusion/sqllogictest/test_files/group_by.slt | 4 +-
10 files changed, 372 insertions(+), 72 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/convert_first_last.rs
b/datafusion/core/src/physical_optimizer/convert_first_last.rs
new file mode 100644
index 0000000000..4102313d31
--- /dev/null
+++ b/datafusion/core/src/physical_optimizer/convert_first_last.rs
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::Result;
+use datafusion_common::{
+ config::ConfigOptions,
+ tree_node::{Transformed, TransformedResult, TreeNode},
+};
+use datafusion_physical_expr::expressions::{FirstValue, LastValue};
+use datafusion_physical_expr::{
+ equivalence::ProjectionMapping, reverse_order_bys, AggregateExpr,
+ EquivalenceProperties, PhysicalSortRequirement,
+};
+use datafusion_physical_plan::aggregates::concat_slices;
+use datafusion_physical_plan::{
+ aggregates::{AggregateExec, AggregateMode},
+ ExecutionPlan, ExecutionPlanProperties, InputOrderMode,
+};
+use std::sync::Arc;
+
+use datafusion_physical_plan::windows::get_ordered_partition_by_indices;
+
+use super::PhysicalOptimizerRule;
+
+/// The optimizer rule check the ordering requirements of the aggregate
expressions.
+/// And convert between FIRST_VALUE and LAST_VALUE if possible.
+/// For example, If we have an ascending values and we want LastValue from the
descending requirement,
+/// it is equivalent to FirstValue with the current ascending ordering.
+///
+/// The concrete example is that, says we have values c1 with [1, 2, 3], which
is an ascending order.
+/// If we want LastValue(c1 order by desc), which is the first value of
reversed c1 [3, 2, 1],
+/// so we can convert the aggregate expression to FirstValue(c1 order by asc),
+/// since the current ordering is already satisfied, it saves our time!
+#[derive(Default)]
+pub struct OptimizeAggregateOrder {}
+
+impl OptimizeAggregateOrder {
+ pub fn new() -> Self {
+ Self::default()
+ }
+}
+
+impl PhysicalOptimizerRule for OptimizeAggregateOrder {
+ fn optimize(
+ &self,
+ plan: Arc<dyn ExecutionPlan>,
+ _config: &ConfigOptions,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ plan.transform_up(&get_common_requirement_of_aggregate_input)
+ .data()
+ }
+
+ fn name(&self) -> &str {
+ "OptimizeAggregateOrder"
+ }
+
+ fn schema_check(&self) -> bool {
+ true
+ }
+}
+
+fn get_common_requirement_of_aggregate_input(
+ plan: Arc<dyn ExecutionPlan>,
+) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
+ if let Some(aggr_exec) = plan.as_any().downcast_ref::<AggregateExec>() {
+ let input = aggr_exec.input();
+ let mut aggr_expr = try_get_updated_aggr_expr_from_child(aggr_exec);
+ let group_by = aggr_exec.group_by();
+ let mode = aggr_exec.mode();
+
+ let input_eq_properties = input.equivalence_properties();
+ let groupby_exprs = group_by.input_exprs();
+ // If existing ordering satisfies a prefix of the GROUP BY expressions,
+ // prefix requirements with this section. In this case, aggregation
will
+ // work more efficiently.
+ let indices = get_ordered_partition_by_indices(&groupby_exprs, input);
+ let requirement = indices
+ .iter()
+ .map(|&idx| PhysicalSortRequirement {
+ expr: groupby_exprs[idx].clone(),
+ options: None,
+ })
+ .collect::<Vec<_>>();
+
+ try_convert_first_last_if_better(
+ &requirement,
+ &mut aggr_expr,
+ input_eq_properties,
+ )?;
+
+ let required_input_ordering =
(!requirement.is_empty()).then_some(requirement);
+
+ let input_order_mode =
+ if indices.len() == groupby_exprs.len() && !indices.is_empty() {
+ InputOrderMode::Sorted
+ } else if !indices.is_empty() {
+ InputOrderMode::PartiallySorted(indices)
+ } else {
+ InputOrderMode::Linear
+ };
+ let projection_mapping =
+ ProjectionMapping::try_new(group_by.expr(), &input.schema())?;
+
+ let cache = AggregateExec::compute_properties(
+ input,
+ plan.schema().clone(),
+ &projection_mapping,
+ mode,
+ &input_order_mode,
+ );
+
+ let aggr_exec = aggr_exec.new_with_aggr_expr_and_ordering_info(
+ required_input_ordering,
+ aggr_expr,
+ cache,
+ input_order_mode,
+ );
+
+ Ok(Transformed::yes(
+ Arc::new(aggr_exec) as Arc<dyn ExecutionPlan>
+ ))
+ } else {
+ Ok(Transformed::no(plan))
+ }
+}
+
+/// In `create_initial_plan` for LogicalPlan::Aggregate, we have a nested
AggregateExec where the first layer
+/// is in Partial mode and the second layer is in Final or Finalpartitioned
mode.
+/// If the first layer of aggregate plan is transformed, we need to update the
child of the layer with final mode.
+/// Therefore, we check it and get the updated aggregate expressions.
+///
+/// If AggregateExec is created from elsewhere, we skip the check and return
the original aggregate expressions.
+fn try_get_updated_aggr_expr_from_child(
+ aggr_exec: &AggregateExec,
+) -> Vec<Arc<dyn AggregateExpr>> {
+ let input = aggr_exec.input();
+ if aggr_exec.mode() == &AggregateMode::Final
+ || aggr_exec.mode() == &AggregateMode::FinalPartitioned
+ {
+ // Some aggregators may be modified during initialization for
+ // optimization purposes. For example, a FIRST_VALUE may turn
+ // into a LAST_VALUE with the reverse ordering requirement.
+ // To reflect such changes to subsequent stages, use the updated
+ // `AggregateExpr`/`PhysicalSortExpr` objects.
+ //
+ // The bottom up transformation is the mirror of
LogicalPlan::Aggregate creation in [create_initial_plan]
+ if let Some(c_aggr_exec) =
input.as_any().downcast_ref::<AggregateExec>() {
+ if c_aggr_exec.mode() == &AggregateMode::Partial {
+ // If the input is an AggregateExec in Partial mode, then the
+ // input is a CoalescePartitionsExec. In this case, the
+ // AggregateExec is the second stage of aggregation. The
+ // requirements of the second stage are the requirements of
+ // the first stage.
+ return c_aggr_exec.aggr_expr().to_vec();
+ }
+ }
+ }
+
+ aggr_exec.aggr_expr().to_vec()
+}
+
+/// Get the common requirement that satisfies all the aggregate expressions.
+///
+/// # Parameters
+///
+/// - `aggr_exprs`: A slice of `Arc<dyn AggregateExpr>` containing all the
+/// aggregate expressions.
+/// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the
+/// physical GROUP BY expression.
+/// - `eq_properties`: A reference to an `EquivalenceProperties` instance
+/// representing equivalence properties for ordering.
+/// - `agg_mode`: A reference to an `AggregateMode` instance representing the
+/// mode of aggregation.
+///
+/// # Returns
+///
+/// A `LexRequirement` instance, which is the requirement that satisfies all
the
+/// aggregate requirements. Returns an error in case of conflicting
requirements.
+///
+/// Similar to the one in datafusion/physical-plan/src/aggregates/mod.rs, but
this
+/// function care only the possible conversion between FIRST_VALUE and
LAST_VALUE
+fn try_convert_first_last_if_better(
+ prefix_requirement: &[PhysicalSortRequirement],
+ aggr_exprs: &mut [Arc<dyn AggregateExpr>],
+ eq_properties: &EquivalenceProperties,
+) -> Result<()> {
+ for aggr_expr in aggr_exprs.iter_mut() {
+ let aggr_req = aggr_expr.order_bys().unwrap_or(&[]);
+ let reverse_aggr_req = reverse_order_bys(aggr_req);
+ let aggr_req = PhysicalSortRequirement::from_sort_exprs(aggr_req);
+ let reverse_aggr_req =
+ PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_req);
+
+ if let Some(first_value) =
aggr_expr.as_any().downcast_ref::<FirstValue>() {
+ let mut first_value = first_value.clone();
+
+ if eq_properties.ordering_satisfy_requirement(&concat_slices(
+ prefix_requirement,
+ &aggr_req,
+ )) {
+ first_value = first_value.with_requirement_satisfied(true);
+ *aggr_expr = Arc::new(first_value) as _;
+ } else if
eq_properties.ordering_satisfy_requirement(&concat_slices(
+ prefix_requirement,
+ &reverse_aggr_req,
+ )) {
+ // Converting to LAST_VALUE enables more efficient execution
+ // given the existing ordering:
+ let mut last_value = first_value.convert_to_last();
+ last_value = last_value.with_requirement_satisfied(true);
+ *aggr_expr = Arc::new(last_value) as _;
+ } else {
+ // Requirement is not satisfied with existing ordering.
+ first_value = first_value.with_requirement_satisfied(false);
+ *aggr_expr = Arc::new(first_value) as _;
+ }
+ continue;
+ }
+ if let Some(last_value) =
aggr_expr.as_any().downcast_ref::<LastValue>() {
+ let mut last_value = last_value.clone();
+ if eq_properties.ordering_satisfy_requirement(&concat_slices(
+ prefix_requirement,
+ &aggr_req,
+ )) {
+ last_value = last_value.with_requirement_satisfied(true);
+ *aggr_expr = Arc::new(last_value) as _;
+ } else if
eq_properties.ordering_satisfy_requirement(&concat_slices(
+ prefix_requirement,
+ &reverse_aggr_req,
+ )) {
+ // Converting to FIRST_VALUE enables more efficient execution
+ // given the existing ordering:
+ let mut first_value = last_value.convert_to_first();
+ first_value = first_value.with_requirement_satisfied(true);
+ *aggr_expr = Arc::new(first_value) as _;
+ } else {
+ // Requirement is not satisfied with existing ordering.
+ last_value = last_value.with_requirement_satisfied(false);
+ *aggr_expr = Arc::new(last_value) as _;
+ }
+ continue;
+ }
+ }
+
+ Ok(())
+}
diff --git a/datafusion/core/src/physical_optimizer/mod.rs
b/datafusion/core/src/physical_optimizer/mod.rs
index e990fead61..c80668c6da 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -24,6 +24,7 @@
pub mod aggregate_statistics;
pub mod coalesce_batches;
pub mod combine_partial_final_agg;
+mod convert_first_last;
pub mod enforce_distribution;
pub mod enforce_sorting;
pub mod join_selection;
diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs
b/datafusion/core/src/physical_optimizer/optimizer.rs
index 48da68cb2e..08cbf68fa6 100644
--- a/datafusion/core/src/physical_optimizer/optimizer.rs
+++ b/datafusion/core/src/physical_optimizer/optimizer.rs
@@ -19,6 +19,7 @@
use std::sync::Arc;
+use super::convert_first_last::OptimizeAggregateOrder;
use super::projection_pushdown::ProjectionPushdown;
use crate::config::ConfigOptions;
use crate::physical_optimizer::aggregate_statistics::AggregateStatistics;
@@ -101,6 +102,8 @@ impl PhysicalOptimizer {
// Note that one should always run this rule after running the
EnforceDistribution rule
// as the latter may break local sorting requirements.
Arc::new(EnforceSorting::new()),
+ // Run once after the local sorting requirement is changed
+ Arc::new(OptimizeAggregateOrder::new()),
// TODO: `try_embed_to_hash_join` in the ProjectionPushdown rule
would be block by the CoalesceBatches, so add it before CoalesceBatches. Maybe
optimize it in the future.
Arc::new(ProjectionPushdown::new()),
// The CoalesceBatches rule will not influence the distribution
and ordering of the
diff --git a/datafusion/core/tests/data/convert_first_last.csv
b/datafusion/core/tests/data/convert_first_last.csv
new file mode 100644
index 0000000000..059b631e57
--- /dev/null
+++ b/datafusion/core/tests/data/convert_first_last.csv
@@ -0,0 +1,11 @@
+c1,c2,c3
+1,9,0
+2,8,1
+3,7,2
+4,6,3
+5,5,4
+6,4,5
+7,3,6
+8,2,7
+9,1,8
+10,0,9
\ No newline at end of file
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index 98c44e23c6..ba9a6b1be0 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -39,12 +39,15 @@ use datafusion_common::stats::Precision;
use datafusion_common::{internal_err, not_impl_err, Result};
use datafusion_execution::TaskContext;
use datafusion_expr::Accumulator;
+use datafusion_physical_expr::aggregate::is_order_sensitive;
+use datafusion_physical_expr::equivalence::collapse_lex_req;
use datafusion_physical_expr::{
- aggregate::is_order_sensitive,
- equivalence::{collapse_lex_req, ProjectionMapping},
- expressions::{Column, FirstValue, LastValue, Max, Min, UnKnownColumn},
- physical_exprs_contains, reverse_order_bys, AggregateExpr,
EquivalenceProperties,
- LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortRequirement,
+ equivalence::ProjectionMapping,
+ expressions::{Column, Max, Min, UnKnownColumn},
+ AggregateExpr, LexRequirement, PhysicalExpr,
+};
+use datafusion_physical_expr::{
+ physical_exprs_contains, EquivalenceProperties, LexOrdering,
PhysicalSortRequirement,
};
use itertools::Itertools;
@@ -269,6 +272,36 @@ pub struct AggregateExec {
}
impl AggregateExec {
+ /// Function used in `ConvertFirstLast` optimizer rule,
+ /// where we need parts of the new value, others cloned from the old one
+ pub fn new_with_aggr_expr_and_ordering_info(
+ &self,
+ required_input_ordering: Option<LexRequirement>,
+ aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+ cache: PlanProperties,
+ input_order_mode: InputOrderMode,
+ ) -> Self {
+ Self {
+ aggr_expr,
+ required_input_ordering,
+ metrics: ExecutionPlanMetricsSet::new(),
+ input_order_mode,
+ cache,
+ // clone the rest of the fields
+ mode: self.mode,
+ group_by: self.group_by.clone(),
+ filter_expr: self.filter_expr.clone(),
+ limit: self.limit,
+ input: self.input.clone(),
+ schema: self.schema.clone(),
+ input_schema: self.input_schema.clone(),
+ }
+ }
+
+ pub fn cache(&self) -> &PlanProperties {
+ &self.cache
+ }
+
/// Create a new hash aggregate execution plan
pub fn try_new(
mode: AggregateMode,
@@ -336,8 +369,7 @@ impl AggregateExec {
})
.collect::<Vec<_>>();
- let req = get_aggregate_exprs_requirement(
- &new_requirement,
+ let req = get_finer_aggregate_exprs_requirement(
&mut aggr_expr,
&group_by,
input_eq_properties,
@@ -369,6 +401,7 @@ impl AggregateExec {
&mode,
&input_order_mode,
);
+
Ok(AggregateExec {
mode,
group_by,
@@ -507,7 +540,7 @@ impl AggregateExec {
}
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
- fn compute_properties(
+ pub fn compute_properties(
input: &Arc<dyn ExecutionPlan>,
schema: SchemaRef,
projection_mapping: &ProjectionMapping,
@@ -683,9 +716,9 @@ impl ExecutionPlan for AggregateExec {
children[0].clone(),
self.input_schema.clone(),
self.schema.clone(),
- //self.original_schema.clone(),
)?;
me.limit = self.limit;
+
Ok(Arc::new(me))
}
@@ -870,7 +903,7 @@ fn finer_ordering(
}
/// Concatenates the given slices.
-fn concat_slices<T: Clone>(lhs: &[T], rhs: &[T]) -> Vec<T> {
+pub fn concat_slices<T: Clone>(lhs: &[T], rhs: &[T]) -> Vec<T> {
[lhs, rhs].concat()
}
@@ -891,8 +924,7 @@ fn concat_slices<T: Clone>(lhs: &[T], rhs: &[T]) -> Vec<T> {
///
/// A `LexRequirement` instance, which is the requirement that satisfies all
the
/// aggregate requirements. Returns an error in case of conflicting
requirements.
-fn get_aggregate_exprs_requirement(
- prefix_requirement: &[PhysicalSortRequirement],
+fn get_finer_aggregate_exprs_requirement(
aggr_exprs: &mut [Arc<dyn AggregateExpr>],
group_by: &PhysicalGroupBy,
eq_properties: &EquivalenceProperties,
@@ -900,60 +932,6 @@ fn get_aggregate_exprs_requirement(
) -> Result<LexRequirement> {
let mut requirement = vec![];
for aggr_expr in aggr_exprs.iter_mut() {
- let aggr_req = aggr_expr.order_bys().unwrap_or(&[]);
- let reverse_aggr_req = reverse_order_bys(aggr_req);
- let aggr_req = PhysicalSortRequirement::from_sort_exprs(aggr_req);
- let reverse_aggr_req =
- PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_req);
-
- if let Some(first_value) =
aggr_expr.as_any().downcast_ref::<FirstValue>() {
- let mut first_value = first_value.clone();
- if eq_properties.ordering_satisfy_requirement(&concat_slices(
- prefix_requirement,
- &aggr_req,
- )) {
- first_value = first_value.with_requirement_satisfied(true);
- *aggr_expr = Arc::new(first_value) as _;
- } else if
eq_properties.ordering_satisfy_requirement(&concat_slices(
- prefix_requirement,
- &reverse_aggr_req,
- )) {
- // Converting to LAST_VALUE enables more efficient execution
- // given the existing ordering:
- let mut last_value = first_value.convert_to_last();
- last_value = last_value.with_requirement_satisfied(true);
- *aggr_expr = Arc::new(last_value) as _;
- } else {
- // Requirement is not satisfied with existing ordering.
- first_value = first_value.with_requirement_satisfied(false);
- *aggr_expr = Arc::new(first_value) as _;
- }
- continue;
- }
- if let Some(last_value) =
aggr_expr.as_any().downcast_ref::<LastValue>() {
- let mut last_value = last_value.clone();
- if eq_properties.ordering_satisfy_requirement(&concat_slices(
- prefix_requirement,
- &aggr_req,
- )) {
- last_value = last_value.with_requirement_satisfied(true);
- *aggr_expr = Arc::new(last_value) as _;
- } else if
eq_properties.ordering_satisfy_requirement(&concat_slices(
- prefix_requirement,
- &reverse_aggr_req,
- )) {
- // Converting to FIRST_VALUE enables more efficient execution
- // given the existing ordering:
- let mut first_value = last_value.convert_to_first();
- first_value = first_value.with_requirement_satisfied(true);
- *aggr_expr = Arc::new(first_value) as _;
- } else {
- // Requirement is not satisfied with existing ordering.
- last_value = last_value.with_requirement_satisfied(false);
- *aggr_expr = Arc::new(last_value) as _;
- }
- continue;
- }
if let Some(finer_ordering) =
finer_ordering(&requirement, aggr_expr, group_by, eq_properties,
agg_mode)
{
@@ -1003,6 +981,7 @@ fn get_aggregate_exprs_requirement(
continue;
}
}
+
// Neither the existing requirement and current aggregate requirement
satisfy the other, this means
// requirements are conflicting. Currently, we do not support
// conflicting requirements.
@@ -1010,6 +989,7 @@ fn get_aggregate_exprs_requirement(
"Conflicting ordering requirements in aggregate functions is not
supported"
);
}
+
Ok(PhysicalSortRequirement::from_sort_exprs(&requirement))
}
@@ -1235,7 +1215,7 @@ mod tests {
use datafusion_execution::memory_pool::FairSpillPool;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_physical_expr::expressions::{
- lit, ApproxDistinct, Count, LastValue, Median, OrderSensitiveArrayAgg,
+ lit, ApproxDistinct, Count, FirstValue, LastValue, Median,
OrderSensitiveArrayAgg,
};
use datafusion_physical_expr::{
reverse_order_bys, AggregateExpr, EquivalenceProperties, PhysicalExpr,
@@ -2171,8 +2151,7 @@ mod tests {
})
.collect::<Vec<_>>();
let group_by = PhysicalGroupBy::new_single(vec![]);
- let res = get_aggregate_exprs_requirement(
- &[],
+ let res = get_finer_aggregate_exprs_requirement(
&mut aggr_exprs,
&group_by,
&eq_properties,
diff --git a/datafusion/physical-plan/src/tree_node.rs
b/datafusion/physical-plan/src/tree_node.rs
index 52a52f81bd..46460cbb66 100644
--- a/datafusion/physical-plan/src/tree_node.rs
+++ b/datafusion/physical-plan/src/tree_node.rs
@@ -64,6 +64,7 @@ impl<T> PlanContext<T> {
pub fn update_plan_from_children(mut self) -> Result<Self> {
let children_plans = self.children.iter().map(|c|
c.plan.clone()).collect();
self.plan = with_new_children_if_necessary(self.plan, children_plans)?;
+
Ok(self)
}
}
diff --git a/datafusion/physical-plan/src/windows/mod.rs
b/datafusion/physical-plan/src/windows/mod.rs
index c5c845614c..e01ee06a12 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -373,7 +373,7 @@ pub(crate) fn calc_requirements<
/// For instance, if input is ordered by a, b, c and PARTITION BY b, a is used,
/// this vector will be [1, 0]. It means that when we iterate b, a columns
with the order [1, 0]
/// resulting vector (a, b) is a preset of the existing ordering (a, b, c).
-pub(crate) fn get_ordered_partition_by_indices(
+pub fn get_ordered_partition_by_indices(
partition_by_exprs: &[Arc<dyn PhysicalExpr>],
input: &Arc<dyn ExecutionPlan>,
) -> Vec<usize> {
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt
b/datafusion/sqllogictest/test_files/aggregate.slt
index 30d5c7243f..3d24fe3888 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -3455,3 +3455,45 @@ SELECT LAST_VALUE(column1 ORDER BY column2 DESC) IGNORE
NULLS FROM t;
statement ok
DROP TABLE t;
+
+# Test Convert FirstLast optimizer rule
+statement ok
+CREATE EXTERNAL TABLE convert_first_last_table (
+c1 INT NOT NULL,
+c2 INT NOT NULL,
+c3 INT NOT NULL
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (c1 ASC)
+WITH ORDER (c2 DESC)
+WITH ORDER (c3 ASC)
+LOCATION '../core/tests/data/convert_first_last.csv';
+
+# test first to last, the result does not show difference, we need to check
the conversion by `explain`
+query TT
+explain select first_value(c1 order by c3 desc) from convert_first_last_table;
+----
+logical_plan
+01)Aggregate: groupBy=[[]], aggr=[[FIRST_VALUE(convert_first_last_table.c1)
ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]]
+02)--TableScan: convert_first_last_table projection=[c1, c3]
+physical_plan
+01)AggregateExec: mode=Final, gby=[],
aggr=[FIRST_VALUE(convert_first_last_table.c1)]
+02)--CoalescePartitionsExec
+03)----AggregateExec: mode=Partial, gby=[],
aggr=[LAST_VALUE(convert_first_last_table.c1)]
+04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+05)--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]},
projection=[c1, c3], output_orderings=[[c1@0 ASC NULLS LAST], [c3@1 ASC NULLS
LAST]], has_header=true
+
+# test last to first
+query TT
+explain select last_value(c1 order by c2 asc) from convert_first_last_table;
+----
+logical_plan
+01)Aggregate: groupBy=[[]], aggr=[[LAST_VALUE(convert_first_last_table.c1)
ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]]
+02)--TableScan: convert_first_last_table projection=[c1, c2]
+physical_plan
+01)AggregateExec: mode=Final, gby=[],
aggr=[LAST_VALUE(convert_first_last_table.c1)]
+02)--CoalescePartitionsExec
+03)----AggregateExec: mode=Partial, gby=[],
aggr=[FIRST_VALUE(convert_first_last_table.c1)]
+04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+05)--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]},
projection=[c1, c2], output_orderings=[[c1@0 ASC NULLS LAST], [c2@1 DESC]],
has_header=true
diff --git a/datafusion/sqllogictest/test_files/explain.slt
b/datafusion/sqllogictest/test_files/explain.slt
index c357391e70..d8c8dcd41b 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -248,6 +248,7 @@ physical_plan after LimitedDistinctAggregation SAME TEXT AS
ABOVE
physical_plan after EnforceDistribution SAME TEXT AS ABOVE
physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
physical_plan after EnforceSorting SAME TEXT AS ABOVE
+physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after coalesce_batches SAME TEXT AS ABOVE
physical_plan after OutputRequirements CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b,
c], has_header=true
@@ -304,6 +305,7 @@ physical_plan after LimitedDistinctAggregation SAME TEXT AS
ABOVE
physical_plan after EnforceDistribution SAME TEXT AS ABOVE
physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
physical_plan after EnforceSorting SAME TEXT AS ABOVE
+physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after coalesce_batches SAME TEXT AS ABOVE
physical_plan after OutputRequirements
@@ -340,6 +342,7 @@ physical_plan after LimitedDistinctAggregation SAME TEXT AS
ABOVE
physical_plan after EnforceDistribution SAME TEXT AS ABOVE
physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
physical_plan after EnforceSorting SAME TEXT AS ABOVE
+physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after coalesce_batches SAME TEXT AS ABOVE
physical_plan after OutputRequirements
diff --git a/datafusion/sqllogictest/test_files/group_by.slt
b/datafusion/sqllogictest/test_files/group_by.slt
index 1acdcde9c8..5c5bf58dd0 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -2805,7 +2805,7 @@ logical_plan
04)------TableScan: sales_global projection=[country, ts, amount]
physical_plan
01)ProjectionExec: expr=[country@0 as country,
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1
as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS
FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS
FIRST]@3 as sum1]
-02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[LAST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount),
SUM(sales_global.amount)]
+02)--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount),
SUM(sales_global.amount)]
03)----MemoryExec: partitions=1, partition_sizes=[1]
query TRRR rowsort
@@ -3800,7 +3800,7 @@ logical_plan
03)----TableScan: multiple_ordered_table projection=[a, c, d]
physical_plan
01)ProjectionExec: expr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY
[multiple_ordered_table.a ASC NULLS LAST]@1 as first_a,
LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC
NULLS FIRST]@2 as last_c]
-02)--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d],
aggr=[FIRST_VALUE(multiple_ordered_table.a),
FIRST_VALUE(multiple_ordered_table.c)]
+02)--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d],
aggr=[FIRST_VALUE(multiple_ordered_table.a),
LAST_VALUE(multiple_ordered_table.c)]
03)----CoalesceBatchesExec: target_batch_size=2
04)------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8
05)--------AggregateExec: mode=Partial, gby=[d@2 as d],
aggr=[FIRST_VALUE(multiple_ordered_table.a),
FIRST_VALUE(multiple_ordered_table.c)]