This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ff65dee3ff add multi ordering test case (#8439)
ff65dee3ff is described below

commit ff65dee3ff4318da13f5f89bafddf446ffbf8803
Author: Jay Zhan <[email protected]>
AuthorDate: Mon Dec 11 21:38:23 2023 +0800

    add multi ordering test case (#8439)
    
    Signed-off-by: jayzhan211 <[email protected]>
---
 .../core/tests/data/aggregate_agg_multi_order.csv  | 11 +++++
 .../src/aggregate/array_agg_ordered.rs             | 49 +++++++++-------------
 datafusion/sqllogictest/test_files/aggregate.slt   | 30 +++++++++++++
 3 files changed, 60 insertions(+), 30 deletions(-)

diff --git a/datafusion/core/tests/data/aggregate_agg_multi_order.csv 
b/datafusion/core/tests/data/aggregate_agg_multi_order.csv
new file mode 100644
index 0000000000..e9a65ceee4
--- /dev/null
+++ b/datafusion/core/tests/data/aggregate_agg_multi_order.csv
@@ -0,0 +1,11 @@
+c1,c2,c3
+1,20,0
+2,20,1
+3,10,2
+4,10,3
+5,30,4
+6,30,5
+7,30,6
+8,30,7
+9,30,8
+10,10,9
\ No newline at end of file
diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs 
b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
index 9ca83a781a..eb5ae8b0b0 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
@@ -30,9 +30,9 @@ use crate::{AggregateExpr, LexOrdering, PhysicalExpr, 
PhysicalSortExpr};
 
 use arrow::array::ArrayRef;
 use arrow::datatypes::{DataType, Field};
+use arrow_array::cast::AsArray;
 use arrow_array::Array;
 use arrow_schema::{Fields, SortOptions};
-use datafusion_common::cast::as_list_array;
 use datafusion_common::utils::{compare_rows, get_row_at_idx};
 use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue};
 use datafusion_expr::Accumulator;
@@ -214,7 +214,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
         // values received from its ordering requirement expression. (This 
information is necessary for during merging).
         let agg_orderings = &states[1];
 
-        if as_list_array(agg_orderings).is_ok() {
+        if let Some(agg_orderings) = agg_orderings.as_list_opt::<i32>() {
             // Stores ARRAY_AGG results coming from each partition
             let mut partition_values = vec![];
             // Stores ordering requirement expression results coming from each 
partition
@@ -232,10 +232,21 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
             }
 
             let orderings = 
ScalarValue::convert_array_to_scalar_vec(agg_orderings)?;
-            // Ordering requirement expression values for each entry in the 
ARRAY_AGG list
-            let other_ordering_values = 
self.convert_array_agg_to_orderings(orderings)?;
-            for v in other_ordering_values.into_iter() {
-                partition_ordering_values.push(v);
+
+            for partition_ordering_rows in orderings.into_iter() {
+                // Extract value from struct to ordering_rows for each 
group/partition
+                let ordering_value = 
partition_ordering_rows.into_iter().map(|ordering_row| {
+                        if let 
ScalarValue::Struct(Some(ordering_columns_per_row), _) = ordering_row {
+                            Ok(ordering_columns_per_row)
+                        } else {
+                            exec_err!(
+                                "Expects to receive 
ScalarValue::Struct(Some(..), _) but got:{:?}",
+                                ordering_row.data_type()
+                            )
+                        }
+                    }).collect::<Result<Vec<_>>>()?;
+
+                partition_ordering_values.push(ordering_value);
             }
 
             let sort_options = self
@@ -293,33 +304,10 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
 }
 
 impl OrderSensitiveArrayAggAccumulator {
-    /// Inner Vec\<ScalarValue> in the ordering_values can be thought as 
ordering information for the each ScalarValue in the values array.
-    /// See [`merge_ordered_arrays`] for more information.
-    fn convert_array_agg_to_orderings(
-        &self,
-        array_agg: Vec<Vec<ScalarValue>>,
-    ) -> Result<Vec<Vec<Vec<ScalarValue>>>> {
-        let mut orderings = vec![];
-        // in_data is Vec<ScalarValue> where ScalarValue does not include 
ScalarValue::List
-        for in_data in array_agg.into_iter() {
-            let ordering = in_data.into_iter().map(|struct_vals| {
-                    if let ScalarValue::Struct(Some(orderings), _) = 
struct_vals {
-                        Ok(orderings)
-                    } else {
-                        exec_err!(
-                            "Expects to receive ScalarValue::Struct(Some(..), 
_) but got:{:?}",
-                            struct_vals.data_type()
-                        )
-                    }
-                }).collect::<Result<Vec<_>>>()?;
-            orderings.push(ordering);
-        }
-        Ok(orderings)
-    }
-
     fn evaluate_orderings(&self) -> Result<ScalarValue> {
         let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
         let struct_field = Fields::from(fields.clone());
+
         let orderings: Vec<ScalarValue> = self
             .ordering_values
             .iter()
@@ -329,6 +317,7 @@ impl OrderSensitiveArrayAggAccumulator {
             .collect();
         let struct_type = DataType::Struct(Fields::from(fields));
 
+        // Wrap in List, so we have the same data structure 
ListArray(StructArray..) for group by cases
         let arr = ScalarValue::new_list(&orderings, &struct_type);
         Ok(ScalarValue::List(arr))
     }
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt 
b/datafusion/sqllogictest/test_files/aggregate.slt
index 7cfc9c707d..bcda3464f4 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -106,6 +106,36 @@ FROM
 ----
 [0VVIHzxWtNOFLtnhjHEKjXaJOSLJfm, 0keZ5G8BffGwgF2RwQD59TFzMStxCB, 
0og6hSkhbX8AC1ktFS4kounvTzy8Vo, 1aOcrEGd0cOqZe2I5XBOm0nDcwtBZO, 
2T3wSlHdEmASmO0xcXHnndkKEt6bz8]
 
+statement ok
+CREATE EXTERNAL TABLE agg_order (
+c1 INT NOT NULL,
+c2 INT NOT NULL,
+c3 INT NOT NULL
+)
+STORED AS CSV
+WITH HEADER ROW
+LOCATION '../core/tests/data/aggregate_agg_multi_order.csv';
+
+# test array_agg with order by multiple columns
+query ?
+select array_agg(c1 order by c2 desc, c3) from agg_order;
+----
+[5, 6, 7, 8, 9, 1, 2, 3, 4, 10]
+
+query TT
+explain select array_agg(c1 order by c2 desc, c3) from agg_order;
+----
+logical_plan
+Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(agg_order.c1) ORDER BY [agg_order.c2 
DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]]
+--TableScan: agg_order projection=[c1, c2, c3]
+physical_plan
+AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1)]
+--CoalescePartitionsExec
+----AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(agg_order.c1)]
+------SortExec: expr=[c2@1 DESC,c3@2 ASC NULLS LAST]
+--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, 
projection=[c1, c2, c3], has_header=true
+
 statement error This feature is not implemented: LIMIT not supported in 
ARRAY_AGG: 1
 SELECT array_agg(c13 LIMIT 1) FROM aggregate_test_100
 

Reply via email to