neilconway commented on code in PR #20657:
URL: https://github.com/apache/datafusion/pull/20657#discussion_r2961253505
##########
datafusion/functions/src/unicode/rpad.rs:
##########
@@ -111,241 +113,399 @@ impl ScalarUDFImpl for RPadFunc {
utf8_to_str_type(&arg_types[0], "rpad")
}
- fn invoke_with_args(
- &self,
- args: datafusion_expr::ScalarFunctionArgs,
- ) -> Result<ColumnarValue> {
- let args = &args.args;
- match (
- args.len(),
- args[0].data_type(),
- args.get(2).map(|arg| arg.data_type()),
- ) {
- (2, Utf8 | Utf8View, _) => {
- make_scalar_function(rpad::<i32, i32>, vec![])(args)
- }
- (2, LargeUtf8, _) => make_scalar_function(rpad::<i64, i64>,
vec![])(args),
- (3, Utf8 | Utf8View, Some(Utf8 | Utf8View)) => {
- make_scalar_function(rpad::<i32, i32>, vec![])(args)
- }
- (3, LargeUtf8, Some(LargeUtf8)) => {
- make_scalar_function(rpad::<i64, i64>, vec![])(args)
- }
- (3, Utf8 | Utf8View, Some(LargeUtf8)) => {
- make_scalar_function(rpad::<i32, i64>, vec![])(args)
- }
- (3, LargeUtf8, Some(Utf8 | Utf8View)) => {
- make_scalar_function(rpad::<i64, i32>, vec![])(args)
- }
- (_, _, _) => {
- exec_err!("Unsupported combination of data types for function
rpad")
+ fn invoke_with_args(&self, args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
+ let ScalarFunctionArgs {
+ args, number_rows, ..
+ } = args;
+
+ const MAX_SCALAR_TARGET_LEN: usize = 16384;
+
+ // If target_len and fill (if specified) are constants, use the
+ // scalar fast path.
+ if let Some(target_len) = try_as_scalar_i64(&args[1]) {
+ let target_len: usize = match usize::try_from(target_len) {
+ Ok(n) if n <= i32::MAX as usize => n,
+ Ok(n) => {
+ return exec_err!("rpad requested length {n} too large");
+ }
+ Err(_) => 0, // negative → 0
+ };
+
+ let fill_str = if args.len() == 3 {
+ try_as_scalar_str(&args[2])
+ } else {
+ Some(" ")
+ };
+
+ // Skip the fast path for very large `target_len` values to avoid
+ // consuming too much memory. Such large padding values are
uncommon
+ // in practice.
+ if target_len <= MAX_SCALAR_TARGET_LEN
+ && let Some(fill) = fill_str
+ {
+ let string_array = args[0].to_array_of_size(number_rows)?;
+ let result = match string_array.data_type() {
+ Utf8View => rpad_scalar_args::<_, i32>(
+ string_array.as_string_view(),
+ target_len,
+ fill,
+ ),
+ Utf8 => rpad_scalar_args::<_, i32>(
+ string_array.as_string::<i32>(),
+ target_len,
+ fill,
+ ),
+ LargeUtf8 => rpad_scalar_args::<_, i64>(
+ string_array.as_string::<i64>(),
+ target_len,
+ fill,
+ ),
+ other => {
+ exec_err!("Unsupported data type {other:?} for
function rpad")
+ }
+ }?;
+ return Ok(ColumnarValue::Array(result));
}
}
+
+ match args[0].data_type() {
+ Utf8 | Utf8View => make_scalar_function(rpad::<i32>,
vec![])(&args),
+ LargeUtf8 => make_scalar_function(rpad::<i64>, vec![])(&args),
+ other => exec_err!("Unsupported data type {other:?} for function
rpad"),
+ }
}
fn documentation(&self) -> Option<&Documentation> {
self.doc()
}
}
-fn rpad<StringArrayLen: OffsetSizeTrait, FillArrayLen: OffsetSizeTrait>(
- args: &[ArrayRef],
+use super::common::{try_as_scalar_i64, try_as_scalar_str};
+
+/// Optimized rpad for constant target_len and fill arguments.
+fn rpad_scalar_args<'a, V: StringArrayType<'a> + Copy, T: OffsetSizeTrait>(
+ string_array: V,
+ target_len: usize,
+ fill: &str,
+) -> Result<ArrayRef> {
+ if string_array.is_ascii() && fill.is_ascii() {
+ rpad_scalar_ascii::<V, T>(string_array, target_len, fill)
+ } else {
+ rpad_scalar_unicode::<V, T>(string_array, target_len, fill)
+ }
+}
+
+fn rpad_scalar_ascii<'a, V: StringArrayType<'a> + Copy, T: OffsetSizeTrait>(
+ string_array: V,
+ target_len: usize,
+ fill: &str,
+) -> Result<ArrayRef> {
+ // With a scalar `target_len` and `fill`, we can precompute a padding
+ // buffer of `target_len` fill characters repeated cyclically.
+ let padding_buf = if !fill.is_empty() {
+ let mut buf = String::with_capacity(target_len);
+ while buf.len() < target_len {
+ let remaining = target_len - buf.len();
+ if remaining >= fill.len() {
+ buf.push_str(fill);
+ } else {
+ buf.push_str(&fill[..remaining]);
+ }
+ }
+ buf
+ } else {
+ String::new()
+ };
+
+ // Each output row is exactly `target_len` ASCII bytes (string + padding).
+ let data_capacity = string_array.len().saturating_mul(target_len);
+ let mut builder =
+ GenericStringBuilder::<T>::with_capacity(string_array.len(),
data_capacity);
+
+ for maybe_string in string_array.iter() {
+ match maybe_string {
+ Some(string) => {
+ let str_len = string.len();
+ if target_len <= str_len {
+ builder.append_value(&string[..target_len]);
+ } else if fill.is_empty() {
+ builder.append_value(string);
+ } else {
+ let pad_needed = target_len - str_len;
+ builder.write_str(string)?;
+ builder.write_str(&padding_buf[..pad_needed])?;
+ builder.append_value("");
+ }
+ }
+ None => builder.append_null(),
+ }
+ }
+
+ Ok(Arc::new(builder.finish()) as ArrayRef)
+}
+
+fn rpad_scalar_unicode<'a, V: StringArrayType<'a> + Copy, T: OffsetSizeTrait>(
+ string_array: V,
+ target_len: usize,
+ fill: &str,
) -> Result<ArrayRef> {
- if args.len() < 2 || args.len() > 3 {
+ let fill_chars: Vec<char> = fill.chars().collect();
+
+ // With a scalar `target_len` and `fill`, we can precompute a padding
buffer
+ // of `target_len` fill characters repeated cyclically. Because Unicode
+ // characters are variable-width, we build a byte-offset table to map from
+ // character count to the corresponding byte position in the padding
buffer.
+ let (padding_buf, char_byte_offsets) = if !fill_chars.is_empty() {
+ let mut buf = String::new();
+ let mut offsets = Vec::with_capacity(target_len + 1);
+ offsets.push(0usize);
+ for i in 0..target_len {
+ buf.push(fill_chars[i % fill_chars.len()]);
+ offsets.push(buf.len());
+ }
+ (buf, offsets)
+ } else {
+ (String::new(), vec![0])
+ };
+
+ // Each output row is `target_len` chars; multiply by 4 (max UTF-8 bytes
+ // per char) for an upper bound in bytes.
+ let data_capacity = string_array.len().saturating_mul(target_len * 4);
+ let mut builder =
+ GenericStringBuilder::<T>::with_capacity(string_array.len(),
data_capacity);
+ let mut graphemes_buf = Vec::new();
+
+ for maybe_string in string_array.iter() {
+ match maybe_string {
+ Some(string) => {
+ graphemes_buf.clear();
+ graphemes_buf.extend(string.graphemes(true));
+
+ if target_len < graphemes_buf.len() {
+ builder.append_value(graphemes_buf[..target_len].concat());
+ } else if fill_chars.is_empty() {
+ builder.append_value(string);
+ } else {
+ let pad_chars = target_len - graphemes_buf.len();
+ let pad_bytes = char_byte_offsets[pad_chars];
+ builder.write_str(string)?;
+ builder.write_str(&padding_buf[..pad_bytes])?;
+ builder.append_value("");
+ }
+ }
+ None => builder.append_null(),
+ }
+ }
+
+ Ok(Arc::new(builder.finish()) as ArrayRef)
+}
+
+fn rpad<T: OffsetSizeTrait>(args: &[ArrayRef]) -> Result<ArrayRef> {
+ if args.len() <= 1 || args.len() > 3 {
return exec_err!(
- "rpad was called with {} arguments. It requires 2 or 3 arguments.",
+ "rpad was called with {} arguments. It requires at least 2 and at
most 3.",
args.len()
);
}
let length_array = as_int64_array(&args[1])?;
- match (
- args.len(),
- args[0].data_type(),
- args.get(2).map(|arg| arg.data_type()),
- ) {
- (2, Utf8View, _) => {
- rpad_impl::<&StringViewArray, &StringViewArray, StringArrayLen>(
- &args[0].as_string_view(),
- length_array,
- None,
- )
- }
- (3, Utf8View, Some(Utf8View)) => {
- rpad_impl::<&StringViewArray, &StringViewArray, StringArrayLen>(
- &args[0].as_string_view(),
- length_array,
- Some(args[2].as_string_view()),
- )
- }
- (3, Utf8View, Some(Utf8 | LargeUtf8)) => {
- rpad_impl::<&StringViewArray, &GenericStringArray<FillArrayLen>,
StringArrayLen>(
- &args[0].as_string_view(),
- length_array,
- Some(args[2].as_string::<FillArrayLen>()),
- )
- }
- (3, Utf8 | LargeUtf8, Some(Utf8View)) => rpad_impl::<
- &GenericStringArray<StringArrayLen>,
- &StringViewArray,
- StringArrayLen,
- >(
- &args[0].as_string::<StringArrayLen>(),
+
+ match (args.len(), args[0].data_type()) {
+ (2, Utf8View) => rpad_impl::<&StringViewArray,
&GenericStringArray<i32>, T>(
+ &args[0].as_string_view(),
+ length_array,
+ None,
+ ),
+ (2, Utf8 | LargeUtf8) => rpad_impl::<
+ &GenericStringArray<T>,
+ &GenericStringArray<T>,
+ T,
+ >(&args[0].as_string::<T>(), length_array, None),
+ (3, Utf8View) => rpad_with_replace::<&StringViewArray, T>(
+ &args[0].as_string_view(),
length_array,
- Some(args[2].as_string_view()),
+ &args[2],
),
- (_, _, _) => rpad_impl::<
- &GenericStringArray<StringArrayLen>,
- &GenericStringArray<FillArrayLen>,
- StringArrayLen,
- >(
- &args[0].as_string::<StringArrayLen>(),
+ (3, Utf8 | LargeUtf8) => rpad_with_replace::<&GenericStringArray<T>,
T>(
+ &args[0].as_string::<T>(),
length_array,
- args.get(2).map(|arg| arg.as_string::<FillArrayLen>()),
+ &args[2],
),
+ (_, _) => unreachable!("rpad"),
Review Comment:
Yep, done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]