This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 06f2475b30 address comment (#7993)
06f2475b30 is described below
commit 06f2475b304ad173fe7dfe31656928747460588e
Author: Jay Zhan <[email protected]>
AuthorDate: Thu Nov 2 03:04:38 2023 +0800
address comment (#7993)
Signed-off-by: jayzhan211 <[email protected]>
---
datafusion/common/src/scalar.rs | 20 +++++-----
datafusion/common/src/utils.rs | 46 +++++++++++++++++++++-
.../physical-expr/src/aggregate/array_agg.rs | 4 +-
.../src/aggregate/array_agg_distinct.rs | 4 +-
datafusion/physical-expr/src/array_expressions.rs | 4 +-
5 files changed, 60 insertions(+), 18 deletions(-)
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index b3c11740ab..0d701eaad2 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -30,7 +30,7 @@ use crate::cast::{
};
use crate::error::{DataFusionError, Result, _internal_err, _not_impl_err};
use crate::hash_utils::create_hashes;
-use crate::utils::wrap_into_list_array;
+use crate::utils::array_into_list_array;
use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::compute::kernels::numeric::*;
use arrow::datatypes::{i256, FieldRef, Fields, SchemaBuilder};
@@ -1667,7 +1667,7 @@ impl ScalarValue {
} else {
Self::iter_to_array(values.iter().cloned()).unwrap()
};
- Arc::new(wrap_into_list_array(values))
+ Arc::new(array_into_list_array(values))
}
/// Converts a scalar value into an array of `size` rows.
@@ -2058,7 +2058,7 @@ impl ScalarValue {
let list_array = as_list_array(array);
let nested_array = list_array.value(index);
// Produces a single element `ListArray` with the value at
`index`.
- let arr = Arc::new(wrap_into_list_array(nested_array));
+ let arr = Arc::new(array_into_list_array(nested_array));
ScalarValue::List(arr)
}
@@ -2067,7 +2067,7 @@ impl ScalarValue {
let list_array = as_fixed_size_list_array(array)?;
let nested_array = list_array.value(index);
// Produces a single element `ListArray` with the value at
`index`.
- let arr = Arc::new(wrap_into_list_array(nested_array));
+ let arr = Arc::new(array_into_list_array(nested_array));
ScalarValue::List(arr)
}
@@ -3052,7 +3052,7 @@ mod tests {
let array = ScalarValue::new_list(scalars.as_slice(), &DataType::Utf8);
- let expected = wrap_into_list_array(Arc::new(StringArray::from(vec![
+ let expected = array_into_list_array(Arc::new(StringArray::from(vec![
"rust",
"arrow",
"data-fusion",
@@ -3091,9 +3091,9 @@ mod tests {
#[test]
fn iter_to_array_string_test() {
let arr1 =
- wrap_into_list_array(Arc::new(StringArray::from(vec!["foo", "bar",
"baz"])));
+ array_into_list_array(Arc::new(StringArray::from(vec!["foo",
"bar", "baz"])));
let arr2 =
- wrap_into_list_array(Arc::new(StringArray::from(vec!["rust",
"world"])));
+ array_into_list_array(Arc::new(StringArray::from(vec!["rust",
"world"])));
let scalars = vec![
ScalarValue::List(Arc::new(arr1)),
@@ -4335,13 +4335,13 @@ mod tests {
// Define list-of-structs scalars
let nl0_array = ScalarValue::iter_to_array(vec![s0.clone(),
s1.clone()]).unwrap();
- let nl0 = ScalarValue::List(Arc::new(wrap_into_list_array(nl0_array)));
+ let nl0 =
ScalarValue::List(Arc::new(array_into_list_array(nl0_array)));
let nl1_array = ScalarValue::iter_to_array(vec![s2.clone()]).unwrap();
- let nl1 = ScalarValue::List(Arc::new(wrap_into_list_array(nl1_array)));
+ let nl1 =
ScalarValue::List(Arc::new(array_into_list_array(nl1_array)));
let nl2_array = ScalarValue::iter_to_array(vec![s1.clone()]).unwrap();
- let nl2 = ScalarValue::List(Arc::new(wrap_into_list_array(nl2_array)));
+ let nl2 =
ScalarValue::List(Arc::new(array_into_list_array(nl2_array)));
// iter_to_array for list-of-struct
let array = ScalarValue::iter_to_array(vec![nl0, nl1, nl2]).unwrap();
diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs
index b2f71e86f2..f031f78804 100644
--- a/datafusion/common/src/utils.rs
+++ b/datafusion/common/src/utils.rs
@@ -17,6 +17,7 @@
//! This module provides the bisect function, which implements binary search.
+use crate::error::_internal_err;
use crate::{DataFusionError, Result, ScalarValue};
use arrow::array::{ArrayRef, PrimitiveArray};
use arrow::buffer::OffsetBuffer;
@@ -24,7 +25,7 @@ use arrow::compute;
use arrow::compute::{partition, SortColumn, SortOptions};
use arrow::datatypes::{Field, SchemaRef, UInt32Type};
use arrow::record_batch::RecordBatch;
-use arrow_array::ListArray;
+use arrow_array::{Array, ListArray};
use sqlparser::ast::Ident;
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
@@ -338,7 +339,7 @@ pub fn longest_consecutive_prefix<T: Borrow<usize>>(
/// Wrap an array into a single element `ListArray`.
/// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]`
-pub fn wrap_into_list_array(arr: ArrayRef) -> ListArray {
+pub fn array_into_list_array(arr: ArrayRef) -> ListArray {
let offsets = OffsetBuffer::from_lengths([arr.len()]);
ListArray::new(
Arc::new(Field::new("item", arr.data_type().to_owned(), true)),
@@ -348,6 +349,47 @@ pub fn wrap_into_list_array(arr: ArrayRef) -> ListArray {
)
}
+/// Wrap arrays into a single element `ListArray`.
+///
+/// Example:
+/// ```
+/// use arrow::array::{Int32Array, ListArray, ArrayRef};
+/// use arrow::datatypes::{Int32Type, Field};
+/// use std::sync::Arc;
+///
+/// let arr1 = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
+/// let arr2 = Arc::new(Int32Array::from(vec![4, 5, 6])) as ArrayRef;
+///
+/// let list_arr = datafusion_common::utils::arrays_into_list_array([arr1,
arr2]).unwrap();
+///
+/// let expected = ListArray::from_iter_primitive::<Int32Type, _, _>(
+/// vec![
+/// Some(vec![Some(1), Some(2), Some(3)]),
+/// Some(vec![Some(4), Some(5), Some(6)]),
+/// ]
+/// );
+///
+/// assert_eq!(list_arr, expected);
+pub fn arrays_into_list_array(
+ arr: impl IntoIterator<Item = ArrayRef>,
+) -> Result<ListArray> {
+ let arr = arr.into_iter().collect::<Vec<_>>();
+ if arr.is_empty() {
+ return _internal_err!("Cannot wrap empty array into list array");
+ }
+
+ let lens = arr.iter().map(|x| x.len()).collect::<Vec<_>>();
+ // Assume data type is consistent
+ let data_type = arr[0].data_type().to_owned();
+ let values = arr.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
+ Ok(ListArray::new(
+ Arc::new(Field::new("item", data_type, true)),
+ OffsetBuffer::from_lengths(lens),
+ arrow::compute::concat(values.as_slice())?,
+ None,
+ ))
+}
+
/// An extension trait for smart pointers. Provides an interface to get a
/// raw pointer to the data (with metadata stripped away).
///
diff --git a/datafusion/physical-expr/src/aggregate/array_agg.rs
b/datafusion/physical-expr/src/aggregate/array_agg.rs
index 834925b8d5..4dccbfef07 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg.rs
@@ -24,7 +24,7 @@ use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Field};
use arrow_array::Array;
use datafusion_common::cast::as_list_array;
-use datafusion_common::utils::wrap_into_list_array;
+use datafusion_common::utils::array_into_list_array;
use datafusion_common::Result;
use datafusion_common::ScalarValue;
use datafusion_expr::Accumulator;
@@ -161,7 +161,7 @@ impl Accumulator for ArrayAggAccumulator {
}
let concated_array = arrow::compute::concat(&element_arrays)?;
- let list_array = wrap_into_list_array(concated_array);
+ let list_array = array_into_list_array(concated_array);
Ok(ScalarValue::List(Arc::new(list_array)))
}
diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
index 21143ce54a..9b391b0c42 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
@@ -185,7 +185,7 @@ mod tests {
use arrow_array::types::Int32Type;
use arrow_array::{Array, ListArray};
use arrow_buffer::OffsetBuffer;
- use datafusion_common::utils::wrap_into_list_array;
+ use datafusion_common::utils::array_into_list_array;
use datafusion_common::{internal_err, DataFusionError};
// arrow::compute::sort cann't sort ListArray directly, so we need to sort
the inner primitive array and wrap it back into ListArray.
@@ -201,7 +201,7 @@ mod tests {
};
let arr = arrow::compute::sort(&arr, None).unwrap();
- let list_arr = wrap_into_list_array(arr);
+ let list_arr = array_into_list_array(arr);
ScalarValue::List(Arc::new(list_arr))
}
diff --git a/datafusion/physical-expr/src/array_expressions.rs
b/datafusion/physical-expr/src/array_expressions.rs
index 84fd301b84..18d8c60fe7 100644
--- a/datafusion/physical-expr/src/array_expressions.rs
+++ b/datafusion/physical-expr/src/array_expressions.rs
@@ -29,7 +29,7 @@ use arrow_buffer::NullBuffer;
use datafusion_common::cast::{
as_generic_string_array, as_int64_array, as_list_array, as_string_array,
};
-use datafusion_common::utils::wrap_into_list_array;
+use datafusion_common::utils::array_into_list_array;
use datafusion_common::{
exec_err, internal_err, not_impl_err, plan_err, DataFusionError, Result,
};
@@ -412,7 +412,7 @@ pub fn make_array(arrays: &[ArrayRef]) -> Result<ArrayRef> {
// Either an empty array or all nulls:
DataType::Null => {
let array = new_null_array(&DataType::Null, arrays.len());
- Ok(Arc::new(wrap_into_list_array(array)))
+ Ok(Arc::new(array_into_list_array(array)))
}
data_type => array_array(arrays, data_type),
}