This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 341db1d43 feat: rpad support column for second arg instead of just 
literal (#2099)
341db1d43 is described below

commit 341db1d4325f07124e466154fa1cb9c5a75c7aa6
Author: B Vadlamani <11091419+coderfen...@users.noreply.github.com>
AuthorDate: Tue Sep 16 10:46:39 2025 -0700

    feat: rpad support column for second arg instead of just literal (#2099)
---
 .../char_varchar_utils/read_side_padding.rs        | 141 +++++++++++++++------
 .../org/apache/comet/CometExpressionSuite.scala    |   7 +
 2 files changed, 111 insertions(+), 37 deletions(-)

diff --git 
a/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs 
b/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs
index 6e56d9d86..40735de7a 100644
--- 
a/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs
+++ 
b/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs
@@ -18,12 +18,11 @@
 use arrow::array::builder::GenericStringBuilder;
 use arrow::array::cast::as_dictionary_array;
 use arrow::array::types::Int32Type;
-use arrow::array::{make_array, Array, DictionaryArray};
+use arrow::array::{make_array, Array, AsArray, DictionaryArray};
 use arrow::array::{ArrayRef, OffsetSizeTrait};
 use arrow::datatypes::DataType;
 use datafusion::common::{cast::as_generic_string_array, DataFusionError, 
ScalarValue};
 use datafusion::physical_plan::ColumnarValue;
-use std::fmt::Write;
 use std::sync::Arc;
 
 /// Similar to DataFusion `rpad`, but not to truncate when the string is 
already longer than length
@@ -43,17 +42,31 @@ fn spark_read_side_padding2(
     match args {
         [ColumnarValue::Array(array), 
ColumnarValue::Scalar(ScalarValue::Int32(Some(length)))] => {
             match array.data_type() {
-                DataType::Utf8 => 
spark_read_side_padding_internal::<i32>(array, *length, truncate),
-                DataType::LargeUtf8 => {
-                    spark_read_side_padding_internal::<i64>(array, *length, 
truncate)
-                }
+                DataType::Utf8 => spark_read_side_padding_internal::<i32>(
+                    array,
+                    truncate,
+                    ColumnarValue::Scalar(ScalarValue::Int32(Some(*length))),
+                ),
+                DataType::LargeUtf8 => spark_read_side_padding_internal::<i64>(
+                    array,
+                    truncate,
+                    ColumnarValue::Scalar(ScalarValue::Int32(Some(*length))),
+                ),
                 // Dictionary support required for SPARK-48498
                 DataType::Dictionary(_, value_type) => {
                     let dict = as_dictionary_array::<Int32Type>(array);
                     let col = if value_type.as_ref() == &DataType::Utf8 {
-                        spark_read_side_padding_internal::<i32>(dict.values(), 
*length, truncate)?
+                        spark_read_side_padding_internal::<i32>(
+                            dict.values(),
+                            truncate,
+                            
ColumnarValue::Scalar(ScalarValue::Int32(Some(*length))),
+                        )?
                     } else {
-                        spark_read_side_padding_internal::<i64>(dict.values(), 
*length, truncate)?
+                        spark_read_side_padding_internal::<i64>(
+                            dict.values(),
+                            truncate,
+                            
ColumnarValue::Scalar(ScalarValue::Int32(Some(*length))),
+                        )?
                     };
                     // col consists of an array, so arg of to_array() is not 
used. Can be anything
                     let values = col.to_array(0)?;
@@ -65,6 +78,21 @@ fn spark_read_side_padding2(
                 ))),
             }
         }
+        [ColumnarValue::Array(array), ColumnarValue::Array(array_int)] => 
match array.data_type() {
+            DataType::Utf8 => spark_read_side_padding_internal::<i32>(
+                array,
+                truncate,
+                ColumnarValue::Array(Arc::<dyn Array>::clone(array_int)),
+            ),
+            DataType::LargeUtf8 => spark_read_side_padding_internal::<i64>(
+                array,
+                truncate,
+                ColumnarValue::Array(Arc::<dyn Array>::clone(array_int)),
+            ),
+            other => Err(DataFusionError::Internal(format!(
+                "Unsupported data type {other:?} for function 
rpad/read_side_padding",
+            ))),
+        },
         other => Err(DataFusionError::Internal(format!(
             "Unsupported arguments {other:?} for function 
rpad/read_side_padding",
         ))),
@@ -73,42 +101,81 @@ fn spark_read_side_padding2(
 
 fn spark_read_side_padding_internal<T: OffsetSizeTrait>(
     array: &ArrayRef,
-    length: i32,
     truncate: bool,
+    pad_type: ColumnarValue,
 ) -> Result<ColumnarValue, DataFusionError> {
     let string_array = as_generic_string_array::<T>(array)?;
-    let length = 0.max(length) as usize;
-    let space_string = " ".repeat(length);
+    match pad_type {
+        ColumnarValue::Array(array_int) => {
+            let int_pad_array = array_int.as_primitive::<Int32Type>();
 
-    let mut builder =
-        GenericStringBuilder::<T>::with_capacity(string_array.len(), 
string_array.len() * length);
+            let mut builder = GenericStringBuilder::<T>::with_capacity(
+                string_array.len(),
+                string_array.len() * int_pad_array.len(),
+            );
 
-    for string in string_array.iter() {
-        match string {
-            Some(string) => {
-                // It looks Spark's UTF8String is closer to chars rather than 
graphemes
-                // https://stackoverflow.com/a/46290728
-                let char_len = string.chars().count();
-                if length <= char_len {
-                    if truncate {
-                        let idx = string
-                            .char_indices()
-                            .nth(length)
-                            .map(|(i, _)| i)
-                            .unwrap_or(string.len());
-                        builder.append_value(&string[..idx]);
-                    } else {
-                        builder.append_value(string);
-                    }
-                } else {
-                    // write_str updates only the value buffer, not null nor 
offset buffer
-                    // This is convenient for concatenating str(s)
-                    builder.write_str(string)?;
-                    builder.append_value(&space_string[char_len..]);
+            for (string, length) in string_array.iter().zip(int_pad_array) {
+                match string {
+                    Some(string) => builder.append_value(add_padding_string(
+                        string.parse().unwrap(),
+                        length.unwrap() as usize,
+                        truncate,
+                    )?),
+                    _ => builder.append_null(),
+                }
+            }
+            Ok(ColumnarValue::Array(Arc::new(builder.finish())))
+        }
+        ColumnarValue::Scalar(const_pad_length) => {
+            let length = 0.max(i32::try_from(const_pad_length)?) as usize;
+
+            let mut builder = GenericStringBuilder::<T>::with_capacity(
+                string_array.len(),
+                string_array.len() * length,
+            );
+
+            for string in string_array.iter() {
+                match string {
+                    Some(string) => builder.append_value(add_padding_string(
+                        string.parse().unwrap(),
+                        length,
+                        truncate,
+                    )?),
+                    _ => builder.append_null(),
                 }
             }
-            _ => builder.append_null(),
+            Ok(ColumnarValue::Array(Arc::new(builder.finish())))
+        }
+    }
+}
+
+fn add_padding_string(
+    string: String,
+    length: usize,
+    truncate: bool,
+) -> Result<String, DataFusionError> {
+    // It looks Spark's UTF8String is closer to chars rather than graphemes
+    // https://stackoverflow.com/a/46290728
+    let space_string = " ".repeat(length);
+    let char_len = string.chars().count();
+    if length <= char_len {
+        if truncate {
+            let idx = string
+                .char_indices()
+                .nth(length)
+                .map(|(i, _)| i)
+                .unwrap_or(string.len());
+            match string[..idx].parse() {
+                Ok(string) => Ok(string),
+                Err(err) => Err(DataFusionError::Internal(format!(
+                    "Failed adding padding string {} error {:}",
+                    string, err
+                ))),
+            }
+        } else {
+            Ok(string)
         }
+    } else {
+        Ok(string + &space_string[char_len..])
     }
-    Ok(ColumnarValue::Array(Arc::new(builder.finish())))
 }
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 0e1d4fc24..6d7a5de7d 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -407,6 +407,13 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
       }
     }
   }
+  test("Verify rpad expr support for second arg instead of just literal") {
+    val data = Seq(("IfIWasARoadIWouldBeBent", 10), ("తెలుగు", 2))
+    withParquetTable(data, "t1") {
+      val res = sql("select rpad(_1,_2) , rpad(_1,2) from t1 order by _1")
+      checkSparkAnswerAndOperator(res)
+    }
+  }
 
   test("dictionary arithmetic") {
     // TODO: test ANSI mode


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to