jayzhan211 commented on code in PR #7057:
URL: https://github.com/apache/arrow-datafusion/pull/7057#discussion_r1272855021
##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -902,176 +902,394 @@ pub fn array_positions(args: &[ArrayRef]) ->
Result<ArrayRef> {
Ok(res)
}
-macro_rules! remove {
- ($ARRAY:expr, $ELEMENT:expr, $ARRAY_TYPE:ident, $BUILDER_TYPE:ident) => {{
- let child_array =
- downcast_arg!(downcast_arg!($ARRAY, ListArray).values(),
$ARRAY_TYPE);
- let element = downcast_arg!($ELEMENT, $ARRAY_TYPE).value(0);
- let mut builder = new_builder!($BUILDER_TYPE, child_array.len());
+macro_rules! general_remove {
+ ($ARRAY:expr, $ELEMENT:expr, $MAX:expr, $ARRAY_TYPE:ident) => {{
+ let mut offsets: Vec<i32> = vec![0];
+ let mut values =
+ downcast_arg!(new_empty_array($ELEMENT.data_type()),
$ARRAY_TYPE).clone();
- for x in child_array {
- match x {
- Some(x) => {
- if x != element {
- builder.append_value(x);
- }
+ let element = downcast_arg!($ELEMENT, $ARRAY_TYPE);
+ for ((arr, el), max) in
$ARRAY.iter().zip(element.iter()).zip($MAX.iter()) {
+ let last_offset: i32 = offsets.last().copied().ok_or_else(|| {
+ DataFusionError::Internal(format!("offsets should not be
empty"))
+ })?;
+ match arr {
+ Some(arr) => {
+ let child_array = downcast_arg!(arr, $ARRAY_TYPE);
+ let mut counter = 0;
+ let max = if max < Some(1) { 1 } else { max.unwrap() };
+
+ let filter_array = child_array
+ .iter()
+ .map(|element| {
+ if counter != max && element == el {
+ counter += 1;
+ Some(false)
+ } else {
+ Some(true)
+ }
+ })
+ .collect::<BooleanArray>();
+
+ let filtered_array = compute::filter(&child_array,
&filter_array)?;
+ values = downcast_arg!(
+ compute::concat(&[&values, &filtered_array,])?.clone(),
+ $ARRAY_TYPE
+ )
+ .clone();
+ offsets.push(last_offset + filtered_array.len() as i32);
}
- None => builder.append_null(),
+ None => offsets.push(last_offset),
}
}
- let arr = builder.finish();
- let mut scalars = vec![];
- for i in 0..arr.len() {
-
scalars.push(ColumnarValue::Scalar(ScalarValue::try_from_array(&arr, i)?));
- }
- scalars
+ let field = Arc::new(Field::new("item", $ELEMENT.data_type().clone(),
true));
+
+ Arc::new(ListArray::try_new(
+ field,
+ OffsetBuffer::new(offsets.into()),
+ Arc::new(values),
+ None,
+ )?)
}};
}
/// Array_remove SQL function
-pub fn array_remove(args: &[ColumnarValue]) -> Result<ColumnarValue> {
- let arr = match &args[0] {
- ColumnarValue::Scalar(scalar) => scalar.to_array().clone(),
- ColumnarValue::Array(arr) => arr.clone(),
- };
+pub fn array_remove(args: &[ArrayRef]) -> Result<ArrayRef> {
+ let arr = as_list_array(&args[0])?;
+ let element = &args[1];
- let element = match &args[1] {
- ColumnarValue::Scalar(scalar) => scalar.to_array().clone(),
- _ => {
- return Err(DataFusionError::Internal(
- "Array_remove function requires scalar element".to_string(),
- ))
- }
+ let max = if args.len() == 3 {
+ as_int64_array(&args[2])?.clone()
+ } else {
+ Int64Array::from_value(1, arr.len())
};
- let data_type = arr.data_type();
- let res = match data_type {
- DataType::List(field) => {
- match (field.data_type(), element.data_type()) {
- (DataType::Utf8, DataType::Utf8) => remove!(arr, element,
StringArray, StringBuilder),
- (DataType::LargeUtf8, DataType::LargeUtf8) => remove!(arr,
element, LargeStringArray, LargeStringBuilder),
- (DataType::Boolean, DataType::Boolean) => remove!(arr,
element, BooleanArray, BooleanBuilder),
- (DataType::Float32, DataType::Float32) => remove!(arr,
element, Float32Array, Float32Builder),
- (DataType::Float64, DataType::Float64) => remove!(arr,
element, Float64Array, Float64Builder),
- (DataType::Int8, DataType::Int8) => remove!(arr, element,
Int8Array, Int8Builder),
- (DataType::Int16, DataType::Int16) => remove!(arr, element,
Int16Array, Int16Builder),
- (DataType::Int32, DataType::Int32) => remove!(arr, element,
Int32Array, Int32Builder),
- (DataType::Int64, DataType::Int64) => remove!(arr, element,
Int64Array, Int64Builder),
- (DataType::UInt8, DataType::UInt8) => remove!(arr, element,
UInt8Array, UInt8Builder),
- (DataType::UInt16, DataType::UInt16) => remove!(arr, element,
UInt16Array, UInt16Builder),
- (DataType::UInt32, DataType::UInt32) => remove!(arr, element,
UInt32Array, UInt32Builder),
- (DataType::UInt64, DataType::UInt64) => remove!(arr, element,
UInt64Array, UInt64Builder),
+ let res = match (arr.value_type(), element.data_type()) {
+ (DataType::List(_), DataType::List(_)) => general_remove!(arr,
element, max, ListArray),
+ (DataType::Utf8, DataType::Utf8) => general_remove!(arr,
element, max, StringArray),
+ (DataType::LargeUtf8, DataType::LargeUtf8) =>
general_remove!(arr, element, max, LargeStringArray),
+ (DataType::Boolean, DataType::Boolean) => general_remove!(arr,
element, max, BooleanArray),
+ (DataType::Float32, DataType::Float32) => general_remove!(arr,
element, max, Float32Array),
+ (DataType::Float64, DataType::Float64) => general_remove!(arr,
element, max, Float64Array),
+ (DataType::Int8, DataType::Int8) => general_remove!(arr,
element, max, Int8Array),
+ (DataType::Int16, DataType::Int16) => general_remove!(arr,
element, max, Int16Array),
+ (DataType::Int32, DataType::Int32) => general_remove!(arr,
element, max, Int32Array),
+ (DataType::Int64, DataType::Int64) => general_remove!(arr,
element, max, Int64Array),
+ (DataType::UInt8, DataType::UInt8) => general_remove!(arr,
element, max, UInt8Array),
+ (DataType::UInt16, DataType::UInt16) => general_remove!(arr,
element, max, UInt16Array),
+ (DataType::UInt32, DataType::UInt32) => general_remove!(arr,
element, max, UInt32Array),
+ (DataType::UInt64, DataType::UInt64) => general_remove!(arr,
element, max, UInt64Array),
(array_data_type, element_data_type) => {
return Err(DataFusionError::NotImplemented(format!(
"Array_remove is not implemented for types
'{array_data_type:?}' and '{element_data_type:?}'."
)))
}
+ };
+
+ Ok(res)
+}
+
+/// Array_removes SQL function
+pub fn array_removes(args: &[ArrayRef]) -> Result<ArrayRef> {
+ let arr = as_list_array(&args[0])?;
+ let element = &args[1];
+
+ let max = Int64Array::from_value(i64::MAX, arr.len());
+
+ let res = match (arr.value_type(), element.data_type()) {
+ (DataType::List(_), DataType::List(_)) => general_remove!(arr,
element, max, ListArray),
+ (DataType::Utf8, DataType::Utf8) => general_remove!(arr,
element, max, StringArray),
+ (DataType::LargeUtf8, DataType::LargeUtf8) =>
general_remove!(arr, element, max, LargeStringArray),
+ (DataType::Boolean, DataType::Boolean) => general_remove!(arr,
element, max, BooleanArray),
+ (DataType::Float32, DataType::Float32) => general_remove!(arr,
element, max, Float32Array),
+ (DataType::Float64, DataType::Float64) => general_remove!(arr,
element, max, Float64Array),
+ (DataType::Int8, DataType::Int8) => general_remove!(arr,
element, max, Int8Array),
+ (DataType::Int16, DataType::Int16) => general_remove!(arr,
element, max, Int16Array),
+ (DataType::Int32, DataType::Int32) => general_remove!(arr,
element, max, Int32Array),
+ (DataType::Int64, DataType::Int64) => general_remove!(arr,
element, max, Int64Array),
+ (DataType::UInt8, DataType::UInt8) => general_remove!(arr,
element, max, UInt8Array),
+ (DataType::UInt16, DataType::UInt16) => general_remove!(arr,
element, max, UInt16Array),
+ (DataType::UInt32, DataType::UInt32) => general_remove!(arr,
element, max, UInt32Array),
+ (DataType::UInt64, DataType::UInt64) => general_remove!(arr,
element, max, UInt64Array),
+ (array_data_type, element_data_type) => {
+ return Err(DataFusionError::NotImplemented(format!(
+ "Array_removes is not implemented for types
'{array_data_type:?}' and '{element_data_type:?}'."
+ )))
+ }
+ };
+
+ Ok(res)
+}
+
+macro_rules! general_replace {
+ ($ARRAY:expr, $FROM:expr, $TO:expr, $MAX:expr, $ARRAY_TYPE:ident) => {{
+ let mut offsets: Vec<i32> = vec![0];
+ let mut values =
+ downcast_arg!(new_empty_array($FROM.data_type()),
$ARRAY_TYPE).clone();
+
+ let from_array = downcast_arg!($FROM, $ARRAY_TYPE);
+ let to_array = downcast_arg!($TO, $ARRAY_TYPE);
+ for (((arr, from), to), max) in $ARRAY
+ .iter()
+ .zip(from_array.iter())
+ .zip(to_array.iter())
+ .zip($MAX.iter())
+ {
+ let last_offset: i32 = offsets.last().copied().ok_or_else(|| {
+ DataFusionError::Internal(format!("offsets should not be
empty"))
+ })?;
+ match arr {
+ Some(arr) => {
+ let child_array = downcast_arg!(arr, $ARRAY_TYPE);
+ let mut counter = 0;
+ let max = if max < Some(1) { 1 } else { max.unwrap() };
+
+ let replaced_array = child_array
+ .iter()
+ .map(|el| {
+ if counter != max && el == from {
+ counter += 1;
+ to
+ } else {
+ el
+ }
+ })
+ .collect::<$ARRAY_TYPE>();
+
+ values = downcast_arg!(
+ compute::concat(&[&values, &replaced_array])?.clone(),
+ $ARRAY_TYPE
+ )
+ .clone();
+ offsets.push(last_offset + replaced_array.len() as i32);
+ }
+ None => {
+ offsets.push(last_offset);
+ }
}
}
- data_type => {
- return Err(DataFusionError::Internal(format!(
- "Array is not type '{data_type:?}'."
- )))
- }
- };
- array(res.as_slice())
+ let field = Arc::new(Field::new("item", $FROM.data_type().clone(),
true));
+
+ Arc::new(ListArray::try_new(
+ field,
+ OffsetBuffer::new(offsets.into()),
+ Arc::new(values),
+ None,
+ )?)
+ }};
}
-macro_rules! replace {
- ($ARRAY:expr, $FROM:expr, $TO:expr, $ARRAY_TYPE:ident,
$BUILDER_TYPE:ident) => {{
- let child_array =
- downcast_arg!(downcast_arg!($ARRAY, ListArray).values(),
$ARRAY_TYPE);
- let from = downcast_arg!($FROM, $ARRAY_TYPE).value(0);
- let to = downcast_arg!($TO, $ARRAY_TYPE).value(0);
- let mut builder = new_builder!($BUILDER_TYPE, child_array.len());
+macro_rules! general_replace_list {
+ ($ARRAY:expr, $FROM:expr, $TO:expr, $MAX:expr, $ARRAY_TYPE:ident) => {{
+ let mut offsets: Vec<i32> = vec![0];
+ let mut values =
+ downcast_arg!(new_empty_array($FROM.data_type()),
ListArray).clone();
- for x in child_array {
- match x {
- Some(x) => {
- if x == from {
- builder.append_value(to);
- } else {
- builder.append_value(x);
+ let from_array = downcast_arg!($FROM, ListArray);
+ let to_array = downcast_arg!($TO, ListArray);
+ for (((arr, from), to), max) in $ARRAY
+ .iter()
+ .zip(from_array.iter())
+ .zip(to_array.iter())
+ .zip($MAX.iter())
+ {
+ let last_offset: i32 = offsets.last().copied().ok_or_else(|| {
+ DataFusionError::Internal(format!("offsets should not be
empty"))
+ })?;
+ match arr {
+ Some(arr) => {
+ let child_array = downcast_arg!(arr, ListArray);
+ let mut counter = 0;
+ let max = if max < Some(1) { 1 } else { max.unwrap() };
+
+ let replaced_vec = child_array
+ .iter()
+ .map(|el| {
+ if counter != max && el == from {
+ counter += 1;
+ to.clone().unwrap()
+ } else {
+ el.clone().unwrap()
+ }
+ })
+ .collect::<Vec<_>>();
+
+ let mut i: i32 = 0;
+ let mut replaced_offsets = vec![i];
+ replaced_offsets.extend(
+ replaced_vec
+ .clone()
+ .into_iter()
+ .map(|a| {
+ i += a.len() as i32;
+ i
+ })
+ .collect::<Vec<_>>(),
+ );
+
+ let mut replaced_values = downcast_arg!(
+ new_empty_array(&from_array.value_type()),
+ $ARRAY_TYPE
+ )
+ .clone();
+ for replaced_list in replaced_vec {
+ replaced_values = downcast_arg!(
+ compute::concat(&[&replaced_values,
&replaced_list])?,
+ $ARRAY_TYPE
+ )
+ .clone();
}
+
+ let field = Arc::new(Field::new(
+ "item",
+ from_array.value_type().clone(),
+ true,
+ ));
+ let replaced_array = ListArray::try_new(
+ field,
+ OffsetBuffer::new(replaced_offsets.clone().into()),
+ Arc::new(replaced_values),
+ None,
+ )?;
+
+ values = downcast_arg!(
+ compute::concat(&[&values, &replaced_array,])?.clone(),
+ ListArray
+ )
+ .clone();
+ offsets.push(last_offset + replaced_array.len() as i32);
+ }
+ None => {
+ offsets.push(last_offset);
}
- None => builder.append_null(),
}
}
- let arr = builder.finish();
- let mut scalars = vec![];
- for i in 0..arr.len() {
-
scalars.push(ColumnarValue::Scalar(ScalarValue::try_from_array(&arr, i)?));
- }
- scalars
+ let field = Arc::new(Field::new("item", $FROM.data_type().clone(),
true));
+
+ Arc::new(ListArray::try_new(
+ field,
+ OffsetBuffer::new(offsets.into()),
+ Arc::new(values),
+ None,
+ )?)
}};
}
/// Array_replace SQL function
-pub fn array_replace(args: &[ColumnarValue]) -> Result<ColumnarValue> {
- let arr = match &args[0] {
- ColumnarValue::Scalar(scalar) => scalar.to_array().clone(),
- ColumnarValue::Array(arr) => arr.clone(),
- };
+pub fn array_replace(args: &[ArrayRef]) -> Result<ArrayRef> {
Review Comment:
I think `array_replace(array, from, to, max)` should be `array_replaces`
since not replace only one. `array_remove(array, element, max)` should be
`array_remove(array, elements, max)` since we might remove elements, the same
as `array_removes(array, elements)`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]