jayzhan211 commented on code in PR #7057:
URL: https://github.com/apache/arrow-datafusion/pull/7057#discussion_r1272855021


##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -902,176 +902,394 @@ pub fn array_positions(args: &[ArrayRef]) -> 
Result<ArrayRef> {
     Ok(res)
 }
 
-macro_rules! remove {
-    ($ARRAY:expr, $ELEMENT:expr, $ARRAY_TYPE:ident, $BUILDER_TYPE:ident) => {{
-        let child_array =
-            downcast_arg!(downcast_arg!($ARRAY, ListArray).values(), 
$ARRAY_TYPE);
-        let element = downcast_arg!($ELEMENT, $ARRAY_TYPE).value(0);
-        let mut builder = new_builder!($BUILDER_TYPE, child_array.len());
+macro_rules! general_remove {
+    ($ARRAY:expr, $ELEMENT:expr, $MAX:expr, $ARRAY_TYPE:ident) => {{
+        let mut offsets: Vec<i32> = vec![0];
+        let mut values =
+            downcast_arg!(new_empty_array($ELEMENT.data_type()), 
$ARRAY_TYPE).clone();
 
-        for x in child_array {
-            match x {
-                Some(x) => {
-                    if x != element {
-                        builder.append_value(x);
-                    }
+        let element = downcast_arg!($ELEMENT, $ARRAY_TYPE);
+        for ((arr, el), max) in 
$ARRAY.iter().zip(element.iter()).zip($MAX.iter()) {
+            let last_offset: i32 = offsets.last().copied().ok_or_else(|| {
+                DataFusionError::Internal(format!("offsets should not be 
empty"))
+            })?;
+            match arr {
+                Some(arr) => {
+                    let child_array = downcast_arg!(arr, $ARRAY_TYPE);
+                    let mut counter = 0;
+                    let max = if max < Some(1) { 1 } else { max.unwrap() };
+
+                    let filter_array = child_array
+                        .iter()
+                        .map(|element| {
+                            if counter != max && element == el {
+                                counter += 1;
+                                Some(false)
+                            } else {
+                                Some(true)
+                            }
+                        })
+                        .collect::<BooleanArray>();
+
+                    let filtered_array = compute::filter(&child_array, 
&filter_array)?;
+                    values = downcast_arg!(
+                        compute::concat(&[&values, &filtered_array,])?.clone(),
+                        $ARRAY_TYPE
+                    )
+                    .clone();
+                    offsets.push(last_offset + filtered_array.len() as i32);
                 }
-                None => builder.append_null(),
+                None => offsets.push(last_offset),
             }
         }
-        let arr = builder.finish();
 
-        let mut scalars = vec![];
-        for i in 0..arr.len() {
-            
scalars.push(ColumnarValue::Scalar(ScalarValue::try_from_array(&arr, i)?));
-        }
-        scalars
+        let field = Arc::new(Field::new("item", $ELEMENT.data_type().clone(), 
true));
+
+        Arc::new(ListArray::try_new(
+            field,
+            OffsetBuffer::new(offsets.into()),
+            Arc::new(values),
+            None,
+        )?)
     }};
 }
 
 /// Array_remove SQL function
-pub fn array_remove(args: &[ColumnarValue]) -> Result<ColumnarValue> {
-    let arr = match &args[0] {
-        ColumnarValue::Scalar(scalar) => scalar.to_array().clone(),
-        ColumnarValue::Array(arr) => arr.clone(),
-    };
+pub fn array_remove(args: &[ArrayRef]) -> Result<ArrayRef> {
+    let arr = as_list_array(&args[0])?;
+    let element = &args[1];
 
-    let element = match &args[1] {
-        ColumnarValue::Scalar(scalar) => scalar.to_array().clone(),
-        _ => {
-            return Err(DataFusionError::Internal(
-                "Array_remove function requires scalar element".to_string(),
-            ))
-        }
+    let max = if args.len() == 3 {
+        as_int64_array(&args[2])?.clone()
+    } else {
+        Int64Array::from_value(1, arr.len())
     };
 
-    let data_type = arr.data_type();
-    let res = match data_type {
-        DataType::List(field) => {
-            match (field.data_type(), element.data_type()) {
-                (DataType::Utf8, DataType::Utf8) => remove!(arr, element, 
StringArray, StringBuilder),
-                (DataType::LargeUtf8, DataType::LargeUtf8) => remove!(arr, 
element, LargeStringArray, LargeStringBuilder),
-                (DataType::Boolean, DataType::Boolean) => remove!(arr, 
element, BooleanArray, BooleanBuilder),
-                (DataType::Float32, DataType::Float32) => remove!(arr, 
element, Float32Array, Float32Builder),
-                (DataType::Float64, DataType::Float64) => remove!(arr, 
element, Float64Array, Float64Builder),
-                (DataType::Int8, DataType::Int8) => remove!(arr, element, 
Int8Array, Int8Builder),
-                (DataType::Int16, DataType::Int16) => remove!(arr, element, 
Int16Array, Int16Builder),
-                (DataType::Int32, DataType::Int32) => remove!(arr, element, 
Int32Array, Int32Builder),
-                (DataType::Int64, DataType::Int64) => remove!(arr, element, 
Int64Array, Int64Builder),
-                (DataType::UInt8, DataType::UInt8) => remove!(arr, element, 
UInt8Array, UInt8Builder),
-                (DataType::UInt16, DataType::UInt16) => remove!(arr, element, 
UInt16Array, UInt16Builder),
-                (DataType::UInt32, DataType::UInt32) => remove!(arr, element, 
UInt32Array, UInt32Builder),
-                (DataType::UInt64, DataType::UInt64) => remove!(arr, element, 
UInt64Array, UInt64Builder),
+    let res = match (arr.value_type(), element.data_type()) {
+                (DataType::List(_), DataType::List(_)) => general_remove!(arr, 
element, max, ListArray),
+                (DataType::Utf8, DataType::Utf8) => general_remove!(arr, 
element, max, StringArray),
+                (DataType::LargeUtf8, DataType::LargeUtf8) => 
general_remove!(arr, element, max, LargeStringArray),
+                (DataType::Boolean, DataType::Boolean) => general_remove!(arr, 
element, max, BooleanArray),
+                (DataType::Float32, DataType::Float32) => general_remove!(arr, 
element, max, Float32Array),
+                (DataType::Float64, DataType::Float64) => general_remove!(arr, 
element, max, Float64Array),
+                (DataType::Int8, DataType::Int8) => general_remove!(arr, 
element, max, Int8Array),
+                (DataType::Int16, DataType::Int16) => general_remove!(arr, 
element, max, Int16Array),
+                (DataType::Int32, DataType::Int32) => general_remove!(arr, 
element, max, Int32Array),
+                (DataType::Int64, DataType::Int64) => general_remove!(arr, 
element, max, Int64Array),
+                (DataType::UInt8, DataType::UInt8) => general_remove!(arr, 
element, max, UInt8Array),
+                (DataType::UInt16, DataType::UInt16) => general_remove!(arr, 
element, max, UInt16Array),
+                (DataType::UInt32, DataType::UInt32) => general_remove!(arr, 
element, max, UInt32Array),
+                (DataType::UInt64, DataType::UInt64) => general_remove!(arr, 
element, max, UInt64Array),
                 (array_data_type, element_data_type) => {
                     return Err(DataFusionError::NotImplemented(format!(
                         "Array_remove is not implemented for types 
'{array_data_type:?}' and '{element_data_type:?}'."
                     )))
                 }
+    };
+
+    Ok(res)
+}
+
+/// Array_removes SQL function
+pub fn array_removes(args: &[ArrayRef]) -> Result<ArrayRef> {
+    let arr = as_list_array(&args[0])?;
+    let element = &args[1];
+
+    let max = Int64Array::from_value(i64::MAX, arr.len());
+
+    let res = match (arr.value_type(), element.data_type()) {
+                (DataType::List(_), DataType::List(_)) => general_remove!(arr, 
element, max, ListArray),
+                (DataType::Utf8, DataType::Utf8) => general_remove!(arr, 
element, max, StringArray),
+                (DataType::LargeUtf8, DataType::LargeUtf8) => 
general_remove!(arr, element, max, LargeStringArray),
+                (DataType::Boolean, DataType::Boolean) => general_remove!(arr, 
element, max, BooleanArray),
+                (DataType::Float32, DataType::Float32) => general_remove!(arr, 
element, max, Float32Array),
+                (DataType::Float64, DataType::Float64) => general_remove!(arr, 
element, max, Float64Array),
+                (DataType::Int8, DataType::Int8) => general_remove!(arr, 
element, max, Int8Array),
+                (DataType::Int16, DataType::Int16) => general_remove!(arr, 
element, max, Int16Array),
+                (DataType::Int32, DataType::Int32) => general_remove!(arr, 
element, max, Int32Array),
+                (DataType::Int64, DataType::Int64) => general_remove!(arr, 
element, max, Int64Array),
+                (DataType::UInt8, DataType::UInt8) => general_remove!(arr, 
element, max, UInt8Array),
+                (DataType::UInt16, DataType::UInt16) => general_remove!(arr, 
element, max, UInt16Array),
+                (DataType::UInt32, DataType::UInt32) => general_remove!(arr, 
element, max, UInt32Array),
+                (DataType::UInt64, DataType::UInt64) => general_remove!(arr, 
element, max, UInt64Array),
+                (array_data_type, element_data_type) => {
+                    return Err(DataFusionError::NotImplemented(format!(
+                        "Array_removes is not implemented for types 
'{array_data_type:?}' and '{element_data_type:?}'."
+                    )))
+                }
+    };
+
+    Ok(res)
+}
+
+macro_rules! general_replace {
+    ($ARRAY:expr, $FROM:expr, $TO:expr, $MAX:expr, $ARRAY_TYPE:ident) => {{
+        let mut offsets: Vec<i32> = vec![0];
+        let mut values =
+            downcast_arg!(new_empty_array($FROM.data_type()), 
$ARRAY_TYPE).clone();
+
+        let from_array = downcast_arg!($FROM, $ARRAY_TYPE);
+        let to_array = downcast_arg!($TO, $ARRAY_TYPE);
+        for (((arr, from), to), max) in $ARRAY
+            .iter()
+            .zip(from_array.iter())
+            .zip(to_array.iter())
+            .zip($MAX.iter())
+        {
+            let last_offset: i32 = offsets.last().copied().ok_or_else(|| {
+                DataFusionError::Internal(format!("offsets should not be 
empty"))
+            })?;
+            match arr {
+                Some(arr) => {
+                    let child_array = downcast_arg!(arr, $ARRAY_TYPE);
+                    let mut counter = 0;
+                    let max = if max < Some(1) { 1 } else { max.unwrap() };
+
+                    let replaced_array = child_array
+                        .iter()
+                        .map(|el| {
+                            if counter != max && el == from {
+                                counter += 1;
+                                to
+                            } else {
+                                el
+                            }
+                        })
+                        .collect::<$ARRAY_TYPE>();
+
+                    values = downcast_arg!(
+                        compute::concat(&[&values, &replaced_array])?.clone(),
+                        $ARRAY_TYPE
+                    )
+                    .clone();
+                    offsets.push(last_offset + replaced_array.len() as i32);
+                }
+                None => {
+                    offsets.push(last_offset);
+                }
             }
         }
-        data_type => {
-            return Err(DataFusionError::Internal(format!(
-                "Array is not type '{data_type:?}'."
-            )))
-        }
-    };
 
-    array(res.as_slice())
+        let field = Arc::new(Field::new("item", $FROM.data_type().clone(), 
true));
+
+        Arc::new(ListArray::try_new(
+            field,
+            OffsetBuffer::new(offsets.into()),
+            Arc::new(values),
+            None,
+        )?)
+    }};
 }
 
-macro_rules! replace {
-    ($ARRAY:expr, $FROM:expr, $TO:expr, $ARRAY_TYPE:ident, 
$BUILDER_TYPE:ident) => {{
-        let child_array =
-            downcast_arg!(downcast_arg!($ARRAY, ListArray).values(), 
$ARRAY_TYPE);
-        let from = downcast_arg!($FROM, $ARRAY_TYPE).value(0);
-        let to = downcast_arg!($TO, $ARRAY_TYPE).value(0);
-        let mut builder = new_builder!($BUILDER_TYPE, child_array.len());
+macro_rules! general_replace_list {
+    ($ARRAY:expr, $FROM:expr, $TO:expr, $MAX:expr, $ARRAY_TYPE:ident) => {{
+        let mut offsets: Vec<i32> = vec![0];
+        let mut values =
+            downcast_arg!(new_empty_array($FROM.data_type()), 
ListArray).clone();
 
-        for x in child_array {
-            match x {
-                Some(x) => {
-                    if x == from {
-                        builder.append_value(to);
-                    } else {
-                        builder.append_value(x);
+        let from_array = downcast_arg!($FROM, ListArray);
+        let to_array = downcast_arg!($TO, ListArray);
+        for (((arr, from), to), max) in $ARRAY
+            .iter()
+            .zip(from_array.iter())
+            .zip(to_array.iter())
+            .zip($MAX.iter())
+        {
+            let last_offset: i32 = offsets.last().copied().ok_or_else(|| {
+                DataFusionError::Internal(format!("offsets should not be 
empty"))
+            })?;
+            match arr {
+                Some(arr) => {
+                    let child_array = downcast_arg!(arr, ListArray);
+                    let mut counter = 0;
+                    let max = if max < Some(1) { 1 } else { max.unwrap() };
+
+                    let replaced_vec = child_array
+                        .iter()
+                        .map(|el| {
+                            if counter != max && el == from {
+                                counter += 1;
+                                to.clone().unwrap()
+                            } else {
+                                el.clone().unwrap()
+                            }
+                        })
+                        .collect::<Vec<_>>();
+
+                    let mut i: i32 = 0;
+                    let mut replaced_offsets = vec![i];
+                    replaced_offsets.extend(
+                        replaced_vec
+                            .clone()
+                            .into_iter()
+                            .map(|a| {
+                                i += a.len() as i32;
+                                i
+                            })
+                            .collect::<Vec<_>>(),
+                    );
+
+                    let mut replaced_values = downcast_arg!(
+                        new_empty_array(&from_array.value_type()),
+                        $ARRAY_TYPE
+                    )
+                    .clone();
+                    for replaced_list in replaced_vec {
+                        replaced_values = downcast_arg!(
+                            compute::concat(&[&replaced_values, 
&replaced_list])?,
+                            $ARRAY_TYPE
+                        )
+                        .clone();
                     }
+
+                    let field = Arc::new(Field::new(
+                        "item",
+                        from_array.value_type().clone(),
+                        true,
+                    ));
+                    let replaced_array = ListArray::try_new(
+                        field,
+                        OffsetBuffer::new(replaced_offsets.clone().into()),
+                        Arc::new(replaced_values),
+                        None,
+                    )?;
+
+                    values = downcast_arg!(
+                        compute::concat(&[&values, &replaced_array,])?.clone(),
+                        ListArray
+                    )
+                    .clone();
+                    offsets.push(last_offset + replaced_array.len() as i32);
+                }
+                None => {
+                    offsets.push(last_offset);
                 }
-                None => builder.append_null(),
             }
         }
-        let arr = builder.finish();
 
-        let mut scalars = vec![];
-        for i in 0..arr.len() {
-            
scalars.push(ColumnarValue::Scalar(ScalarValue::try_from_array(&arr, i)?));
-        }
-        scalars
+        let field = Arc::new(Field::new("item", $FROM.data_type().clone(), 
true));
+
+        Arc::new(ListArray::try_new(
+            field,
+            OffsetBuffer::new(offsets.into()),
+            Arc::new(values),
+            None,
+        )?)
     }};
 }
 
 /// Array_replace SQL function
-pub fn array_replace(args: &[ColumnarValue]) -> Result<ColumnarValue> {
-    let arr = match &args[0] {
-        ColumnarValue::Scalar(scalar) => scalar.to_array().clone(),
-        ColumnarValue::Array(arr) => arr.clone(),
-    };
+pub fn array_replace(args: &[ArrayRef]) -> Result<ArrayRef> {

Review Comment:
   I think `array_replace(array, from, to, max)` should be `array_replaces` 
since not replace only one. `array_remove(array, element, max)` should be 
`array_remove(array, elements, max)` since we might remove elements, the same 
as `array_removes(array, elements)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to