This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new e1a5cdf6db Support multiple ordered `array_agg` aggregations (#16625) e1a5cdf6db is described below commit e1a5cdf6db791ea50b4028d6eb11ffafd940b29b Author: Piotr Findeisen <piotr.findei...@gmail.com> AuthorDate: Mon Jul 28 21:14:32 2025 +0200 Support multiple ordered `array_agg` aggregations (#16625) * Validate states shape in merge_batch Due to `..` in the pattern, the `OrderSensitiveArrayAggAccumulator::merge_batch` did not validate it's not receiving additional states columns it ignores. Update the code to check number of inputs. * Support multiple ordered array_agg Before the change, `array_agg` with ordering would depend on input being ordered. As a result, it was impossible to do two or more `array_agg(x ORDER BY ...)` with incompatible ordering. This change moves ordering responsibility into `OrderSensitiveArrayAggAccumulator`. When input is pre-ordered (beneficial ordering), no additional work is done. However, when it's not, `array_agg` accumulator will order the data on its own. * Generate sorts based on aggregations soft requirements The sorting consideration before aggregations did respect only ordered aggregation functions with `AggregateOrderSensitivity::HardRequirement`. This change includes sorting expectations from `AggregateOrderSensitivity::Beneficial` functions. When beneficial ordered function requirements are not satisfied, no error is raised, they are considered in the second pass only. * Fix reversing first_value, last_value Upon reversing, a schema and field mismatch would happen. * Revert "Fix reversing first_value, last_value" This reverts commit 9b7e94d9d2f3a2003c34b03a30dddaddfc84f015. * sort array_agg input the old way whenever possible * revert some now unnecessary change * Improve doc for SoftRequiement Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> * Add comment for include_soft_requirement Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> * Document include_soft_requirement param * fmt * doc fix --------- Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> --- datafusion/ffi/src/udaf/mod.rs | 4 + datafusion/functions-aggregate-common/src/order.rs | 15 ++- datafusion/functions-aggregate/src/array_agg.rs | 70 +++++++++-- datafusion/physical-plan/src/aggregates/mod.rs | 139 +++++++++++++-------- datafusion/sqllogictest/test_files/aggregate.slt | 50 ++++++++ datafusion/sqllogictest/test_files/group_by.slt | 6 +- 6 files changed, 222 insertions(+), 62 deletions(-) diff --git a/datafusion/ffi/src/udaf/mod.rs b/datafusion/ffi/src/udaf/mod.rs index 63d44110a6..17116e2461 100644 --- a/datafusion/ffi/src/udaf/mod.rs +++ b/datafusion/ffi/src/udaf/mod.rs @@ -589,6 +589,7 @@ impl AggregateUDFImpl for ForeignAggregateUDF { pub enum FFI_AggregateOrderSensitivity { Insensitive, HardRequirement, + SoftRequirement, Beneficial, } @@ -597,6 +598,7 @@ impl From<FFI_AggregateOrderSensitivity> for AggregateOrderSensitivity { match value { FFI_AggregateOrderSensitivity::Insensitive => Self::Insensitive, FFI_AggregateOrderSensitivity::HardRequirement => Self::HardRequirement, + FFI_AggregateOrderSensitivity::SoftRequirement => Self::SoftRequirement, FFI_AggregateOrderSensitivity::Beneficial => Self::Beneficial, } } @@ -607,6 +609,7 @@ impl From<AggregateOrderSensitivity> for FFI_AggregateOrderSensitivity { match value { AggregateOrderSensitivity::Insensitive => Self::Insensitive, AggregateOrderSensitivity::HardRequirement => Self::HardRequirement, + AggregateOrderSensitivity::SoftRequirement => Self::SoftRequirement, AggregateOrderSensitivity::Beneficial => Self::Beneficial, } } @@ -748,6 +751,7 @@ mod tests { fn test_round_trip_all_order_sensitivities() { test_round_trip_order_sensitivity(AggregateOrderSensitivity::Insensitive); test_round_trip_order_sensitivity(AggregateOrderSensitivity::HardRequirement); + test_round_trip_order_sensitivity(AggregateOrderSensitivity::SoftRequirement); test_round_trip_order_sensitivity(AggregateOrderSensitivity::Beneficial); } } diff --git a/datafusion/functions-aggregate-common/src/order.rs b/datafusion/functions-aggregate-common/src/order.rs index bfa6e39138..0908396d78 100644 --- a/datafusion/functions-aggregate-common/src/order.rs +++ b/datafusion/functions-aggregate-common/src/order.rs @@ -22,9 +22,20 @@ pub enum AggregateOrderSensitivity { /// Ordering at the input is not important for the result of the aggregator. Insensitive, /// Indicates that the aggregate expression has a hard requirement on ordering. - /// The aggregator can not produce a correct result unless its ordering + /// The aggregator cannot produce a correct result unless its ordering /// requirement is satisfied. HardRequirement, + /// Indicates that the aggregator is more efficient when the input is ordered + /// but can still produce its result correctly regardless of the input ordering. + /// This is similar to, but stronger than, [`Self::Beneficial`]. + /// + /// Similarly to [`Self::HardRequirement`], when possible DataFusion will insert + /// a `SortExec`, to reorder the input to match the SoftRequirement. However, + /// when such a `SortExec` cannot be inserted, (for example, due to conflicting + /// [`Self::HardRequirement`] with other ordered aggregates in the query), + /// the aggregate function will still execute, without the preferred order, unlike + /// with [`Self::HardRequirement`] + SoftRequirement, /// Indicates that ordering is beneficial for the aggregate expression in terms /// of evaluation efficiency. The aggregator can produce its result efficiently /// when its required ordering is satisfied; however, it can still produce the @@ -38,7 +49,7 @@ impl AggregateOrderSensitivity { } pub fn is_beneficial(&self) -> bool { - self.eq(&AggregateOrderSensitivity::Beneficial) + matches!(self, Self::SoftRequirement | Self::Beneficial) } pub fn hard_requires(&self) -> bool { diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 0aa346e91a..5f5738d153 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -19,7 +19,7 @@ use std::cmp::Ordering; use std::collections::{HashSet, VecDeque}; -use std::mem::{size_of, size_of_val}; +use std::mem::{size_of, size_of_val, take}; use std::sync::Arc; use arrow::array::{ @@ -31,7 +31,9 @@ use arrow::datatypes::{DataType, Field, FieldRef, Fields}; use datafusion_common::cast::as_list_array; use datafusion_common::scalar::copy_array_data; -use datafusion_common::utils::{get_row_at_idx, SingleRowListArrayBuilder}; +use datafusion_common::utils::{ + compare_rows, get_row_at_idx, take_function_args, SingleRowListArrayBuilder, +}; use datafusion_common::{exec_err, internal_err, Result, ScalarValue}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::utils::format_state_name; @@ -39,6 +41,7 @@ use datafusion_expr::{ Accumulator, AggregateUDFImpl, Documentation, Signature, Volatility, }; use datafusion_functions_aggregate_common::merge_arrays::merge_ordered_arrays; +use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity; use datafusion_functions_aggregate_common::utils::ordering_fields; use datafusion_macros::user_doc; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; @@ -78,12 +81,14 @@ This aggregation function can only mix DISTINCT and ORDER BY if the ordering exp /// ARRAY_AGG aggregate expression pub struct ArrayAgg { signature: Signature, + is_input_pre_ordered: bool, } impl Default for ArrayAgg { fn default() -> Self { Self { signature: Signature::any(1, Volatility::Immutable), + is_input_pre_ordered: false, } } } @@ -144,6 +149,20 @@ impl AggregateUDFImpl for ArrayAgg { Ok(fields) } + fn order_sensitivity(&self) -> AggregateOrderSensitivity { + AggregateOrderSensitivity::SoftRequirement + } + + fn with_beneficial_ordering( + self: Arc<Self>, + beneficial_ordering: bool, + ) -> Result<Option<Arc<dyn AggregateUDFImpl>>> { + Ok(Some(Arc::new(Self { + signature: self.signature.clone(), + is_input_pre_ordered: beneficial_ordering, + }))) + } + fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> { let data_type = acc_args.exprs[0].data_type(acc_args.schema)?; let ignore_nulls = @@ -196,6 +215,7 @@ impl AggregateUDFImpl for ArrayAgg { &data_type, &ordering_dtypes, ordering, + self.is_input_pre_ordered, acc_args.is_reversed, ignore_nulls, ) @@ -518,6 +538,8 @@ pub(crate) struct OrderSensitiveArrayAggAccumulator { datatypes: Vec<DataType>, /// Stores the ordering requirement of the `Accumulator`. ordering_req: LexOrdering, + /// Whether the input is known to be pre-ordered + is_input_pre_ordered: bool, /// Whether the aggregation is running in reverse. reverse: bool, /// Whether the aggregation should ignore null values. @@ -531,6 +553,7 @@ impl OrderSensitiveArrayAggAccumulator { datatype: &DataType, ordering_dtypes: &[DataType], ordering_req: LexOrdering, + is_input_pre_ordered: bool, reverse: bool, ignore_nulls: bool, ) -> Result<Self> { @@ -541,11 +564,34 @@ impl OrderSensitiveArrayAggAccumulator { ordering_values: vec![], datatypes, ordering_req, + is_input_pre_ordered, reverse, ignore_nulls, }) } + fn sort(&mut self) { + let sort_options = self + .ordering_req + .iter() + .map(|sort_expr| sort_expr.options) + .collect::<Vec<_>>(); + let mut values = take(&mut self.values) + .into_iter() + .zip(take(&mut self.ordering_values)) + .collect::<Vec<_>>(); + let mut delayed_cmp_err = Ok(()); + values.sort_by(|(_, left_ordering), (_, right_ordering)| { + compare_rows(left_ordering, right_ordering, &sort_options).unwrap_or_else( + |err| { + delayed_cmp_err = Err(err); + Ordering::Equal + }, + ) + }); + (self.values, self.ordering_values) = values.into_iter().unzip(); + } + fn evaluate_orderings(&self) -> Result<ScalarValue> { let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]); @@ -616,9 +662,8 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { // inside `ARRAY_AGG` list, we will receive an `Array` that stores values // received from its ordering requirement expression. (This information // is necessary for during merging). - let [array_agg_values, agg_orderings, ..] = &states else { - return exec_err!("State should have two elements"); - }; + let [array_agg_values, agg_orderings] = + take_function_args("OrderSensitiveArrayAggAccumulator::merge_batch", states)?; let Some(agg_orderings) = agg_orderings.as_list_opt::<i32>() else { return exec_err!("Expects to receive a list array"); }; @@ -629,8 +674,11 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { let mut partition_ordering_values = vec![]; // Existing values should be merged also. - partition_values.push(self.values.clone().into()); - partition_ordering_values.push(self.ordering_values.clone().into()); + if !self.is_input_pre_ordered { + self.sort(); + } + partition_values.push(take(&mut self.values).into()); + partition_ordering_values.push(take(&mut self.ordering_values).into()); // Convert array to Scalars to sort them easily. Convert back to array at evaluation. let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?; @@ -679,6 +727,10 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { } fn state(&mut self) -> Result<Vec<ScalarValue>> { + if !self.is_input_pre_ordered { + self.sort(); + } + let mut result = vec![self.evaluate()?]; result.push(self.evaluate_orderings()?); @@ -686,6 +738,10 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { } fn evaluate(&mut self) -> Result<ScalarValue> { + if !self.is_input_pre_ordered { + self.sort(); + } + if self.values.is_empty() { return Ok(ScalarValue::new_null_list( self.datatypes[0].clone(), diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 784b7db893..878bccc1d1 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -52,6 +52,7 @@ use datafusion_physical_expr_common::sort_expr::{ LexOrdering, LexRequirement, OrderingRequirements, PhysicalSortRequirement, }; +use datafusion_expr::utils::AggregateOrderSensitivity; use itertools::Itertools; pub mod group_values; @@ -1069,6 +1070,11 @@ fn create_schema( /// physical GROUP BY expression. /// - `agg_mode`: A reference to an `AggregateMode` instance representing the /// mode of aggregation. +/// - `include_soft_requirement`: When `false`, only hard requirements are +/// considered, as indicated by [`AggregateFunctionExpr::order_sensitivity`] +/// returning [`AggregateOrderSensitivity::HardRequirement`]. +/// Otherwise, also soft requirements ([`AggregateOrderSensitivity::SoftRequirement`]) +/// are considered. /// /// # Returns /// @@ -1078,13 +1084,26 @@ fn get_aggregate_expr_req( aggr_expr: &AggregateFunctionExpr, group_by: &PhysicalGroupBy, agg_mode: &AggregateMode, + include_soft_requirement: bool, ) -> Option<LexOrdering> { - // If the aggregation function is ordering requirement is not absolutely - // necessary, or the aggregation is performing a "second stage" calculation, - // then ignore the ordering requirement. - if !aggr_expr.order_sensitivity().hard_requires() || !agg_mode.is_first_stage() { + // If the aggregation is performing a "second stage" calculation, + // then ignore the ordering requirement. Ordering requirement applies + // only to the aggregation input data. + if !agg_mode.is_first_stage() { return None; } + + match aggr_expr.order_sensitivity() { + AggregateOrderSensitivity::Insensitive => return None, + AggregateOrderSensitivity::HardRequirement => {} + AggregateOrderSensitivity::SoftRequirement => { + if !include_soft_requirement { + return None; + } + } + AggregateOrderSensitivity::Beneficial => return None, + } + let mut sort_exprs = aggr_expr.order_bys().to_vec(); // In non-first stage modes, we accumulate data (using `merge_batch`) from // different partitions (i.e. merge partial results). During this merge, we @@ -1149,60 +1168,76 @@ pub fn get_finer_aggregate_exprs_requirement( agg_mode: &AggregateMode, ) -> Result<Vec<PhysicalSortRequirement>> { let mut requirement = None; - for aggr_expr in aggr_exprs.iter_mut() { - let Some(aggr_req) = get_aggregate_expr_req(aggr_expr, group_by, agg_mode) - .and_then(|o| eq_properties.normalize_sort_exprs(o)) - else { - // There is no aggregate ordering requirement, or it is trivially - // satisfied -- we can skip this expression. - continue; - }; - // If the common requirement is finer than the current expression's, - // we can skip this expression. If the latter is finer than the former, - // adopt it if it is satisfied by the equivalence properties. Otherwise, - // defer the analysis to the reverse expression. - let forward_finer = determine_finer(&requirement, &aggr_req); - if let Some(finer) = forward_finer { - if !finer { - continue; - } else if eq_properties.ordering_satisfy(aggr_req.clone())? { - requirement = Some(aggr_req); - continue; - } - } - if let Some(reverse_aggr_expr) = aggr_expr.reverse_expr() { - let Some(rev_aggr_req) = - get_aggregate_expr_req(&reverse_aggr_expr, group_by, agg_mode) - .and_then(|o| eq_properties.normalize_sort_exprs(o)) - else { - // The reverse requirement is trivially satisfied -- just reverse - // the expression and continue with the next one: - *aggr_expr = Arc::new(reverse_aggr_expr); + + // First try and find a match for all hard and soft requirements. + // If a match can't be found, try a second time just matching hard + // requirements. + for include_soft_requirement in [false, true] { + for aggr_expr in aggr_exprs.iter_mut() { + let Some(aggr_req) = get_aggregate_expr_req( + aggr_expr, + group_by, + agg_mode, + include_soft_requirement, + ) + .and_then(|o| eq_properties.normalize_sort_exprs(o)) else { + // There is no aggregate ordering requirement, or it is trivially + // satisfied -- we can skip this expression. continue; }; - // If the common requirement is finer than the reverse expression's, - // just reverse it and continue the loop with the next aggregate - // expression. If the latter is finer than the former, adopt it if - // it is satisfied by the equivalence properties. Otherwise, adopt - // the forward expression. - if let Some(finer) = determine_finer(&requirement, &rev_aggr_req) { + // If the common requirement is finer than the current expression's, + // we can skip this expression. If the latter is finer than the former, + // adopt it if it is satisfied by the equivalence properties. Otherwise, + // defer the analysis to the reverse expression. + let forward_finer = determine_finer(&requirement, &aggr_req); + if let Some(finer) = forward_finer { if !finer { + continue; + } else if eq_properties.ordering_satisfy(aggr_req.clone())? { + requirement = Some(aggr_req); + continue; + } + } + if let Some(reverse_aggr_expr) = aggr_expr.reverse_expr() { + let Some(rev_aggr_req) = get_aggregate_expr_req( + &reverse_aggr_expr, + group_by, + agg_mode, + include_soft_requirement, + ) + .and_then(|o| eq_properties.normalize_sort_exprs(o)) else { + // The reverse requirement is trivially satisfied -- just reverse + // the expression and continue with the next one: *aggr_expr = Arc::new(reverse_aggr_expr); - } else if eq_properties.ordering_satisfy(rev_aggr_req.clone())? { - *aggr_expr = Arc::new(reverse_aggr_expr); - requirement = Some(rev_aggr_req); - } else { + continue; + }; + // If the common requirement is finer than the reverse expression's, + // just reverse it and continue the loop with the next aggregate + // expression. If the latter is finer than the former, adopt it if + // it is satisfied by the equivalence properties. Otherwise, adopt + // the forward expression. + if let Some(finer) = determine_finer(&requirement, &rev_aggr_req) { + if !finer { + *aggr_expr = Arc::new(reverse_aggr_expr); + } else if eq_properties.ordering_satisfy(rev_aggr_req.clone())? { + *aggr_expr = Arc::new(reverse_aggr_expr); + requirement = Some(rev_aggr_req); + } else { + requirement = Some(aggr_req); + } + } else if forward_finer.is_some() { requirement = Some(aggr_req); + } else { + // Neither the existing requirement nor the current aggregate + // requirement satisfy the other (forward or reverse), this + // means they are conflicting. This is a problem only for hard + // requirements. Unsatisfied soft requirements can be ignored. + if !include_soft_requirement { + return not_impl_err!( + "Conflicting ordering requirements in aggregate functions is not supported" + ); + } } - } else if forward_finer.is_some() { - requirement = Some(aggr_req); - } else { - // Neither the existing requirement nor the current aggregate - // requirement satisfy the other (forward or reverse), this - // means they are conflicting. - return not_impl_err!( - "Conflicting ordering requirements in aggregate functions is not supported" - ); } } } diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index bdf327c982..37a074bda4 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -206,6 +206,56 @@ query error Execution error: In an aggregate with DISTINCT, ORDER BY expressions SELECT array_agg(DISTINCT c13 ORDER BY c13, c12) FROM aggregate_test_100 +query ?? rowsort +with tbl as (SELECT * FROM (VALUES ('xxx', 'yyy'), ('xxx', 'yyy'), ('xxx2', 'yyy2')) AS t(x, y)) +select + array_agg(x order by x) as x_agg, + array_agg(y order by y) as y_agg +from tbl +group by all +---- +[xxx, xxx, xxx2] [yyy, yyy, yyy2] + +query ?? +SELECT + (SELECT array_agg(c12 ORDER BY c12) FROM aggregate_test_100), + (SELECT array_agg(c13 ORDER BY c13) FROM aggregate_test_100) +---- +[0.01479305307777301, 0.02182578039211991, 0.03968347085780355, 0.04429073092078406, 0.047343434291126085, 0.04893135681998029, 0.0494924465469434, 0.05573662213439634, 0.05636955101974106, 0.061029375346466685, 0.07260475960924484, 0.09465635123783445, 0.12357539988406441, 0.152498292971736, 0.16301110515739792, 0.1640882545084913, 0.1754261586710173, 0.17592486905979987, 0.17909035118828576, 0.18628859265874176, 0.19113293583306745, 0.2145232647388039, 0.21535402343780985, 0.2489979431 [...] + +query ?? +SELECT + array_agg(c12 ORDER BY c12), + array_agg(c13 ORDER BY c13) +FROM aggregate_test_100 +---- +[0.01479305307777301, 0.02182578039211991, 0.03968347085780355, 0.04429073092078406, 0.047343434291126085, 0.04893135681998029, 0.0494924465469434, 0.05573662213439634, 0.05636955101974106, 0.061029375346466685, 0.07260475960924484, 0.09465635123783445, 0.12357539988406441, 0.152498292971736, 0.16301110515739792, 0.1640882545084913, 0.1754261586710173, 0.17592486905979987, 0.17909035118828576, 0.18628859265874176, 0.19113293583306745, 0.2145232647388039, 0.21535402343780985, 0.2489979431 [...] + +query ?? rowsort +with tbl as (SELECT * FROM (VALUES ('xxx', 'yyy'), ('xxx', 'yyy'), ('xxx2', 'yyy2')) AS t(x, y)) +select + array_agg(distinct x order by x) as x_agg, + array_agg(distinct y order by y) as y_agg +from tbl +group by all +---- +[xxx, xxx2] [yyy, yyy2] + +query ?? +SELECT + (SELECT array_agg(DISTINCT c12 ORDER BY c12) FROM aggregate_test_100), + (SELECT array_agg(DISTINCT c13 ORDER BY c13) FROM aggregate_test_100) +---- +[0.01479305307777301, 0.02182578039211991, 0.03968347085780355, 0.04429073092078406, 0.047343434291126085, 0.04893135681998029, 0.0494924465469434, 0.05573662213439634, 0.05636955101974106, 0.061029375346466685, 0.07260475960924484, 0.09465635123783445, 0.12357539988406441, 0.152498292971736, 0.16301110515739792, 0.1640882545084913, 0.1754261586710173, 0.17592486905979987, 0.17909035118828576, 0.18628859265874176, 0.19113293583306745, 0.2145232647388039, 0.21535402343780985, 0.2489979431 [...] + +query ?? +SELECT + array_agg(DISTINCT c12 ORDER BY c12), + array_agg(DISTINCT c13 ORDER BY c13) +FROM aggregate_test_100 +---- +[0.01479305307777301, 0.02182578039211991, 0.03968347085780355, 0.04429073092078406, 0.047343434291126085, 0.04893135681998029, 0.0494924465469434, 0.05573662213439634, 0.05636955101974106, 0.061029375346466685, 0.07260475960924484, 0.09465635123783445, 0.12357539988406441, 0.152498292971736, 0.16301110515739792, 0.1640882545084913, 0.1754261586710173, 0.17592486905979987, 0.17909035118828576, 0.18628859265874176, 0.19113293583306745, 0.2145232647388039, 0.21535402343780985, 0.2489979431 [...] + statement ok CREATE EXTERNAL TABLE agg_order ( c1 INT NOT NULL, diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index 6dc4bd8404..1b5ea3df2c 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -2506,12 +2506,16 @@ TUR [100.0, 75.0] 175 # test_ordering_sensitive_aggregation3 # When different aggregators have conflicting requirements, we cannot satisfy all of them in current implementation. # test below should raise Plan Error. -statement error DataFusion error: This feature is not implemented: Conflicting ordering requirements in aggregate functions is not supported +query ??? rowsort SELECT ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts, ARRAY_AGG(s.amount ORDER BY s.amount ASC) AS amounts2, ARRAY_AGG(s.amount ORDER BY s.sn ASC) AS amounts3 FROM sales_global AS s GROUP BY s.country +---- +[100.0, 75.0] [75.0, 100.0] [75.0, 100.0] +[200.0, 50.0] [50.0, 200.0] [50.0, 200.0] +[80.0, 30.0] [30.0, 80.0] [30.0, 80.0] # test_ordering_sensitive_aggregation4 # If aggregators can work with bounded memory (Sorted or PartiallySorted mode), we should append requirement to --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org