Jefffrey commented on code in PR #18137:
URL: https://github.com/apache/datafusion/pull/18137#discussion_r2460851707


##########
datafusion/functions/src/string/concat.rs:
##########
@@ -65,14 +75,223 @@ impl Default for ConcatFunc {
 
 impl ConcatFunc {
     pub fn new() -> Self {
-        use DataType::*;
         Self {
-            signature: Signature::variadic(
-                vec![Utf8View, Utf8, LargeUtf8],
-                Volatility::Immutable,
-            ),
+            signature: Signature::user_defined(Volatility::Immutable),
         }
     }
+
+    fn concat_arrays(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        if args.is_empty() {
+            return plan_err!("concat requires at least one argument");
+        }
+
+        // Simple case: single row - use fast path
+        let num_rows = args
+            .iter()
+            .find_map(|arg| match arg {
+                ColumnarValue::Array(array) => Some(array.len()),
+                _ => None,
+            })
+            .unwrap_or(1);

Review Comment:
   I feel this count could be obtained from the original ScalarFunctionArgs and 
passed through, instead of having this logic (which doesn't account for scalars)



##########
datafusion/functions/src/string/concat.rs:
##########
@@ -65,14 +75,223 @@ impl Default for ConcatFunc {
 
 impl ConcatFunc {
     pub fn new() -> Self {
-        use DataType::*;
         Self {
-            signature: Signature::variadic(
-                vec![Utf8View, Utf8, LargeUtf8],
-                Volatility::Immutable,
-            ),
+            signature: Signature::user_defined(Volatility::Immutable),
         }
     }
+
+    fn concat_arrays(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        if args.is_empty() {
+            return plan_err!("concat requires at least one argument");
+        }
+
+        // Simple case: single row - use fast path
+        let num_rows = args
+            .iter()
+            .find_map(|arg| match arg {
+                ColumnarValue::Array(array) => Some(array.len()),
+                _ => None,
+            })
+            .unwrap_or(1);
+
+        if num_rows == 1 {
+            return self.concat_arrays_single_row(args);
+        }
+
+        // Multi-row case: process more carefully to avoid blocking
+        let arrays: Result<Vec<Arc<dyn Array>>> = args
+            .iter()
+            .map(|arg| match arg {
+                ColumnarValue::Array(array) => Ok(Arc::clone(array)),
+                ColumnarValue::Scalar(scalar) => 
scalar.to_array_of_size(num_rows),
+            })
+            .collect();
+
+        let arrays = arrays?;
+
+        // Build result using efficient batched operations
+        self.concat_arrays_multi_row(&arrays, num_rows)
+    }
+
+    /// Fast path for single-row array concatenation
+    fn concat_arrays_single_row(&self, args: &[ColumnarValue]) -> 
Result<ColumnarValue> {
+        let mut all_elements = Vec::new();
+
+        for arg in args {
+            match arg {
+                ColumnarValue::Array(array) => {
+                    if !array.is_null(0) {
+                        let elements = 
self.extract_row_elements(array.as_ref(), 0)?;
+                        all_elements.extend(elements);
+                    }
+                }
+                ColumnarValue::Scalar(scalar) => {
+                    let array = scalar.to_array_of_size(1)?;

Review Comment:
   Is it possible to avoid this conversion to array?



##########
datafusion/functions/src/string/concat.rs:
##########
@@ -258,18 +535,11 @@ impl ScalarUDFImpl for ConcatFunc {
                 let string_array = builder.finish(None);
                 Ok(ColumnarValue::Array(Arc::new(string_array)))
             }
-            _ => unreachable!(),
+            _ => plan_err!("Unsupported return datatype: {return_datatype}"),
         }
     }
 
-    /// Simplify the `concat` function by
-    /// 1. filtering out all `null` literals
-    /// 2. concatenating contiguous literal arguments
-    ///
-    /// For example:
-    /// `concat(col(a), 'hello ', 'world', col(b), null)`
-    /// will be optimized to
-    /// `concat(col(a), 'hello world', col(b))`
+    /// Simplify the `concat` function by concatenating literals and filtering 
nulls

Review Comment:
   The old comment has details like the example but why are we removing it now?



##########
datafusion/functions/src/string/concat.rs:
##########
@@ -139,10 +414,14 @@ impl ScalarUDFImpl for ConcatFunc {
                 match scalar.try_as_str() {
                     Some(Some(v)) => result.push_str(v),
                     Some(None) => {} // null literal
-                    None => plan_err!(
-                        "Concat function does not support scalar type {}",
-                        scalar
-                    )?,
+                    None => {
+                        // For non-string types, convert to string 
representation
+                        if scalar.is_null() {
+                            // Skip null values
+                        } else {
+                            result.push_str(&format!("{scalar}"));
+                        }
+                    }

Review Comment:
   Why is this change necessary?



##########
datafusion/functions/src/string/concat.rs:
##########
@@ -88,26 +307,82 @@ impl ScalarUDFImpl for ConcatFunc {
         &self.signature
     }
 
+    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
+        use DataType::*;
+
+        if arg_types.is_empty() {
+            return plan_err!("concat requires at least one argument");
+        }
+
+        // Arrays need no coercion
+        for dt in arg_types {
+            if matches!(dt, List(_) | LargeList(_) | FixedSizeList(_, _)) {
+                return Ok(arg_types.to_vec());
+            }
+        }

Review Comment:
   If we find one list type we exit early; what happens if we have a `List, 
String` input? Shouldn't we capture that error here?



##########
datafusion/functions/src/string/concat.rs:
##########
@@ -65,14 +75,223 @@ impl Default for ConcatFunc {
 
 impl ConcatFunc {
     pub fn new() -> Self {
-        use DataType::*;
         Self {
-            signature: Signature::variadic(
-                vec![Utf8View, Utf8, LargeUtf8],
-                Volatility::Immutable,
-            ),
+            signature: Signature::user_defined(Volatility::Immutable),
         }
     }
+
+    fn concat_arrays(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        if args.is_empty() {
+            return plan_err!("concat requires at least one argument");
+        }
+
+        // Simple case: single row - use fast path
+        let num_rows = args
+            .iter()
+            .find_map(|arg| match arg {
+                ColumnarValue::Array(array) => Some(array.len()),
+                _ => None,
+            })
+            .unwrap_or(1);
+
+        if num_rows == 1 {
+            return self.concat_arrays_single_row(args);
+        }
+
+        // Multi-row case: process more carefully to avoid blocking
+        let arrays: Result<Vec<Arc<dyn Array>>> = args
+            .iter()
+            .map(|arg| match arg {
+                ColumnarValue::Array(array) => Ok(Arc::clone(array)),
+                ColumnarValue::Scalar(scalar) => 
scalar.to_array_of_size(num_rows),
+            })
+            .collect();
+
+        let arrays = arrays?;
+
+        // Build result using efficient batched operations
+        self.concat_arrays_multi_row(&arrays, num_rows)
+    }
+
+    /// Fast path for single-row array concatenation
+    fn concat_arrays_single_row(&self, args: &[ColumnarValue]) -> 
Result<ColumnarValue> {
+        let mut all_elements = Vec::new();
+
+        for arg in args {
+            match arg {
+                ColumnarValue::Array(array) => {
+                    if !array.is_null(0) {
+                        let elements = 
self.extract_row_elements(array.as_ref(), 0)?;
+                        all_elements.extend(elements);
+                    }
+                }
+                ColumnarValue::Scalar(scalar) => {
+                    let array = scalar.to_array_of_size(1)?;
+                    if !array.is_null(0) {
+                        let elements = 
self.extract_row_elements(array.as_ref(), 0)?;
+                        all_elements.extend(elements);
+                    }
+                }
+            }
+        }
+
+        if all_elements.is_empty() {
+            return plan_err!("No elements to concatenate");
+        }

Review Comment:
   So inputs of `concat([null], [null])` would return an error if I understand 
this correctly?



##########
datafusion/functions/src/string/concat.rs:
##########
@@ -65,14 +75,223 @@ impl Default for ConcatFunc {
 
 impl ConcatFunc {
     pub fn new() -> Self {
-        use DataType::*;
         Self {
-            signature: Signature::variadic(
-                vec![Utf8View, Utf8, LargeUtf8],
-                Volatility::Immutable,
-            ),
+            signature: Signature::user_defined(Volatility::Immutable),
         }
     }
+
+    fn concat_arrays(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        if args.is_empty() {
+            return plan_err!("concat requires at least one argument");
+        }
+
+        // Simple case: single row - use fast path
+        let num_rows = args
+            .iter()
+            .find_map(|arg| match arg {
+                ColumnarValue::Array(array) => Some(array.len()),
+                _ => None,
+            })
+            .unwrap_or(1);
+
+        if num_rows == 1 {
+            return self.concat_arrays_single_row(args);
+        }
+
+        // Multi-row case: process more carefully to avoid blocking
+        let arrays: Result<Vec<Arc<dyn Array>>> = args
+            .iter()
+            .map(|arg| match arg {
+                ColumnarValue::Array(array) => Ok(Arc::clone(array)),
+                ColumnarValue::Scalar(scalar) => 
scalar.to_array_of_size(num_rows),
+            })
+            .collect();
+
+        let arrays = arrays?;
+
+        // Build result using efficient batched operations
+        self.concat_arrays_multi_row(&arrays, num_rows)
+    }
+
+    /// Fast path for single-row array concatenation
+    fn concat_arrays_single_row(&self, args: &[ColumnarValue]) -> 
Result<ColumnarValue> {
+        let mut all_elements = Vec::new();
+
+        for arg in args {
+            match arg {
+                ColumnarValue::Array(array) => {
+                    if !array.is_null(0) {
+                        let elements = 
self.extract_row_elements(array.as_ref(), 0)?;
+                        all_elements.extend(elements);
+                    }
+                }
+                ColumnarValue::Scalar(scalar) => {
+                    let array = scalar.to_array_of_size(1)?;
+                    if !array.is_null(0) {
+                        let elements = 
self.extract_row_elements(array.as_ref(), 0)?;
+                        all_elements.extend(elements);
+                    }
+                }
+            }
+        }
+
+        if all_elements.is_empty() {
+            return plan_err!("No elements to concatenate");
+        }
+
+        let element_refs: Vec<&dyn Array> =
+            all_elements.iter().map(|a| a.as_ref()).collect();
+        let concatenated = compute::concat(&element_refs)?;
+
+        // Build single-element ListArray
+        let field = Arc::new(arrow::datatypes::Field::new_list_field(
+            concatenated.data_type().clone(),
+            true,
+        ));
+        let offsets = OffsetBuffer::from_lengths([concatenated.len()]);
+        let result = ListArray::new(field, offsets, concatenated, None);
+
+        Ok(ColumnarValue::Array(Arc::new(result)))
+    }
+
+    /// Extract elements from a specific row of an array, optimized for 
performance
+    fn extract_row_elements(
+        &self,
+        array: &dyn Array,
+        row_idx: usize,
+    ) -> Result<Vec<Arc<dyn Array>>> {
+        if array.is_null(row_idx) {
+            return Ok(Vec::new());
+        }
+
+        let list_value = match array.data_type() {
+            DataType::List(_) => {
+                let list_array =
+                    array.as_any().downcast_ref::<ListArray>().ok_or_else(|| {
+                        datafusion_common::DataFusionError::Plan(
+                            "Failed to downcast to ListArray".to_string(),
+                        )
+                    })?;
+                list_array.value(row_idx)
+            }
+            DataType::LargeList(_) => {
+                let list_array = array
+                    .as_any()
+                    .downcast_ref::<LargeListArray>()
+                    .ok_or_else(|| {
+                        datafusion_common::DataFusionError::Plan(
+                            "Failed to downcast to LargeListArray".to_string(),
+                        )
+                    })?;
+                list_array.value(row_idx)
+            }
+            DataType::FixedSizeList(_, _) => {
+                let list_array = array
+                    .as_any()
+                    .downcast_ref::<FixedSizeListArray>()
+                    .ok_or_else(|| {

Review Comment:
   I feel we should just unwrap here as we already guard via the match arm



##########
datafusion/functions/src/string/concat.rs:
##########
@@ -88,26 +307,82 @@ impl ScalarUDFImpl for ConcatFunc {
         &self.signature
     }
 
+    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
+        use DataType::*;
+
+        if arg_types.is_empty() {
+            return plan_err!("concat requires at least one argument");
+        }
+
+        // Arrays need no coercion
+        for dt in arg_types {
+            if matches!(dt, List(_) | LargeList(_) | FixedSizeList(_, _)) {
+                return Ok(arg_types.to_vec());
+            }
+        }
+
+        let coerced_types = arg_types
+            .iter()
+            .map(|data_type| match data_type {
+                Utf8View | Utf8 | LargeUtf8 => data_type.clone(),
+                _ => Utf8,
+            })
+            .collect();
+        Ok(coerced_types)
+    }
+
     fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
         use DataType::*;
-        let mut dt = &Utf8;
-        arg_types.iter().for_each(|data_type| {
-            if data_type == &Utf8View {
-                dt = data_type;
+
+        // Arrays return list type
+        for data_type in arg_types {
+            if let List(field) | LargeList(field) | FixedSizeList(field, _) = 
data_type {
+                return Ok(List(Arc::new(arrow::datatypes::Field::new(
+                    "item",
+                    field.data_type().clone(),
+                    true,
+                ))));
             }
-            if data_type == &LargeUtf8 && dt != &Utf8View {
+        }
+
+        let mut dt = &Utf8;
+        for data_type in arg_types.iter() {
+            if data_type == &Utf8View || (data_type == &LargeUtf8 && dt != 
&Utf8View) {
                 dt = data_type;
             }
-        });
-
-        Ok(dt.to_owned())
+        }
+        Ok(dt.clone())
     }
 
-    /// Concatenates the text representations of all the arguments. NULL 
arguments are ignored.
-    /// concat('abcde', 2, NULL, 22) = 'abcde222'
+    /// Concatenates strings or arrays

Review Comment:
   Why are we removing details from the existing comments?



##########
datafusion/functions/src/string/concat.rs:
##########
@@ -156,9 +435,7 @@ impl ScalarUDFImpl for ConcatFunc {
                 DataType::LargeUtf8 => {
                     
Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(result))))
                 }
-                other => {
-                    plan_err!("Concat function does not support datatype of 
{other}")
-                }
+                other => plan_err!("Unsupported datatype: {other}"),

Review Comment:
   We're losing existing details?



##########
datafusion/sqllogictest/test_files/information_schema.slt:
##########
@@ -770,8 +770,6 @@ datafusion public string_agg 1 OUT NULL String NULL false 1
 query TTTBI rowsort
 select specific_name, data_type, parameter_mode, is_variadic, rid from 
information_schema.parameters where specific_name = 'concat';
 ----

Review Comment:
   This test should be fixed so it has an expected result, not just an empty 
return



##########
datafusion/functions/src/string/concat.rs:
##########
@@ -211,13 +490,11 @@ impl ScalarUDFImpl for ConcatFunc {
                                 
ColumnarValueRef::NonNullableStringViewArray(string_array)
                             };
                             columns.push(column);
-                        },
-                        other => {
-                            return plan_err!("Input was {other} which is not a 
supported datatype for concat function")
                         }
+                        other => return plan_err!("Unsupported datatype: 
{other}"),

Review Comment:
   Again we're losing details?



##########
datafusion/functions/src/string/concat.rs:
##########
@@ -65,14 +75,223 @@ impl Default for ConcatFunc {
 
 impl ConcatFunc {
     pub fn new() -> Self {
-        use DataType::*;
         Self {
-            signature: Signature::variadic(
-                vec![Utf8View, Utf8, LargeUtf8],
-                Volatility::Immutable,
-            ),
+            signature: Signature::user_defined(Volatility::Immutable),
         }
     }
+
+    fn concat_arrays(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        if args.is_empty() {
+            return plan_err!("concat requires at least one argument");
+        }
+
+        // Simple case: single row - use fast path
+        let num_rows = args
+            .iter()
+            .find_map(|arg| match arg {
+                ColumnarValue::Array(array) => Some(array.len()),
+                _ => None,
+            })
+            .unwrap_or(1);
+
+        if num_rows == 1 {
+            return self.concat_arrays_single_row(args);
+        }
+
+        // Multi-row case: process more carefully to avoid blocking
+        let arrays: Result<Vec<Arc<dyn Array>>> = args
+            .iter()
+            .map(|arg| match arg {
+                ColumnarValue::Array(array) => Ok(Arc::clone(array)),
+                ColumnarValue::Scalar(scalar) => 
scalar.to_array_of_size(num_rows),
+            })
+            .collect();
+
+        let arrays = arrays?;
+
+        // Build result using efficient batched operations
+        self.concat_arrays_multi_row(&arrays, num_rows)
+    }
+
+    /// Fast path for single-row array concatenation
+    fn concat_arrays_single_row(&self, args: &[ColumnarValue]) -> 
Result<ColumnarValue> {
+        let mut all_elements = Vec::new();
+
+        for arg in args {
+            match arg {
+                ColumnarValue::Array(array) => {
+                    if !array.is_null(0) {
+                        let elements = 
self.extract_row_elements(array.as_ref(), 0)?;
+                        all_elements.extend(elements);
+                    }
+                }
+                ColumnarValue::Scalar(scalar) => {
+                    let array = scalar.to_array_of_size(1)?;
+                    if !array.is_null(0) {
+                        let elements = 
self.extract_row_elements(array.as_ref(), 0)?;
+                        all_elements.extend(elements);
+                    }
+                }
+            }
+        }
+
+        if all_elements.is_empty() {
+            return plan_err!("No elements to concatenate");
+        }
+
+        let element_refs: Vec<&dyn Array> =
+            all_elements.iter().map(|a| a.as_ref()).collect();
+        let concatenated = compute::concat(&element_refs)?;
+
+        // Build single-element ListArray
+        let field = Arc::new(arrow::datatypes::Field::new_list_field(
+            concatenated.data_type().clone(),
+            true,
+        ));
+        let offsets = OffsetBuffer::from_lengths([concatenated.len()]);
+        let result = ListArray::new(field, offsets, concatenated, None);
+
+        Ok(ColumnarValue::Array(Arc::new(result)))
+    }
+
+    /// Extract elements from a specific row of an array, optimized for 
performance
+    fn extract_row_elements(
+        &self,
+        array: &dyn Array,
+        row_idx: usize,
+    ) -> Result<Vec<Arc<dyn Array>>> {
+        if array.is_null(row_idx) {
+            return Ok(Vec::new());
+        }
+
+        let list_value = match array.data_type() {
+            DataType::List(_) => {
+                let list_array =
+                    array.as_any().downcast_ref::<ListArray>().ok_or_else(|| {
+                        datafusion_common::DataFusionError::Plan(
+                            "Failed to downcast to ListArray".to_string(),
+                        )
+                    })?;
+                list_array.value(row_idx)
+            }
+            DataType::LargeList(_) => {
+                let list_array = array
+                    .as_any()
+                    .downcast_ref::<LargeListArray>()
+                    .ok_or_else(|| {
+                        datafusion_common::DataFusionError::Plan(
+                            "Failed to downcast to LargeListArray".to_string(),
+                        )
+                    })?;
+                list_array.value(row_idx)
+            }
+            DataType::FixedSizeList(_, _) => {
+                let list_array = array
+                    .as_any()
+                    .downcast_ref::<FixedSizeListArray>()
+                    .ok_or_else(|| {
+                        datafusion_common::DataFusionError::Plan(
+                            "Failed to downcast to 
FixedSizeListArray".to_string(),
+                        )
+                    })?;
+                list_array.value(row_idx)
+            }
+            _ => return plan_err!("Expected array type, got {}", 
array.data_type()),
+        };
+
+        // Extract non-null elements efficiently
+        Ok((0..list_value.len())
+            .filter(|&i| !list_value.is_null(i))
+            .map(|i| list_value.slice(i, 1))
+            .collect())
+    }
+
+    /// Multi-row array concatenation with efficient batching
+    fn concat_arrays_multi_row(
+        &self,
+        arrays: &[Arc<dyn Array>],
+        num_rows: usize,
+    ) -> Result<ColumnarValue> {
+        let mut result_arrays = Vec::with_capacity(num_rows);
+
+        for row_idx in 0..num_rows {
+            let mut row_elements = Vec::new();
+
+            // Collect elements from this row across all arrays
+            for array in arrays {
+                let elements = self.extract_row_elements(array.as_ref(), 
row_idx)?;
+                row_elements.extend(elements);
+            }
+
+            if row_elements.is_empty() {
+                result_arrays.push(None);
+            } else {
+                let element_refs: Vec<&dyn Array> =
+                    row_elements.iter().map(|a| a.as_ref()).collect();
+                let concatenated = compute::concat(&element_refs)?;
+                result_arrays.push(Some(concatenated));
+            }
+        }
+
+        // Build the final result array
+        self.build_list_array_result(result_arrays, &arrays[0], num_rows)
+    }
+
+    /// Build a ListArray result from concatenated elements
+    fn build_list_array_result(
+        &self,
+        result_arrays: Vec<Option<Arc<dyn Array>>>,
+        sample_array: &dyn Array,
+        _num_rows: usize,

Review Comment:
   Why is this argument here if its unused?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to