This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 182a37eaf4 Remove `define_array_slice` and reuse `array_slice` for
`array_pop_front/back` (#8401)
182a37eaf4 is described below
commit 182a37eaf48e89a84dcd241880bd970c8b0b9363
Author: Jay Zhan <[email protected]>
AuthorDate: Sat Dec 9 19:57:56 2023 +0800
Remove `define_array_slice` and reuse `array_slice` for
`array_pop_front/back` (#8401)
* array_element done
Signed-off-by: jayzhan211 <[email protected]>
* clippy
Signed-off-by: jayzhan211 <[email protected]>
* replace array_slice
Signed-off-by: jayzhan211 <[email protected]>
* fix get_indexed_field_empty_list
Signed-off-by: jayzhan211 <[email protected]>
* replace pop front and pop back
Signed-off-by: jayzhan211 <[email protected]>
* clippy
Signed-off-by: jayzhan211 <[email protected]>
* add doc and comment
Signed-off-by: jayzhan211 <[email protected]>
* fmt
Signed-off-by: jayzhan211 <[email protected]>
---------
Signed-off-by: jayzhan211 <[email protected]>
---
datafusion/physical-expr/src/array_expressions.rs | 337 +++++++++++----------
.../src/expressions/get_indexed_field.rs | 2 +-
2 files changed, 179 insertions(+), 160 deletions(-)
diff --git a/datafusion/physical-expr/src/array_expressions.rs
b/datafusion/physical-expr/src/array_expressions.rs
index ae04869458..c2dc88b107 100644
--- a/datafusion/physical-expr/src/array_expressions.rs
+++ b/datafusion/physical-expr/src/array_expressions.rs
@@ -18,7 +18,6 @@
//! Array expressions
use std::any::type_name;
-use std::cmp::Ordering;
use std::collections::HashSet;
use std::sync::Arc;
@@ -370,135 +369,64 @@ pub fn make_array(arrays: &[ArrayRef]) ->
Result<ArrayRef> {
}
}
-fn return_empty(return_null: bool, data_type: DataType) -> Arc<dyn Array> {
- if return_null {
- new_null_array(&data_type, 1)
- } else {
- new_empty_array(&data_type)
- }
-}
-
-fn list_slice<T: Array + 'static>(
- array: &dyn Array,
- i: i64,
- j: i64,
- return_element: bool,
-) -> ArrayRef {
- let array = array.as_any().downcast_ref::<T>().unwrap();
-
- let array_type = array.data_type().clone();
+/// array_element SQL function
+///
+/// There are two arguments for array_element, the first one is the array, the
second one is the 1-indexed index.
+/// `array_element(array, index)`
+///
+/// For example:
+/// > array_element(\[1, 2, 3], 2) -> 2
+pub fn array_element(args: &[ArrayRef]) -> Result<ArrayRef> {
+ let list_array = as_list_array(&args[0])?;
+ let indexes = as_int64_array(&args[1])?;
- if i == 0 && j == 0 || array.is_empty() {
- return return_empty(return_element, array_type);
- }
+ let values = list_array.values();
+ let original_data = values.to_data();
+ let capacity = Capacities::Array(original_data.len());
- let i = match i.cmp(&0) {
- Ordering::Less => {
- if i.unsigned_abs() > array.len() as u64 {
- return return_empty(true, array_type);
- }
+ // use_nulls: true, we don't construct List for array_element, so we need
explicit nulls.
+ let mut mutable =
+ MutableArrayData::with_capacities(vec![&original_data], true,
capacity);
- (array.len() as i64 + i + 1) as usize
- }
- Ordering::Equal => 1,
- Ordering::Greater => i as usize,
- };
+ fn adjusted_array_index(index: i64, len: usize) -> Option<i64> {
+ // 0 ~ len - 1
+ let adjusted_zero_index = if index < 0 {
+ index + len as i64
+ } else {
+ index - 1
+ };
- let j = match j.cmp(&0) {
- Ordering::Less => {
- if j.unsigned_abs() as usize > array.len() {
- return return_empty(true, array_type);
- }
- if return_element {
- (array.len() as i64 + j + 1) as usize
- } else {
- (array.len() as i64 + j) as usize
- }
+ if 0 <= adjusted_zero_index && adjusted_zero_index < len as i64 {
+ Some(adjusted_zero_index)
+ } else {
+ // Out of bounds
+ None
}
- Ordering::Equal => 1,
- Ordering::Greater => j.min(array.len() as i64) as usize,
- };
-
- if i > j || i > array.len() {
- return_empty(return_element, array_type)
- } else {
- Arc::new(array.slice(i - 1, j + 1 - i))
}
-}
-fn slice<T: Array + 'static>(
- array: &ListArray,
- key: &Int64Array,
- extra_key: &Int64Array,
- return_element: bool,
-) -> Result<Arc<dyn Array>> {
- let sliced_array: Vec<Arc<dyn Array>> = array
- .iter()
- .zip(key.iter())
- .zip(extra_key.iter())
- .map(|((arr, i), j)| match (arr, i, j) {
- (Some(arr), Some(i), Some(j)) => list_slice::<T>(&arr, i, j,
return_element),
- (Some(arr), None, Some(j)) => list_slice::<T>(&arr, 1i64, j,
return_element),
- (Some(arr), Some(i), None) => {
- list_slice::<T>(&arr, i, arr.len() as i64, return_element)
- }
- (Some(arr), None, None) if !return_element => arr.clone(),
- _ => return_empty(return_element, array.value_type()),
- })
- .collect();
+ for (row_index, offset_window) in
list_array.offsets().windows(2).enumerate() {
+ let start = offset_window[0] as usize;
+ let end = offset_window[1] as usize;
+ let len = end - start;
- // concat requires input of at least one array
- if sliced_array.is_empty() {
- Ok(return_empty(return_element, array.value_type()))
- } else {
- let vec = sliced_array
- .iter()
- .map(|a| a.as_ref())
- .collect::<Vec<&dyn Array>>();
- let mut i: i32 = 0;
- let mut offsets = vec![i];
- offsets.extend(
- vec.iter()
- .map(|a| {
- i += a.len() as i32;
- i
- })
- .collect::<Vec<_>>(),
- );
- let values = compute::concat(vec.as_slice()).unwrap();
+ // array is null
+ if len == 0 {
+ mutable.extend_nulls(1);
+ continue;
+ }
+
+ let index = adjusted_array_index(indexes.value(row_index), len);
- if return_element {
- Ok(values)
+ if let Some(index) = index {
+ mutable.extend(0, start + index as usize, start + index as usize +
1);
} else {
- let field = Arc::new(Field::new("item", array.value_type(), true));
- Ok(Arc::new(ListArray::try_new(
- field,
- OffsetBuffer::new(offsets.into()),
- values,
- None,
- )?))
+ // Index out of bounds
+ mutable.extend_nulls(1);
}
}
-}
-
-fn define_array_slice(
- list_array: &ListArray,
- key: &Int64Array,
- extra_key: &Int64Array,
- return_element: bool,
-) -> Result<ArrayRef> {
- macro_rules! array_function {
- ($ARRAY_TYPE:ident) => {
- slice::<$ARRAY_TYPE>(list_array, key, extra_key, return_element)
- };
- }
- call_array_function!(list_array.value_type(), true)
-}
-pub fn array_element(args: &[ArrayRef]) -> Result<ArrayRef> {
- let list_array = as_list_array(&args[0])?;
- let key = as_int64_array(&args[1])?;
- define_array_slice(list_array, key, key, true)
+ let data = mutable.freeze();
+ Ok(arrow_array::make_array(data))
}
fn general_except<OffsetSize: OffsetSizeTrait>(
@@ -579,47 +507,136 @@ pub fn array_except(args: &[ArrayRef]) ->
Result<ArrayRef> {
}
}
+/// array_slice SQL function
+///
+/// We follow the behavior of array_slice in DuckDB
+/// Note that array_slice is 1-indexed. And there are two additional arguments
`from` and `to` in array_slice.
+///
+/// > array_slice(array, from, to)
+///
+/// Positive index is treated as the index from the start of the array. If the
+/// `from` index is smaller than 1, it is treated as 1. If the `to` index is
larger than the
+/// length of the array, it is treated as the length of the array.
+///
+/// Negative index is treated as the index from the end of the array. If the
index
+/// is larger than the length of the array, it is NOT VALID, either in `from`
or `to`.
+/// The `to` index is exclusive like python slice syntax.
+///
+/// See test cases in `array.slt` for more details.
pub fn array_slice(args: &[ArrayRef]) -> Result<ArrayRef> {
let list_array = as_list_array(&args[0])?;
- let key = as_int64_array(&args[1])?;
- let extra_key = as_int64_array(&args[2])?;
- define_array_slice(list_array, key, extra_key, false)
-}
-
-fn general_array_pop(
- list_array: &GenericListArray<i32>,
- from_back: bool,
-) -> Result<(Vec<i64>, Vec<i64>)> {
- if from_back {
- let key = vec![0; list_array.len()];
- // Attention: `arr.len() - 1` in extra key defines the last element
position (position = index + 1, not inclusive) we want in the new array.
- let extra_key: Vec<_> = list_array
- .iter()
- .map(|x| x.map_or(0, |arr| arr.len() as i64 - 1))
- .collect();
- Ok((key, extra_key))
- } else {
- // Attention: 2 in the `key`` defines the first element position
(position = index + 1) we want in the new array.
- // We only handle two cases of the first element index: if the old
array has any elements, starts from 2 (index + 1), or starts from initial.
- let key: Vec<_> = list_array.iter().map(|x| x.map_or(0, |_|
2)).collect();
- let extra_key: Vec<_> = list_array
- .iter()
- .map(|x| x.map_or(0, |arr| arr.len() as i64))
- .collect();
- Ok((key, extra_key))
+ let from_array = as_int64_array(&args[1])?;
+ let to_array = as_int64_array(&args[2])?;
+
+ let values = list_array.values();
+ let original_data = values.to_data();
+ let capacity = Capacities::Array(original_data.len());
+
+ // use_nulls: false, we don't need nulls but empty array for array_slice,
so we don't need explicit nulls but adjust offset to indicate nulls.
+ let mut mutable =
+ MutableArrayData::with_capacities(vec![&original_data], false,
capacity);
+
+ // We have the slice syntax compatible with DuckDB v0.8.1.
+ // The rule `adjusted_from_index` and `adjusted_to_index` follows the rule
of array_slice in duckdb.
+
+ fn adjusted_from_index(index: i64, len: usize) -> Option<i64> {
+ // 0 ~ len - 1
+ let adjusted_zero_index = if index < 0 {
+ index + len as i64
+ } else {
+ // array_slice(arr, 1, to) is the same as array_slice(arr, 0, to)
+ std::cmp::max(index - 1, 0)
+ };
+
+ if 0 <= adjusted_zero_index && adjusted_zero_index < len as i64 {
+ Some(adjusted_zero_index)
+ } else {
+ // Out of bounds
+ None
+ }
+ }
+
+ fn adjusted_to_index(index: i64, len: usize) -> Option<i64> {
+ // 0 ~ len - 1
+ let adjusted_zero_index = if index < 0 {
+ // array_slice in duckdb with negative to_index is python-like, so
index itself is exclusive
+ index + len as i64 - 1
+ } else {
+ // array_slice(arr, from, len + 1) is the same as array_slice(arr,
from, len)
+ std::cmp::min(index - 1, len as i64 - 1)
+ };
+
+ if 0 <= adjusted_zero_index && adjusted_zero_index < len as i64 {
+ Some(adjusted_zero_index)
+ } else {
+ // Out of bounds
+ None
+ }
+ }
+
+ let mut offsets = vec![0];
+
+ for (row_index, offset_window) in
list_array.offsets().windows(2).enumerate() {
+ let start = offset_window[0] as usize;
+ let end = offset_window[1] as usize;
+ let len = end - start;
+
+ // len 0 indicate array is null, return empty array in this row.
+ if len == 0 {
+ offsets.push(offsets[row_index]);
+ continue;
+ }
+
+ // If index is null, we consider it as the minimum / maximum index of
the array.
+ let from_index = if from_array.is_null(row_index) {
+ Some(0)
+ } else {
+ adjusted_from_index(from_array.value(row_index), len)
+ };
+
+ let to_index = if to_array.is_null(row_index) {
+ Some(len as i64 - 1)
+ } else {
+ adjusted_to_index(to_array.value(row_index), len)
+ };
+
+ if let (Some(from), Some(to)) = (from_index, to_index) {
+ if from <= to {
+ assert!(start + to as usize <= end);
+ mutable.extend(0, start + from as usize, start + to as usize +
1);
+ offsets.push(offsets[row_index] + (to - from + 1) as i32);
+ } else {
+ // invalid range, return empty array
+ offsets.push(offsets[row_index]);
+ }
+ } else {
+ // invalid range, return empty array
+ offsets.push(offsets[row_index]);
+ }
}
+
+ let data = mutable.freeze();
+
+ Ok(Arc::new(ListArray::try_new(
+ Arc::new(Field::new("item", list_array.value_type(), true)),
+ OffsetBuffer::new(offsets.into()),
+ arrow_array::make_array(data),
+ None,
+ )?))
}
+/// array_pop_back SQL function
pub fn array_pop_back(args: &[ArrayRef]) -> Result<ArrayRef> {
let list_array = as_list_array(&args[0])?;
- let (key, extra_key) = general_array_pop(list_array, true)?;
-
- define_array_slice(
- list_array,
- &Int64Array::from(key),
- &Int64Array::from(extra_key),
- false,
- )
+ let from_array = Int64Array::from(vec![1; list_array.len()]);
+ let to_array = Int64Array::from(
+ list_array
+ .iter()
+ .map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1))
+ .collect::<Vec<i64>>(),
+ );
+ let args = vec![args[0].clone(), Arc::new(from_array), Arc::new(to_array)];
+ array_slice(args.as_slice())
}
/// Appends or prepends elements to a ListArray.
@@ -743,16 +760,18 @@ pub fn gen_range(args: &[ArrayRef]) -> Result<ArrayRef> {
Ok(arr)
}
+/// array_pop_front SQL function
pub fn array_pop_front(args: &[ArrayRef]) -> Result<ArrayRef> {
let list_array = as_list_array(&args[0])?;
- let (key, extra_key) = general_array_pop(list_array, false)?;
-
- define_array_slice(
- list_array,
- &Int64Array::from(key),
- &Int64Array::from(extra_key),
- false,
- )
+ let from_array = Int64Array::from(vec![2; list_array.len()]);
+ let to_array = Int64Array::from(
+ list_array
+ .iter()
+ .map(|arr| arr.map_or(0, |arr| arr.len() as i64))
+ .collect::<Vec<i64>>(),
+ );
+ let args = vec![args[0].clone(), Arc::new(from_array), Arc::new(to_array)];
+ array_slice(args.as_slice())
}
/// Array_append SQL function
diff --git a/datafusion/physical-expr/src/expressions/get_indexed_field.rs
b/datafusion/physical-expr/src/expressions/get_indexed_field.rs
index 9c2a64723d..43fd5a812a 100644
--- a/datafusion/physical-expr/src/expressions/get_indexed_field.rs
+++ b/datafusion/physical-expr/src/expressions/get_indexed_field.rs
@@ -453,7 +453,7 @@ mod tests {
.evaluate(&batch)?
.into_array(batch.num_rows())
.expect("Failed to convert to array");
- assert!(result.is_null(0));
+ assert!(result.is_empty());
Ok(())
}