This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 549cf844cc Convert first, last aggregate function to UDAF (#10648)
549cf844cc is described below
commit 549cf844ccee96a79bb4bcfd127719275786e6e6
Author: Mustafa Akur <[email protected]>
AuthorDate: Mon May 27 14:31:36 2024 +0300
Convert first, last aggregate function to UDAF (#10648)
* move out the ordering ruel
Signed-off-by: jayzhan211 <[email protected]>
* introduce rule
Signed-off-by: jayzhan211 <[email protected]>
* revert test result
Signed-off-by: jayzhan211 <[email protected]>
* pass mulit order test
Signed-off-by: jayzhan211 <[email protected]>
* cleanup
Signed-off-by: jayzhan211 <[email protected]>
* with new childes
Signed-off-by: jayzhan211 <[email protected]>
* revert slt
Signed-off-by: jayzhan211 <[email protected]>
* revert back
Signed-off-by: jayzhan211 <[email protected]>
* rm rewrite in new child
Signed-off-by: jayzhan211 <[email protected]>
* backup
Signed-off-by: jayzhan211 <[email protected]>
* only move conversion to optimizer
Signed-off-by: jayzhan211 <[email protected]>
* find test that do reverse
Signed-off-by: jayzhan211 <[email protected]>
* add test for first and last
Signed-off-by: jayzhan211 <[email protected]>
* pass all test
Signed-off-by: jayzhan211 <[email protected]>
* upd test
Signed-off-by: jayzhan211 <[email protected]>
* upd test
Signed-off-by: jayzhan211 <[email protected]>
* cleanup
Signed-off-by: jayzhan211 <[email protected]>
* cleanup
Signed-off-by: jayzhan211 <[email protected]>
* cleanup
Signed-off-by: jayzhan211 <[email protected]>
* add aggregate test
Signed-off-by: jayzhan211 <[email protected]>
* cleanup
Signed-off-by: jayzhan211 <[email protected]>
* final draft
Signed-off-by: jayzhan211 <[email protected]>
* cleanup again
Signed-off-by: jayzhan211 <[email protected]>
* pull out finer ordering code and reuse
Signed-off-by: jayzhan211 <[email protected]>
* clippy
Signed-off-by: jayzhan211 <[email protected]>
* remove finer in optimize rule
Signed-off-by: jayzhan211 <[email protected]>
* add comments and clenaup
Signed-off-by: jayzhan211 <[email protected]>
* rename fun
Signed-off-by: jayzhan211 <[email protected]>
* rename fun
Signed-off-by: jayzhan211 <[email protected]>
* fmt
Signed-off-by: jayzhan211 <[email protected]>
* avoid unnecessary recursion and rename
Signed-off-by: jayzhan211 <[email protected]>
* Minor changes
* Add new API for aggregate optimization
* Minor changes
* Minor changes
* Remove old code
* Minor changes
* Minor changes
* Minor changes
* Minor changes
* Minor changes
* Minor changes
* Update comments
* Minor changes
* Minor changes
* Review Part 1
* TMP
* Update display of aggregate fun exprs
* TMP
* TMP
* Update tests
* TMP buggy
* modify name in place
* Minor changes
* Tmp
* Tmp
* Tmp
* TMP
* Simplifications
* Tmp
* Tmp
* Compiles
* Resolve linter errors
* Resolve linter errors
* Minor changes
* Simplifications
* Minor chagnes
* Move cast to common
* Minor changes
* Fix test
* Minor changes
* Simplifications
* Review
* Address reviews
* Address reviews
* Update documentation, rename method
* Minor changes
---------
Signed-off-by: jayzhan211 <[email protected]>
Co-authored-by: jayzhan211 <[email protected]>
Co-authored-by: berkaysynnada <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
datafusion-cli/Cargo.lock | 2 +
datafusion/core/Cargo.toml | 1 +
.../src/physical_optimizer/convert_first_last.rs | 260 -----------
datafusion/core/src/physical_optimizer/mod.rs | 2 +-
.../core/src/physical_optimizer/optimizer.rs | 2 +-
.../src/physical_optimizer/update_aggr_exprs.rs | 182 ++++++++
datafusion/core/src/prelude.rs | 1 +
datafusion/expr/src/aggregate_function.rs | 16 +-
datafusion/expr/src/expr.rs | 9 +
datafusion/expr/src/lib.rs | 2 +-
datafusion/expr/src/type_coercion/aggregates.rs | 3 -
datafusion/expr/src/udaf.rs | 63 ++-
datafusion/expr/src/utils.rs | 31 ++
datafusion/functions-aggregate/src/first_last.rs | 497 ++++++---------------
datafusion/functions-aggregate/src/lib.rs | 1 +
datafusion/optimizer/Cargo.toml | 1 +
.../optimizer/src/replace_distinct_aggregate.rs | 18 +-
.../physical-expr-common/src/aggregate/mod.rs | 147 +++++-
.../src/expressions/cast.rs | 6 +-
.../physical-expr-common/src/expressions/mod.rs | 3 +
datafusion/physical-expr-common/src/sort_expr.rs | 35 +-
datafusion/physical-expr-common/src/utils.rs | 41 +-
.../src/aggregate/array_agg_ordered.rs | 12 +-
datafusion/physical-expr/src/aggregate/build_in.rs | 41 +-
datafusion/physical-expr/src/aggregate/mod.rs | 12 -
.../physical-expr/src/aggregate/nth_value.rs | 5 +
.../physical-expr/src/equivalence/properties.rs | 3 +-
datafusion/physical-expr/src/expressions/mod.rs | 25 +-
datafusion/physical-plan/src/aggregates/mod.rs | 147 +++---
datafusion/proto/proto/datafusion.proto | 4 -
datafusion/proto/src/generated/pbjson.rs | 6 -
datafusion/proto/src/generated/prost.rs | 8 -
datafusion/proto/src/logical_plan/from_proto.rs | 2 -
datafusion/proto/src/logical_plan/to_proto.rs | 8 -
datafusion/proto/src/physical_plan/to_proto.rs | 13 +-
.../proto/tests/cases/roundtrip_logical_plan.rs | 2 -
36 files changed, 755 insertions(+), 856 deletions(-)
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index e659e62d7a..62b6ad287a 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1140,6 +1140,7 @@ dependencies = [
"datafusion-functions-array",
"datafusion-optimizer",
"datafusion-physical-expr",
+ "datafusion-physical-expr-common",
"datafusion-physical-plan",
"datafusion-sql",
"flate2",
@@ -1324,6 +1325,7 @@ dependencies = [
"chrono",
"datafusion-common",
"datafusion-expr",
+ "datafusion-functions-aggregate",
"datafusion-physical-expr",
"hashbrown 0.14.5",
"indexmap 2.2.6",
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 54ca38af67..9f1f748435 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -105,6 +105,7 @@ datafusion-functions-aggregate = { workspace = true }
datafusion-functions-array = { workspace = true, optional = true }
datafusion-optimizer = { workspace = true }
datafusion-physical-expr = { workspace = true }
+datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
datafusion-sql = { workspace = true }
flate2 = { version = "1.0.24", optional = true }
diff --git a/datafusion/core/src/physical_optimizer/convert_first_last.rs
b/datafusion/core/src/physical_optimizer/convert_first_last.rs
deleted file mode 100644
index 62537169cf..0000000000
--- a/datafusion/core/src/physical_optimizer/convert_first_last.rs
+++ /dev/null
@@ -1,260 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use datafusion_common::Result;
-use datafusion_common::{
- config::ConfigOptions,
- tree_node::{Transformed, TransformedResult, TreeNode},
-};
-use datafusion_physical_expr::expressions::{FirstValue, LastValue};
-use datafusion_physical_expr::{
- equivalence::ProjectionMapping, reverse_order_bys, AggregateExpr,
- EquivalenceProperties, PhysicalSortRequirement,
-};
-use datafusion_physical_plan::aggregates::concat_slices;
-use datafusion_physical_plan::{
- aggregates::{AggregateExec, AggregateMode},
- ExecutionPlan, ExecutionPlanProperties, InputOrderMode,
-};
-use std::sync::Arc;
-
-use datafusion_physical_plan::windows::get_ordered_partition_by_indices;
-
-use super::PhysicalOptimizerRule;
-
-/// The optimizer rule check the ordering requirements of the aggregate
expressions.
-/// And convert between FIRST_VALUE and LAST_VALUE if possible.
-/// For example, If we have an ascending values and we want LastValue from the
descending requirement,
-/// it is equivalent to FirstValue with the current ascending ordering.
-///
-/// The concrete example is that, says we have values c1 with [1, 2, 3], which
is an ascending order.
-/// If we want LastValue(c1 order by desc), which is the first value of
reversed c1 [3, 2, 1],
-/// so we can convert the aggregate expression to FirstValue(c1 order by asc),
-/// since the current ordering is already satisfied, it saves our time!
-#[derive(Default)]
-pub struct OptimizeAggregateOrder {}
-
-impl OptimizeAggregateOrder {
- pub fn new() -> Self {
- Self::default()
- }
-}
-
-impl PhysicalOptimizerRule for OptimizeAggregateOrder {
- fn optimize(
- &self,
- plan: Arc<dyn ExecutionPlan>,
- _config: &ConfigOptions,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- plan.transform_up(get_common_requirement_of_aggregate_input)
- .data()
- }
-
- fn name(&self) -> &str {
- "OptimizeAggregateOrder"
- }
-
- fn schema_check(&self) -> bool {
- true
- }
-}
-
-fn get_common_requirement_of_aggregate_input(
- plan: Arc<dyn ExecutionPlan>,
-) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
- if let Some(aggr_exec) = plan.as_any().downcast_ref::<AggregateExec>() {
- let input = aggr_exec.input();
- let mut aggr_expr = try_get_updated_aggr_expr_from_child(aggr_exec);
- let group_by = aggr_exec.group_expr();
- let mode = aggr_exec.mode();
-
- let input_eq_properties = input.equivalence_properties();
- let groupby_exprs = group_by.input_exprs();
- // If existing ordering satisfies a prefix of the GROUP BY expressions,
- // prefix requirements with this section. In this case, aggregation
will
- // work more efficiently.
- let indices = get_ordered_partition_by_indices(&groupby_exprs, input);
- let requirement = indices
- .iter()
- .map(|&idx| PhysicalSortRequirement {
- expr: groupby_exprs[idx].clone(),
- options: None,
- })
- .collect::<Vec<_>>();
-
- try_convert_first_last_if_better(
- &requirement,
- &mut aggr_expr,
- input_eq_properties,
- )?;
-
- let required_input_ordering =
(!requirement.is_empty()).then_some(requirement);
-
- let input_order_mode =
- if indices.len() == groupby_exprs.len() && !indices.is_empty() {
- InputOrderMode::Sorted
- } else if !indices.is_empty() {
- InputOrderMode::PartiallySorted(indices)
- } else {
- InputOrderMode::Linear
- };
- let projection_mapping =
- ProjectionMapping::try_new(group_by.expr(), &input.schema())?;
-
- let cache = AggregateExec::compute_properties(
- input,
- plan.schema().clone(),
- &projection_mapping,
- mode,
- &input_order_mode,
- );
-
- let aggr_exec = aggr_exec.new_with_aggr_expr_and_ordering_info(
- required_input_ordering,
- aggr_expr,
- cache,
- input_order_mode,
- );
-
- Ok(Transformed::yes(
- Arc::new(aggr_exec) as Arc<dyn ExecutionPlan>
- ))
- } else {
- Ok(Transformed::no(plan))
- }
-}
-
-/// In `create_initial_plan` for LogicalPlan::Aggregate, we have a nested
AggregateExec where the first layer
-/// is in Partial mode and the second layer is in Final or Finalpartitioned
mode.
-/// If the first layer of aggregate plan is transformed, we need to update the
child of the layer with final mode.
-/// Therefore, we check it and get the updated aggregate expressions.
-///
-/// If AggregateExec is created from elsewhere, we skip the check and return
the original aggregate expressions.
-fn try_get_updated_aggr_expr_from_child(
- aggr_exec: &AggregateExec,
-) -> Vec<Arc<dyn AggregateExpr>> {
- let input = aggr_exec.input();
- if aggr_exec.mode() == &AggregateMode::Final
- || aggr_exec.mode() == &AggregateMode::FinalPartitioned
- {
- // Some aggregators may be modified during initialization for
- // optimization purposes. For example, a FIRST_VALUE may turn
- // into a LAST_VALUE with the reverse ordering requirement.
- // To reflect such changes to subsequent stages, use the updated
- // `AggregateExpr`/`PhysicalSortExpr` objects.
- //
- // The bottom up transformation is the mirror of
LogicalPlan::Aggregate creation in [create_initial_plan]
- if let Some(c_aggr_exec) =
input.as_any().downcast_ref::<AggregateExec>() {
- if c_aggr_exec.mode() == &AggregateMode::Partial {
- // If the input is an AggregateExec in Partial mode, then the
- // input is a CoalescePartitionsExec. In this case, the
- // AggregateExec is the second stage of aggregation. The
- // requirements of the second stage are the requirements of
- // the first stage.
- return c_aggr_exec.aggr_expr().to_vec();
- }
- }
- }
-
- aggr_exec.aggr_expr().to_vec()
-}
-
-/// Get the common requirement that satisfies all the aggregate expressions.
-///
-/// # Parameters
-///
-/// - `aggr_exprs`: A slice of `Arc<dyn AggregateExpr>` containing all the
-/// aggregate expressions.
-/// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the
-/// physical GROUP BY expression.
-/// - `eq_properties`: A reference to an `EquivalenceProperties` instance
-/// representing equivalence properties for ordering.
-/// - `agg_mode`: A reference to an `AggregateMode` instance representing the
-/// mode of aggregation.
-///
-/// # Returns
-///
-/// A `LexRequirement` instance, which is the requirement that satisfies all
the
-/// aggregate requirements. Returns an error in case of conflicting
requirements.
-///
-/// Similar to the one in datafusion/physical-plan/src/aggregates/mod.rs, but
this
-/// function care only the possible conversion between FIRST_VALUE and
LAST_VALUE
-fn try_convert_first_last_if_better(
- prefix_requirement: &[PhysicalSortRequirement],
- aggr_exprs: &mut [Arc<dyn AggregateExpr>],
- eq_properties: &EquivalenceProperties,
-) -> Result<()> {
- for aggr_expr in aggr_exprs.iter_mut() {
- let aggr_req = aggr_expr.order_bys().unwrap_or(&[]);
- let reverse_aggr_req = reverse_order_bys(aggr_req);
- let aggr_req = PhysicalSortRequirement::from_sort_exprs(aggr_req);
- let reverse_aggr_req =
- PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_req);
-
- if let Some(first_value) =
aggr_expr.as_any().downcast_ref::<FirstValue>() {
- let mut first_value = first_value.clone();
-
- if eq_properties.ordering_satisfy_requirement(&concat_slices(
- prefix_requirement,
- &aggr_req,
- )) {
- first_value = first_value.with_requirement_satisfied(true);
- *aggr_expr = Arc::new(first_value) as _;
- } else if
eq_properties.ordering_satisfy_requirement(&concat_slices(
- prefix_requirement,
- &reverse_aggr_req,
- )) {
- // Converting to LAST_VALUE enables more efficient execution
- // given the existing ordering:
- let mut last_value = first_value.convert_to_last();
- last_value = last_value.with_requirement_satisfied(true);
- *aggr_expr = Arc::new(last_value) as _;
- } else {
- // Requirement is not satisfied with existing ordering.
- first_value = first_value.with_requirement_satisfied(false);
- *aggr_expr = Arc::new(first_value) as _;
- }
- continue;
- }
- if let Some(last_value) =
aggr_expr.as_any().downcast_ref::<LastValue>() {
- let mut last_value = last_value.clone();
- if eq_properties.ordering_satisfy_requirement(&concat_slices(
- prefix_requirement,
- &aggr_req,
- )) {
- last_value = last_value.with_requirement_satisfied(true);
- *aggr_expr = Arc::new(last_value) as _;
- } else if
eq_properties.ordering_satisfy_requirement(&concat_slices(
- prefix_requirement,
- &reverse_aggr_req,
- )) {
- // Converting to FIRST_VALUE enables more efficient execution
- // given the existing ordering:
- let mut first_value = last_value.convert_to_first();
- first_value = first_value.with_requirement_satisfied(true);
- *aggr_expr = Arc::new(first_value) as _;
- } else {
- // Requirement is not satisfied with existing ordering.
- last_value = last_value.with_requirement_satisfied(false);
- *aggr_expr = Arc::new(last_value) as _;
- }
- continue;
- }
- }
-
- Ok(())
-}
diff --git a/datafusion/core/src/physical_optimizer/mod.rs
b/datafusion/core/src/physical_optimizer/mod.rs
index c80668c6da..7cc9a0fb75 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -24,7 +24,6 @@
pub mod aggregate_statistics;
pub mod coalesce_batches;
pub mod combine_partial_final_agg;
-mod convert_first_last;
pub mod enforce_distribution;
pub mod enforce_sorting;
pub mod join_selection;
@@ -37,6 +36,7 @@ pub mod pruning;
pub mod replace_with_order_preserving_variants;
mod sort_pushdown;
pub mod topk_aggregation;
+mod update_aggr_exprs;
mod utils;
#[cfg(test)]
diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs
b/datafusion/core/src/physical_optimizer/optimizer.rs
index 416985983d..e3b60a0cca 100644
--- a/datafusion/core/src/physical_optimizer/optimizer.rs
+++ b/datafusion/core/src/physical_optimizer/optimizer.rs
@@ -19,8 +19,8 @@
use std::sync::Arc;
-use super::convert_first_last::OptimizeAggregateOrder;
use super::projection_pushdown::ProjectionPushdown;
+use super::update_aggr_exprs::OptimizeAggregateOrder;
use crate::config::ConfigOptions;
use crate::physical_optimizer::aggregate_statistics::AggregateStatistics;
use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
diff --git a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
new file mode 100644
index 0000000000..6a6ca815c5
--- /dev/null
+++ b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An optimizer rule that checks ordering requirements of aggregate
expressions
+//! and modifies the expressions to work more efficiently if possible.
+
+use std::sync::Arc;
+
+use super::PhysicalOptimizerRule;
+
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_common::{plan_datafusion_err, Result};
+use datafusion_physical_expr::{
+ reverse_order_bys, AggregateExpr, EquivalenceProperties,
PhysicalSortRequirement,
+};
+use datafusion_physical_plan::aggregates::concat_slices;
+use datafusion_physical_plan::windows::get_ordered_partition_by_indices;
+use datafusion_physical_plan::{
+ aggregates::AggregateExec, ExecutionPlan, ExecutionPlanProperties,
+};
+
+/// This optimizer rule checks ordering requirements of aggregate expressions.
+///
+/// There are 3 kinds of aggregators in terms of ordering requirements:
+/// - `AggregateOrderSensitivity::Insensitive`, meaning that ordering is not
+/// important.
+/// - `AggregateOrderSensitivity::HardRequirement`, meaning that the aggregator
+/// requires a specific ordering.
+/// - `AggregateOrderSensitivity::Beneficial`, meaning that the aggregator can
+/// handle unordered input, but can run more efficiently if its input
conforms
+/// to a specific ordering.
+///
+/// This rule analyzes aggregate expressions of type `Beneficial` to see
whether
+/// their input ordering requirements are satisfied. If this is the case, the
+/// aggregators are modified to run in a more efficient mode.
+#[derive(Default)]
+pub struct OptimizeAggregateOrder {}
+
+impl OptimizeAggregateOrder {
+ pub fn new() -> Self {
+ Self::default()
+ }
+}
+
+impl PhysicalOptimizerRule for OptimizeAggregateOrder {
+ fn optimize(
+ &self,
+ plan: Arc<dyn ExecutionPlan>,
+ _config: &ConfigOptions,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ plan.transform_up(|plan| {
+ if let Some(aggr_exec) =
plan.as_any().downcast_ref::<AggregateExec>() {
+ // Final stage implementations do not rely on ordering -- those
+ // ordering fields may be pruned out by first stage aggregates.
+ // Hence, necessary information for proper merge is added
during
+ // the first stage to the state field, which the final stage
uses.
+ if !aggr_exec.mode().is_first_stage() {
+ return Ok(Transformed::no(plan));
+ }
+ let input = aggr_exec.input();
+ let mut aggr_expr = aggr_exec.aggr_expr().to_vec();
+
+ let groupby_exprs = aggr_exec.group_expr().input_exprs();
+ // If the existing ordering satisfies a prefix of the GROUP BY
+ // expressions, prefix requirements with this section. In this
+ // case, aggregation will work more efficiently.
+ let indices = get_ordered_partition_by_indices(&groupby_exprs,
input);
+ let requirement = indices
+ .iter()
+ .map(|&idx| {
+
PhysicalSortRequirement::new(groupby_exprs[idx].clone(), None)
+ })
+ .collect::<Vec<_>>();
+
+ aggr_expr = try_convert_aggregate_if_better(
+ aggr_expr,
+ &requirement,
+ input.equivalence_properties(),
+ )?;
+
+ let aggr_exec = aggr_exec.with_new_aggr_exprs(aggr_expr);
+
+ Ok(Transformed::yes(Arc::new(aggr_exec) as _))
+ } else {
+ Ok(Transformed::no(plan))
+ }
+ })
+ .data()
+ }
+
+ fn name(&self) -> &str {
+ "OptimizeAggregateOrder"
+ }
+
+ fn schema_check(&self) -> bool {
+ true
+ }
+}
+
+/// Tries to convert each aggregate expression to a potentially more efficient
+/// version.
+///
+/// # Parameters
+///
+/// * `aggr_exprs` - A vector of `Arc<dyn AggregateExpr>` representing the
+/// aggregate expressions to be optimized.
+/// * `prefix_requirement` - An array slice representing the ordering
+/// requirements preceding the aggregate expressions.
+/// * `eq_properties` - A reference to the `EquivalenceProperties` object
+/// containing ordering information.
+///
+/// # Returns
+///
+/// Returns `Ok(converted_aggr_exprs)` if the conversion process completes
+/// successfully. Any errors occuring during the conversion process are
+/// passed through.
+fn try_convert_aggregate_if_better(
+ aggr_exprs: Vec<Arc<dyn AggregateExpr>>,
+ prefix_requirement: &[PhysicalSortRequirement],
+ eq_properties: &EquivalenceProperties,
+) -> Result<Vec<Arc<dyn AggregateExpr>>> {
+ aggr_exprs
+ .into_iter()
+ .map(|aggr_expr| {
+ let aggr_sort_exprs = aggr_expr.order_bys().unwrap_or(&[]);
+ let reverse_aggr_sort_exprs = reverse_order_bys(aggr_sort_exprs);
+ let aggr_sort_reqs =
+ PhysicalSortRequirement::from_sort_exprs(aggr_sort_exprs);
+ let reverse_aggr_req =
+
PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_sort_exprs);
+
+ // If the aggregate expression benefits from input ordering, and
+ // there is an actual ordering enabling this, try to update the
+ // aggregate expression to benefit from the existing ordering.
+ // Otherwise, leave it as is.
+ if aggr_expr.order_sensitivity().is_beneficial() &&
!aggr_sort_reqs.is_empty()
+ {
+ let reqs = concat_slices(prefix_requirement, &aggr_sort_reqs);
+ if eq_properties.ordering_satisfy_requirement(&reqs) {
+ // Existing ordering satisfies the aggregator requirements:
+ aggr_expr.with_beneficial_ordering(true)?
+ } else if
eq_properties.ordering_satisfy_requirement(&concat_slices(
+ prefix_requirement,
+ &reverse_aggr_req,
+ )) {
+ // Converting to reverse enables more efficient execution
+ // given the existing ordering (if possible):
+ aggr_expr
+ .reverse_expr()
+ .unwrap_or(aggr_expr)
+ .with_beneficial_ordering(true)?
+ } else {
+ // There is no beneficial ordering present -- aggregation
+ // will still work albeit in a less efficient mode.
+ aggr_expr.with_beneficial_ordering(false)?
+ }
+ .ok_or_else(|| {
+ plan_datafusion_err!(
+ "Expects an aggregate expression that can benefit from
input ordering"
+ )
+ })
+ } else {
+ Ok(aggr_expr)
+ }
+ })
+ .collect()
+}
diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs
index d82a5a2cc1..0d8d06f49b 100644
--- a/datafusion/core/src/prelude.rs
+++ b/datafusion/core/src/prelude.rs
@@ -39,6 +39,7 @@ pub use datafusion_expr::{
Expr,
};
pub use datafusion_functions::expr_fn::*;
+pub use datafusion_functions_aggregate::expr_fn::*;
#[cfg(feature = "array_expressions")]
pub use datafusion_functions_array::expr_fn::*;
diff --git a/datafusion/expr/src/aggregate_function.rs
b/datafusion/expr/src/aggregate_function.rs
index f251969ca6..fb5a8db550 100644
--- a/datafusion/expr/src/aggregate_function.rs
+++ b/datafusion/expr/src/aggregate_function.rs
@@ -47,10 +47,6 @@ pub enum AggregateFunction {
ApproxDistinct,
/// Aggregation into an array
ArrayAgg,
- /// First value in a group according to some ordering
- FirstValue,
- /// Last value in a group according to some ordering
- LastValue,
/// N'th value in a group according to some ordering
NthValue,
/// Variance (Sample)
@@ -114,8 +110,6 @@ impl AggregateFunction {
Avg => "AVG",
ApproxDistinct => "APPROX_DISTINCT",
ArrayAgg => "ARRAY_AGG",
- FirstValue => "FIRST_VALUE",
- LastValue => "LAST_VALUE",
NthValue => "NTH_VALUE",
Variance => "VAR",
VariancePop => "VAR_POP",
@@ -168,8 +162,6 @@ impl FromStr for AggregateFunction {
"min" => AggregateFunction::Min,
"sum" => AggregateFunction::Sum,
"array_agg" => AggregateFunction::ArrayAgg,
- "first_value" => AggregateFunction::FirstValue,
- "last_value" => AggregateFunction::LastValue,
"nth_value" => AggregateFunction::NthValue,
"string_agg" => AggregateFunction::StringAgg,
// statistical
@@ -273,9 +265,7 @@ impl AggregateFunction {
}
AggregateFunction::ApproxMedian =>
Ok(coerced_data_types[0].clone()),
AggregateFunction::Grouping => Ok(DataType::Int32),
- AggregateFunction::FirstValue
- | AggregateFunction::LastValue
- | AggregateFunction::NthValue => Ok(coerced_data_types[0].clone()),
+ AggregateFunction::NthValue => Ok(coerced_data_types[0].clone()),
AggregateFunction::StringAgg => Ok(DataType::LargeUtf8),
}
}
@@ -329,9 +319,7 @@ impl AggregateFunction {
| AggregateFunction::VariancePop
| AggregateFunction::Stddev
| AggregateFunction::StddevPop
- | AggregateFunction::ApproxMedian
- | AggregateFunction::FirstValue
- | AggregateFunction::LastValue => {
+ | AggregateFunction::ApproxMedian => {
Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable)
}
AggregateFunction::NthValue => Signature::any(2,
Volatility::Immutable),
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 5e43c160ba..0c05355cde 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -504,6 +504,15 @@ impl Sort {
nulls_first,
}
}
+
+ /// Create a new Sort expression with the opposite sort direction
+ pub fn reverse(&self) -> Self {
+ Self {
+ expr: self.expr.clone(),
+ asc: !self.asc,
+ nulls_first: !self.nulls_first,
+ }
+ }
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs
index c491a26564..d0114a4725 100644
--- a/datafusion/expr/src/lib.rs
+++ b/datafusion/expr/src/lib.rs
@@ -80,7 +80,7 @@ pub use signature::{
ArrayFunctionSignature, Signature, TypeSignature, Volatility,
TIMEZONE_WILDCARD,
};
pub use table_source::{TableProviderFilterPushDown, TableSource, TableType};
-pub use udaf::{AggregateUDF, AggregateUDFImpl};
+pub use udaf::{AggregateUDF, AggregateUDFImpl, ReversedUDAF};
pub use udf::{ScalarUDF, ScalarUDFImpl};
pub use udwf::{WindowUDF, WindowUDFImpl};
pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
diff --git a/datafusion/expr/src/type_coercion/aggregates.rs
b/datafusion/expr/src/type_coercion/aggregates.rs
index ce4a2a7098..6bd204c53c 100644
--- a/datafusion/expr/src/type_coercion/aggregates.rs
+++ b/datafusion/expr/src/type_coercion/aggregates.rs
@@ -283,9 +283,6 @@ pub fn coerce_types(
}
Ok(input_types.to_vec())
}
- AggregateFunction::FirstValue | AggregateFunction::LastValue => {
- Ok(input_types.to_vec())
- }
AggregateFunction::NthValue => Ok(input_types.to_vec()),
AggregateFunction::Grouping => Ok(vec![input_types[0].clone()]),
AggregateFunction::StringAgg => {
diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs
index b620a897bc..0274038a36 100644
--- a/datafusion/expr/src/udaf.rs
+++ b/datafusion/expr/src/udaf.rs
@@ -22,10 +22,11 @@ use crate::function::{
};
use crate::groups_accumulator::GroupsAccumulator;
use crate::utils::format_state_name;
+use crate::utils::AggregateOrderSensitivity;
use crate::{Accumulator, Expr};
use crate::{AccumulatorFactoryFunction, ReturnTypeFunction, Signature};
use arrow::datatypes::{DataType, Field};
-use datafusion_common::{not_impl_err, Result};
+use datafusion_common::{exec_err, not_impl_err, Result};
use std::any::Any;
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
@@ -193,6 +194,33 @@ impl AggregateUDF {
self.inner.create_groups_accumulator()
}
+ /// See [`AggregateUDFImpl::with_beneficial_ordering`] for more details.
+ pub fn with_beneficial_ordering(
+ self,
+ beneficial_ordering: bool,
+ ) -> Result<Option<AggregateUDF>> {
+ self.inner
+ .with_beneficial_ordering(beneficial_ordering)
+ .map(|updated_udf| updated_udf.map(|udf| Self { inner: udf }))
+ }
+
+ /// Gets the order sensitivity of the UDF. See
[`AggregateOrderSensitivity`]
+ /// for possible options.
+ pub fn order_sensitivity(&self) -> AggregateOrderSensitivity {
+ self.inner.order_sensitivity()
+ }
+
+ /// Reserves the `AggregateUDF` (e.g. returns the `AggregateUDF` that will
+ /// generate same result with this `AggregateUDF` when iterated in reverse
+ /// order, and `None` if there is no such `AggregateUDF`).
+ pub fn reverse_udf(&self) -> Option<AggregateUDF> {
+ match self.inner.reverse_expr() {
+ ReversedUDAF::NotSupported => None,
+ ReversedUDAF::Identical => Some(self.clone()),
+ ReversedUDAF::Reversed(reverse) => Some(Self { inner: reverse }),
+ }
+ }
+
pub fn coerce_types(&self, _arg_types: &[DataType]) ->
Result<Vec<DataType>> {
not_impl_err!("coerce_types not implemented for {:?} yet", self.name())
}
@@ -361,6 +389,39 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
&[]
}
+ /// Sets the indicator whether ordering requirements of the
AggregateUDFImpl is
+ /// satisfied by its input. If this is not the case, UDFs with order
+ /// sensitivity `AggregateOrderSensitivity::Beneficial` can still produce
+ /// the correct result with possibly more work internally.
+ ///
+ /// # Returns
+ ///
+ /// Returns `Ok(Some(updated_udf))` if the process completes successfully.
+ /// If the expression can benefit from existing input ordering, but does
+ /// not implement the method, returns an error. Order insensitive and hard
+ /// requirement aggregators return `Ok(None)`.
+ fn with_beneficial_ordering(
+ self: Arc<Self>,
+ _beneficial_ordering: bool,
+ ) -> Result<Option<Arc<dyn AggregateUDFImpl>>> {
+ if self.order_sensitivity().is_beneficial() {
+ return exec_err!(
+ "Should implement with satisfied for aggregator :{:?}",
+ self.name()
+ );
+ }
+ Ok(None)
+ }
+
+ /// Gets the order sensitivity of the UDF. See
[`AggregateOrderSensitivity`]
+ /// for possible options.
+ fn order_sensitivity(&self) -> AggregateOrderSensitivity {
+ // We have hard ordering requirements by default, meaning that order
+ // sensitive UDFs need their input orderings to satisfy their ordering
+ // requirements to generate correct results.
+ AggregateOrderSensitivity::HardRequirement
+ }
+
/// Optionally apply per-UDaF simplification / rewrite rules.
///
/// This can be used to apply function specific simplification rules during
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 0d25a3443f..e5b7bddab8 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -1217,6 +1217,37 @@ pub fn format_state_name(name: &str, state_name: &str)
-> String {
format!("{name}[{state_name}]")
}
+/// Represents the sensitivity of an aggregate expression to ordering.
+#[derive(Debug, PartialEq, Eq, Clone, Copy)]
+pub enum AggregateOrderSensitivity {
+ /// Indicates that the aggregate expression is insensitive to ordering.
+ /// Ordering at the input is not important for the result of the
aggregator.
+ Insensitive,
+ /// Indicates that the aggregate expression has a hard requirement on
ordering.
+ /// The aggregator can not produce a correct result unless its ordering
+ /// requirement is satisfied.
+ HardRequirement,
+ /// Indicates that ordering is beneficial for the aggregate expression in
terms
+ /// of evaluation efficiency. The aggregator can produce its result
efficiently
+ /// when its required ordering is satisfied; however, it can still produce
the
+ /// correct result (albeit less efficiently) when its required ordering is
not met.
+ Beneficial,
+}
+
+impl AggregateOrderSensitivity {
+ pub fn is_insensitive(&self) -> bool {
+ self.eq(&AggregateOrderSensitivity::Insensitive)
+ }
+
+ pub fn is_beneficial(&self) -> bool {
+ self.eq(&AggregateOrderSensitivity::Beneficial)
+ }
+
+ pub fn hard_requires(&self) -> bool {
+ self.eq(&AggregateOrderSensitivity::HardRequirement)
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/datafusion/functions-aggregate/src/first_last.rs
b/datafusion/functions-aggregate/src/first_last.rs
index 5d3d483440..fd4e219710 100644
--- a/datafusion/functions-aggregate/src/first_last.rs
+++ b/datafusion/functions-aggregate/src/first_last.rs
@@ -17,8 +17,12 @@
//! Defines the FIRST_VALUE/LAST_VALUE aggregations.
+use std::any::Any;
+use std::fmt::Debug;
+use std::sync::Arc;
+
use arrow::array::{ArrayRef, AsArray, BooleanArray};
-use arrow::compute::{self, lexsort_to_indices, SortColumn, SortOptions};
+use arrow::compute::{self, lexsort_to_indices, SortColumn};
use arrow::datatypes::{DataType, Field};
use datafusion_common::utils::{compare_rows, get_arrayref_at_indices,
get_row_at_idx};
use datafusion_common::{
@@ -26,23 +30,15 @@ use datafusion_common::{
};
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::type_coercion::aggregates::NUMERICS;
-use datafusion_expr::utils::format_state_name;
+use datafusion_expr::utils::{format_state_name, AggregateOrderSensitivity};
use datafusion_expr::{
- Accumulator, AggregateUDFImpl, ArrayFunctionSignature, Expr, Signature,
- TypeSignature, Volatility,
+ Accumulator, AggregateUDFImpl, ArrayFunctionSignature, Signature,
TypeSignature,
+ Volatility,
};
-use datafusion_physical_expr_common::aggregate::utils::{
- down_cast_any_ref, get_sort_options, ordering_fields,
+use datafusion_physical_expr_common::aggregate::utils::get_sort_options;
+use datafusion_physical_expr_common::sort_expr::{
+ limited_convert_logical_sort_exprs_to_physical, LexOrdering,
PhysicalSortExpr,
};
-use datafusion_physical_expr_common::aggregate::AggregateExpr;
-use datafusion_physical_expr_common::expressions;
-use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
-use datafusion_physical_expr_common::sort_expr::{LexOrdering,
PhysicalSortExpr};
-use datafusion_physical_expr_common::utils::reverse_order_bys;
-
-use std::any::Any;
-use std::fmt::Debug;
-use std::sync::Arc;
make_udaf_expr_and_func!(
FirstValue,
@@ -54,6 +50,7 @@ make_udaf_expr_and_func!(
pub struct FirstValue {
signature: Signature,
aliases: Vec<String>,
+ requirement_satisfied: bool,
}
impl Debug for FirstValue {
@@ -75,7 +72,7 @@ impl Default for FirstValue {
impl FirstValue {
pub fn new() -> Self {
Self {
- aliases: vec![String::from("FIRST_VALUE")],
+ aliases: vec![String::from("FIRST_VALUE"),
String::from("first_value")],
signature: Signature::one_of(
vec![
// TODO: we can introduce more strict signature that only
numeric of array types are allowed
@@ -84,8 +81,14 @@ impl FirstValue {
],
Volatility::Immutable,
),
+ requirement_satisfied: false,
}
}
+
+ fn with_requirement_satisfied(mut self, requirement_satisfied: bool) ->
Self {
+ self.requirement_satisfied = requirement_satisfied;
+ self
+ }
}
impl AggregateUDFImpl for FirstValue {
@@ -106,37 +109,19 @@ impl AggregateUDFImpl for FirstValue {
}
fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn
Accumulator>> {
- let mut all_sort_orders = vec![];
-
- // Construct PhysicalSortExpr objects from Expr objects:
- let mut sort_exprs = vec![];
- for expr in acc_args.sort_exprs {
- if let Expr::Sort(sort) = expr {
- if let Expr::Column(col) = sort.expr.as_ref() {
- let name = &col.name;
- let e = expressions::column::col(name, acc_args.schema)?;
- sort_exprs.push(PhysicalSortExpr {
- expr: e,
- options: SortOptions {
- descending: !sort.asc,
- nulls_first: sort.nulls_first,
- },
- });
- }
- }
- }
- if !sort_exprs.is_empty() {
- all_sort_orders.extend(sort_exprs);
- }
-
- let ordering_req = all_sort_orders;
+ let ordering_req = limited_convert_logical_sort_exprs_to_physical(
+ acc_args.sort_exprs,
+ acc_args.schema,
+ )?;
let ordering_dtypes = ordering_req
.iter()
.map(|e| e.expr.data_type(acc_args.schema))
.collect::<Result<Vec<_>>>()?;
- let requirement_satisfied = ordering_req.is_empty();
+ // When requirement is empty, or it is signalled by outside caller that
+ // the ordering requirement is/will be satisfied.
+ let requirement_satisfied = ordering_req.is_empty() ||
self.requirement_satisfied;
FirstValueAccumulator::try_new(
acc_args.data_type,
@@ -161,6 +146,23 @@ impl AggregateUDFImpl for FirstValue {
fn aliases(&self) -> &[String] {
&self.aliases
}
+
+ fn with_beneficial_ordering(
+ self: Arc<Self>,
+ beneficial_ordering: bool,
+ ) -> Result<Option<Arc<dyn AggregateUDFImpl>>> {
+ Ok(Some(Arc::new(
+ FirstValue::new().with_requirement_satisfied(beneficial_ordering),
+ )))
+ }
+
+ fn order_sensitivity(&self) -> AggregateOrderSensitivity {
+ AggregateOrderSensitivity::Beneficial
+ }
+
+ fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF {
+ datafusion_expr::ReversedUDAF::Reversed(last_value_udaf().inner())
+ }
}
#[derive(Debug)]
@@ -338,355 +340,133 @@ impl Accumulator for FirstValueAccumulator {
}
}
-/// TO BE DEPRECATED: Builtin FIRST_VALUE physical aggregate expression will
be replaced by udf in the future
-#[derive(Debug, Clone)]
-pub struct FirstValuePhysicalExpr {
- name: String,
- input_data_type: DataType,
- order_by_data_types: Vec<DataType>,
- expr: Arc<dyn PhysicalExpr>,
- ordering_req: LexOrdering,
+make_udaf_expr_and_func!(
+ LastValue,
+ last_value,
+ "Returns the last value in a group of values.",
+ last_value_udaf
+);
+
+pub struct LastValue {
+ signature: Signature,
+ aliases: Vec<String>,
requirement_satisfied: bool,
- ignore_nulls: bool,
- state_fields: Vec<Field>,
}
-impl FirstValuePhysicalExpr {
- /// Creates a new FIRST_VALUE aggregation function.
- pub fn new(
- expr: Arc<dyn PhysicalExpr>,
- name: impl Into<String>,
- input_data_type: DataType,
- ordering_req: LexOrdering,
- order_by_data_types: Vec<DataType>,
- state_fields: Vec<Field>,
- ) -> Self {
- let requirement_satisfied = ordering_req.is_empty();
- Self {
- name: name.into(),
- input_data_type,
- order_by_data_types,
- expr,
- ordering_req,
- requirement_satisfied,
- ignore_nulls: false,
- state_fields,
- }
- }
-
- pub fn with_ignore_nulls(mut self, ignore_nulls: bool) -> Self {
- self.ignore_nulls = ignore_nulls;
- self
- }
-
- /// Returns the name of the aggregate expression.
- pub fn name(&self) -> &str {
- &self.name
- }
-
- /// Returns the input data type of the aggregate expression.
- pub fn input_data_type(&self) -> &DataType {
- &self.input_data_type
- }
-
- /// Returns the data types of the order-by columns.
- pub fn order_by_data_types(&self) -> &Vec<DataType> {
- &self.order_by_data_types
+impl Debug for LastValue {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ f.debug_struct("LastValue")
+ .field("name", &self.name())
+ .field("signature", &self.signature)
+ .field("accumulator", &"<FUNC>")
+ .finish()
}
+}
- /// Returns the expression associated with the aggregate function.
- pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
- &self.expr
+impl Default for LastValue {
+ fn default() -> Self {
+ Self::new()
}
+}
- /// Returns the lexical ordering requirements of the aggregate expression.
- pub fn ordering_req(&self) -> &LexOrdering {
- &self.ordering_req
+impl LastValue {
+ pub fn new() -> Self {
+ Self {
+ aliases: vec![String::from("LAST_VALUE"),
String::from("last_value")],
+ signature: Signature::one_of(
+ vec![
+ // TODO: we can introduce more strict signature that only
numeric of array types are allowed
+
TypeSignature::ArraySignature(ArrayFunctionSignature::Array),
+ TypeSignature::Uniform(1, NUMERICS.to_vec()),
+ ],
+ Volatility::Immutable,
+ ),
+ requirement_satisfied: false,
+ }
}
- pub fn with_requirement_satisfied(mut self, requirement_satisfied: bool)
-> Self {
+ fn with_requirement_satisfied(mut self, requirement_satisfied: bool) ->
Self {
self.requirement_satisfied = requirement_satisfied;
self
}
-
- pub fn convert_to_last(self) -> LastValuePhysicalExpr {
- let mut name = format!("LAST{}", &self.name[5..]);
- replace_order_by_clause(&mut name);
-
- let FirstValuePhysicalExpr {
- expr,
- input_data_type,
- ordering_req,
- order_by_data_types,
- ..
- } = self;
- LastValuePhysicalExpr::new(
- expr,
- name,
- input_data_type,
- reverse_order_bys(&ordering_req),
- order_by_data_types,
- )
- }
}
-impl AggregateExpr for FirstValuePhysicalExpr {
- /// Return a reference to Any that can be used for downcasting
+impl AggregateUDFImpl for LastValue {
fn as_any(&self) -> &dyn Any {
self
}
- fn field(&self) -> Result<Field> {
- Ok(Field::new(&self.name, self.input_data_type.clone(), true))
- }
-
- fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- FirstValueAccumulator::try_new(
- &self.input_data_type,
- &self.order_by_data_types,
- self.ordering_req.clone(),
- self.ignore_nulls,
- )
- .map(|acc| {
-
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
- })
- }
-
- fn state_fields(&self) -> Result<Vec<Field>> {
- if !self.state_fields.is_empty() {
- return Ok(self.state_fields.clone());
- }
-
- let mut fields = vec![Field::new(
- format_state_name(&self.name, "first_value"),
- self.input_data_type.clone(),
- true,
- )];
- fields.extend(ordering_fields(
- &self.ordering_req,
- &self.order_by_data_types,
- ));
- fields.push(Field::new(
- format_state_name(&self.name, "is_set"),
- DataType::Boolean,
- true,
- ));
- Ok(fields)
- }
-
- fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![self.expr.clone()]
- }
-
- fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
- (!self.ordering_req.is_empty()).then_some(&self.ordering_req)
- }
-
fn name(&self) -> &str {
- &self.name
- }
-
- fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
- Some(Arc::new(self.clone().convert_to_last()))
- }
-
- fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- FirstValueAccumulator::try_new(
- &self.input_data_type,
- &self.order_by_data_types,
- self.ordering_req.clone(),
- self.ignore_nulls,
- )
- .map(|acc| {
-
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
- })
- }
-}
-
-impl PartialEq<dyn Any> for FirstValuePhysicalExpr {
- fn eq(&self, other: &dyn Any) -> bool {
- down_cast_any_ref(other)
- .downcast_ref::<Self>()
- .map(|x| {
- self.name == x.name
- && self.input_data_type == x.input_data_type
- && self.order_by_data_types == x.order_by_data_types
- && self.expr.eq(&x.expr)
- })
- .unwrap_or(false)
- }
-}
-
-/// TO BE DEPRECATED: Builtin LAST_VALUE physical aggregate expression will be
replaced by udf in the future
-#[derive(Debug, Clone)]
-pub struct LastValuePhysicalExpr {
- name: String,
- input_data_type: DataType,
- order_by_data_types: Vec<DataType>,
- expr: Arc<dyn PhysicalExpr>,
- ordering_req: LexOrdering,
- requirement_satisfied: bool,
- ignore_nulls: bool,
-}
-
-impl LastValuePhysicalExpr {
- /// Creates a new LAST_VALUE aggregation function.
- pub fn new(
- expr: Arc<dyn PhysicalExpr>,
- name: impl Into<String>,
- input_data_type: DataType,
- ordering_req: LexOrdering,
- order_by_data_types: Vec<DataType>,
- ) -> Self {
- let requirement_satisfied = ordering_req.is_empty();
- Self {
- name: name.into(),
- input_data_type,
- order_by_data_types,
- expr,
- ordering_req,
- requirement_satisfied,
- ignore_nulls: false,
- }
- }
-
- pub fn with_ignore_nulls(mut self, ignore_nulls: bool) -> Self {
- self.ignore_nulls = ignore_nulls;
- self
- }
-
- /// Returns the name of the aggregate expression.
- pub fn name(&self) -> &str {
- &self.name
+ "LAST_VALUE"
}
- /// Returns the input data type of the aggregate expression.
- pub fn input_data_type(&self) -> &DataType {
- &self.input_data_type
- }
-
- /// Returns the data types of the order-by columns.
- pub fn order_by_data_types(&self) -> &Vec<DataType> {
- &self.order_by_data_types
- }
-
- /// Returns the expression associated with the aggregate function.
- pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
- &self.expr
- }
-
- /// Returns the lexical ordering requirements of the aggregate expression.
- pub fn ordering_req(&self) -> &LexOrdering {
- &self.ordering_req
+ fn signature(&self) -> &Signature {
+ &self.signature
}
- pub fn with_requirement_satisfied(mut self, requirement_satisfied: bool)
-> Self {
- self.requirement_satisfied = requirement_satisfied;
- self
+ fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+ Ok(arg_types[0].clone())
}
- pub fn convert_to_first(self) -> FirstValuePhysicalExpr {
- let mut name = format!("FIRST{}", &self.name[4..]);
- replace_order_by_clause(&mut name);
-
- let LastValuePhysicalExpr {
- expr,
- input_data_type,
- ordering_req,
- order_by_data_types,
- ..
- } = self;
- FirstValuePhysicalExpr::new(
- expr,
- name,
- input_data_type,
- reverse_order_bys(&ordering_req),
- order_by_data_types,
- vec![],
- )
- }
-}
+ fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn
Accumulator>> {
+ let ordering_req = limited_convert_logical_sort_exprs_to_physical(
+ acc_args.sort_exprs,
+ acc_args.schema,
+ )?;
-impl AggregateExpr for LastValuePhysicalExpr {
- /// Return a reference to Any that can be used for downcasting
- fn as_any(&self) -> &dyn Any {
- self
- }
+ let ordering_dtypes = ordering_req
+ .iter()
+ .map(|e| e.expr.data_type(acc_args.schema))
+ .collect::<Result<Vec<_>>>()?;
- fn field(&self) -> Result<Field> {
- Ok(Field::new(&self.name, self.input_data_type.clone(), true))
- }
+ let requirement_satisfied = ordering_req.is_empty() ||
self.requirement_satisfied;
- fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
LastValueAccumulator::try_new(
- &self.input_data_type,
- &self.order_by_data_types,
- self.ordering_req.clone(),
- self.ignore_nulls,
+ acc_args.data_type,
+ &ordering_dtypes,
+ ordering_req,
+ acc_args.ignore_nulls,
)
- .map(|acc| {
-
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
- })
+ .map(|acc|
Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _)
}
- fn state_fields(&self) -> Result<Vec<Field>> {
+ fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
+ let StateFieldsArgs {
+ name,
+ input_type,
+ return_type: _,
+ ordering_fields,
+ is_distinct: _,
+ } = args;
let mut fields = vec![Field::new(
- format_state_name(&self.name, "last_value"),
- self.input_data_type.clone(),
+ format_state_name(name, "last_value"),
+ input_type.clone(),
true,
)];
- fields.extend(ordering_fields(
- &self.ordering_req,
- &self.order_by_data_types,
- ));
- fields.push(Field::new(
- format_state_name(&self.name, "is_set"),
- DataType::Boolean,
- true,
- ));
+ fields.extend(ordering_fields.to_vec());
+ fields.push(Field::new("is_set", DataType::Boolean, true));
Ok(fields)
}
- fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![self.expr.clone()]
- }
-
- fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
- (!self.ordering_req.is_empty()).then_some(&self.ordering_req)
- }
-
- fn name(&self) -> &str {
- &self.name
+ fn aliases(&self) -> &[String] {
+ &self.aliases
}
- fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
- Some(Arc::new(self.clone().convert_to_first()))
+ fn with_beneficial_ordering(
+ self: Arc<Self>,
+ beneficial_ordering: bool,
+ ) -> Result<Option<Arc<dyn AggregateUDFImpl>>> {
+ Ok(Some(Arc::new(
+ LastValue::new().with_requirement_satisfied(beneficial_ordering),
+ )))
}
- fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- LastValueAccumulator::try_new(
- &self.input_data_type,
- &self.order_by_data_types,
- self.ordering_req.clone(),
- self.ignore_nulls,
- )
- .map(|acc| {
-
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
- })
+ fn order_sensitivity(&self) -> AggregateOrderSensitivity {
+ AggregateOrderSensitivity::Beneficial
}
-}
-impl PartialEq<dyn Any> for LastValuePhysicalExpr {
- fn eq(&self, other: &dyn Any) -> bool {
- down_cast_any_ref(other)
- .downcast_ref::<Self>()
- .map(|x| {
- self.name == x.name
- && self.input_data_type == x.input_data_type
- && self.order_by_data_types == x.order_by_data_types
- && self.expr.eq(&x.expr)
- })
- .unwrap_or(false)
+ fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF {
+ datafusion_expr::ReversedUDAF::Reversed(first_value_udaf().inner())
}
}
@@ -896,31 +676,6 @@ fn convert_to_sort_cols(
.collect::<Vec<_>>()
}
-fn replace_order_by_clause(order_by: &mut String) {
- let suffixes = [
- (" DESC NULLS FIRST]", " ASC NULLS LAST]"),
- (" ASC NULLS FIRST]", " DESC NULLS LAST]"),
- (" DESC NULLS LAST]", " ASC NULLS FIRST]"),
- (" ASC NULLS LAST]", " DESC NULLS FIRST]"),
- ];
-
- if let Some(start) = order_by.find("ORDER BY [") {
- if let Some(end) = order_by[start..].find(']') {
- let order_by_start = start + 9;
- let order_by_end = start + end;
-
- let column_order = &order_by[order_by_start..=order_by_end];
- for &(suffix, replacement) in &suffixes {
- if column_order.ends_with(suffix) {
- let new_order = column_order.replace(suffix, replacement);
- order_by.replace_range(order_by_start..=order_by_end,
&new_order);
- break;
- }
- }
- }
- }
-}
-
#[cfg(test)]
mod tests {
use arrow::array::Int64Array;
diff --git a/datafusion/functions-aggregate/src/lib.rs
b/datafusion/functions-aggregate/src/lib.rs
index 3e80174eec..ac40a90aae 100644
--- a/datafusion/functions-aggregate/src/lib.rs
+++ b/datafusion/functions-aggregate/src/lib.rs
@@ -76,6 +76,7 @@ pub mod expr_fn {
pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> {
let functions: Vec<Arc<AggregateUDF>> = vec![
first_last::first_value_udaf(),
+ first_last::last_value_udaf(),
covariance::covar_samp_udaf(),
covariance::covar_pop_udaf(),
median::median_udaf(),
diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml
index 67d5c9b23b..59c0b476c7 100644
--- a/datafusion/optimizer/Cargo.toml
+++ b/datafusion/optimizer/Cargo.toml
@@ -45,6 +45,7 @@ async-trait = { workspace = true }
chrono = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
+datafusion-functions-aggregate = { workspace = true }
datafusion-physical-expr = { workspace = true }
hashbrown = { workspace = true }
indexmap = { workspace = true }
diff --git a/datafusion/optimizer/src/replace_distinct_aggregate.rs
b/datafusion/optimizer/src/replace_distinct_aggregate.rs
index 404f054cb9..c232935f9e 100644
--- a/datafusion/optimizer/src/replace_distinct_aggregate.rs
+++ b/datafusion/optimizer/src/replace_distinct_aggregate.rs
@@ -23,11 +23,9 @@ use datafusion_common::tree_node::Transformed;
use datafusion_common::{internal_err, Column, Result};
use datafusion_expr::expr_rewriter::normalize_cols;
use datafusion_expr::utils::expand_wildcard;
-use datafusion_expr::{
- aggregate_function::AggregateFunction as AggregateFunctionFunc, col,
- expr::AggregateFunction, LogicalPlanBuilder,
-};
+use datafusion_expr::{col, LogicalPlanBuilder};
use datafusion_expr::{Aggregate, Distinct, DistinctOn, Expr, LogicalPlan};
+use datafusion_functions_aggregate::first_last::first_value;
/// Optimizer that replaces logical [[Distinct]] with a logical [[Aggregate]]
///
@@ -99,17 +97,7 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
// Construct the aggregation expression to be used to fetch
the selected expressions.
let aggr_expr = select_expr
.into_iter()
- .map(|e| {
- Expr::AggregateFunction(AggregateFunction::new(
- AggregateFunctionFunc::FirstValue,
- vec![e],
- false,
- None,
- sort_expr.clone(),
- None,
- ))
- })
- .collect::<Vec<Expr>>();
+ .map(|e| first_value(vec![e], false, None,
sort_expr.clone(), None));
let aggr_expr = normalize_cols(aggr_expr, input.as_ref())?;
let group_expr = normalize_cols(on_expr, input.as_ref())?;
diff --git a/datafusion/physical-expr-common/src/aggregate/mod.rs
b/datafusion/physical-expr-common/src/aggregate/mod.rs
index 4e9414bc5a..503e2d8f97 100644
--- a/datafusion/physical-expr-common/src/aggregate/mod.rs
+++ b/datafusion/physical-expr-common/src/aggregate/mod.rs
@@ -19,20 +19,22 @@ pub mod groups_accumulator;
pub mod stats;
pub mod utils;
-use arrow::datatypes::{DataType, Field, Schema};
-use datafusion_common::{not_impl_err, Result};
-use datafusion_expr::function::StateFieldsArgs;
-use datafusion_expr::type_coercion::aggregates::check_arg_count;
-use datafusion_expr::{
- function::AccumulatorArgs, Accumulator, AggregateUDF, Expr,
GroupsAccumulator,
-};
use std::fmt::Debug;
use std::{any::Any, sync::Arc};
+use self::utils::{down_cast_any_ref, ordering_fields};
use crate::physical_expr::PhysicalExpr;
use crate::sort_expr::{LexOrdering, PhysicalSortExpr};
+use crate::utils::reverse_order_bys;
-use self::utils::{down_cast_any_ref, ordering_fields};
+use arrow::datatypes::{DataType, Field, Schema};
+use datafusion_common::{exec_err, not_impl_err, Result};
+use datafusion_expr::function::StateFieldsArgs;
+use datafusion_expr::type_coercion::aggregates::check_arg_count;
+use datafusion_expr::utils::AggregateOrderSensitivity;
+use datafusion_expr::{
+ function::AccumulatorArgs, Accumulator, AggregateUDF, Expr,
GroupsAccumulator,
+};
/// Creates a physical expression of the UDAF, that includes all necessary
type coercion.
/// This function errors when `args`' can't be coerced to a valid argument
type of the UDAF.
@@ -47,6 +49,7 @@ pub fn create_aggregate_expr(
ignore_nulls: bool,
is_distinct: bool,
) -> Result<Arc<dyn AggregateExpr>> {
+ debug_assert_eq!(sort_exprs.len(), ordering_req.len());
let input_exprs_types = input_phy_exprs
.iter()
.map(|arg| arg.data_type(schema))
@@ -117,6 +120,37 @@ pub trait AggregateExpr: Send + Sync + Debug +
PartialEq<dyn Any> {
None
}
+ /// Indicates whether aggregator can produce the correct result with any
+ /// arbitrary input ordering. By default, we assume that aggregate
expressions
+ /// are order insensitive.
+ fn order_sensitivity(&self) -> AggregateOrderSensitivity {
+ AggregateOrderSensitivity::Insensitive
+ }
+
+ /// Sets the indicator whether ordering requirements of the aggregator is
+ /// satisfied by its input. If this is not the case, aggregators with order
+ /// sensitivity `AggregateOrderSensitivity::Beneficial` can still produce
+ /// the correct result with possibly more work internally.
+ ///
+ /// # Returns
+ ///
+ /// Returns `Ok(Some(updated_expr))` if the process completes successfully.
+ /// If the expression can benefit from existing input ordering, but does
+ /// not implement the method, returns an error. Order insensitive and hard
+ /// requirement aggregators return `Ok(None)`.
+ fn with_beneficial_ordering(
+ self: Arc<Self>,
+ _requirement_satisfied: bool,
+ ) -> Result<Option<Arc<dyn AggregateExpr>>> {
+ if self.order_bys().is_some() &&
self.order_sensitivity().is_beneficial() {
+ return exec_err!(
+ "Should implement with satisfied for aggregator :{:?}",
+ self.name()
+ );
+ }
+ Ok(None)
+ }
+
/// Human readable name such as `"MIN(c2)"`. The default
/// implementation returns placeholder text.
fn name(&self) -> &str {
@@ -305,6 +339,74 @@ impl AggregateExpr for AggregateFunctionExpr {
fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
(!self.ordering_req.is_empty()).then_some(&self.ordering_req)
}
+
+ fn order_sensitivity(&self) -> AggregateOrderSensitivity {
+ if !self.ordering_req.is_empty() {
+ // If there is requirement, use the sensitivity of the
implementation
+ self.fun.order_sensitivity()
+ } else {
+ // If no requirement, aggregator is order insensitive
+ AggregateOrderSensitivity::Insensitive
+ }
+ }
+
+ fn with_beneficial_ordering(
+ self: Arc<Self>,
+ beneficial_ordering: bool,
+ ) -> Result<Option<Arc<dyn AggregateExpr>>> {
+ let Some(updated_fn) = self
+ .fun
+ .clone()
+ .with_beneficial_ordering(beneficial_ordering)?
+ else {
+ return Ok(None);
+ };
+ create_aggregate_expr(
+ &updated_fn,
+ &self.args,
+ &self.sort_exprs,
+ &self.ordering_req,
+ &self.schema,
+ self.name(),
+ self.ignore_nulls,
+ self.is_distinct,
+ )
+ .map(Some)
+ }
+
+ fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
+ if let Some(reverse_udf) = self.fun.reverse_udf() {
+ let reverse_ordering_req = reverse_order_bys(&self.ordering_req);
+ let reverse_sort_exprs = self
+ .sort_exprs
+ .iter()
+ .map(|e| {
+ if let Expr::Sort(s) = e {
+ Expr::Sort(s.reverse())
+ } else {
+ // Expects to receive `Expr::Sort`.
+ unreachable!()
+ }
+ })
+ .collect::<Vec<_>>();
+ let mut name = self.name().to_string();
+ replace_order_by_clause(&mut name);
+ replace_fn_name_clause(&mut name, self.fun.name(),
reverse_udf.name());
+ let reverse_aggr = create_aggregate_expr(
+ &reverse_udf,
+ &self.args,
+ &reverse_sort_exprs,
+ &reverse_ordering_req,
+ &self.schema,
+ name,
+ self.ignore_nulls,
+ self.is_distinct,
+ )
+ .unwrap();
+ return Some(reverse_aggr);
+ }
+ None
+ }
}
impl PartialEq<dyn Any> for AggregateFunctionExpr {
@@ -325,3 +427,32 @@ impl PartialEq<dyn Any> for AggregateFunctionExpr {
.unwrap_or(false)
}
}
+
+fn replace_order_by_clause(order_by: &mut String) {
+ let suffixes = [
+ (" DESC NULLS FIRST]", " ASC NULLS LAST]"),
+ (" ASC NULLS FIRST]", " DESC NULLS LAST]"),
+ (" DESC NULLS LAST]", " ASC NULLS FIRST]"),
+ (" ASC NULLS LAST]", " DESC NULLS FIRST]"),
+ ];
+
+ if let Some(start) = order_by.find("ORDER BY [") {
+ if let Some(end) = order_by[start..].find(']') {
+ let order_by_start = start + 9;
+ let order_by_end = start + end;
+
+ let column_order = &order_by[order_by_start..=order_by_end];
+ for (suffix, replacement) in suffixes {
+ if column_order.ends_with(suffix) {
+ let new_order = column_order.replace(suffix, replacement);
+ order_by.replace_range(order_by_start..=order_by_end,
&new_order);
+ break;
+ }
+ }
+ }
+ }
+}
+
+fn replace_fn_name_clause(aggr_name: &mut String, fn_name_old: &str,
fn_name_new: &str) {
+ *aggr_name = aggr_name.replace(fn_name_old, fn_name_new);
+}
diff --git a/datafusion/physical-expr/src/expressions/cast.rs
b/datafusion/physical-expr-common/src/expressions/cast.rs
similarity index 99%
rename from datafusion/physical-expr/src/expressions/cast.rs
rename to datafusion/physical-expr-common/src/expressions/cast.rs
index 4f940a792b..8ef3d16f63 100644
--- a/datafusion/physical-expr/src/expressions/cast.rs
+++ b/datafusion/physical-expr-common/src/expressions/cast.rs
@@ -20,8 +20,7 @@ use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
-use crate::physical_expr::down_cast_any_ref;
-use crate::PhysicalExpr;
+use crate::physical_expr::{down_cast_any_ref, PhysicalExpr};
use arrow::compute::{can_cast_types, CastOptions};
use arrow::datatypes::{DataType, DataType::*, Schema};
@@ -229,7 +228,8 @@ pub fn cast(
#[cfg(test)]
mod tests {
use super::*;
- use crate::expressions::col;
+
+ use crate::expressions::column::col;
use arrow::{
array::{
diff --git a/datafusion/physical-expr-common/src/expressions/mod.rs
b/datafusion/physical-expr-common/src/expressions/mod.rs
index d102422081..4b5965e164 100644
--- a/datafusion/physical-expr-common/src/expressions/mod.rs
+++ b/datafusion/physical-expr-common/src/expressions/mod.rs
@@ -15,4 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+mod cast;
pub mod column;
+
+pub use cast::{cast, cast_with_options, CastExpr};
diff --git a/datafusion/physical-expr-common/src/sort_expr.rs
b/datafusion/physical-expr-common/src/sort_expr.rs
index 1e1187212d..f637355519 100644
--- a/datafusion/physical-expr-common/src/sort_expr.rs
+++ b/datafusion/physical-expr-common/src/sort_expr.rs
@@ -21,13 +21,14 @@ use std::fmt::Display;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
+use crate::physical_expr::PhysicalExpr;
+use crate::utils::limited_convert_logical_expr_to_physical_expr;
+
use arrow::compute::kernels::sort::{SortColumn, SortOptions};
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
-use datafusion_common::Result;
-use datafusion_expr::ColumnarValue;
-
-use crate::physical_expr::PhysicalExpr;
+use datafusion_common::{exec_err, Result};
+use datafusion_expr::{ColumnarValue, Expr};
/// Represents Sort operation for a column in a RecordBatch
#[derive(Clone, Debug)]
@@ -267,3 +268,29 @@ pub type LexRequirement = Vec<PhysicalSortRequirement>;
///`LexRequirementRef` is an alias for the type &`[PhysicalSortRequirement]`,
which
/// represents a reference to a lexicographical ordering requirement.
pub type LexRequirementRef<'a> = &'a [PhysicalSortRequirement];
+
+/// Converts each [`Expr::Sort`] into a corresponding [`PhysicalSortExpr`].
+/// Returns an error if the given logical expression is not a [`Expr::Sort`].
+pub fn limited_convert_logical_sort_exprs_to_physical(
+ exprs: &[Expr],
+ schema: &Schema,
+) -> Result<Vec<PhysicalSortExpr>> {
+ // Construct PhysicalSortExpr objects from Expr objects:
+ let mut sort_exprs = vec![];
+ for expr in exprs {
+ let Expr::Sort(sort) = expr else {
+ return exec_err!("Expects to receive sort expression");
+ };
+ sort_exprs.push(PhysicalSortExpr {
+ expr: limited_convert_logical_expr_to_physical_expr(
+ sort.expr.as_ref(),
+ schema,
+ )?,
+ options: SortOptions {
+ descending: !sort.asc,
+ nulls_first: sort.nulls_first,
+ },
+ });
+ }
+ Ok(sort_exprs)
+}
diff --git a/datafusion/physical-expr-common/src/utils.rs
b/datafusion/physical-expr-common/src/utils.rs
index 487aba945a..f661400fcb 100644
--- a/datafusion/physical-expr-common/src/utils.rs
+++ b/datafusion/physical-expr-common/src/utils.rs
@@ -17,14 +17,17 @@
use std::sync::Arc;
-use crate::{
- physical_expr::PhysicalExpr, sort_expr::PhysicalSortExpr,
tree_node::ExprContext,
-};
+use crate::expressions::{self, CastExpr};
+use crate::physical_expr::PhysicalExpr;
+use crate::sort_expr::PhysicalSortExpr;
+use crate::tree_node::ExprContext;
use arrow::array::{make_array, Array, ArrayRef, BooleanArray,
MutableArrayData};
use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
-use datafusion_common::Result;
+use arrow::datatypes::Schema;
+use datafusion_common::{exec_err, Result};
use datafusion_expr::sort_properties::ExprProperties;
+use datafusion_expr::Expr;
/// Represents a [`PhysicalExpr`] node with associated properties (order and
/// range) in a context where properties are tracked.
@@ -105,15 +108,41 @@ pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr])
-> Vec<PhysicalSortExpr
.collect()
}
+/// Converts `datafusion_expr::Expr` into corresponding `Arc<dyn
PhysicalExpr>`.
+/// If conversion is not supported yet, returns Error.
+pub fn limited_convert_logical_expr_to_physical_expr(
+ expr: &Expr,
+ schema: &Schema,
+) -> Result<Arc<dyn PhysicalExpr>> {
+ match expr {
+ Expr::Column(col) => expressions::column::col(&col.name, schema),
+ Expr::Cast(cast_expr) => Ok(Arc::new(CastExpr::new(
+ limited_convert_logical_expr_to_physical_expr(
+ cast_expr.expr.as_ref(),
+ schema,
+ )?,
+ cast_expr.data_type.clone(),
+ None,
+ ))),
+ Expr::Alias(alias_expr) =>
limited_convert_logical_expr_to_physical_expr(
+ alias_expr.expr.as_ref(),
+ schema,
+ ),
+ _ => exec_err!(
+ "Unsupported expression: {expr} for conversion to Arc<dyn
PhysicalExpr>"
+ ),
+ }
+}
+
#[cfg(test)]
mod tests {
use std::sync::Arc;
+ use super::*;
+
use arrow::array::Int32Array;
use datafusion_common::cast::{as_boolean_array, as_int32_array};
- use super::*;
-
#[test]
fn scatter_int() -> Result<()> {
let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
index 7e2c7bb271..837a9d5511 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
@@ -30,15 +30,13 @@ use crate::{
reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr,
PhysicalSortExpr,
};
-use arrow::array::{Array, ArrayRef};
use arrow::datatypes::{DataType, Field};
use arrow_array::cast::AsArray;
-use arrow_array::{new_empty_array, StructArray};
+use arrow_array::{new_empty_array, Array, ArrayRef, StructArray};
use arrow_schema::{Fields, SortOptions};
-
-use datafusion_common::utils::array_into_list_array;
-use datafusion_common::utils::{compare_rows, get_row_at_idx};
+use datafusion_common::utils::{array_into_list_array, compare_rows,
get_row_at_idx};
use datafusion_common::{exec_err, Result, ScalarValue};
+use datafusion_expr::utils::AggregateOrderSensitivity;
use datafusion_expr::Accumulator;
/// Expression for a `ARRAY_AGG(... ORDER BY ..., ...)` aggregation. In a multi
@@ -131,6 +129,10 @@ impl AggregateExpr for OrderSensitiveArrayAgg {
(!self.ordering_req.is_empty()).then_some(&self.ordering_req)
}
+ fn order_sensitivity(&self) -> AggregateOrderSensitivity {
+ AggregateOrderSensitivity::HardRequirement
+ }
+
fn name(&self) -> &str {
&self.name
}
diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs
b/datafusion/physical-expr/src/aggregate/build_in.rs
index 18252ea370..e100089954 100644
--- a/datafusion/physical-expr/src/aggregate/build_in.rs
+++ b/datafusion/physical-expr/src/aggregate/build_in.rs
@@ -28,15 +28,14 @@
use std::sync::Arc;
-use arrow::datatypes::Schema;
-
-use datafusion_common::{exec_err, not_impl_err, Result};
-use datafusion_expr::AggregateFunction;
-
use crate::aggregate::regr::RegrType;
use crate::expressions::{self, Literal};
use crate::{AggregateExpr, PhysicalExpr, PhysicalSortExpr};
+use arrow::datatypes::Schema;
+use datafusion_common::{exec_err, not_impl_err, Result};
+use datafusion_expr::AggregateFunction;
+
/// Create a physical aggregation expression.
/// This function errors when `input_phy_exprs`' can't be coerced to a valid
argument type of the aggregation function.
pub fn create_aggregate_expr(
@@ -46,7 +45,7 @@ pub fn create_aggregate_expr(
ordering_req: &[PhysicalSortExpr],
input_schema: &Schema,
name: impl Into<String>,
- ignore_nulls: bool,
+ _ignore_nulls: bool,
) -> Result<Arc<dyn AggregateExpr>> {
let name = name.into();
// get the result data type for this aggregate function
@@ -332,27 +331,6 @@ pub fn create_aggregate_expr(
"APPROX_MEDIAN(DISTINCT) aggregations are not available"
);
}
- (AggregateFunction::FirstValue, _) => Arc::new(
- expressions::FirstValue::new(
- input_phy_exprs[0].clone(),
- name,
- input_phy_types[0].clone(),
- ordering_req.to_vec(),
- ordering_types,
- vec![],
- )
- .with_ignore_nulls(ignore_nulls),
- ),
- (AggregateFunction::LastValue, _) => Arc::new(
- expressions::LastValue::new(
- input_phy_exprs[0].clone(),
- name,
- input_phy_types[0].clone(),
- ordering_req.to_vec(),
- ordering_types,
- )
- .with_ignore_nulls(ignore_nulls),
- ),
(AggregateFunction::NthValue, _) => {
let expr = &input_phy_exprs[0];
let Some(n) = input_phy_exprs[1]
@@ -396,17 +374,16 @@ pub fn create_aggregate_expr(
mod tests {
use arrow::datatypes::{DataType, Field};
- use datafusion_common::{plan_err, DataFusionError, ScalarValue};
- use datafusion_expr::type_coercion::aggregates::NUMERICS;
- use datafusion_expr::{type_coercion, Signature};
-
+ use super::*;
use crate::expressions::{
try_cast, ApproxDistinct, ApproxMedian, ApproxPercentileCont,
ArrayAgg, Avg,
BitAnd, BitOr, BitXor, BoolAnd, BoolOr, Count, DistinctArrayAgg,
DistinctCount,
Max, Min, Stddev, Sum, Variance,
};
- use super::*;
+ use datafusion_common::{plan_err, DataFusionError, ScalarValue};
+ use datafusion_expr::type_coercion::aggregates::NUMERICS;
+ use datafusion_expr::{type_coercion, Signature};
#[test]
fn test_count_arragg_approx_expr() -> Result<()> {
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs
b/datafusion/physical-expr/src/aggregate/mod.rs
index 039c8814e9..d8220db4d9 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -15,10 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use std::sync::Arc;
-
-use crate::expressions::{NthValueAgg, OrderSensitiveArrayAgg};
-
pub use datafusion_physical_expr_common::aggregate::AggregateExpr;
mod hyperloglog;
@@ -59,11 +55,3 @@ pub mod utils {
get_sort_options, ordering_fields, DecimalAverager, Hashable,
};
}
-
-/// Checks whether the given aggregate expression is order-sensitive.
-/// For instance, a `SUM` aggregation doesn't depend on the order of its
inputs.
-/// However, an `ARRAY_AGG` with `ORDER BY` depends on the input ordering.
-pub fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
- aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
- || aggr_expr.as_any().is::<NthValueAgg>()
-}
diff --git a/datafusion/physical-expr/src/aggregate/nth_value.rs
b/datafusion/physical-expr/src/aggregate/nth_value.rs
index dba259a507..ee7426a897 100644
--- a/datafusion/physical-expr/src/aggregate/nth_value.rs
+++ b/datafusion/physical-expr/src/aggregate/nth_value.rs
@@ -34,6 +34,7 @@ use arrow_array::{new_empty_array, ArrayRef, StructArray};
use arrow_schema::{DataType, Field, Fields};
use datafusion_common::utils::{array_into_list_array, get_row_at_idx};
use datafusion_common::{exec_err, internal_err, Result, ScalarValue};
+use datafusion_expr::utils::AggregateOrderSensitivity;
use datafusion_expr::Accumulator;
/// Expression for a `NTH_VALUE(... ORDER BY ..., ...)` aggregation. In a multi
@@ -125,6 +126,10 @@ impl AggregateExpr for NthValueAgg {
(!self.ordering_req.is_empty()).then_some(&self.ordering_req)
}
+ fn order_sensitivity(&self) -> AggregateOrderSensitivity {
+ AggregateOrderSensitivity::HardRequirement
+ }
+
fn name(&self) -> &str {
&self.name
}
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs
b/datafusion/physical-expr/src/equivalence/properties.rs
index 016c4c4ae1..7bf389ecfd 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -22,7 +22,7 @@ use super::ordering::collapse_lex_ordering;
use crate::equivalence::{
collapse_lex_req, EquivalenceGroup, OrderingEquivalenceClass,
ProjectionMapping,
};
-use crate::expressions::{CastExpr, Literal};
+use crate::expressions::Literal;
use crate::{
physical_exprs_contains, LexOrdering, LexOrderingRef, LexRequirement,
LexRequirementRef, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr,
@@ -35,6 +35,7 @@ use datafusion_common::{JoinSide, JoinType, Result};
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_physical_expr_common::expressions::column::Column;
+use datafusion_physical_expr_common::expressions::CastExpr;
use datafusion_physical_expr_common::utils::ExprPropertiesNode;
use indexmap::{IndexMap, IndexSet};
diff --git a/datafusion/physical-expr/src/expressions/mod.rs
b/datafusion/physical-expr/src/expressions/mod.rs
index a7921800fc..1e9644f75a 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -20,7 +20,6 @@
#[macro_use]
mod binary;
mod case;
-mod cast;
mod column;
mod datum;
mod in_list;
@@ -53,8 +52,7 @@ pub use crate::aggregate::correlation::Correlation;
pub use crate::aggregate::count::Count;
pub use crate::aggregate::count_distinct::DistinctCount;
pub use crate::aggregate::grouping::Grouping;
-pub use crate::aggregate::min_max::{Max, Min};
-pub use crate::aggregate::min_max::{MaxAccumulator, MinAccumulator};
+pub use crate::aggregate::min_max::{Max, MaxAccumulator, Min, MinAccumulator};
pub use crate::aggregate::nth_value::NthValueAgg;
pub use crate::aggregate::regr::{Regr, RegrType};
pub use crate::aggregate::stats::StatsType;
@@ -63,26 +61,17 @@ pub use crate::aggregate::string_agg::StringAgg;
pub use crate::aggregate::sum::Sum;
pub use crate::aggregate::sum_distinct::DistinctSum;
pub use crate::aggregate::variance::{Variance, VariancePop};
-pub use crate::window::cume_dist::cume_dist;
-pub use crate::window::cume_dist::CumeDist;
-pub use crate::window::lead_lag::WindowShift;
-pub use crate::window::lead_lag::{lag, lead};
+pub use crate::window::cume_dist::{cume_dist, CumeDist};
+pub use crate::window::lead_lag::{lag, lead, WindowShift};
pub use crate::window::nth_value::NthValue;
pub use crate::window::ntile::Ntile;
-pub use crate::window::rank::{dense_rank, percent_rank, rank};
-pub use crate::window::rank::{Rank, RankType};
+pub use crate::window::rank::{dense_rank, percent_rank, rank, Rank, RankType};
pub use crate::window::row_number::RowNumber;
pub use crate::PhysicalSortExpr;
-pub use datafusion_functions_aggregate::first_last::{
- FirstValuePhysicalExpr as FirstValue, LastValuePhysicalExpr as LastValue,
-};
pub use binary::{binary, BinaryExpr};
pub use case::{case, CaseExpr};
-pub use cast::{cast, cast_with_options, CastExpr};
pub use column::UnKnownColumn;
-pub use datafusion_expr::utils::format_state_name;
-pub use datafusion_physical_expr_common::expressions::column::{col, Column};
pub use in_list::{in_list, InListExpr};
pub use is_not_null::{is_not_null, IsNotNullExpr};
pub use is_null::{is_null, IsNullExpr};
@@ -93,11 +82,17 @@ pub use no_op::NoOp;
pub use not::{not, NotExpr};
pub use try_cast::{try_cast, TryCastExpr};
+pub use datafusion_expr::utils::format_state_name;
+pub use datafusion_functions_aggregate::first_last::{FirstValue, LastValue};
+pub use datafusion_physical_expr_common::expressions::column::{col, Column};
+pub use datafusion_physical_expr_common::expressions::{cast, CastExpr};
+
#[cfg(test)]
pub(crate) mod tests {
use std::sync::Arc;
use crate::AggregateExpr;
+
use arrow::record_batch::RecordBatch;
use datafusion_common::{Result, ScalarValue};
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index b0e2af82e6..2bb95852ff 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -39,15 +39,11 @@ use datafusion_common::stats::Precision;
use datafusion_common::{internal_err, not_impl_err, Result};
use datafusion_execution::TaskContext;
use datafusion_expr::Accumulator;
-use datafusion_physical_expr::aggregate::is_order_sensitive;
-use datafusion_physical_expr::equivalence::collapse_lex_req;
use datafusion_physical_expr::{
- equivalence::ProjectionMapping,
+ equivalence::{collapse_lex_req, ProjectionMapping},
expressions::{Column, Max, Min, UnKnownColumn},
- AggregateExpr, LexRequirement, PhysicalExpr,
-};
-use datafusion_physical_expr::{
- physical_exprs_contains, EquivalenceProperties, LexOrdering,
PhysicalSortRequirement,
+ physical_exprs_contains, AggregateExpr, EquivalenceProperties, LexOrdering,
+ LexRequirement, PhysicalExpr, PhysicalSortRequirement,
};
use itertools::Itertools;
@@ -274,20 +270,15 @@ pub struct AggregateExec {
impl AggregateExec {
/// Function used in `ConvertFirstLast` optimizer rule,
/// where we need parts of the new value, others cloned from the old one
- pub fn new_with_aggr_expr_and_ordering_info(
- &self,
- required_input_ordering: Option<LexRequirement>,
- aggr_expr: Vec<Arc<dyn AggregateExpr>>,
- cache: PlanProperties,
- input_order_mode: InputOrderMode,
- ) -> Self {
+ /// Rewrites aggregate exec with new aggregate expressions.
+ pub fn with_new_aggr_exprs(&self, aggr_expr: Vec<Arc<dyn AggregateExpr>>)
-> Self {
Self {
aggr_expr,
- required_input_ordering,
- metrics: ExecutionPlanMetricsSet::new(),
- input_order_mode,
- cache,
// clone the rest of the fields
+ required_input_ordering: self.required_input_ordering.clone(),
+ metrics: ExecutionPlanMetricsSet::new(),
+ input_order_mode: self.input_order_mode.clone(),
+ cache: self.cache.clone(),
mode: self.mode,
group_by: self.group_by.clone(),
filter_expr: self.filter_expr.clone(),
@@ -844,11 +835,10 @@ fn get_aggregate_expr_req(
group_by: &PhysicalGroupBy,
agg_mode: &AggregateMode,
) -> LexOrdering {
- // If the aggregation function is not order sensitive, or the aggregation
- // is performing a "second stage" calculation, or all aggregate function
- // requirements are inside the GROUP BY expression, then ignore the
ordering
- // requirement.
- if !is_order_sensitive(aggr_expr) || !agg_mode.is_first_stage() {
+ // If the aggregation function is ordering requirement is not absolutely
+ // necessary, or the aggregation is performing a "second stage"
calculation,
+ // then ignore the ordering requirement.
+ if !aggr_expr.order_sensitivity().hard_requires() ||
!agg_mode.is_first_stage() {
return vec![];
}
@@ -1203,11 +1193,12 @@ mod tests {
use datafusion_execution::config::SessionConfig;
use datafusion_execution::memory_pool::FairSpillPool;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+ use datafusion_expr::expr::Sort;
use datafusion_functions_aggregate::median::median_udaf;
use datafusion_physical_expr::expressions::{
lit, ApproxDistinct, Count, FirstValue, LastValue,
OrderSensitiveArrayAgg,
};
- use datafusion_physical_expr::{reverse_order_bys, PhysicalSortExpr};
+ use datafusion_physical_expr::PhysicalSortExpr;
use futures::{FutureExt, Stream};
@@ -1958,6 +1949,66 @@ mod tests {
Ok(())
}
+ // FIRST_VALUE(b ORDER BY b <SortOptions>)
+ fn test_first_value_agg_expr(
+ schema: &Schema,
+ sort_options: SortOptions,
+ ) -> Result<Arc<dyn AggregateExpr>> {
+ let sort_exprs = vec![datafusion_expr::Expr::Sort(Sort {
+ expr: Box::new(datafusion_expr::Expr::Column(
+ datafusion_common::Column::new(Some("table1"), "b"),
+ )),
+ asc: !sort_options.descending,
+ nulls_first: sort_options.nulls_first,
+ })];
+ let ordering_req = vec![PhysicalSortExpr {
+ expr: col("b", schema)?,
+ options: sort_options,
+ }];
+ let args = vec![col("b", schema)?];
+ let func =
datafusion_expr::AggregateUDF::new_from_impl(FirstValue::new());
+ datafusion_physical_expr_common::aggregate::create_aggregate_expr(
+ &func,
+ &args,
+ &sort_exprs,
+ &ordering_req,
+ schema,
+ "FIRST_VALUE(b)",
+ false,
+ false,
+ )
+ }
+
+ // LAST_VALUE(b ORDER BY b <SortOptions>)
+ fn test_last_value_agg_expr(
+ schema: &Schema,
+ sort_options: SortOptions,
+ ) -> Result<Arc<dyn AggregateExpr>> {
+ let sort_exprs = vec![datafusion_expr::Expr::Sort(Sort {
+ expr: Box::new(datafusion_expr::Expr::Column(
+ datafusion_common::Column::new(Some("table1"), "b"),
+ )),
+ asc: !sort_options.descending,
+ nulls_first: sort_options.nulls_first,
+ })];
+ let ordering_req = vec![PhysicalSortExpr {
+ expr: col("b", schema)?,
+ options: sort_options,
+ }];
+ let args = vec![col("b", schema)?];
+ let func =
datafusion_expr::AggregateUDF::new_from_impl(LastValue::new());
+ datafusion_physical_expr_common::aggregate::create_aggregate_expr(
+ &func,
+ &args,
+ &sort_exprs,
+ &ordering_req,
+ schema,
+ "LAST_VALUE(b)",
+ false,
+ false,
+ )
+ }
+
// This function either constructs the physical plan below,
//
// "AggregateExec: mode=Final, gby=[a@0 as a], aggr=[FIRST_VALUE(b)]",
@@ -1995,27 +2046,14 @@ mod tests {
let groups =
PhysicalGroupBy::new_single(vec![(col("a", &schema)?,
"a".to_string())]);
- let ordering_req = vec![PhysicalSortExpr {
- expr: col("b", &schema)?,
- options: SortOptions::default(),
- }];
+ let sort_options = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
let aggregates: Vec<Arc<dyn AggregateExpr>> = if is_first_acc {
- vec![Arc::new(FirstValue::new(
- col("b", &schema)?,
- "FIRST_VALUE(b)".to_string(),
- DataType::Float64,
- ordering_req.clone(),
- vec![DataType::Float64],
- vec![],
- ))]
+ vec![test_first_value_agg_expr(&schema, sort_options)?]
} else {
- vec![Arc::new(LastValue::new(
- col("b", &schema)?,
- "LAST_VALUE(b)".to_string(),
- DataType::Float64,
- ordering_req.clone(),
- vec![DataType::Float64],
- ))]
+ vec![test_last_value_agg_expr(&schema, sort_options)?]
};
let memory_exec = Arc::new(MemoryExec::try_new(
@@ -2170,34 +2208,15 @@ mod tests {
]));
let col_a = col("a", &schema)?;
- let col_b = col("b", &schema)?;
let option_desc = SortOptions {
descending: true,
nulls_first: true,
};
- let sort_expr = vec![PhysicalSortExpr {
- expr: col_b.clone(),
- options: option_desc,
- }];
- let sort_expr_reverse = reverse_order_bys(&sort_expr);
let groups = PhysicalGroupBy::new_single(vec![(col_a,
"a".to_string())]);
let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![
- Arc::new(FirstValue::new(
- col_b.clone(),
- "FIRST_VALUE(b)".to_string(),
- DataType::Float64,
- sort_expr_reverse.clone(),
- vec![DataType::Float64],
- vec![],
- )),
- Arc::new(LastValue::new(
- col_b.clone(),
- "LAST_VALUE(b)".to_string(),
- DataType::Float64,
- sort_expr.clone(),
- vec![DataType::Float64],
- )),
+ test_first_value_agg_expr(&schema, option_desc)?,
+ test_last_value_agg_expr(&schema, option_desc)?,
];
let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema),
1));
let aggregate_exec = Arc::new(AggregateExec::try_new(
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 434ec9f81f..fecfa2bc33 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -557,10 +557,6 @@ enum AggregateFunction {
BIT_XOR = 21;
BOOL_AND = 22;
BOOL_OR = 23;
- // When a function with the same name exists among built-in window functions,
- // we append "_AGG" to obey name scoping rules.
- FIRST_VALUE_AGG = 24;
- LAST_VALUE_AGG = 25;
REGR_SLOPE = 26;
REGR_INTERCEPT = 27;
REGR_COUNT = 28;
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 86a5975c8b..91bf3170e5 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -442,8 +442,6 @@ impl serde::Serialize for AggregateFunction {
Self::BitXor => "BIT_XOR",
Self::BoolAnd => "BOOL_AND",
Self::BoolOr => "BOOL_OR",
- Self::FirstValueAgg => "FIRST_VALUE_AGG",
- Self::LastValueAgg => "LAST_VALUE_AGG",
Self::RegrSlope => "REGR_SLOPE",
Self::RegrIntercept => "REGR_INTERCEPT",
Self::RegrCount => "REGR_COUNT",
@@ -487,8 +485,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
"BIT_XOR",
"BOOL_AND",
"BOOL_OR",
- "FIRST_VALUE_AGG",
- "LAST_VALUE_AGG",
"REGR_SLOPE",
"REGR_INTERCEPT",
"REGR_COUNT",
@@ -561,8 +557,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
"BIT_XOR" => Ok(AggregateFunction::BitXor),
"BOOL_AND" => Ok(AggregateFunction::BoolAnd),
"BOOL_OR" => Ok(AggregateFunction::BoolOr),
- "FIRST_VALUE_AGG" => Ok(AggregateFunction::FirstValueAgg),
- "LAST_VALUE_AGG" => Ok(AggregateFunction::LastValueAgg),
"REGR_SLOPE" => Ok(AggregateFunction::RegrSlope),
"REGR_INTERCEPT" => Ok(AggregateFunction::RegrIntercept),
"REGR_COUNT" => Ok(AggregateFunction::RegrCount),
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index cb2de71007..979ce69245 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -2854,10 +2854,6 @@ pub enum AggregateFunction {
BitXor = 21,
BoolAnd = 22,
BoolOr = 23,
- /// When a function with the same name exists among built-in window
functions,
- /// we append "_AGG" to obey name scoping rules.
- FirstValueAgg = 24,
- LastValueAgg = 25,
RegrSlope = 26,
RegrIntercept = 27,
RegrCount = 28,
@@ -2900,8 +2896,6 @@ impl AggregateFunction {
AggregateFunction::BitXor => "BIT_XOR",
AggregateFunction::BoolAnd => "BOOL_AND",
AggregateFunction::BoolOr => "BOOL_OR",
- AggregateFunction::FirstValueAgg => "FIRST_VALUE_AGG",
- AggregateFunction::LastValueAgg => "LAST_VALUE_AGG",
AggregateFunction::RegrSlope => "REGR_SLOPE",
AggregateFunction::RegrIntercept => "REGR_INTERCEPT",
AggregateFunction::RegrCount => "REGR_COUNT",
@@ -2941,8 +2935,6 @@ impl AggregateFunction {
"BIT_XOR" => Some(Self::BitXor),
"BOOL_AND" => Some(Self::BoolAnd),
"BOOL_OR" => Some(Self::BoolOr),
- "FIRST_VALUE_AGG" => Some(Self::FirstValueAgg),
- "LAST_VALUE_AGG" => Some(Self::LastValueAgg),
"REGR_SLOPE" => Some(Self::RegrSlope),
"REGR_INTERCEPT" => Some(Self::RegrIntercept),
"REGR_COUNT" => Some(Self::RegrCount),
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index 00c62fc32b..eaba9c0c12 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -450,8 +450,6 @@ impl From<protobuf::AggregateFunction> for
AggregateFunction {
}
protobuf::AggregateFunction::ApproxMedian => Self::ApproxMedian,
protobuf::AggregateFunction::Grouping => Self::Grouping,
- protobuf::AggregateFunction::FirstValueAgg => Self::FirstValue,
- protobuf::AggregateFunction::LastValueAgg => Self::LastValue,
protobuf::AggregateFunction::NthValueAgg => Self::NthValue,
protobuf::AggregateFunction::StringAgg => Self::StringAgg,
}
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index f2ee679ac1..16ba166d9f 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -386,8 +386,6 @@ impl From<&AggregateFunction> for
protobuf::AggregateFunction {
}
AggregateFunction::ApproxMedian => Self::ApproxMedian,
AggregateFunction::Grouping => Self::Grouping,
- AggregateFunction::FirstValue => Self::FirstValueAgg,
- AggregateFunction::LastValue => Self::LastValueAgg,
AggregateFunction::NthValue => Self::NthValueAgg,
AggregateFunction::StringAgg => Self::StringAgg,
}
@@ -696,12 +694,6 @@ pub fn serialize_expr(
protobuf::AggregateFunction::ApproxMedian
}
AggregateFunction::Grouping =>
protobuf::AggregateFunction::Grouping,
- AggregateFunction::FirstValue => {
- protobuf::AggregateFunction::FirstValueAgg
- }
- AggregateFunction::LastValue => {
- protobuf::AggregateFunction::LastValueAgg
- }
AggregateFunction::NthValue => {
protobuf::AggregateFunction::NthValueAgg
}
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index d3badee3ef..c0da4cc0cd 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -26,11 +26,10 @@ use datafusion::physical_plan::expressions::{
ApproxDistinct, ApproxMedian, ApproxPercentileCont,
ApproxPercentileContWithWeight,
ArrayAgg, Avg, BinaryExpr, BitAnd, BitOr, BitXor, BoolAnd, BoolOr,
CaseExpr,
CastExpr, Column, Correlation, Count, CumeDist, DistinctArrayAgg,
DistinctBitXor,
- DistinctCount, DistinctSum, FirstValue, Grouping, InListExpr,
IsNotNullExpr,
- IsNullExpr, LastValue, Literal, Max, Min, NegativeExpr, NotExpr, NthValue,
- NthValueAgg, Ntile, OrderSensitiveArrayAgg, Rank, RankType, Regr, RegrType,
- RowNumber, Stddev, StddevPop, StringAgg, Sum, TryCastExpr, Variance,
VariancePop,
- WindowShift,
+ DistinctCount, DistinctSum, Grouping, InListExpr, IsNotNullExpr,
IsNullExpr, Literal,
+ Max, Min, NegativeExpr, NotExpr, NthValue, NthValueAgg, Ntile,
+ OrderSensitiveArrayAgg, Rank, RankType, Regr, RegrType, RowNumber, Stddev,
StddevPop,
+ StringAgg, Sum, TryCastExpr, Variance, VariancePop, WindowShift,
};
use datafusion::physical_plan::udaf::AggregateFunctionExpr;
use datafusion::physical_plan::windows::{BuiltInWindowExpr,
PlainAggregateWindowExpr};
@@ -318,10 +317,6 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) ->
Result<AggrFn> {
protobuf::AggregateFunction::ApproxPercentileContWithWeight
} else if aggr_expr.downcast_ref::<ApproxMedian>().is_some() {
protobuf::AggregateFunction::ApproxMedian
- } else if aggr_expr.downcast_ref::<FirstValue>().is_some() {
- protobuf::AggregateFunction::FirstValueAgg
- } else if aggr_expr.downcast_ref::<LastValue>().is_some() {
- protobuf::AggregateFunction::LastValueAgg
} else if aggr_expr.downcast_ref::<StringAgg>().is_some() {
protobuf::AggregateFunction::StringAgg
} else if aggr_expr.downcast_ref::<NthValueAgg>().is_some() {
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index d83d6cd1c2..4e2534227e 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -32,8 +32,6 @@ use datafusion::execution::context::SessionState;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::FunctionRegistry;
use datafusion::functions_aggregate::covariance::{covar_pop, covar_samp};
-use datafusion::functions_aggregate::expr_fn::first_value;
-use datafusion::functions_aggregate::median::median;
use datafusion::prelude::*;
use datafusion::test_util::{TestTableFactory, TestTableProvider};
use datafusion_common::config::{FormatOptions, TableOptions};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]