This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ff65dee3ff add multi ordering test case (#8439)
ff65dee3ff is described below
commit ff65dee3ff4318da13f5f89bafddf446ffbf8803
Author: Jay Zhan <[email protected]>
AuthorDate: Mon Dec 11 21:38:23 2023 +0800
add multi ordering test case (#8439)
Signed-off-by: jayzhan211 <[email protected]>
---
.../core/tests/data/aggregate_agg_multi_order.csv | 11 +++++
.../src/aggregate/array_agg_ordered.rs | 49 +++++++++-------------
datafusion/sqllogictest/test_files/aggregate.slt | 30 +++++++++++++
3 files changed, 60 insertions(+), 30 deletions(-)
diff --git a/datafusion/core/tests/data/aggregate_agg_multi_order.csv
b/datafusion/core/tests/data/aggregate_agg_multi_order.csv
new file mode 100644
index 0000000000..e9a65ceee4
--- /dev/null
+++ b/datafusion/core/tests/data/aggregate_agg_multi_order.csv
@@ -0,0 +1,11 @@
+c1,c2,c3
+1,20,0
+2,20,1
+3,10,2
+4,10,3
+5,30,4
+6,30,5
+7,30,6
+8,30,7
+9,30,8
+10,10,9
\ No newline at end of file
diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
index 9ca83a781a..eb5ae8b0b0 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
@@ -30,9 +30,9 @@ use crate::{AggregateExpr, LexOrdering, PhysicalExpr,
PhysicalSortExpr};
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Field};
+use arrow_array::cast::AsArray;
use arrow_array::Array;
use arrow_schema::{Fields, SortOptions};
-use datafusion_common::cast::as_list_array;
use datafusion_common::utils::{compare_rows, get_row_at_idx};
use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::Accumulator;
@@ -214,7 +214,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
// values received from its ordering requirement expression. (This
information is necessary for during merging).
let agg_orderings = &states[1];
- if as_list_array(agg_orderings).is_ok() {
+ if let Some(agg_orderings) = agg_orderings.as_list_opt::<i32>() {
// Stores ARRAY_AGG results coming from each partition
let mut partition_values = vec![];
// Stores ordering requirement expression results coming from each
partition
@@ -232,10 +232,21 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
}
let orderings =
ScalarValue::convert_array_to_scalar_vec(agg_orderings)?;
- // Ordering requirement expression values for each entry in the
ARRAY_AGG list
- let other_ordering_values =
self.convert_array_agg_to_orderings(orderings)?;
- for v in other_ordering_values.into_iter() {
- partition_ordering_values.push(v);
+
+ for partition_ordering_rows in orderings.into_iter() {
+ // Extract value from struct to ordering_rows for each
group/partition
+ let ordering_value =
partition_ordering_rows.into_iter().map(|ordering_row| {
+ if let
ScalarValue::Struct(Some(ordering_columns_per_row), _) = ordering_row {
+ Ok(ordering_columns_per_row)
+ } else {
+ exec_err!(
+ "Expects to receive
ScalarValue::Struct(Some(..), _) but got:{:?}",
+ ordering_row.data_type()
+ )
+ }
+ }).collect::<Result<Vec<_>>>()?;
+
+ partition_ordering_values.push(ordering_value);
}
let sort_options = self
@@ -293,33 +304,10 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
}
impl OrderSensitiveArrayAggAccumulator {
- /// Inner Vec\<ScalarValue> in the ordering_values can be thought as
ordering information for the each ScalarValue in the values array.
- /// See [`merge_ordered_arrays`] for more information.
- fn convert_array_agg_to_orderings(
- &self,
- array_agg: Vec<Vec<ScalarValue>>,
- ) -> Result<Vec<Vec<Vec<ScalarValue>>>> {
- let mut orderings = vec![];
- // in_data is Vec<ScalarValue> where ScalarValue does not include
ScalarValue::List
- for in_data in array_agg.into_iter() {
- let ordering = in_data.into_iter().map(|struct_vals| {
- if let ScalarValue::Struct(Some(orderings), _) =
struct_vals {
- Ok(orderings)
- } else {
- exec_err!(
- "Expects to receive ScalarValue::Struct(Some(..),
_) but got:{:?}",
- struct_vals.data_type()
- )
- }
- }).collect::<Result<Vec<_>>>()?;
- orderings.push(ordering);
- }
- Ok(orderings)
- }
-
fn evaluate_orderings(&self) -> Result<ScalarValue> {
let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
let struct_field = Fields::from(fields.clone());
+
let orderings: Vec<ScalarValue> = self
.ordering_values
.iter()
@@ -329,6 +317,7 @@ impl OrderSensitiveArrayAggAccumulator {
.collect();
let struct_type = DataType::Struct(Fields::from(fields));
+ // Wrap in List, so we have the same data structure
ListArray(StructArray..) for group by cases
let arr = ScalarValue::new_list(&orderings, &struct_type);
Ok(ScalarValue::List(arr))
}
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt
b/datafusion/sqllogictest/test_files/aggregate.slt
index 7cfc9c707d..bcda3464f4 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -106,6 +106,36 @@ FROM
----
[0VVIHzxWtNOFLtnhjHEKjXaJOSLJfm, 0keZ5G8BffGwgF2RwQD59TFzMStxCB,
0og6hSkhbX8AC1ktFS4kounvTzy8Vo, 1aOcrEGd0cOqZe2I5XBOm0nDcwtBZO,
2T3wSlHdEmASmO0xcXHnndkKEt6bz8]
+statement ok
+CREATE EXTERNAL TABLE agg_order (
+c1 INT NOT NULL,
+c2 INT NOT NULL,
+c3 INT NOT NULL
+)
+STORED AS CSV
+WITH HEADER ROW
+LOCATION '../core/tests/data/aggregate_agg_multi_order.csv';
+
+# test array_agg with order by multiple columns
+query ?
+select array_agg(c1 order by c2 desc, c3) from agg_order;
+----
+[5, 6, 7, 8, 9, 1, 2, 3, 4, 10]
+
+query TT
+explain select array_agg(c1 order by c2 desc, c3) from agg_order;
+----
+logical_plan
+Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(agg_order.c1) ORDER BY [agg_order.c2
DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]]
+--TableScan: agg_order projection=[c1, c2, c3]
+physical_plan
+AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1)]
+--CoalescePartitionsExec
+----AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(agg_order.c1)]
+------SortExec: expr=[c2@1 DESC,c3@2 ASC NULLS LAST]
+--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+----------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]},
projection=[c1, c2, c3], has_header=true
+
statement error This feature is not implemented: LIMIT not supported in
ARRAY_AGG: 1
SELECT array_agg(c13 LIMIT 1) FROM aggregate_test_100