alamb commented on code in PR #8054:
URL: https://github.com/apache/arrow-datafusion/pull/8054#discussion_r1382548740
##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1222,119 +1222,144 @@ array_removement_function!(
"Array_remove_all SQL function"
);
-fn general_replace(args: &[ArrayRef], arr_n: Vec<i64>) -> Result<ArrayRef> {
- let list_array = as_list_array(&args[0])?;
- let from_array = &args[1];
- let to_array = &args[2];
-
+/// For each element of `list_array[i]`, replaces up to `arr_n[i]` occurences
Review Comment:
In general, having a clear description of what the function does, especially
when it is mind bending like`general_replace` helps a lot. I also find that
writing such a description often helps improve the code as well as results in
better naming
For example, I struggled to explain how this function worked when it took
`&[ArrayRef]` because then the explanations were in terms fof `arg[0]`,
`arg[1]`, and `arg[2]`. Giving the parameters names made things much clearer
##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1222,119 +1222,144 @@ array_removement_function!(
"Array_remove_all SQL function"
);
-fn general_replace(args: &[ArrayRef], arr_n: Vec<i64>) -> Result<ArrayRef> {
- let list_array = as_list_array(&args[0])?;
- let from_array = &args[1];
- let to_array = &args[2];
-
+/// For each element of `list_array[i]`, replaces up to `arr_n[i]` occurences
+/// of `from_array[i]`, `to_array[i]`.
+///
+/// The type of each **element** in `list_array` must be the same as the type
of
+/// `from_array` and `to_array`. This function also handles nested arrays
+/// ([`ListArray`] of [`ListArray`]s)
+///
+/// For example, whn called to replace a list array (where each element is a
+/// list of int32s, the second and third argument are int32 arrays, and the
+/// fourth argument is the number of occurrences to replace
+///
+/// ```
+/// general_replace(
+/// [1, 2, 3, 2], 2, 10, 1 ==> [1, 10, 3, 2] (only the first 2 is
replaced)
+/// [4, 5, 6, 5], 5, 20, 2 ==> [4, 20, 6, 20] (both 5s are replaced)
+/// )
+/// ```
+///
+fn general_replace(
+ list_array: &ListArray,
+ from_array: &ArrayRef,
+ to_array: &ArrayRef,
+ arr_n: Vec<i64>,
+) -> Result<ArrayRef> {
+ // Build up the offsets for the final output array
let mut offsets: Vec<i32> = vec![0];
let data_type = list_array.value_type();
- let mut values = new_empty_array(&data_type);
+ let mut new_values = vec![];
- for (row_index, (arr, n)) in
list_array.iter().zip(arr_n.iter()).enumerate() {
+ // n is the number of elements to replace in this row
+ for (row_index, (list_array_row, n)) in
+ list_array.iter().zip(arr_n.iter()).enumerate()
+ {
let last_offset: i32 = offsets
.last()
.copied()
.ok_or_else(|| internal_datafusion_err!("offsets should not be
empty"))?;
- match arr {
- Some(arr) => {
- let indices = UInt32Array::from(vec![row_index as u32]);
- let from_arr = arrow::compute::take(from_array, &indices,
None)?;
- let eq_array = match from_arr.data_type() {
- // arrow_ord::cmp_eq does not support ListArray, so we
need to compare it by loop
+ match list_array_row {
+ Some(list_array_row) => {
+ let indices = UInt32Array::from(vec![row_index as u32]);
+ let from_array_row = arrow::compute::take(from_array,
&indices, None)?;
+ // Compute all positions in list_row_array (that is itself an
+ // array) that are equal to `from_array_row`
+ let eq_array = match from_array_row.data_type() {
+ // arrow_ord::cmp::eq does not support ListArray, so we
need to compare it by loop
DataType::List(_) => {
- let from_a = as_list_array(&from_arr)?.value(0);
- let list_arr = as_list_array(&arr)?;
+ // compare each element of the from array
+ let from_array_row_inner =
+ as_list_array(&from_array_row)?.value(0);
+ let list_array_row_inner =
as_list_array(&list_array_row)?;
- let mut bool_values = vec![];
- for arr in list_arr.iter() {
- if let Some(a) = arr {
- bool_values.push(Some(a.eq(&from_a)));
- } else {
- return internal_err!(
- "Null value is not supported in
array_replace"
Review Comment:
I don't know why nulls aren't supported -- all of the kernels support them
well, and the code is more concise without the special case
##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1222,119 +1222,144 @@ array_removement_function!(
"Array_remove_all SQL function"
);
-fn general_replace(args: &[ArrayRef], arr_n: Vec<i64>) -> Result<ArrayRef> {
- let list_array = as_list_array(&args[0])?;
- let from_array = &args[1];
- let to_array = &args[2];
-
+/// For each element of `list_array[i]`, replaces up to `arr_n[i]` occurences
+/// of `from_array[i]`, `to_array[i]`.
+///
+/// The type of each **element** in `list_array` must be the same as the type
of
+/// `from_array` and `to_array`. This function also handles nested arrays
+/// ([`ListArray`] of [`ListArray`]s)
+///
+/// For example, whn called to replace a list array (where each element is a
+/// list of int32s, the second and third argument are int32 arrays, and the
+/// fourth argument is the number of occurrences to replace
+///
+/// ```
+/// general_replace(
+/// [1, 2, 3, 2], 2, 10, 1 ==> [1, 10, 3, 2] (only the first 2 is
replaced)
+/// [4, 5, 6, 5], 5, 20, 2 ==> [4, 20, 6, 20] (both 5s are replaced)
+/// )
+/// ```
+///
+fn general_replace(
+ list_array: &ListArray,
+ from_array: &ArrayRef,
+ to_array: &ArrayRef,
+ arr_n: Vec<i64>,
+) -> Result<ArrayRef> {
+ // Build up the offsets for the final output array
let mut offsets: Vec<i32> = vec![0];
let data_type = list_array.value_type();
- let mut values = new_empty_array(&data_type);
+ let mut new_values = vec![];
- for (row_index, (arr, n)) in
list_array.iter().zip(arr_n.iter()).enumerate() {
+ // n is the number of elements to replace in this row
+ for (row_index, (list_array_row, n)) in
+ list_array.iter().zip(arr_n.iter()).enumerate()
+ {
let last_offset: i32 = offsets
.last()
.copied()
.ok_or_else(|| internal_datafusion_err!("offsets should not be
empty"))?;
- match arr {
- Some(arr) => {
- let indices = UInt32Array::from(vec![row_index as u32]);
- let from_arr = arrow::compute::take(from_array, &indices,
None)?;
- let eq_array = match from_arr.data_type() {
- // arrow_ord::cmp_eq does not support ListArray, so we
need to compare it by loop
+ match list_array_row {
+ Some(list_array_row) => {
+ let indices = UInt32Array::from(vec![row_index as u32]);
+ let from_array_row = arrow::compute::take(from_array,
&indices, None)?;
+ // Compute all positions in list_row_array (that is itself an
+ // array) that are equal to `from_array_row`
+ let eq_array = match from_array_row.data_type() {
+ // arrow_ord::cmp::eq does not support ListArray, so we
need to compare it by loop
DataType::List(_) => {
- let from_a = as_list_array(&from_arr)?.value(0);
- let list_arr = as_list_array(&arr)?;
+ // compare each element of the from array
+ let from_array_row_inner =
+ as_list_array(&from_array_row)?.value(0);
+ let list_array_row_inner =
as_list_array(&list_array_row)?;
- let mut bool_values = vec![];
- for arr in list_arr.iter() {
- if let Some(a) = arr {
- bool_values.push(Some(a.eq(&from_a)));
- } else {
- return internal_err!(
- "Null value is not supported in
array_replace"
- );
- }
- }
- BooleanArray::from(bool_values)
+ list_array_row_inner
+ .iter()
+ // compare element by element the current row of
list_array
+ .map(|row| row.map(|row|
row.eq(&from_array_row_inner)))
+ .collect::<BooleanArray>()
}
_ => {
- let from_arr = Scalar::new(from_arr);
- arrow_ord::cmp::eq(&arr, &from_arr)?
+ let from_arr = Scalar::new(from_array_row);
+ arrow_ord::cmp::eq(&list_array_row, &from_arr)?
}
};
// Use MutableArrayData to build the replaced array
+ let original_data = list_array_row.to_data();
+ let to_data = to_array.to_data();
+ let capacity = Capacities::Array(original_data.len() +
to_data.len());
+
// First array is the original array, second array is the
element to replace with.
- let arrays = vec![arr, to_array.clone()];
- let arrays_data = arrays
- .iter()
- .map(|a| a.to_data())
- .collect::<Vec<ArrayData>>();
- let arrays_data =
arrays_data.iter().collect::<Vec<&ArrayData>>();
-
- let arrays = arrays
- .iter()
- .map(|arr| arr.as_ref())
- .collect::<Vec<&dyn Array>>();
- let capacity = Capacities::Array(arrays.iter().map(|a|
a.len()).sum());
-
- let mut mutable =
- MutableArrayData::with_capacities(arrays_data, false,
capacity);
+ let mut mutable = MutableArrayData::with_capacities(
+ vec![&original_data, &to_data],
+ false,
+ capacity,
+ );
+ let original_idx = 0;
+ let replace_idx = 1;
let mut counter = 0;
for (i, to_replace) in eq_array.iter().enumerate() {
- if let Some(to_replace) = to_replace {
- if to_replace {
- mutable.extend(1, row_index, row_index + 1);
- counter += 1;
- if counter == *n {
- // extend the rest of the array
- mutable.extend(0, i + 1, eq_array.len());
- break;
- }
- } else {
- mutable.extend(0, i, i + 1);
+ if let Some(true) = to_replace {
+ mutable.extend(replace_idx, row_index, row_index + 1);
+ counter += 1;
+ if counter == *n {
+ // copy original data for any matches past n
+ mutable.extend(original_idx, i + 1,
eq_array.len());
+ break;
}
} else {
- return internal_err!("eq_array should not contain
None");
+ // copy original data for false / null matches
+ mutable.extend(original_idx, i, i + 1);
}
}
let data = mutable.freeze();
let replaced_array = arrow_array::make_array(data);
- let v = arrow::compute::concat(&[&values, &replaced_array])?;
- values = v;
offsets.push(last_offset + replaced_array.len() as i32);
+ new_values.push(replaced_array);
}
None => {
+ // Null element results in a null row (no new offsets)
offsets.push(last_offset);
}
}
}
+ let values = if new_values.is_empty() {
+ new_empty_array(&data_type)
+ } else {
+ let new_values: Vec<_> = new_values.iter().map(|a|
a.as_ref()).collect();
+ arrow::compute::concat(&new_values)?
Review Comment:
The old logic called `concat` for each row, which I think is `O(N^2)` as it
keeps copying the same values over and over again. I changed it to call
`concat` just once
I think it is probably possible to avoid calling `concat` entirely (use
`MutableArrayData` to build up values directly). We can do that as a follow on
PR perhaps.
##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1222,119 +1222,144 @@ array_removement_function!(
"Array_remove_all SQL function"
);
-fn general_replace(args: &[ArrayRef], arr_n: Vec<i64>) -> Result<ArrayRef> {
- let list_array = as_list_array(&args[0])?;
- let from_array = &args[1];
- let to_array = &args[2];
-
+/// For each element of `list_array[i]`, replaces up to `arr_n[i]` occurences
+/// of `from_array[i]`, `to_array[i]`.
+///
+/// The type of each **element** in `list_array` must be the same as the type
of
+/// `from_array` and `to_array`. This function also handles nested arrays
+/// ([`ListArray`] of [`ListArray`]s)
+///
+/// For example, whn called to replace a list array (where each element is a
+/// list of int32s, the second and third argument are int32 arrays, and the
+/// fourth argument is the number of occurrences to replace
+///
+/// ```
+/// general_replace(
+/// [1, 2, 3, 2], 2, 10, 1 ==> [1, 10, 3, 2] (only the first 2 is
replaced)
+/// [4, 5, 6, 5], 5, 20, 2 ==> [4, 20, 6, 20] (both 5s are replaced)
+/// )
+/// ```
+///
+fn general_replace(
+ list_array: &ListArray,
+ from_array: &ArrayRef,
+ to_array: &ArrayRef,
+ arr_n: Vec<i64>,
+) -> Result<ArrayRef> {
+ // Build up the offsets for the final output array
let mut offsets: Vec<i32> = vec![0];
let data_type = list_array.value_type();
- let mut values = new_empty_array(&data_type);
+ let mut new_values = vec![];
- for (row_index, (arr, n)) in
list_array.iter().zip(arr_n.iter()).enumerate() {
+ // n is the number of elements to replace in this row
+ for (row_index, (list_array_row, n)) in
+ list_array.iter().zip(arr_n.iter()).enumerate()
+ {
let last_offset: i32 = offsets
.last()
.copied()
.ok_or_else(|| internal_datafusion_err!("offsets should not be
empty"))?;
- match arr {
- Some(arr) => {
- let indices = UInt32Array::from(vec![row_index as u32]);
- let from_arr = arrow::compute::take(from_array, &indices,
None)?;
- let eq_array = match from_arr.data_type() {
- // arrow_ord::cmp_eq does not support ListArray, so we
need to compare it by loop
+ match list_array_row {
+ Some(list_array_row) => {
+ let indices = UInt32Array::from(vec![row_index as u32]);
+ let from_array_row = arrow::compute::take(from_array,
&indices, None)?;
+ // Compute all positions in list_row_array (that is itself an
+ // array) that are equal to `from_array_row`
+ let eq_array = match from_array_row.data_type() {
+ // arrow_ord::cmp::eq does not support ListArray, so we
need to compare it by loop
DataType::List(_) => {
- let from_a = as_list_array(&from_arr)?.value(0);
- let list_arr = as_list_array(&arr)?;
+ // compare each element of the from array
+ let from_array_row_inner =
+ as_list_array(&from_array_row)?.value(0);
+ let list_array_row_inner =
as_list_array(&list_array_row)?;
- let mut bool_values = vec![];
- for arr in list_arr.iter() {
- if let Some(a) = arr {
- bool_values.push(Some(a.eq(&from_a)));
- } else {
- return internal_err!(
- "Null value is not supported in
array_replace"
- );
- }
- }
- BooleanArray::from(bool_values)
+ list_array_row_inner
+ .iter()
+ // compare element by element the current row of
list_array
+ .map(|row| row.map(|row|
row.eq(&from_array_row_inner)))
+ .collect::<BooleanArray>()
}
_ => {
- let from_arr = Scalar::new(from_arr);
- arrow_ord::cmp::eq(&arr, &from_arr)?
+ let from_arr = Scalar::new(from_array_row);
+ arrow_ord::cmp::eq(&list_array_row, &from_arr)?
}
};
// Use MutableArrayData to build the replaced array
+ let original_data = list_array_row.to_data();
Review Comment:
Rather than using a new array with two indexes, I found naming them helped a
lot
##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -2763,6 +2790,52 @@ mod tests {
);
}
+ #[test]
+ fn test_array_replace_with_null() {
Review Comment:
Since the null cases were not covered, I added new coverage. Maybe this
style of test could help as we clean up the other aray functions
##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1222,119 +1222,144 @@ array_removement_function!(
"Array_remove_all SQL function"
);
-fn general_replace(args: &[ArrayRef], arr_n: Vec<i64>) -> Result<ArrayRef> {
- let list_array = as_list_array(&args[0])?;
- let from_array = &args[1];
- let to_array = &args[2];
-
+/// For each element of `list_array[i]`, replaces up to `arr_n[i]` occurences
+/// of `from_array[i]`, `to_array[i]`.
+///
+/// The type of each **element** in `list_array` must be the same as the type
of
+/// `from_array` and `to_array`. This function also handles nested arrays
+/// ([`ListArray`] of [`ListArray`]s)
+///
+/// For example, whn called to replace a list array (where each element is a
+/// list of int32s, the second and third argument are int32 arrays, and the
+/// fourth argument is the number of occurrences to replace
+///
+/// ```
+/// general_replace(
+/// [1, 2, 3, 2], 2, 10, 1 ==> [1, 10, 3, 2] (only the first 2 is
replaced)
+/// [4, 5, 6, 5], 5, 20, 2 ==> [4, 20, 6, 20] (both 5s are replaced)
+/// )
+/// ```
+///
+fn general_replace(
+ list_array: &ListArray,
+ from_array: &ArrayRef,
+ to_array: &ArrayRef,
+ arr_n: Vec<i64>,
+) -> Result<ArrayRef> {
+ // Build up the offsets for the final output array
let mut offsets: Vec<i32> = vec![0];
let data_type = list_array.value_type();
- let mut values = new_empty_array(&data_type);
+ let mut new_values = vec![];
- for (row_index, (arr, n)) in
list_array.iter().zip(arr_n.iter()).enumerate() {
+ // n is the number of elements to replace in this row
+ for (row_index, (list_array_row, n)) in
Review Comment:
I rename these variables to better explain what they are (rather than `arr`
a shorthand for `array` which is already overloaded in this function
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]