This is an automated email from the ASF dual-hosted git repository. mbutrovich pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 341db1d43 feat: rpad support column for second arg instead of just literal (#2099) 341db1d43 is described below commit 341db1d4325f07124e466154fa1cb9c5a75c7aa6 Author: B Vadlamani <11091419+coderfen...@users.noreply.github.com> AuthorDate: Tue Sep 16 10:46:39 2025 -0700 feat: rpad support column for second arg instead of just literal (#2099) --- .../char_varchar_utils/read_side_padding.rs | 141 +++++++++++++++------ .../org/apache/comet/CometExpressionSuite.scala | 7 + 2 files changed, 111 insertions(+), 37 deletions(-) diff --git a/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs b/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs index 6e56d9d86..40735de7a 100644 --- a/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs +++ b/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs @@ -18,12 +18,11 @@ use arrow::array::builder::GenericStringBuilder; use arrow::array::cast::as_dictionary_array; use arrow::array::types::Int32Type; -use arrow::array::{make_array, Array, DictionaryArray}; +use arrow::array::{make_array, Array, AsArray, DictionaryArray}; use arrow::array::{ArrayRef, OffsetSizeTrait}; use arrow::datatypes::DataType; use datafusion::common::{cast::as_generic_string_array, DataFusionError, ScalarValue}; use datafusion::physical_plan::ColumnarValue; -use std::fmt::Write; use std::sync::Arc; /// Similar to DataFusion `rpad`, but not to truncate when the string is already longer than length @@ -43,17 +42,31 @@ fn spark_read_side_padding2( match args { [ColumnarValue::Array(array), ColumnarValue::Scalar(ScalarValue::Int32(Some(length)))] => { match array.data_type() { - DataType::Utf8 => spark_read_side_padding_internal::<i32>(array, *length, truncate), - DataType::LargeUtf8 => { - spark_read_side_padding_internal::<i64>(array, *length, truncate) - } + DataType::Utf8 => spark_read_side_padding_internal::<i32>( + array, + truncate, + ColumnarValue::Scalar(ScalarValue::Int32(Some(*length))), + ), + DataType::LargeUtf8 => spark_read_side_padding_internal::<i64>( + array, + truncate, + ColumnarValue::Scalar(ScalarValue::Int32(Some(*length))), + ), // Dictionary support required for SPARK-48498 DataType::Dictionary(_, value_type) => { let dict = as_dictionary_array::<Int32Type>(array); let col = if value_type.as_ref() == &DataType::Utf8 { - spark_read_side_padding_internal::<i32>(dict.values(), *length, truncate)? + spark_read_side_padding_internal::<i32>( + dict.values(), + truncate, + ColumnarValue::Scalar(ScalarValue::Int32(Some(*length))), + )? } else { - spark_read_side_padding_internal::<i64>(dict.values(), *length, truncate)? + spark_read_side_padding_internal::<i64>( + dict.values(), + truncate, + ColumnarValue::Scalar(ScalarValue::Int32(Some(*length))), + )? }; // col consists of an array, so arg of to_array() is not used. Can be anything let values = col.to_array(0)?; @@ -65,6 +78,21 @@ fn spark_read_side_padding2( ))), } } + [ColumnarValue::Array(array), ColumnarValue::Array(array_int)] => match array.data_type() { + DataType::Utf8 => spark_read_side_padding_internal::<i32>( + array, + truncate, + ColumnarValue::Array(Arc::<dyn Array>::clone(array_int)), + ), + DataType::LargeUtf8 => spark_read_side_padding_internal::<i64>( + array, + truncate, + ColumnarValue::Array(Arc::<dyn Array>::clone(array_int)), + ), + other => Err(DataFusionError::Internal(format!( + "Unsupported data type {other:?} for function rpad/read_side_padding", + ))), + }, other => Err(DataFusionError::Internal(format!( "Unsupported arguments {other:?} for function rpad/read_side_padding", ))), @@ -73,42 +101,81 @@ fn spark_read_side_padding2( fn spark_read_side_padding_internal<T: OffsetSizeTrait>( array: &ArrayRef, - length: i32, truncate: bool, + pad_type: ColumnarValue, ) -> Result<ColumnarValue, DataFusionError> { let string_array = as_generic_string_array::<T>(array)?; - let length = 0.max(length) as usize; - let space_string = " ".repeat(length); + match pad_type { + ColumnarValue::Array(array_int) => { + let int_pad_array = array_int.as_primitive::<Int32Type>(); - let mut builder = - GenericStringBuilder::<T>::with_capacity(string_array.len(), string_array.len() * length); + let mut builder = GenericStringBuilder::<T>::with_capacity( + string_array.len(), + string_array.len() * int_pad_array.len(), + ); - for string in string_array.iter() { - match string { - Some(string) => { - // It looks Spark's UTF8String is closer to chars rather than graphemes - // https://stackoverflow.com/a/46290728 - let char_len = string.chars().count(); - if length <= char_len { - if truncate { - let idx = string - .char_indices() - .nth(length) - .map(|(i, _)| i) - .unwrap_or(string.len()); - builder.append_value(&string[..idx]); - } else { - builder.append_value(string); - } - } else { - // write_str updates only the value buffer, not null nor offset buffer - // This is convenient for concatenating str(s) - builder.write_str(string)?; - builder.append_value(&space_string[char_len..]); + for (string, length) in string_array.iter().zip(int_pad_array) { + match string { + Some(string) => builder.append_value(add_padding_string( + string.parse().unwrap(), + length.unwrap() as usize, + truncate, + )?), + _ => builder.append_null(), + } + } + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) + } + ColumnarValue::Scalar(const_pad_length) => { + let length = 0.max(i32::try_from(const_pad_length)?) as usize; + + let mut builder = GenericStringBuilder::<T>::with_capacity( + string_array.len(), + string_array.len() * length, + ); + + for string in string_array.iter() { + match string { + Some(string) => builder.append_value(add_padding_string( + string.parse().unwrap(), + length, + truncate, + )?), + _ => builder.append_null(), } } - _ => builder.append_null(), + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) + } + } +} + +fn add_padding_string( + string: String, + length: usize, + truncate: bool, +) -> Result<String, DataFusionError> { + // It looks Spark's UTF8String is closer to chars rather than graphemes + // https://stackoverflow.com/a/46290728 + let space_string = " ".repeat(length); + let char_len = string.chars().count(); + if length <= char_len { + if truncate { + let idx = string + .char_indices() + .nth(length) + .map(|(i, _)| i) + .unwrap_or(string.len()); + match string[..idx].parse() { + Ok(string) => Ok(string), + Err(err) => Err(DataFusionError::Internal(format!( + "Failed adding padding string {} error {:}", + string, err + ))), + } + } else { + Ok(string) } + } else { + Ok(string + &space_string[char_len..]) } - Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 0e1d4fc24..6d7a5de7d 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -407,6 +407,13 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } } + test("Verify rpad expr support for second arg instead of just literal") { + val data = Seq(("IfIWasARoadIWouldBeBent", 10), ("తెలుగు", 2)) + withParquetTable(data, "t1") { + val res = sql("select rpad(_1,_2) , rpad(_1,2) from t1 order by _1") + checkSparkAnswerAndOperator(res) + } + } test("dictionary arithmetic") { // TODO: test ANSI mode --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org