This is an automated email from the ASF dual-hosted git repository.
ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 06ed3dd1ac Handle ordering of first last aggregation inside aggregator
(#8662)
06ed3dd1ac is described below
commit 06ed3dd1ac01b1bd6a70b93b56cb72cb40777690
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu Dec 28 23:34:40 2023 +0300
Handle ordering of first last aggregation inside aggregator (#8662)
* Initial commit
* Update tests in distinct_on
* Update group by joins slt
* Remove unused code
* Minor changes
* Minor changes
* Simplifications
* Update comments
* Review
* Fix clippy
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
datafusion-cli/src/functions.rs | 2 +-
datafusion/common/src/error.rs | 1 -
.../src/physical_optimizer/projection_pushdown.rs | 4 +
.../src/simplify_expressions/guarantees.rs | 4 +
.../physical-expr/src/aggregate/first_last.rs | 131 ++++--
datafusion/physical-expr/src/aggregate/mod.rs | 30 +-
datafusion/physical-expr/src/aggregate/utils.rs | 18 +-
datafusion/physical-expr/src/array_expressions.rs | 2 +-
datafusion/physical-plan/src/aggregates/mod.rs | 461 +++++++++------------
.../src/engines/datafusion_engine/mod.rs | 1 -
datafusion/sqllogictest/test_files/distinct_on.slt | 9 +-
datafusion/sqllogictest/test_files/groupby.slt | 82 ++--
datafusion/sqllogictest/test_files/joins.slt | 4 +-
13 files changed, 373 insertions(+), 376 deletions(-)
diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs
index f8d9ed238b..5390fa9f22 100644
--- a/datafusion-cli/src/functions.rs
+++ b/datafusion-cli/src/functions.rs
@@ -297,7 +297,7 @@ pub struct ParquetMetadataFunc {}
impl TableFunctionImpl for ParquetMetadataFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
- let filename = match exprs.get(0) {
+ let filename = match exprs.first() {
Some(Expr::Literal(ScalarValue::Utf8(Some(s)))) => s, // single
quote: parquet_metadata('x.parquet')
Some(Expr::Column(Column { name, .. })) => name, // double quote:
parquet_metadata("x.parquet")
_ => {
diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs
index 515acc6d1c..e58faaa150 100644
--- a/datafusion/common/src/error.rs
+++ b/datafusion/common/src/error.rs
@@ -558,7 +558,6 @@ macro_rules! arrow_err {
// To avoid compiler error when using macro in the same crate:
// macros from the current crate cannot be referred to by absolute paths
-pub use exec_err as _exec_err;
pub use internal_datafusion_err as _internal_datafusion_err;
pub use internal_err as _internal_err;
pub use not_impl_err as _not_impl_err;
diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index 7e1312dad2..d237a3e860 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -990,6 +990,10 @@ fn update_join_on(
proj_right_exprs: &[(Column, String)],
hash_join_on: &[(Column, Column)],
) -> Option<Vec<(Column, Column)>> {
+ // TODO: Clippy wants the "map" call removed, but doing so generates
+ // a compilation error. Remove the clippy directive once this
+ // issue is fixed.
+ #[allow(clippy::map_identity)]
let (left_idx, right_idx): (Vec<_>, Vec<_>) = hash_join_on
.iter()
.map(|(left, right)| (left, right))
diff --git a/datafusion/optimizer/src/simplify_expressions/guarantees.rs
b/datafusion/optimizer/src/simplify_expressions/guarantees.rs
index 860dc326b9..aa7bb4f78a 100644
--- a/datafusion/optimizer/src/simplify_expressions/guarantees.rs
+++ b/datafusion/optimizer/src/simplify_expressions/guarantees.rs
@@ -47,6 +47,10 @@ impl<'a> GuaranteeRewriter<'a> {
guarantees: impl IntoIterator<Item = &'a (Expr, NullableInterval)>,
) -> Self {
Self {
+ // TODO: Clippy wants the "map" call removed, but doing so
generates
+ // a compilation error. Remove the clippy directive once this
+ // issue is fixed.
+ #[allow(clippy::map_identity)]
guarantees: guarantees.into_iter().map(|(k, v)| (k, v)).collect(),
}
}
diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs
b/datafusion/physical-expr/src/aggregate/first_last.rs
index c009881d89..c7032e601c 100644
--- a/datafusion/physical-expr/src/aggregate/first_last.rs
+++ b/datafusion/physical-expr/src/aggregate/first_last.rs
@@ -20,7 +20,7 @@
use std::any::Any;
use std::sync::Arc;
-use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
+use crate::aggregate::utils::{down_cast_any_ref, get_sort_options,
ordering_fields};
use crate::expressions::format_state_name;
use crate::{
reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr,
PhysicalSortExpr,
@@ -29,9 +29,10 @@ use crate::{
use arrow::array::{Array, ArrayRef, AsArray, BooleanArray};
use arrow::compute::{self, lexsort_to_indices, SortColumn};
use arrow::datatypes::{DataType, Field};
-use arrow_schema::SortOptions;
use datafusion_common::utils::{compare_rows, get_arrayref_at_indices,
get_row_at_idx};
-use datafusion_common::{arrow_datafusion_err, DataFusionError, Result,
ScalarValue};
+use datafusion_common::{
+ arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue,
+};
use datafusion_expr::Accumulator;
/// FIRST_VALUE aggregate expression
@@ -211,10 +212,45 @@ impl FirstValueAccumulator {
}
// Updates state with the values in the given row.
- fn update_with_new_row(&mut self, row: &[ScalarValue]) {
- self.first = row[0].clone();
- self.orderings = row[1..].to_vec();
- self.is_set = true;
+ fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
+ let [value, orderings @ ..] = row else {
+ return internal_err!("Empty row in FIRST_VALUE");
+ };
+ // Update when there is no entry in the state, or we have an "earlier"
+ // entry according to sort requirements.
+ if !self.is_set
+ || compare_rows(
+ &self.orderings,
+ orderings,
+ &get_sort_options(&self.ordering_req),
+ )?
+ .is_gt()
+ {
+ self.first = value.clone();
+ self.orderings = orderings.to_vec();
+ self.is_set = true;
+ }
+ Ok(())
+ }
+
+ fn get_first_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
+ let [value, ordering_values @ ..] = values else {
+ return internal_err!("Empty row in FIRST_VALUE");
+ };
+ if self.ordering_req.is_empty() {
+ // Get first entry according to receive order (0th index)
+ return Ok((!value.is_empty()).then_some(0));
+ }
+ let sort_columns = ordering_values
+ .iter()
+ .zip(self.ordering_req.iter())
+ .map(|(values, req)| SortColumn {
+ values: values.clone(),
+ options: Some(req.options),
+ })
+ .collect::<Vec<_>>();
+ let indices = lexsort_to_indices(&sort_columns, Some(1))?;
+ Ok((!indices.is_empty()).then_some(indices.value(0) as _))
}
}
@@ -227,11 +263,9 @@ impl Accumulator for FirstValueAccumulator {
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
- // If we have seen first value, we shouldn't update it
- if !values[0].is_empty() && !self.is_set {
- let row = get_row_at_idx(values, 0)?;
- // Update with first value in the array.
- self.update_with_new_row(&row);
+ if let Some(first_idx) = self.get_first_idx(values)? {
+ let row = get_row_at_idx(values, first_idx)?;
+ self.update_with_new_row(&row)?;
}
Ok(())
}
@@ -265,7 +299,7 @@ impl Accumulator for FirstValueAccumulator {
// Update with first value in the state. Note that we should
exclude the
// is_set flag from the state. Otherwise, we will end up with
a state
// containing two is_set flags.
- self.update_with_new_row(&first_row[0..is_set_idx]);
+ self.update_with_new_row(&first_row[0..is_set_idx])?;
}
}
Ok(())
@@ -459,10 +493,50 @@ impl LastValueAccumulator {
}
// Updates state with the values in the given row.
- fn update_with_new_row(&mut self, row: &[ScalarValue]) {
- self.last = row[0].clone();
- self.orderings = row[1..].to_vec();
- self.is_set = true;
+ fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
+ let [value, orderings @ ..] = row else {
+ return internal_err!("Empty row in LAST_VALUE");
+ };
+ // Update when there is no entry in the state, or we have a "later"
+ // entry (either according to sort requirements or the order of
execution).
+ if !self.is_set
+ || self.orderings.is_empty()
+ || compare_rows(
+ &self.orderings,
+ orderings,
+ &get_sort_options(&self.ordering_req),
+ )?
+ .is_lt()
+ {
+ self.last = value.clone();
+ self.orderings = orderings.to_vec();
+ self.is_set = true;
+ }
+ Ok(())
+ }
+
+ fn get_last_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
+ let [value, ordering_values @ ..] = values else {
+ return internal_err!("Empty row in LAST_VALUE");
+ };
+ if self.ordering_req.is_empty() {
+ // Get last entry according to the order of data:
+ return Ok((!value.is_empty()).then_some(value.len() - 1));
+ }
+ let sort_columns = ordering_values
+ .iter()
+ .zip(self.ordering_req.iter())
+ .map(|(values, req)| {
+ // Take the reverse ordering requirement. This enables us to
+ // use "fetch = 1" to get the last value.
+ SortColumn {
+ values: values.clone(),
+ options: Some(!req.options),
+ }
+ })
+ .collect::<Vec<_>>();
+ let indices = lexsort_to_indices(&sort_columns, Some(1))?;
+ Ok((!indices.is_empty()).then_some(indices.value(0) as _))
}
}
@@ -475,10 +549,9 @@ impl Accumulator for LastValueAccumulator {
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
- if !values[0].is_empty() {
- let row = get_row_at_idx(values, values[0].len() - 1)?;
- // Update with last value in the array.
- self.update_with_new_row(&row);
+ if let Some(last_idx) = self.get_last_idx(values)? {
+ let row = get_row_at_idx(values, last_idx)?;
+ self.update_with_new_row(&row)?;
}
Ok(())
}
@@ -515,7 +588,7 @@ impl Accumulator for LastValueAccumulator {
// Update with last value in the state. Note that we should
exclude the
// is_set flag from the state. Otherwise, we will end up with
a state
// containing two is_set flags.
- self.update_with_new_row(&last_row[0..is_set_idx]);
+ self.update_with_new_row(&last_row[0..is_set_idx])?;
}
}
Ok(())
@@ -559,26 +632,18 @@ fn convert_to_sort_cols(
.collect::<Vec<_>>()
}
-/// Selects the sort option attribute from all the given `PhysicalSortExpr`s.
-fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec<SortOptions> {
- ordering_req
- .iter()
- .map(|item| item.options)
- .collect::<Vec<_>>()
-}
-
#[cfg(test)]
mod tests {
+ use std::sync::Arc;
+
use crate::aggregate::first_last::{FirstValueAccumulator,
LastValueAccumulator};
+ use arrow::compute::concat;
use arrow_array::{ArrayRef, Int64Array};
use arrow_schema::DataType;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::Accumulator;
- use arrow::compute::concat;
- use std::sync::Arc;
-
#[test]
fn test_first_last_value_value() -> Result<()> {
let mut first_accumulator =
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs
b/datafusion/physical-expr/src/aggregate/mod.rs
index 329bb1e641..5bd1fca385 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -15,16 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-use crate::expressions::{FirstValue, LastValue, OrderSensitiveArrayAgg};
-use crate::{PhysicalExpr, PhysicalSortExpr};
-use arrow::datatypes::Field;
-use datafusion_common::{not_impl_err, DataFusionError, Result};
-use datafusion_expr::Accumulator;
use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;
use self::groups_accumulator::GroupsAccumulator;
+use crate::expressions::OrderSensitiveArrayAgg;
+use crate::{PhysicalExpr, PhysicalSortExpr};
+
+use arrow::datatypes::Field;
+use datafusion_common::{not_impl_err, DataFusionError, Result};
+use datafusion_expr::Accumulator;
+
+mod hyperloglog;
+mod tdigest;
pub(crate) mod approx_distinct;
pub(crate) mod approx_median;
@@ -46,19 +50,18 @@ pub(crate) mod median;
pub(crate) mod string_agg;
#[macro_use]
pub(crate) mod min_max;
-pub mod build_in;
pub(crate) mod groups_accumulator;
-mod hyperloglog;
-pub mod moving_min_max;
pub(crate) mod regr;
pub(crate) mod stats;
pub(crate) mod stddev;
pub(crate) mod sum;
pub(crate) mod sum_distinct;
-mod tdigest;
-pub mod utils;
pub(crate) mod variance;
+pub mod build_in;
+pub mod moving_min_max;
+pub mod utils;
+
/// An aggregate expression that:
/// * knows its resulting field
/// * knows how to create its accumulator
@@ -134,10 +137,7 @@ pub trait AggregateExpr: Send + Sync + Debug +
PartialEq<dyn Any> {
/// Checks whether the given aggregate expression is order-sensitive.
/// For instance, a `SUM` aggregation doesn't depend on the order of its
inputs.
-/// However, a `FirstValue` depends on the input ordering (if the order
changes,
-/// the first value in the list would change).
+/// However, an `ARRAY_AGG` with `ORDER BY` depends on the input ordering.
pub fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
- aggr_expr.as_any().is::<FirstValue>()
- || aggr_expr.as_any().is::<LastValue>()
- || aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
+ aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
}
diff --git a/datafusion/physical-expr/src/aggregate/utils.rs
b/datafusion/physical-expr/src/aggregate/utils.rs
index e5421ef5ab..9777158da1 100644
--- a/datafusion/physical-expr/src/aggregate/utils.rs
+++ b/datafusion/physical-expr/src/aggregate/utils.rs
@@ -17,20 +17,21 @@
//! Utilities used in aggregates
+use std::any::Any;
+use std::sync::Arc;
+
use crate::{AggregateExpr, PhysicalSortExpr};
-use arrow::array::ArrayRef;
+
+use arrow::array::{ArrayRef, ArrowNativeTypeOp};
use arrow_array::cast::AsArray;
use arrow_array::types::{
Decimal128Type, DecimalType, TimestampMicrosecondType,
TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
};
-use arrow_array::ArrowNativeTypeOp;
use arrow_buffer::ArrowNativeType;
-use arrow_schema::{DataType, Field};
+use arrow_schema::{DataType, Field, SortOptions};
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::Accumulator;
-use std::any::Any;
-use std::sync::Arc;
/// Convert scalar values from an accumulator into arrays.
pub fn get_accum_scalar_values_as_arrays(
@@ -40,7 +41,7 @@ pub fn get_accum_scalar_values_as_arrays(
.state()?
.iter()
.map(|s| s.to_array_of_size(1))
- .collect::<Result<Vec<_>>>()
+ .collect()
}
/// Computes averages for `Decimal128`/`Decimal256` values, checking for
overflow
@@ -205,3 +206,8 @@ pub(crate) fn ordering_fields(
})
.collect()
}
+
+/// Selects the sort option attribute from all the given `PhysicalSortExpr`s.
+pub fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec<SortOptions>
{
+ ordering_req.iter().map(|item| item.options).collect()
+}
diff --git a/datafusion/physical-expr/src/array_expressions.rs
b/datafusion/physical-expr/src/array_expressions.rs
index 274d1db4eb..7a986810ba 100644
--- a/datafusion/physical-expr/src/array_expressions.rs
+++ b/datafusion/physical-expr/src/array_expressions.rs
@@ -2453,7 +2453,7 @@ pub fn general_array_distinct<OffsetSize:
OffsetSizeTrait>(
let last_offset: OffsetSize = offsets.last().copied().unwrap();
offsets.push(last_offset + OffsetSize::usize_as(rows.len()));
let arrays = converter.convert_rows(rows)?;
- let array = match arrays.get(0) {
+ let array = match arrays.first() {
Some(array) => array.clone(),
None => {
return internal_err!("array_distinct: failed to get array from
rows")
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index f779322456..f5bb4fe59b 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -27,7 +27,7 @@ use crate::aggregates::{
};
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
-use crate::windows::{get_ordered_partition_by_indices, get_window_mode};
+use crate::windows::get_ordered_partition_by_indices;
use crate::{
DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode,
Partitioning,
SendableRecordBatchStream, Statistics,
@@ -45,11 +45,11 @@ use datafusion_physical_expr::{
aggregate::is_order_sensitive,
equivalence::{collapse_lex_req, ProjectionMapping},
expressions::{Column, Max, Min, UnKnownColumn},
- physical_exprs_contains, reverse_order_bys, AggregateExpr,
EquivalenceProperties,
- LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortExpr,
PhysicalSortRequirement,
+ physical_exprs_contains, AggregateExpr, EquivalenceProperties, LexOrdering,
+ LexRequirement, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
};
-use itertools::{izip, Itertools};
+use itertools::Itertools;
mod group_values;
mod no_grouping;
@@ -277,159 +277,6 @@ pub struct AggregateExec {
output_ordering: Option<LexOrdering>,
}
-/// This function returns the ordering requirement of the first non-reversible
-/// order-sensitive aggregate function such as ARRAY_AGG. This requirement
serves
-/// as the initial requirement while calculating the finest requirement among
all
-/// aggregate functions. If this function returns `None`, it means there is no
-/// hard ordering requirement for the aggregate functions (in terms of
direction).
-/// Then, we can generate two alternative requirements with opposite
directions.
-fn get_init_req(
- aggr_expr: &[Arc<dyn AggregateExpr>],
- order_by_expr: &[Option<LexOrdering>],
-) -> Option<LexOrdering> {
- for (aggr_expr, fn_reqs) in aggr_expr.iter().zip(order_by_expr.iter()) {
- // If the aggregation function is a non-reversible order-sensitive
function
- // and there is a hard requirement, choose first such requirement:
- if is_order_sensitive(aggr_expr)
- && aggr_expr.reverse_expr().is_none()
- && fn_reqs.is_some()
- {
- return fn_reqs.clone();
- }
- }
- None
-}
-
-/// This function gets the finest ordering requirement among all the
aggregation
-/// functions. If requirements are conflicting, (i.e. we can not compute the
-/// aggregations in a single [`AggregateExec`]), the function returns an error.
-fn get_finest_requirement(
- aggr_expr: &mut [Arc<dyn AggregateExpr>],
- order_by_expr: &mut [Option<LexOrdering>],
- eq_properties: &EquivalenceProperties,
-) -> Result<Option<LexOrdering>> {
- // First, we check if all the requirements are satisfied by the existing
- // ordering. If so, we return `None` to indicate this.
- let mut all_satisfied = true;
- for (aggr_expr, fn_req) in
aggr_expr.iter_mut().zip(order_by_expr.iter_mut()) {
- if eq_properties.ordering_satisfy(fn_req.as_deref().unwrap_or(&[])) {
- continue;
- }
- if let Some(reverse) = aggr_expr.reverse_expr() {
- let reverse_req = fn_req.as_ref().map(|item|
reverse_order_bys(item));
- if
eq_properties.ordering_satisfy(reverse_req.as_deref().unwrap_or(&[])) {
- // We need to update `aggr_expr` with its reverse since only
its
- // reverse requirement is compatible with the existing
requirements:
- *aggr_expr = reverse;
- *fn_req = reverse_req;
- continue;
- }
- }
- // Requirement is not satisfied:
- all_satisfied = false;
- }
- if all_satisfied {
- // All of the requirements are already satisfied.
- return Ok(None);
- }
- let mut finest_req = get_init_req(aggr_expr, order_by_expr);
- for (aggr_expr, fn_req) in
aggr_expr.iter_mut().zip(order_by_expr.iter_mut()) {
- let Some(fn_req) = fn_req else {
- continue;
- };
-
- if let Some(finest_req) = &mut finest_req {
- if let Some(finer) = eq_properties.get_finer_ordering(finest_req,
fn_req) {
- *finest_req = finer;
- continue;
- }
- // If an aggregate function is reversible, analyze whether its
reverse
- // direction is compatible with existing requirements:
- if let Some(reverse) = aggr_expr.reverse_expr() {
- let fn_req_reverse = reverse_order_bys(fn_req);
- if let Some(finer) =
- eq_properties.get_finer_ordering(finest_req,
&fn_req_reverse)
- {
- // We need to update `aggr_expr` with its reverse, since
only its
- // reverse requirement is compatible with existing
requirements:
- *aggr_expr = reverse;
- *finest_req = finer;
- *fn_req = fn_req_reverse;
- continue;
- }
- }
- // If neither of the requirements satisfy the other, this means
- // requirements are conflicting. Currently, we do not support
- // conflicting requirements.
- return not_impl_err!(
- "Conflicting ordering requirements in aggregate functions is
not supported"
- );
- } else {
- finest_req = Some(fn_req.clone());
- }
- }
- Ok(finest_req)
-}
-
-/// Calculates search_mode for the aggregation
-fn get_aggregate_search_mode(
- group_by: &PhysicalGroupBy,
- input: &Arc<dyn ExecutionPlan>,
- aggr_expr: &mut [Arc<dyn AggregateExpr>],
- order_by_expr: &mut [Option<LexOrdering>],
- ordering_req: &mut Vec<PhysicalSortExpr>,
-) -> InputOrderMode {
- let groupby_exprs = group_by
- .expr
- .iter()
- .map(|(item, _)| item.clone())
- .collect::<Vec<_>>();
- let mut input_order_mode = InputOrderMode::Linear;
- if !group_by.is_single() || groupby_exprs.is_empty() {
- return input_order_mode;
- }
-
- if let Some((should_reverse, mode)) =
- get_window_mode(&groupby_exprs, ordering_req, input)
- {
- let all_reversible = aggr_expr
- .iter()
- .all(|expr| !is_order_sensitive(expr) ||
expr.reverse_expr().is_some());
- if should_reverse && all_reversible {
- izip!(aggr_expr.iter_mut(), order_by_expr.iter_mut()).for_each(
- |(aggr, order_by)| {
- if let Some(reverse) = aggr.reverse_expr() {
- *aggr = reverse;
- } else {
- unreachable!();
- }
- *order_by = order_by.as_ref().map(|ob|
reverse_order_bys(ob));
- },
- );
- *ordering_req = reverse_order_bys(ordering_req);
- }
- input_order_mode = mode;
- }
- input_order_mode
-}
-
-/// Check whether group by expression contains all of the expression inside
`requirement`
-// As an example Group By (c,b,a) contains all of the expressions in the
`requirement`: (a ASC, b DESC)
-fn group_by_contains_all_requirements(
- group_by: &PhysicalGroupBy,
- requirement: &LexOrdering,
-) -> bool {
- let physical_exprs = group_by.input_exprs();
- // When we have multiple groups (grouping set)
- // since group by may be calculated on the subset of the group_by.expr()
- // it is not guaranteed to have all of the requirements among group by
expressions.
- // Hence do the analysis: whether group by contains all requirements in
the single group case.
- group_by.is_single()
- && requirement
- .iter()
- .all(|req| physical_exprs_contains(&physical_exprs, &req.expr))
-}
-
impl AggregateExec {
/// Create a new hash aggregate execution plan
pub fn try_new(
@@ -477,50 +324,14 @@ impl AggregateExec {
fn try_new_with_schema(
mode: AggregateMode,
group_by: PhysicalGroupBy,
- mut aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+ aggr_expr: Vec<Arc<dyn AggregateExpr>>,
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
schema: SchemaRef,
original_schema: SchemaRef,
) -> Result<Self> {
- // Reset ordering requirement to `None` if aggregator is not
order-sensitive
- let mut order_by_expr = aggr_expr
- .iter()
- .map(|aggr_expr| {
- let fn_reqs = aggr_expr.order_bys().map(|ordering|
ordering.to_vec());
- // If
- // - aggregation function is order-sensitive and
- // - aggregation is performing a "first stage" calculation, and
- // - at least one of the aggregate function requirement is not
inside group by expression
- // keep the ordering requirement as is; otherwise ignore the
ordering requirement.
- // In non-first stage modes, we accumulate data (using
`merge_batch`)
- // from different partitions (i.e. merge partial results).
During
- // this merge, we consider the ordering of each partial result.
- // Hence, we do not need to use the ordering requirement in
such
- // modes as long as partial results are generated with the
- // correct ordering.
- fn_reqs.filter(|req| {
- is_order_sensitive(aggr_expr)
- && mode.is_first_stage()
- && !group_by_contains_all_requirements(&group_by, req)
- })
- })
- .collect::<Vec<_>>();
- let requirement = get_finest_requirement(
- &mut aggr_expr,
- &mut order_by_expr,
- &input.equivalence_properties(),
- )?;
- let mut ordering_req = requirement.unwrap_or(vec![]);
- let input_order_mode = get_aggregate_search_mode(
- &group_by,
- &input,
- &mut aggr_expr,
- &mut order_by_expr,
- &mut ordering_req,
- );
-
+ let input_eq_properties = input.equivalence_properties();
// Get GROUP BY expressions:
let groupby_exprs = group_by.input_exprs();
// If existing ordering satisfies a prefix of the GROUP BY expressions,
@@ -528,17 +339,31 @@ impl AggregateExec {
// work more efficiently.
let indices = get_ordered_partition_by_indices(&groupby_exprs, &input);
let mut new_requirement = indices
- .into_iter()
- .map(|idx| PhysicalSortRequirement {
+ .iter()
+ .map(|&idx| PhysicalSortRequirement {
expr: groupby_exprs[idx].clone(),
options: None,
})
.collect::<Vec<_>>();
- // Postfix ordering requirement of the aggregation to the requirement.
- let req = PhysicalSortRequirement::from_sort_exprs(&ordering_req);
+
+ let req = get_aggregate_exprs_requirement(
+ &aggr_expr,
+ &group_by,
+ &input_eq_properties,
+ &mode,
+ )?;
new_requirement.extend(req);
new_requirement = collapse_lex_req(new_requirement);
+ let input_order_mode =
+ if indices.len() == groupby_exprs.len() && !indices.is_empty() {
+ InputOrderMode::Sorted
+ } else if !indices.is_empty() {
+ InputOrderMode::PartiallySorted(indices)
+ } else {
+ InputOrderMode::Linear
+ };
+
// construct a map from the input expression to the output expression
of the Aggregation group by
let projection_mapping =
ProjectionMapping::try_new(&group_by.expr, &input.schema())?;
@@ -546,9 +371,8 @@ impl AggregateExec {
let required_input_ordering =
(!new_requirement.is_empty()).then_some(new_requirement);
- let aggregate_eqs = input
- .equivalence_properties()
- .project(&projection_mapping, schema.clone());
+ let aggregate_eqs =
+ input_eq_properties.project(&projection_mapping, schema.clone());
let output_ordering = aggregate_eqs.oeq_class().output_ordering();
Ok(AggregateExec {
@@ -998,6 +822,121 @@ fn group_schema(schema: &Schema, group_count: usize) ->
SchemaRef {
Arc::new(Schema::new(group_fields))
}
+/// Determines the lexical ordering requirement for an aggregate expression.
+///
+/// # Parameters
+///
+/// - `aggr_expr`: A reference to an `Arc<dyn AggregateExpr>` representing the
+/// aggregate expression.
+/// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the
+/// physical GROUP BY expression.
+/// - `agg_mode`: A reference to an `AggregateMode` instance representing the
+/// mode of aggregation.
+///
+/// # Returns
+///
+/// A `LexOrdering` instance indicating the lexical ordering requirement for
+/// the aggregate expression.
+fn get_aggregate_expr_req(
+ aggr_expr: &Arc<dyn AggregateExpr>,
+ group_by: &PhysicalGroupBy,
+ agg_mode: &AggregateMode,
+) -> LexOrdering {
+ // If the aggregation function is not order sensitive, or the aggregation
+ // is performing a "second stage" calculation, or all aggregate function
+ // requirements are inside the GROUP BY expression, then ignore the
ordering
+ // requirement.
+ if !is_order_sensitive(aggr_expr) || !agg_mode.is_first_stage() {
+ return vec![];
+ }
+
+ let mut req = aggr_expr.order_bys().unwrap_or_default().to_vec();
+
+ // In non-first stage modes, we accumulate data (using `merge_batch`) from
+ // different partitions (i.e. merge partial results). During this merge, we
+ // consider the ordering of each partial result. Hence, we do not need to
+ // use the ordering requirement in such modes as long as partial results
are
+ // generated with the correct ordering.
+ if group_by.is_single() {
+ // Remove all orderings that occur in the group by. These requirements
+ // will definitely be satisfied -- Each group by expression will have
+ // distinct values per group, hence all requirements are satisfied.
+ let physical_exprs = group_by.input_exprs();
+ req.retain(|sort_expr| {
+ !physical_exprs_contains(&physical_exprs, &sort_expr.expr)
+ });
+ }
+ req
+}
+
+/// Computes the finer ordering for between given existing ordering requirement
+/// of aggregate expression.
+///
+/// # Parameters
+///
+/// * `existing_req` - The existing lexical ordering that needs refinement.
+/// * `aggr_expr` - A reference to an aggregate expression trait object.
+/// * `group_by` - Information about the physical grouping (e.g group by
expression).
+/// * `eq_properties` - Equivalence properties relevant to the computation.
+/// * `agg_mode` - The mode of aggregation (e.g., Partial, Final, etc.).
+///
+/// # Returns
+///
+/// An `Option<LexOrdering>` representing the computed finer lexical ordering,
+/// or `None` if there is no finer ordering; e.g. the existing requirement and
+/// the aggregator requirement is incompatible.
+fn finer_ordering(
+ existing_req: &LexOrdering,
+ aggr_expr: &Arc<dyn AggregateExpr>,
+ group_by: &PhysicalGroupBy,
+ eq_properties: &EquivalenceProperties,
+ agg_mode: &AggregateMode,
+) -> Option<LexOrdering> {
+ let aggr_req = get_aggregate_expr_req(aggr_expr, group_by, agg_mode);
+ eq_properties.get_finer_ordering(existing_req, &aggr_req)
+}
+
+/// Get the common requirement that satisfies all the aggregate expressions.
+///
+/// # Parameters
+///
+/// - `aggr_exprs`: A slice of `Arc<dyn AggregateExpr>` containing all the
+/// aggregate expressions.
+/// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the
+/// physical GROUP BY expression.
+/// - `eq_properties`: A reference to an `EquivalenceProperties` instance
+/// representing equivalence properties for ordering.
+/// - `agg_mode`: A reference to an `AggregateMode` instance representing the
+/// mode of aggregation.
+///
+/// # Returns
+///
+/// A `LexRequirement` instance, which is the requirement that satisfies all
the
+/// aggregate requirements. Returns an error in case of conflicting
requirements.
+fn get_aggregate_exprs_requirement(
+ aggr_exprs: &[Arc<dyn AggregateExpr>],
+ group_by: &PhysicalGroupBy,
+ eq_properties: &EquivalenceProperties,
+ agg_mode: &AggregateMode,
+) -> Result<LexRequirement> {
+ let mut requirement = vec![];
+ for aggr_expr in aggr_exprs.iter() {
+ if let Some(finer_ordering) =
+ finer_ordering(&requirement, aggr_expr, group_by, eq_properties,
agg_mode)
+ {
+ requirement = finer_ordering;
+ } else {
+ // If neither of the requirements satisfy the other, this means
+ // requirements are conflicting. Currently, we do not support
+ // conflicting requirements.
+ return not_impl_err!(
+ "Conflicting ordering requirements in aggregate functions is
not supported"
+ );
+ }
+ }
+ Ok(PhysicalSortRequirement::from_sort_exprs(&requirement))
+}
+
/// returns physical expressions for arguments to evaluate against a batch
/// The expressions are different depending on `mode`:
/// * Partial: AggregateExpr::expressions
@@ -1013,33 +952,27 @@ fn aggregate_expressions(
| AggregateMode::SinglePartitioned => Ok(aggr_expr
.iter()
.map(|agg| {
- let mut result = agg.expressions().clone();
- // In partial mode, append ordering requirements to
expressions' results.
- // Ordering requirements are used by subsequent executors to
satisfy the required
- // ordering for
`AggregateMode::FinalPartitioned`/`AggregateMode::Final` modes.
- if matches!(mode, AggregateMode::Partial) {
- if let Some(ordering_req) = agg.order_bys() {
- let ordering_exprs = ordering_req
- .iter()
- .map(|item| item.expr.clone())
- .collect::<Vec<_>>();
- result.extend(ordering_exprs);
- }
+ let mut result = agg.expressions();
+ // Append ordering requirements to expressions' results. This
+ // way order sensitive aggregators can satisfy requirement
+ // themselves.
+ if let Some(ordering_req) = agg.order_bys() {
+ result.extend(ordering_req.iter().map(|item|
item.expr.clone()));
}
result
})
.collect()),
- // in this mode, we build the merge expressions of the aggregation
+ // In this mode, we build the merge expressions of the aggregation.
AggregateMode::Final | AggregateMode::FinalPartitioned => {
let mut col_idx_base = col_idx_base;
- Ok(aggr_expr
+ aggr_expr
.iter()
.map(|agg| {
let exprs = merge_expressions(col_idx_base, agg)?;
col_idx_base += exprs.len();
Ok(exprs)
})
- .collect::<Result<Vec<_>>>()?)
+ .collect()
}
}
}
@@ -1052,14 +985,13 @@ fn merge_expressions(
index_base: usize,
expr: &Arc<dyn AggregateExpr>,
) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
- Ok(expr
- .state_fields()?
- .iter()
- .enumerate()
- .map(|(idx, f)| {
- Arc::new(Column::new(f.name(), index_base + idx)) as Arc<dyn
PhysicalExpr>
- })
- .collect::<Vec<_>>())
+ expr.state_fields().map(|fields| {
+ fields
+ .iter()
+ .enumerate()
+ .map(|(idx, f)| Arc::new(Column::new(f.name(), index_base + idx))
as _)
+ .collect()
+ })
}
pub(crate) type AccumulatorItem = Box<dyn Accumulator>;
@@ -1070,7 +1002,7 @@ fn create_accumulators(
aggr_expr
.iter()
.map(|expr| expr.create_accumulator())
- .collect::<Result<Vec<_>>>()
+ .collect()
}
/// returns a vector of ArrayRefs, where each entry corresponds to either the
@@ -1081,8 +1013,8 @@ fn finalize_aggregation(
) -> Result<Vec<ArrayRef>> {
match mode {
AggregateMode::Partial => {
- // build the vector of states
- let a = accumulators
+ // Build the vector of states
+ accumulators
.iter()
.map(|accumulator| {
accumulator.state().and_then(|e| {
@@ -1091,18 +1023,18 @@ fn finalize_aggregation(
.collect::<Result<Vec<ArrayRef>>>()
})
})
- .collect::<Result<Vec<_>>>()?;
- Ok(a.iter().flatten().cloned().collect::<Vec<_>>())
+ .flatten_ok()
+ .collect()
}
AggregateMode::Final
| AggregateMode::FinalPartitioned
| AggregateMode::Single
| AggregateMode::SinglePartitioned => {
- // merge the state to the final value
+ // Merge the state to the final value
accumulators
.iter()
.map(|accumulator| accumulator.evaluate().and_then(|v|
v.to_array()))
- .collect::<Result<Vec<ArrayRef>>>()
+ .collect()
}
}
}
@@ -1125,9 +1057,7 @@ pub(crate) fn evaluate_many(
expr: &[Vec<Arc<dyn PhysicalExpr>>],
batch: &RecordBatch,
) -> Result<Vec<Vec<ArrayRef>>> {
- expr.iter()
- .map(|expr| evaluate(expr, batch))
- .collect::<Result<Vec<_>>>()
+ expr.iter().map(|expr| evaluate(expr, batch)).collect()
}
fn evaluate_optional(
@@ -1143,7 +1073,7 @@ fn evaluate_optional(
})
.transpose()
})
- .collect::<Result<Vec<_>>>()
+ .collect()
}
/// Evaluate a group by expression against a `RecordBatch`
@@ -1204,9 +1134,7 @@ mod tests {
use std::task::{Context, Poll};
use super::*;
- use crate::aggregates::{
- get_finest_requirement, AggregateExec, AggregateMode, PhysicalGroupBy,
- };
+ use crate::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use crate::coalesce_batches::CoalesceBatchesExec;
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::common;
@@ -1228,15 +1156,16 @@ mod tests {
Result, ScalarValue,
};
use datafusion_execution::config::SessionConfig;
+ use datafusion_execution::memory_pool::FairSpillPool;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_physical_expr::expressions::{
- lit, ApproxDistinct, Count, FirstValue, LastValue, Median,
+ lit, ApproxDistinct, Count, FirstValue, LastValue, Median,
OrderSensitiveArrayAgg,
};
use datafusion_physical_expr::{
- AggregateExpr, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+ reverse_order_bys, AggregateExpr, EquivalenceProperties, PhysicalExpr,
+ PhysicalSortExpr,
};
- use datafusion_execution::memory_pool::FairSpillPool;
use futures::{FutureExt, Stream};
// Generate a schema which consists of 5 columns (a, b, c, d, e)
@@ -2093,11 +2022,6 @@ mod tests {
descending: false,
nulls_first: false,
};
- // This is the reverse requirement of options1
- let options2 = SortOptions {
- descending: true,
- nulls_first: true,
- };
let col_a = &col("a", &test_schema)?;
let col_b = &col("b", &test_schema)?;
let col_c = &col("c", &test_schema)?;
@@ -2106,7 +2030,7 @@ mod tests {
eq_properties.add_equal_conditions(col_a, col_b);
// Aggregate requirements are
// [None], [a ASC], [a ASC, b ASC, c ASC], [a ASC, b ASC] respectively
- let mut order_by_exprs = vec![
+ let order_by_exprs = vec![
None,
Some(vec![PhysicalSortExpr {
expr: col_a.clone(),
@@ -2136,14 +2060,8 @@ mod tests {
options: options1,
},
]),
- // Since aggregate expression is reversible (FirstValue), we
should be able to resolve below
- // contradictory requirement by reversing it.
- Some(vec![PhysicalSortExpr {
- expr: col_b.clone(),
- options: options2,
- }]),
];
- let common_requirement = Some(vec![
+ let common_requirement = vec![
PhysicalSortExpr {
expr: col_a.clone(),
options: options1,
@@ -2152,17 +2070,28 @@ mod tests {
expr: col_c.clone(),
options: options1,
},
- ]);
- let aggr_expr = Arc::new(FirstValue::new(
- col_a.clone(),
- "first1",
- DataType::Int32,
- vec![],
- vec![],
- )) as _;
- let mut aggr_exprs = vec![aggr_expr; order_by_exprs.len()];
- let res =
- get_finest_requirement(&mut aggr_exprs, &mut order_by_exprs,
&eq_properties)?;
+ ];
+ let aggr_exprs = order_by_exprs
+ .into_iter()
+ .map(|order_by_expr| {
+ Arc::new(OrderSensitiveArrayAgg::new(
+ col_a.clone(),
+ "array_agg",
+ DataType::Int32,
+ false,
+ vec![],
+ order_by_expr.unwrap_or_default(),
+ )) as _
+ })
+ .collect::<Vec<_>>();
+ let group_by = PhysicalGroupBy::new_single(vec![]);
+ let res = get_aggregate_exprs_requirement(
+ &aggr_exprs,
+ &group_by,
+ &eq_properties,
+ &AggregateMode::Partial,
+ )?;
+ let res = PhysicalSortRequirement::to_sort_exprs(res);
assert_eq!(res, common_requirement);
Ok(())
}
diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/mod.rs
b/datafusion/sqllogictest/src/engines/datafusion_engine/mod.rs
index 663bbdd5a3..8e2bbbfe4f 100644
--- a/datafusion/sqllogictest/src/engines/datafusion_engine/mod.rs
+++ b/datafusion/sqllogictest/src/engines/datafusion_engine/mod.rs
@@ -21,5 +21,4 @@ mod normalize;
mod runner;
pub use error::*;
-pub use normalize::*;
pub use runner::*;
diff --git a/datafusion/sqllogictest/test_files/distinct_on.slt
b/datafusion/sqllogictest/test_files/distinct_on.slt
index 9a7117b69b..3f609e2548 100644
--- a/datafusion/sqllogictest/test_files/distinct_on.slt
+++ b/datafusion/sqllogictest/test_files/distinct_on.slt
@@ -78,7 +78,7 @@ c 4
query I
SELECT DISTINCT ON (c1) c2 FROM aggregate_test_100 ORDER BY c1, c3;
----
-5
+4
4
2
1
@@ -100,10 +100,9 @@ ProjectionExec: expr=[FIRST_VALUE(aggregate_test_100.c3)
ORDER BY [aggregate_tes
------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1],
aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
-------------AggregateExec: mode=Partial, gby=[c1@0 as c1],
aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)],
ordering_mode=Sorted
---------------SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC NULLS LAST]
-----------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c2, c3], has_header=true
+------------AggregateExec: mode=Partial, gby=[c1@0 as c1],
aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
+--------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+----------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c2, c3], has_header=true
# ON expressions are not a sub-set of the ORDER BY expressions
query error SELECT DISTINCT ON expressions must match initial ORDER BY
expressions
diff --git a/datafusion/sqllogictest/test_files/groupby.slt
b/datafusion/sqllogictest/test_files/groupby.slt
index f1b6a57287..bbf21e135f 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -2019,17 +2019,16 @@ SortPreservingMergeExec: [col0@0 ASC NULLS LAST]
------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as
col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4),
input_partitions=4
-------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1,
col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallySorted([0])
---------------SortExec: expr=[col0@3 ASC NULLS LAST]
-----------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4
as col2, col0@0 as col0, col1@1 as col1]
-------------------CoalesceBatchesExec: target_batch_size=8192
---------------------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(col0@0, col0@0)]
-----------------------CoalesceBatchesExec: target_batch_size=8192
-------------------------RepartitionExec: partitioning=Hash([col0@0], 4),
input_partitions=1
---------------------------MemoryExec: partitions=1, partition_sizes=[3]
-----------------------CoalesceBatchesExec: target_batch_size=8192
-------------------------RepartitionExec: partitioning=Hash([col0@0], 4),
input_partitions=1
---------------------------MemoryExec: partitions=1, partition_sizes=[3]
+------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1,
col2@2 as col2], aggr=[LAST_VALUE(r.col1)]
+--------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as
col2, col0@0 as col0, col1@1 as col1]
+----------------CoalesceBatchesExec: target_batch_size=8192
+------------------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(col0@0, col0@0)]
+--------------------CoalesceBatchesExec: target_batch_size=8192
+----------------------RepartitionExec: partitioning=Hash([col0@0], 4),
input_partitions=1
+------------------------MemoryExec: partitions=1, partition_sizes=[3]
+--------------------CoalesceBatchesExec: target_batch_size=8192
+----------------------RepartitionExec: partitioning=Hash([col0@0], 4),
input_partitions=1
+------------------------MemoryExec: partitions=1, partition_sizes=[3]
# Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
# a,b,c column. Column a has cardinality 2, column b has cardinality 4.
@@ -2209,7 +2208,7 @@ ProjectionExec: expr=[a@0 as a, b@1 as b,
LAST_VALUE(annotated_data_infinite2.c)
----StreamingTableExec: partition_sizes=1, projection=[a, b, c],
infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST,
c@2 ASC NULLS LAST]
query III
-SELECT a, b, LAST_VALUE(c ORDER BY a DESC) as last_c
+SELECT a, b, LAST_VALUE(c ORDER BY a DESC, c ASC) as last_c
FROM annotated_data_infinite2
GROUP BY a, b
----
@@ -2509,7 +2508,7 @@ Projection: sales_global.country,
ARRAY_AGG(sales_global.amount) ORDER BY [sales
----TableScan: sales_global projection=[country, amount]
physical_plan
ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount)
ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts,
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS
LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount
DESC NULLS FIRST]@3 as fv2]
---AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
+--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
----SortExec: expr=[amount@1 DESC]
------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2540,7 +2539,7 @@ Projection: sales_global.country,
ARRAY_AGG(sales_global.amount) ORDER BY [sales
----TableScan: sales_global projection=[country, amount]
physical_plan
ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount)
ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts,
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS
LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount
DESC NULLS FIRST]@3 as fv2]
---AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount),
FIRST_VALUE(sales_global.amount)]
+--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
----SortExec: expr=[amount@1 ASC NULLS LAST]
------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2572,7 +2571,7 @@ Projection: sales_global.country,
FIRST_VALUE(sales_global.amount) ORDER BY [sal
----TableScan: sales_global projection=[country, amount]
physical_plan
ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount)
ORDER BY [sales_global.amount ASC NULLS LAST]@1 as fv1,
LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS
FIRST]@2 as fv2, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount
ASC NULLS LAST]@3 as amounts]
---AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount),
ARRAY_AGG(sales_global.amount)]
+--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount),
ARRAY_AGG(sales_global.amount)]
----SortExec: expr=[amount@1 ASC NULLS LAST]
------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2637,9 +2636,8 @@ Projection: sales_global.country,
FIRST_VALUE(sales_global.amount) ORDER BY [sal
------TableScan: sales_global projection=[country, ts, amount]
physical_plan
ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount)
ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1,
LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2
as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3
as sum1]
---AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[LAST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount),
SUM(sales_global.amount)]
-----SortExec: expr=[ts@1 ASC NULLS LAST]
-------MemoryExec: partitions=1, partition_sizes=[1]
+--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount),
SUM(sales_global.amount)]
+----MemoryExec: partitions=1, partition_sizes=[1]
query TRRR rowsort
SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1,
@@ -2672,8 +2670,7 @@ Projection: sales_global.country,
FIRST_VALUE(sales_global.amount) ORDER BY [sal
physical_plan
ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount)
ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1,
LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2
as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3
as sum1]
--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount),
SUM(sales_global.amount)]
-----SortExec: expr=[ts@1 DESC]
-------MemoryExec: partitions=1, partition_sizes=[1]
+----MemoryExec: partitions=1, partition_sizes=[1]
query TRRR rowsort
SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1,
@@ -2709,12 +2706,11 @@ physical_plan
SortExec: expr=[sn@2 ASC NULLS LAST]
--ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 as
sn, ts@3 as ts, currency@4 as currency, LAST_VALUE(e.amount) ORDER BY [e.sn ASC
NULLS LAST]@5 as last_rate]
----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code,
country@1 as country, ts@3 as ts, currency@4 as currency],
aggr=[LAST_VALUE(e.amount)]
-------SortExec: expr=[sn@5 ASC NULLS LAST]
---------ProjectionExec: expr=[zip_code@4 as zip_code, country@5 as country,
sn@6 as sn, ts@7 as ts, currency@8 as currency, sn@0 as sn, amount@3 as amount]
-----------CoalesceBatchesExec: target_batch_size=8192
-------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2,
currency@4)], filter=ts@0 >= ts@1
---------------MemoryExec: partitions=1, partition_sizes=[1]
---------------MemoryExec: partitions=1, partition_sizes=[1]
+------ProjectionExec: expr=[zip_code@4 as zip_code, country@5 as country, sn@6
as sn, ts@7 as ts, currency@8 as currency, sn@0 as sn, amount@3 as amount]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2,
currency@4)], filter=ts@0 >= ts@1
+------------MemoryExec: partitions=1, partition_sizes=[1]
+------------MemoryExec: partitions=1, partition_sizes=[1]
query ITIPTR rowsort
SELECT s.zip_code, s.country, s.sn, s.ts, s.currency, LAST_VALUE(e.amount
ORDER BY e.sn) AS last_rate
@@ -2759,8 +2755,7 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST]
----------RepartitionExec: partitioning=Hash([country@0], 8),
input_partitions=8
------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
--------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
-----------------SortExec: expr=[ts@1 ASC NULLS LAST]
-------------------MemoryExec: partitions=1, partition_sizes=[1]
+----------------MemoryExec: partitions=1, partition_sizes=[1]
query TRR
SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
@@ -2791,13 +2786,12 @@ physical_plan
SortPreservingMergeExec: [country@0 ASC NULLS LAST]
--SortExec: expr=[country@0 ASC NULLS LAST]
----ProjectionExec: expr=[country@0 as country,
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as
fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS
FIRST]@2 as fv2]
-------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
+------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([country@0], 8),
input_partitions=8
------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
---------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
-----------------SortExec: expr=[ts@1 ASC NULLS LAST]
-------------------MemoryExec: partitions=1, partition_sizes=[1]
+--------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+----------------MemoryExec: partitions=1, partition_sizes=[1]
query TRR
SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
@@ -2831,16 +2825,15 @@ ProjectionExec: expr=[FIRST_VALUE(sales_global.amount)
ORDER BY [sales_global.ts
--AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
----CoalescePartitionsExec
------AggregateExec: mode=Partial, gby=[],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
---------SortExec: expr=[ts@0 ASC NULLS LAST]
-----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
-------------MemoryExec: partitions=1, partition_sizes=[1]
+--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+----------MemoryExec: partitions=1, partition_sizes=[1]
query RR
SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
LAST_VALUE(amount ORDER BY ts ASC) AS fv2
FROM sales_global
----
-30 80
+30 100
# Conversion in between FIRST_VALUE and LAST_VALUE to resolve
# contradictory requirements should work in multi partitions.
@@ -2855,12 +2848,11 @@ Projection: FIRST_VALUE(sales_global.amount) ORDER BY
[sales_global.ts ASC NULLS
----TableScan: sales_global projection=[ts, amount]
physical_plan
ProjectionExec: expr=[FIRST_VALUE(sales_global.amount) ORDER BY
[sales_global.ts ASC NULLS LAST]@0 as fv1, LAST_VALUE(sales_global.amount)
ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv2]
---AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount),
FIRST_VALUE(sales_global.amount)]
+--AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
----CoalescePartitionsExec
-------AggregateExec: mode=Partial, gby=[],
aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
---------SortExec: expr=[ts@0 ASC NULLS LAST]
-----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
-------------MemoryExec: partitions=1, partition_sizes=[1]
+------AggregateExec: mode=Partial, gby=[],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+----------MemoryExec: partitions=1, partition_sizes=[1]
query RR
SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
@@ -2993,10 +2985,10 @@ physical_plan
SortPreservingMergeExec: [country@0 ASC NULLS LAST]
--SortExec: expr=[country@0 ASC NULLS LAST]
----ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount)
ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts,
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS
LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount
DESC NULLS FIRST]@3 as fv2]
-------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
+------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
--------CoalesceBatchesExec: target_batch_size=4
----------RepartitionExec: partitioning=Hash([country@0], 8),
input_partitions=8
-------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
+------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
--------------SortExec: expr=[amount@1 DESC]
----------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
------------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3639,10 +3631,10 @@ Projection: FIRST_VALUE(multiple_ordered_table.a) ORDER
BY [multiple_ordered_tab
----TableScan: multiple_ordered_table projection=[a, c, d]
physical_plan
ProjectionExec: expr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY
[multiple_ordered_table.a ASC NULLS LAST]@1 as first_a,
LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC
NULLS FIRST]@2 as last_c]
---AggregateExec: mode=FinalPartitioned, gby=[d@0 as d],
aggr=[FIRST_VALUE(multiple_ordered_table.a),
FIRST_VALUE(multiple_ordered_table.c)]
+--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d],
aggr=[FIRST_VALUE(multiple_ordered_table.a),
LAST_VALUE(multiple_ordered_table.c)]
----CoalesceBatchesExec: target_batch_size=2
------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8
---------AggregateExec: mode=Partial, gby=[d@2 as d],
aggr=[FIRST_VALUE(multiple_ordered_table.a),
FIRST_VALUE(multiple_ordered_table.c)]
+--------AggregateExec: mode=Partial, gby=[d@2 as d],
aggr=[FIRST_VALUE(multiple_ordered_table.a),
LAST_VALUE(multiple_ordered_table.c)]
----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c,
d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]],
has_header=true
diff --git a/datafusion/sqllogictest/test_files/joins.slt
b/datafusion/sqllogictest/test_files/joins.slt
index 9a349f6000..a7146a5a91 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -3454,7 +3454,7 @@ SortPreservingMergeExec: [a@0 ASC]
------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as
c], aggr=[LAST_VALUE(r.b)]
--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 2),
input_partitions=2
-------------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c],
aggr=[LAST_VALUE(r.b)], ordering_mode=PartiallySorted([0])
+------------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c],
aggr=[LAST_VALUE(r.b)]
--------------CoalesceBatchesExec: target_batch_size=2
----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0,
a@0)]
------------------CoalesceBatchesExec: target_batch_size=2
@@ -3462,7 +3462,7 @@ SortPreservingMergeExec: [a@0 ASC]
----------------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
------------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b,
c], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST],
has_header=true
------------------CoalesceBatchesExec: target_batch_size=2
---------------------RepartitionExec: partitioning=Hash([a@0], 2),
input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC,b@1 ASC NULLS LAST
+--------------------RepartitionExec: partitioning=Hash([a@0], 2),
input_partitions=2
----------------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
------------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b],
output_ordering=[a@0 ASC, b@1 ASC NULLS LAST], has_header=true