alamb commented on code in PR #14413:
URL: https://github.com/apache/datafusion/pull/14413#discussion_r2009214238
##########
datafusion/sqllogictest/test_files/aggregate.slt:
##########
@@ -234,6 +253,16 @@ select column1, nth_value(column3, 2 order by column2,
column4 desc) from array_
b [4, 5, 6]
w [9, 5, 2]
+query ?
Review Comment:
Can you please add a test for
```
-- default ordering (and show that desc is respected)
select array_agg(DISTINCT column2 order by column2) from
array_agg_order_list_table;
```
Also a query with a `GROUP BY` as it goes through a different code path
Also the negative case (no order by but with distinct) -- to show the error
message is wired up correctly
##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -131,7 +133,32 @@ impl AggregateUDFImpl for ArrayAgg {
let data_type = acc_args.exprs[0].data_type(acc_args.schema)?;
if acc_args.is_distinct {
- return
Ok(Box::new(DistinctArrayAggAccumulator::try_new(&data_type)?));
+ // Limitation similar to Postgres. The aggregation function can
only mix
Review Comment:
Maybe we could add this description (about ORDER / DISTINCT in general) to
https://datafusion.apache.org/user-guide/sql/aggregate_functions.html#aggregate-functions
At the very least we shoudl add it to the documentation (in the `user_doc`
macro on this UDF definition) so it appears in
https://datafusion.apache.org/user-guide/sql/aggregate_functions.html#array-agg
##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -598,146 +656,370 @@ impl OrderSensitiveArrayAggAccumulator {
#[cfg(test)]
mod tests {
use super::*;
-
- use std::collections::VecDeque;
+ use arrow::datatypes::{FieldRef, Schema};
+ use datafusion_common::cast::as_generic_string_array;
+ use datafusion_common::internal_err;
+ use datafusion_physical_expr::expressions::Column;
+ use datafusion_physical_expr_common::sort_expr::{LexOrdering,
PhysicalSortExpr};
use std::sync::Arc;
- use arrow::array::Int64Array;
- use arrow::compute::SortOptions;
+ #[test]
+ fn no_duplicates_no_distinct() -> Result<()> {
+ let (mut acc1, mut acc2) =
ArrayAggAccumulatorBuilder::string().build_two()?;
+
+ acc1.update_batch(&[data(["a", "b", "c"])])?;
+ acc2.update_batch(&[data(["d", "e", "f"])])?;
+ acc1 = merge(acc1, acc2)?;
- use datafusion_common::utils::get_row_at_idx;
- use datafusion_common::{Result, ScalarValue};
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+ assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
+
+ Ok(())
+ }
#[test]
- fn test_merge_asc() -> Result<()> {
- let lhs_arrays: Vec<ArrayRef> = vec![
- Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
- Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
- ];
- let n_row = lhs_arrays[0].len();
- let lhs_orderings = (0..n_row)
- .map(|idx| get_row_at_idx(&lhs_arrays, idx))
- .collect::<Result<VecDeque<_>>>()?;
-
- let rhs_arrays: Vec<ArrayRef> = vec![
- Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
- Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
- ];
- let n_row = rhs_arrays[0].len();
- let rhs_orderings = (0..n_row)
- .map(|idx| get_row_at_idx(&rhs_arrays, idx))
- .collect::<Result<VecDeque<_>>>()?;
- let sort_options = vec![
- SortOptions {
- descending: false,
- nulls_first: false,
- },
- SortOptions {
- descending: false,
- nulls_first: false,
- },
- ];
-
- let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as
ArrayRef;
- let lhs_vals = (0..lhs_vals_arr.len())
- .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
- .collect::<Result<VecDeque<_>>>()?;
-
- let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as
ArrayRef;
- let rhs_vals = (0..rhs_vals_arr.len())
- .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
- .collect::<Result<VecDeque<_>>>()?;
- let expected =
- Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as
ArrayRef;
- let expected_ts = vec![
- Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as
ArrayRef,
- Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as
ArrayRef,
- ];
-
- let (merged_vals, merged_ts) = merge_ordered_arrays(
- &mut [lhs_vals, rhs_vals],
- &mut [lhs_orderings, rhs_orderings],
- &sort_options,
- )?;
- let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
- let merged_ts = (0..merged_ts[0].len())
- .map(|col_idx| {
- ScalarValue::iter_to_array(
- (0..merged_ts.len())
- .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
- )
- })
- .collect::<Result<Vec<_>>>()?;
+ fn no_duplicates_distinct() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .build_two()?;
+
+ acc1.update_batch(&[data(["a", "b", "c"])])?;
+ acc2.update_batch(&[data(["d", "e", "f"])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+ result.sort();
- assert_eq!(&merged_vals, &expected);
- assert_eq!(&merged_ts, &expected_ts);
+ assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
Ok(())
}
#[test]
- fn test_merge_desc() -> Result<()> {
- let lhs_arrays: Vec<ArrayRef> = vec![
- Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
- Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
- ];
- let n_row = lhs_arrays[0].len();
- let lhs_orderings = (0..n_row)
- .map(|idx| get_row_at_idx(&lhs_arrays, idx))
- .collect::<Result<VecDeque<_>>>()?;
-
- let rhs_arrays: Vec<ArrayRef> = vec![
- Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
- Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
- ];
- let n_row = rhs_arrays[0].len();
- let rhs_orderings = (0..n_row)
- .map(|idx| get_row_at_idx(&rhs_arrays, idx))
- .collect::<Result<VecDeque<_>>>()?;
- let sort_options = vec![
- SortOptions {
- descending: true,
- nulls_first: false,
- },
- SortOptions {
- descending: true,
- nulls_first: false,
- },
- ];
-
- // Values (which will be merged) doesn't have to be ordered.
- let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as
ArrayRef;
- let lhs_vals = (0..lhs_vals_arr.len())
- .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
- .collect::<Result<VecDeque<_>>>()?;
-
- let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as
ArrayRef;
- let rhs_vals = (0..rhs_vals_arr.len())
- .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
- .collect::<Result<VecDeque<_>>>()?;
- let expected =
- Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as
ArrayRef;
- let expected_ts = vec![
- Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as
ArrayRef,
- Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as
ArrayRef,
- ];
- let (merged_vals, merged_ts) = merge_ordered_arrays(
- &mut [lhs_vals, rhs_vals],
- &mut [lhs_orderings, rhs_orderings],
- &sort_options,
- )?;
- let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
- let merged_ts = (0..merged_ts[0].len())
- .map(|col_idx| {
- ScalarValue::iter_to_array(
- (0..merged_ts.len())
- .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
- )
- })
- .collect::<Result<Vec<_>>>()?;
+ fn duplicates_no_distinct() -> Result<()> {
+ let (mut acc1, mut acc2) =
ArrayAggAccumulatorBuilder::string().build_two()?;
+
+ acc1.update_batch(&[data(["a", "b", "c"])])?;
+ acc2.update_batch(&[data(["a", "b", "c"])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+ assert_eq!(result, vec!["a", "b", "c", "a", "b", "c"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn duplicates_distinct() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .build_two()?;
+
+ acc1.update_batch(&[data(["a", "b", "c"])])?;
+ acc2.update_batch(&[data(["a", "b", "c"])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+ result.sort();
+
+ assert_eq!(result, vec!["a", "b", "c"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn duplicates_on_second_batch_distinct() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .build_two()?;
+
+ acc1.update_batch(&[data(["a", "c"])])?;
+ acc2.update_batch(&[data(["d", "a", "b", "c"])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+ result.sort();
+
+ assert_eq!(result, vec!["a", "b", "c", "d"]);
- assert_eq!(&merged_vals, &expected);
- assert_eq!(&merged_ts, &expected_ts);
Ok(())
}
+
+ #[test]
+ fn no_duplicates_distinct_sort_asc() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .order_by_col("col", SortOptions::new(false, false))
+ .build_two()?;
+
+ acc1.update_batch(&[data(["e", "b", "d"])])?;
+ acc2.update_batch(&[data(["f", "a", "c"])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+ assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn no_duplicates_distinct_sort_desc() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .order_by_col("col", SortOptions::new(true, false))
+ .build_two()?;
+
+ acc1.update_batch(&[data(["e", "b", "d"])])?;
+ acc2.update_batch(&[data(["f", "a", "c"])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+ assert_eq!(result, vec!["f", "e", "d", "c", "b", "a"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn duplicates_distinct_sort_asc() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .order_by_col("col", SortOptions::new(false, false))
+ .build_two()?;
+
+ acc1.update_batch(&[data(["a", "c", "b"])])?;
+ acc2.update_batch(&[data(["b", "c", "a"])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+ assert_eq!(result, vec!["a", "b", "c"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn duplicates_distinct_sort_desc() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .order_by_col("col", SortOptions::new(true, false))
+ .build_two()?;
+
+ acc1.update_batch(&[data(["a", "c", "b"])])?;
+ acc2.update_batch(&[data(["b", "c", "a"])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+ assert_eq!(result, vec!["c", "b", "a"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn no_duplicates_distinct_sort_asc_nulls_first() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .order_by_col("col", SortOptions::new(false, true))
+ .build_two()?;
+
+ acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+ acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+ assert_eq!(result, vec!["NULL", "a", "b", "e", "f"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn no_duplicates_distinct_sort_asc_nulls_last() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .order_by_col("col", SortOptions::new(false, false))
+ .build_two()?;
+
+ acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+ acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+ assert_eq!(result, vec!["a", "b", "e", "f", "NULL"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn no_duplicates_distinct_sort_desc_nulls_first() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .order_by_col("col", SortOptions::new(true, true))
+ .build_two()?;
+
+ acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+ acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+ assert_eq!(result, vec!["NULL", "f", "e", "b", "a"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn no_duplicates_distinct_sort_desc_nulls_last() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .order_by_col("col", SortOptions::new(true, false))
+ .build_two()?;
+
+ acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+ acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+ assert_eq!(result, vec!["f", "e", "b", "a", "NULL"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn all_nulls_on_first_batch_with_distinct() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .build_two()?;
+
+ acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
+ acc2.update_batch(&[data([Some("a"), None, None, None])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+ result.sort();
+ assert_eq!(result, vec!["NULL", "a"]);
+ Ok(())
+ }
+
+ #[test]
+ fn all_nulls_on_both_batches_with_distinct() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .build_two()?;
+
+ acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
+ acc2.update_batch(&[data::<Option<&str>, 4>([None, None, None,
None])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+ assert_eq!(result, vec!["NULL"]);
+ Ok(())
+ }
+
+ struct ArrayAggAccumulatorBuilder {
Review Comment:
This looks very similar to
https://docs.rs/datafusion/latest/datafusion/physical_expr/aggregate/struct.AggregateExprBuilder.html#method.distinct
Though it seems like that structure has no good example, perhaps you could
make a PR to add an doc example of how to use it
##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -598,146 +656,370 @@ impl OrderSensitiveArrayAggAccumulator {
#[cfg(test)]
mod tests {
use super::*;
-
- use std::collections::VecDeque;
+ use arrow::datatypes::{FieldRef, Schema};
+ use datafusion_common::cast::as_generic_string_array;
+ use datafusion_common::internal_err;
+ use datafusion_physical_expr::expressions::Column;
+ use datafusion_physical_expr_common::sort_expr::{LexOrdering,
PhysicalSortExpr};
use std::sync::Arc;
- use arrow::array::Int64Array;
- use arrow::compute::SortOptions;
+ #[test]
+ fn no_duplicates_no_distinct() -> Result<()> {
+ let (mut acc1, mut acc2) =
ArrayAggAccumulatorBuilder::string().build_two()?;
+
+ acc1.update_batch(&[data(["a", "b", "c"])])?;
+ acc2.update_batch(&[data(["d", "e", "f"])])?;
+ acc1 = merge(acc1, acc2)?;
- use datafusion_common::utils::get_row_at_idx;
- use datafusion_common::{Result, ScalarValue};
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+ assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
+
+ Ok(())
+ }
#[test]
- fn test_merge_asc() -> Result<()> {
- let lhs_arrays: Vec<ArrayRef> = vec![
- Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
- Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
- ];
- let n_row = lhs_arrays[0].len();
- let lhs_orderings = (0..n_row)
- .map(|idx| get_row_at_idx(&lhs_arrays, idx))
- .collect::<Result<VecDeque<_>>>()?;
-
- let rhs_arrays: Vec<ArrayRef> = vec![
- Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
- Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
- ];
- let n_row = rhs_arrays[0].len();
- let rhs_orderings = (0..n_row)
- .map(|idx| get_row_at_idx(&rhs_arrays, idx))
- .collect::<Result<VecDeque<_>>>()?;
- let sort_options = vec![
- SortOptions {
- descending: false,
- nulls_first: false,
- },
- SortOptions {
- descending: false,
- nulls_first: false,
- },
- ];
-
- let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as
ArrayRef;
- let lhs_vals = (0..lhs_vals_arr.len())
- .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
- .collect::<Result<VecDeque<_>>>()?;
-
- let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as
ArrayRef;
- let rhs_vals = (0..rhs_vals_arr.len())
- .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
- .collect::<Result<VecDeque<_>>>()?;
- let expected =
- Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as
ArrayRef;
- let expected_ts = vec![
- Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as
ArrayRef,
- Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as
ArrayRef,
- ];
-
- let (merged_vals, merged_ts) = merge_ordered_arrays(
- &mut [lhs_vals, rhs_vals],
- &mut [lhs_orderings, rhs_orderings],
- &sort_options,
- )?;
- let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
- let merged_ts = (0..merged_ts[0].len())
- .map(|col_idx| {
- ScalarValue::iter_to_array(
- (0..merged_ts.len())
- .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
- )
- })
- .collect::<Result<Vec<_>>>()?;
+ fn no_duplicates_distinct() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .build_two()?;
+
+ acc1.update_batch(&[data(["a", "b", "c"])])?;
+ acc2.update_batch(&[data(["d", "e", "f"])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+ result.sort();
- assert_eq!(&merged_vals, &expected);
- assert_eq!(&merged_ts, &expected_ts);
+ assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
Ok(())
}
#[test]
- fn test_merge_desc() -> Result<()> {
- let lhs_arrays: Vec<ArrayRef> = vec![
- Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
- Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
- ];
- let n_row = lhs_arrays[0].len();
- let lhs_orderings = (0..n_row)
- .map(|idx| get_row_at_idx(&lhs_arrays, idx))
- .collect::<Result<VecDeque<_>>>()?;
-
- let rhs_arrays: Vec<ArrayRef> = vec![
- Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
- Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
- ];
- let n_row = rhs_arrays[0].len();
- let rhs_orderings = (0..n_row)
- .map(|idx| get_row_at_idx(&rhs_arrays, idx))
- .collect::<Result<VecDeque<_>>>()?;
- let sort_options = vec![
- SortOptions {
- descending: true,
- nulls_first: false,
- },
- SortOptions {
- descending: true,
- nulls_first: false,
- },
- ];
-
- // Values (which will be merged) doesn't have to be ordered.
- let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as
ArrayRef;
- let lhs_vals = (0..lhs_vals_arr.len())
- .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
- .collect::<Result<VecDeque<_>>>()?;
-
- let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as
ArrayRef;
- let rhs_vals = (0..rhs_vals_arr.len())
- .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
- .collect::<Result<VecDeque<_>>>()?;
- let expected =
- Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as
ArrayRef;
- let expected_ts = vec![
- Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as
ArrayRef,
- Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as
ArrayRef,
- ];
- let (merged_vals, merged_ts) = merge_ordered_arrays(
- &mut [lhs_vals, rhs_vals],
- &mut [lhs_orderings, rhs_orderings],
- &sort_options,
- )?;
- let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
- let merged_ts = (0..merged_ts[0].len())
- .map(|col_idx| {
- ScalarValue::iter_to_array(
- (0..merged_ts.len())
- .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
- )
- })
- .collect::<Result<Vec<_>>>()?;
+ fn duplicates_no_distinct() -> Result<()> {
+ let (mut acc1, mut acc2) =
ArrayAggAccumulatorBuilder::string().build_two()?;
+
+ acc1.update_batch(&[data(["a", "b", "c"])])?;
+ acc2.update_batch(&[data(["a", "b", "c"])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+ assert_eq!(result, vec!["a", "b", "c", "a", "b", "c"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn duplicates_distinct() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .build_two()?;
+
+ acc1.update_batch(&[data(["a", "b", "c"])])?;
+ acc2.update_batch(&[data(["a", "b", "c"])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+ result.sort();
+
+ assert_eq!(result, vec!["a", "b", "c"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn duplicates_on_second_batch_distinct() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .build_two()?;
+
+ acc1.update_batch(&[data(["a", "c"])])?;
+ acc2.update_batch(&[data(["d", "a", "b", "c"])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+ result.sort();
+
+ assert_eq!(result, vec!["a", "b", "c", "d"]);
- assert_eq!(&merged_vals, &expected);
- assert_eq!(&merged_ts, &expected_ts);
Ok(())
}
+
+ #[test]
+ fn no_duplicates_distinct_sort_asc() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .order_by_col("col", SortOptions::new(false, false))
+ .build_two()?;
+
+ acc1.update_batch(&[data(["e", "b", "d"])])?;
+ acc2.update_batch(&[data(["f", "a", "c"])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+ assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn no_duplicates_distinct_sort_desc() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .order_by_col("col", SortOptions::new(true, false))
+ .build_two()?;
+
+ acc1.update_batch(&[data(["e", "b", "d"])])?;
+ acc2.update_batch(&[data(["f", "a", "c"])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+ assert_eq!(result, vec!["f", "e", "d", "c", "b", "a"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn duplicates_distinct_sort_asc() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .order_by_col("col", SortOptions::new(false, false))
+ .build_two()?;
+
+ acc1.update_batch(&[data(["a", "c", "b"])])?;
+ acc2.update_batch(&[data(["b", "c", "a"])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+ assert_eq!(result, vec!["a", "b", "c"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn duplicates_distinct_sort_desc() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .order_by_col("col", SortOptions::new(true, false))
+ .build_two()?;
+
+ acc1.update_batch(&[data(["a", "c", "b"])])?;
+ acc2.update_batch(&[data(["b", "c", "a"])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+ assert_eq!(result, vec!["c", "b", "a"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn no_duplicates_distinct_sort_asc_nulls_first() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .order_by_col("col", SortOptions::new(false, true))
+ .build_two()?;
+
+ acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+ acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+ assert_eq!(result, vec!["NULL", "a", "b", "e", "f"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn no_duplicates_distinct_sort_asc_nulls_last() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .order_by_col("col", SortOptions::new(false, false))
+ .build_two()?;
+
+ acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+ acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+ assert_eq!(result, vec!["a", "b", "e", "f", "NULL"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn no_duplicates_distinct_sort_desc_nulls_first() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .order_by_col("col", SortOptions::new(true, true))
+ .build_two()?;
+
+ acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+ acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+ assert_eq!(result, vec!["NULL", "f", "e", "b", "a"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn no_duplicates_distinct_sort_desc_nulls_last() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .order_by_col("col", SortOptions::new(true, false))
+ .build_two()?;
+
+ acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+ acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+ assert_eq!(result, vec!["f", "e", "b", "a", "NULL"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn all_nulls_on_first_batch_with_distinct() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .build_two()?;
+
+ acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
+ acc2.update_batch(&[data([Some("a"), None, None, None])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+ result.sort();
+ assert_eq!(result, vec!["NULL", "a"]);
+ Ok(())
+ }
+
+ #[test]
+ fn all_nulls_on_both_batches_with_distinct() -> Result<()> {
+ let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+ .distinct()
+ .build_two()?;
+
+ acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
+ acc2.update_batch(&[data::<Option<&str>, 4>([None, None, None,
None])])?;
+ acc1 = merge(acc1, acc2)?;
+
+ let result = print_nulls(str_arr(acc1.evaluate()?)?);
+ assert_eq!(result, vec!["NULL"]);
+ Ok(())
+ }
+
+ struct ArrayAggAccumulatorBuilder {
+ data_type: DataType,
+ distinct: bool,
+ ordering: LexOrdering,
+ schema: Schema,
+ }
+
+ impl ArrayAggAccumulatorBuilder {
+ fn string() -> Self {
+ Self::new(DataType::Utf8)
+ }
+
+ fn new(data_type: DataType) -> Self {
+ Self {
+ data_type: data_type.clone(),
+ distinct: Default::default(),
+ ordering: Default::default(),
+ schema: Schema {
+ fields: Fields::from(vec![Field::new(
+ "col",
+ DataType::List(FieldRef::new(Field::new(
+ "item", data_type, true,
+ ))),
+ true,
+ )]),
+ metadata: Default::default(),
+ },
+ }
+ }
+
+ fn distinct(mut self) -> Self {
+ self.distinct = true;
+ self
+ }
+
+ fn order_by_col(mut self, col: &str, sort_options: SortOptions) ->
Self {
+ self.ordering.extend([PhysicalSortExpr::new(
+ Arc::new(
+ Column::new_with_schema(col, &self.schema)
+ .expect("column not available in schema"),
+ ),
+ sort_options,
+ )]);
+ self
+ }
+
+ fn build(&self) -> Result<Box<dyn Accumulator>> {
+ ArrayAgg::default().accumulator(AccumulatorArgs {
+ return_type: &self.data_type,
+ schema: &self.schema,
+ ignore_nulls: false,
+ ordering_req: &self.ordering,
+ is_reversed: false,
+ name: "",
+ is_distinct: self.distinct,
+ exprs: &[Arc::new(Column::new("col", 0))],
+ })
+ }
+
+ fn build_two(&self) -> Result<(Box<dyn Accumulator>, Box<dyn
Accumulator>)> {
+ Ok((self.build()?, self.build()?))
+ }
+ }
+
+ fn str_arr(value: ScalarValue) -> Result<Vec<Option<String>>> {
Review Comment:
I think we could make these tests easier to update using the pre-existing
formatter:
https://docs.rs/arrow/latest/arrow/util/pretty/fn.pretty_format_batches.html
Recently (after this PR was made) we have been migrating to `insta` which
might be helpful for this kind of test -- see for example
https://github.com/apache/datafusion/pull/15364
You could create a RecordBatch and then do something like
```rust
assert_snapshot!(batches_to_sort_string(&batches), @r#"
+----+----+----+
| a1 | b1 | c1 |
+----+----+----+
| 5 | 5 | 50 |
+----+----+----+
"#);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]