This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7e049749eb feat: Implement Spark function `space` (#19610)
7e049749eb is described below
commit 7e049749eb52fd838dda698762cea4c77af6efe8
Author: Kazantsev Maksim <[email protected]>
AuthorDate: Sun Jan 4 13:34:57 2026 -0800
feat: Implement Spark function `space` (#19610)
## Which issue does this PR close?
- N/A
## Rationale for this change
Add new function:
https://spark.apache.org/docs/latest/api/sql/index.html#space
## What changes are included in this PR?
- Implementation
- Unit Tests
- SLT tests
## Are these changes tested?
Yes, tests added as part of this PR.
## Are there any user-facing changes?
No, these are new function.
---------
Co-authored-by: Kazantsev Maksim <[email protected]>
---
datafusion/spark/Cargo.toml | 4 +
datafusion/spark/benches/space.rs | 73 +++++++
datafusion/spark/src/function/string/mod.rs | 4 +
datafusion/spark/src/function/string/space.rs | 232 +++++++++++++++++++++
.../sqllogictest/test_files/spark/string/space.slt | 41 ++++
5 files changed, 354 insertions(+)
diff --git a/datafusion/spark/Cargo.toml b/datafusion/spark/Cargo.toml
index 09959db41f..673b62c5c3 100644
--- a/datafusion/spark/Cargo.toml
+++ b/datafusion/spark/Cargo.toml
@@ -61,3 +61,7 @@ criterion = { workspace = true }
[[bench]]
harness = false
name = "char"
+
+[[bench]]
+harness = false
+name = "space"
diff --git a/datafusion/spark/benches/space.rs
b/datafusion/spark/benches/space.rs
new file mode 100644
index 0000000000..8ace7219a1
--- /dev/null
+++ b/datafusion/spark/benches/space.rs
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+extern crate criterion;
+
+use arrow::array::PrimitiveArray;
+use arrow::datatypes::{DataType, Field, Int32Type};
+use criterion::{Criterion, criterion_group, criterion_main};
+use datafusion_common::config::ConfigOptions;
+use datafusion_expr::{ColumnarValue, ScalarFunctionArgs};
+use datafusion_spark::function::string::space;
+use rand::prelude::StdRng;
+use rand::{Rng, SeedableRng};
+use std::hint::black_box;
+use std::sync::Arc;
+
+fn criterion_benchmark(c: &mut Criterion) {
+ let space_func = space();
+ let size = 1024;
+ let input: PrimitiveArray<Int32Type> = {
+ let null_density = 0.2;
+ let mut rng = StdRng::seed_from_u64(42);
+ (0..size)
+ .map(|_| {
+ if rng.random::<f32>() < null_density {
+ None
+ } else {
+ Some(rng.random_range::<i32, _>(1i32..10))
+ }
+ })
+ .collect()
+ };
+ let input = Arc::new(input);
+ let args = vec![ColumnarValue::Array(input)];
+ let arg_fields = args
+ .iter()
+ .enumerate()
+ .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(),
true).into())
+ .collect::<Vec<_>>();
+ let config_options = Arc::new(ConfigOptions::default());
+ c.bench_function("space", |b| {
+ b.iter(|| {
+ black_box(
+ space_func
+ .invoke_with_args(ScalarFunctionArgs {
+ args: args.clone(),
+ arg_fields: arg_fields.clone(),
+ number_rows: size,
+ return_field: Arc::new(Field::new("f", DataType::Utf8,
true)),
+ config_options: Arc::clone(&config_options),
+ })
+ .unwrap(),
+ )
+ })
+ });
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/datafusion/spark/src/function/string/mod.rs
b/datafusion/spark/src/function/string/mod.rs
index 480984f021..369d381a9c 100644
--- a/datafusion/spark/src/function/string/mod.rs
+++ b/datafusion/spark/src/function/string/mod.rs
@@ -24,6 +24,7 @@ pub mod ilike;
pub mod length;
pub mod like;
pub mod luhn_check;
+pub mod space;
use datafusion_expr::ScalarUDF;
use datafusion_functions::make_udf_function;
@@ -38,6 +39,7 @@ make_udf_function!(elt::SparkElt, elt);
make_udf_function!(like::SparkLike, like);
make_udf_function!(luhn_check::SparkLuhnCheck, luhn_check);
make_udf_function!(format_string::FormatStringFunc, format_string);
+make_udf_function!(space::SparkSpace, space);
pub mod expr_fn {
use datafusion_functions::export_functions;
@@ -87,6 +89,7 @@ pub mod expr_fn {
"Returns a formatted string from printf-style format strings.",
strfmt args
));
+ export_functions!((space, "Returns a string consisting of n spaces.",
arg1));
}
pub fn functions() -> Vec<Arc<ScalarUDF>> {
@@ -100,5 +103,6 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
like(),
luhn_check(),
format_string(),
+ space(),
]
}
diff --git a/datafusion/spark/src/function/string/space.rs
b/datafusion/spark/src/function/string/space.rs
new file mode 100644
index 0000000000..77daff28ff
--- /dev/null
+++ b/datafusion/spark/src/function/string/space.rs
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+ Array, ArrayRef, DictionaryArray, Int32Array, StringArray, StringBuilder,
+ as_dictionary_array,
+};
+use arrow::datatypes::{DataType, Int32Type};
+use datafusion_common::cast::as_int32_array;
+use datafusion_common::{Result, ScalarValue, exec_err};
+use datafusion_expr::{
+ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
+};
+use std::any::Any;
+use std::sync::Arc;
+
+/// Spark-compatible `space` expression
+/// <https://spark.apache.org/docs/latest/api/sql/index.html#space>
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct SparkSpace {
+ signature: Signature,
+}
+
+impl Default for SparkSpace {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl SparkSpace {
+ pub fn new() -> Self {
+ Self {
+ signature: Signature::uniform(
+ 1,
+ vec![
+ DataType::Int32,
+ DataType::Dictionary(
+ Box::new(DataType::Int32),
+ Box::new(DataType::Int32),
+ ),
+ ],
+ Volatility::Immutable,
+ ),
+ }
+ }
+}
+
+impl ScalarUDFImpl for SparkSpace {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "space"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, args: &[DataType]) -> Result<DataType> {
+ let return_type = match &args[0] {
+ DataType::Dictionary(key_type, _) => {
+ DataType::Dictionary(key_type.clone(),
Box::new(DataType::Utf8))
+ }
+ _ => DataType::Utf8,
+ };
+ Ok(return_type)
+ }
+
+ fn invoke_with_args(&self, args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
+ spark_space(&args.args)
+ }
+}
+
+pub fn spark_space(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ if args.len() != 1 {
+ return exec_err!("space function takes exactly one argument");
+ }
+ match &args[0] {
+ ColumnarValue::Array(array) => {
+ let result = spark_space_array(array)?;
+ Ok(ColumnarValue::Array(result))
+ }
+ ColumnarValue::Scalar(scalar) => {
+ let result = spark_space_scalar(scalar)?;
+ Ok(ColumnarValue::Scalar(result))
+ }
+ }
+}
+
+fn spark_space_array(array: &ArrayRef) -> Result<ArrayRef> {
+ match array.data_type() {
+ DataType::Int32 => {
+ let array = as_int32_array(array)?;
+ Ok(Arc::new(spark_space_array_inner(array)))
+ }
+ DataType::Dictionary(_, _) => {
+ let dict = as_dictionary_array::<Int32Type>(array);
+ let values = spark_space_array(dict.values())?;
+ let result = DictionaryArray::try_new(dict.keys().clone(),
values)?;
+ Ok(Arc::new(result))
+ }
+ other => {
+ exec_err!("Unsupported data type {other:?} for function `space`")
+ }
+ }
+}
+
+fn spark_space_scalar(scalar: &ScalarValue) -> Result<ScalarValue> {
+ match scalar {
+ ScalarValue::Int32(value) => {
+ let result = value.map(|v| {
+ if v <= 0 {
+ String::new()
+ } else {
+ " ".repeat(v as usize)
+ }
+ });
+ Ok(ScalarValue::Utf8(result))
+ }
+ other => {
+ exec_err!("Unsupported data type {other:?} for function `space`")
+ }
+ }
+}
+
+fn spark_space_array_inner(array: &Int32Array) -> StringArray {
+ let mut builder = StringBuilder::with_capacity(array.len(), array.len() *
16);
+ let mut space_buf = String::new();
+ for value in array.iter() {
+ match value {
+ None => builder.append_null(),
+ Some(l) if l > 0 => {
+ let l = l as usize;
+ if space_buf.len() < l {
+ space_buf = " ".repeat(l);
+ }
+ builder.append_value(&space_buf[..l]);
+ }
+ Some(_) => builder.append_value(""),
+ }
+ }
+ builder.finish()
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::function::string::space::spark_space;
+ use arrow::array::{Array, Int32Array, Int32DictionaryArray};
+ use arrow::datatypes::Int32Type;
+ use datafusion_common::cast::{as_dictionary_array, as_string_array};
+ use datafusion_common::{Result, ScalarValue};
+ use datafusion_expr::ColumnarValue;
+ use std::sync::Arc;
+
+ #[test]
+ fn test_spark_space_int32_array() -> Result<()> {
+ let int32_array = ColumnarValue::Array(Arc::new(Int32Array::from(vec![
+ Some(1),
+ Some(-3),
+ Some(0),
+ Some(5),
+ None,
+ ])));
+ let ColumnarValue::Array(result) = spark_space(&[int32_array])? else {
+ unreachable!()
+ };
+ let result = as_string_array(&result)?;
+
+ assert_eq!(result.value(0), " ");
+ assert_eq!(result.value(1), "");
+ assert_eq!(result.value(2), "");
+ assert_eq!(result.value(3), " ");
+ assert!(result.is_null(4));
+ Ok(())
+ }
+
+ #[test]
+ fn test_spark_space_dictionary() -> Result<()> {
+ let dictionary =
ColumnarValue::Array(Arc::new(Int32DictionaryArray::new(
+ Int32Array::from(vec![0, 1, 2, 3, 4]),
+ Arc::new(Int32Array::from(vec![
+ Some(1),
+ Some(-3),
+ Some(0),
+ Some(5),
+ None,
+ ])),
+ )));
+ let ColumnarValue::Array(result) = spark_space(&[dictionary])? else {
+ unreachable!()
+ };
+ let result =
+
as_string_array(as_dictionary_array::<Int32Type>(&result)?.values())?;
+ assert_eq!(result.value(0), " ");
+ assert_eq!(result.value(1), "");
+ assert_eq!(result.value(2), "");
+ assert_eq!(result.value(3), " ");
+ assert!(result.is_null(4));
+ Ok(())
+ }
+
+ #[test]
+ fn test_spark_space_scalar() -> Result<()> {
+ let scalar = ColumnarValue::Scalar(ScalarValue::Int32(Some(-5)));
+ let ColumnarValue::Scalar(result) = spark_space(&[scalar])? else {
+ unreachable!()
+ };
+ match result {
+ ScalarValue::Utf8(Some(result)) => {
+ assert_eq!(result, "");
+ }
+ _ => unreachable!(),
+ }
+ Ok(())
+ }
+}
diff --git a/datafusion/sqllogictest/test_files/spark/string/space.slt
b/datafusion/sqllogictest/test_files/spark/string/space.slt
new file mode 100644
index 0000000000..388f679c4d
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/spark/string/space.slt
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+query T
+SELECT concat(space(1::INT), 'Spark');
+----
+ Spark
+
+query T
+SELECT concat(space(5::INT), 'Spark');
+----
+ Spark
+
+query T
+SELECT space(0::INT);
+----
+(empty)
+
+query T
+SELECT space(-1::INT);
+----
+(empty)
+
+query T
+SELECT space(NULL);
+----
+NULL
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]