jayzhan211 commented on code in PR #8081:
URL: https://github.com/apache/arrow-datafusion/pull/8081#discussion_r1387915399


##########
datafusion/expr/src/built_in_function.rs:
##########
@@ -543,6 +546,34 @@ impl BuiltinScalarFunction {
             BuiltinScalarFunction::ArrayReplaceN => 
Ok(input_expr_types[0].clone()),
             BuiltinScalarFunction::ArrayReplaceAll => 
Ok(input_expr_types[0].clone()),
             BuiltinScalarFunction::ArrayToString => Ok(Utf8),
+            BuiltinScalarFunction::ArrayIntersect => {
+                if input_expr_types.len() < 2 || input_expr_types.len() > 2 {
+                    Err(DataFusionError::Internal(format!(

Review Comment:
   You can consider macro internal_error!
   
   Also, I think we can check data types in array_expression.rs. Since 
return_type() should just return data type, type checking should be done 
elsewhere. `array_expression` might not be the best place to do type checking, 
but definitely not in `return_type` too.
   You can just return `Ok(input_expr_types[0].clone())` here.



##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1820,6 +1822,66 @@ pub fn array_has_all(args: &[ArrayRef]) -> 
Result<ArrayRef> {
     Ok(Arc::new(boolean_builder.finish()))
 }
 
+/// array_intersect SQL function
+pub fn array_intersect(args: &[ArrayRef]) -> Result<ArrayRef> {
+    assert_eq!(args.len(), 2);
+
+    let first_array = as_list_array(&args[0])?;
+    let second_array = as_list_array(&args[1])?;
+
+    if first_array.value_type() != second_array.value_type() {
+        return Err(DataFusionError::NotImplemented(format!(
+            "array_intersect is not implemented for '{first_array:?}' and 
'{second_array:?}'",
+        )));
+    }
+    let dt = first_array.value_type().clone();
+
+    let mut offsets = vec![0];
+    let mut tmp_values = vec![];
+
+    let mut converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
+    for (first_arr, second_arr) in first_array.iter().zip(second_array.iter()) 
{
+        if let (Some(first_arr), Some(second_arr)) = (first_arr, second_arr) {
+            let l_values = converter.convert_columns(&[first_arr])?;
+            let r_values = converter.convert_columns(&[second_arr])?;
+
+            let mut values_set = HashSet::with_capacity(l_values.num_rows());
+            for l_val in l_values.iter() {
+                values_set.insert(l_val);
+            }
+            let mut rows = Vec::with_capacity(r_values.num_rows());
+            for r_val in r_values.iter().sorted().dedup() {
+                if values_set.contains(&r_val) {
+                    rows.push(r_val);
+                }
+            }
+
+            let last_offset: i32 = offsets.last().copied().ok_or_else(|| {
+                DataFusionError::Internal(format!("offsets should not be 
empty"))

Review Comment:
   macro



##########
datafusion/physical-expr/src/array_expressions.rs:
##########
@@ -1820,6 +1822,66 @@ pub fn array_has_all(args: &[ArrayRef]) -> 
Result<ArrayRef> {
     Ok(Arc::new(boolean_builder.finish()))
 }
 
+/// array_intersect SQL function
+pub fn array_intersect(args: &[ArrayRef]) -> Result<ArrayRef> {
+    assert_eq!(args.len(), 2);
+
+    let first_array = as_list_array(&args[0])?;
+    let second_array = as_list_array(&args[1])?;
+
+    if first_array.value_type() != second_array.value_type() {
+        return Err(DataFusionError::NotImplemented(format!(
+            "array_intersect is not implemented for '{first_array:?}' and 
'{second_array:?}'",
+        )));
+    }
+    let dt = first_array.value_type().clone();
+
+    let mut offsets = vec![0];
+    let mut tmp_values = vec![];
+
+    let mut converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
+    for (first_arr, second_arr) in first_array.iter().zip(second_array.iter()) 
{
+        if let (Some(first_arr), Some(second_arr)) = (first_arr, second_arr) {
+            let l_values = converter.convert_columns(&[first_arr])?;
+            let r_values = converter.convert_columns(&[second_arr])?;
+
+            let mut values_set = HashSet::with_capacity(l_values.num_rows());
+            for l_val in l_values.iter() {
+                values_set.insert(l_val);
+            }
+            let mut rows = Vec::with_capacity(r_values.num_rows());
+            for r_val in r_values.iter().sorted().dedup() {
+                if values_set.contains(&r_val) {
+                    rows.push(r_val);
+                }
+            }
+
+            let last_offset: i32 = offsets.last().copied().ok_or_else(|| {
+                DataFusionError::Internal(format!("offsets should not be 
empty"))
+            })?;
+            offsets.push(last_offset + rows.len() as i32);
+            let tmp_value = converter.convert_rows(rows)?;
+            tmp_values.push(
+                tmp_value
+                    .get(0)
+                    .ok_or_else(|| {
+                        DataFusionError::Internal(format!(
+                            "array_intersect: failed to get value from rows"
+                        ))
+                    })?
+                    .clone(),
+            );
+        }
+    }
+
+    let field = Arc::new(Field::new("item", dt, true));
+    let offsets = OffsetBuffer::new(offsets.into());
+    let tmp_values_ref = tmp_values.iter().map(|v| 
v.as_ref()).collect::<Vec<_>>();
+    let values = concat(&tmp_values_ref)?;

Review Comment:
   I prefer arrow::compute::concat since we have `concat_internal` in this file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to