This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7e049749eb feat: Implement Spark function `space` (#19610)
7e049749eb is described below

commit 7e049749eb52fd838dda698762cea4c77af6efe8
Author: Kazantsev Maksim <[email protected]>
AuthorDate: Sun Jan 4 13:34:57 2026 -0800

    feat: Implement Spark function `space` (#19610)
    
    ## Which issue does this PR close?
    
    - N/A
    
    ## Rationale for this change
    
    Add new function:
    https://spark.apache.org/docs/latest/api/sql/index.html#space
    
    ## What changes are included in this PR?
    
    - Implementation
    - Unit Tests
    - SLT tests
    
    ## Are these changes tested?
    
    Yes, tests added as part of this PR.
    
    ## Are there any user-facing changes?
    
    No, these are new function.
    
    ---------
    
    Co-authored-by: Kazantsev Maksim <[email protected]>
---
 datafusion/spark/Cargo.toml                        |   4 +
 datafusion/spark/benches/space.rs                  |  73 +++++++
 datafusion/spark/src/function/string/mod.rs        |   4 +
 datafusion/spark/src/function/string/space.rs      | 232 +++++++++++++++++++++
 .../sqllogictest/test_files/spark/string/space.slt |  41 ++++
 5 files changed, 354 insertions(+)

diff --git a/datafusion/spark/Cargo.toml b/datafusion/spark/Cargo.toml
index 09959db41f..673b62c5c3 100644
--- a/datafusion/spark/Cargo.toml
+++ b/datafusion/spark/Cargo.toml
@@ -61,3 +61,7 @@ criterion = { workspace = true }
 [[bench]]
 harness = false
 name = "char"
+
+[[bench]]
+harness = false
+name = "space"
diff --git a/datafusion/spark/benches/space.rs 
b/datafusion/spark/benches/space.rs
new file mode 100644
index 0000000000..8ace7219a1
--- /dev/null
+++ b/datafusion/spark/benches/space.rs
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+extern crate criterion;
+
+use arrow::array::PrimitiveArray;
+use arrow::datatypes::{DataType, Field, Int32Type};
+use criterion::{Criterion, criterion_group, criterion_main};
+use datafusion_common::config::ConfigOptions;
+use datafusion_expr::{ColumnarValue, ScalarFunctionArgs};
+use datafusion_spark::function::string::space;
+use rand::prelude::StdRng;
+use rand::{Rng, SeedableRng};
+use std::hint::black_box;
+use std::sync::Arc;
+
+fn criterion_benchmark(c: &mut Criterion) {
+    let space_func = space();
+    let size = 1024;
+    let input: PrimitiveArray<Int32Type> = {
+        let null_density = 0.2;
+        let mut rng = StdRng::seed_from_u64(42);
+        (0..size)
+            .map(|_| {
+                if rng.random::<f32>() < null_density {
+                    None
+                } else {
+                    Some(rng.random_range::<i32, _>(1i32..10))
+                }
+            })
+            .collect()
+    };
+    let input = Arc::new(input);
+    let args = vec![ColumnarValue::Array(input)];
+    let arg_fields = args
+        .iter()
+        .enumerate()
+        .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), 
true).into())
+        .collect::<Vec<_>>();
+    let config_options = Arc::new(ConfigOptions::default());
+    c.bench_function("space", |b| {
+        b.iter(|| {
+            black_box(
+                space_func
+                    .invoke_with_args(ScalarFunctionArgs {
+                        args: args.clone(),
+                        arg_fields: arg_fields.clone(),
+                        number_rows: size,
+                        return_field: Arc::new(Field::new("f", DataType::Utf8, 
true)),
+                        config_options: Arc::clone(&config_options),
+                    })
+                    .unwrap(),
+            )
+        })
+    });
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/datafusion/spark/src/function/string/mod.rs 
b/datafusion/spark/src/function/string/mod.rs
index 480984f021..369d381a9c 100644
--- a/datafusion/spark/src/function/string/mod.rs
+++ b/datafusion/spark/src/function/string/mod.rs
@@ -24,6 +24,7 @@ pub mod ilike;
 pub mod length;
 pub mod like;
 pub mod luhn_check;
+pub mod space;
 
 use datafusion_expr::ScalarUDF;
 use datafusion_functions::make_udf_function;
@@ -38,6 +39,7 @@ make_udf_function!(elt::SparkElt, elt);
 make_udf_function!(like::SparkLike, like);
 make_udf_function!(luhn_check::SparkLuhnCheck, luhn_check);
 make_udf_function!(format_string::FormatStringFunc, format_string);
+make_udf_function!(space::SparkSpace, space);
 
 pub mod expr_fn {
     use datafusion_functions::export_functions;
@@ -87,6 +89,7 @@ pub mod expr_fn {
         "Returns a formatted string from printf-style format strings.",
         strfmt args
     ));
+    export_functions!((space, "Returns a string consisting of n spaces.", 
arg1));
 }
 
 pub fn functions() -> Vec<Arc<ScalarUDF>> {
@@ -100,5 +103,6 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
         like(),
         luhn_check(),
         format_string(),
+        space(),
     ]
 }
diff --git a/datafusion/spark/src/function/string/space.rs 
b/datafusion/spark/src/function/string/space.rs
new file mode 100644
index 0000000000..77daff28ff
--- /dev/null
+++ b/datafusion/spark/src/function/string/space.rs
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+    Array, ArrayRef, DictionaryArray, Int32Array, StringArray, StringBuilder,
+    as_dictionary_array,
+};
+use arrow::datatypes::{DataType, Int32Type};
+use datafusion_common::cast::as_int32_array;
+use datafusion_common::{Result, ScalarValue, exec_err};
+use datafusion_expr::{
+    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
+};
+use std::any::Any;
+use std::sync::Arc;
+
+/// Spark-compatible `space` expression
+/// <https://spark.apache.org/docs/latest/api/sql/index.html#space>
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct SparkSpace {
+    signature: Signature,
+}
+
+impl Default for SparkSpace {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl SparkSpace {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::uniform(
+                1,
+                vec![
+                    DataType::Int32,
+                    DataType::Dictionary(
+                        Box::new(DataType::Int32),
+                        Box::new(DataType::Int32),
+                    ),
+                ],
+                Volatility::Immutable,
+            ),
+        }
+    }
+}
+
+impl ScalarUDFImpl for SparkSpace {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "space"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, args: &[DataType]) -> Result<DataType> {
+        let return_type = match &args[0] {
+            DataType::Dictionary(key_type, _) => {
+                DataType::Dictionary(key_type.clone(), 
Box::new(DataType::Utf8))
+            }
+            _ => DataType::Utf8,
+        };
+        Ok(return_type)
+    }
+
+    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+        spark_space(&args.args)
+    }
+}
+
+pub fn spark_space(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    if args.len() != 1 {
+        return exec_err!("space function takes exactly one argument");
+    }
+    match &args[0] {
+        ColumnarValue::Array(array) => {
+            let result = spark_space_array(array)?;
+            Ok(ColumnarValue::Array(result))
+        }
+        ColumnarValue::Scalar(scalar) => {
+            let result = spark_space_scalar(scalar)?;
+            Ok(ColumnarValue::Scalar(result))
+        }
+    }
+}
+
+fn spark_space_array(array: &ArrayRef) -> Result<ArrayRef> {
+    match array.data_type() {
+        DataType::Int32 => {
+            let array = as_int32_array(array)?;
+            Ok(Arc::new(spark_space_array_inner(array)))
+        }
+        DataType::Dictionary(_, _) => {
+            let dict = as_dictionary_array::<Int32Type>(array);
+            let values = spark_space_array(dict.values())?;
+            let result = DictionaryArray::try_new(dict.keys().clone(), 
values)?;
+            Ok(Arc::new(result))
+        }
+        other => {
+            exec_err!("Unsupported data type {other:?} for function `space`")
+        }
+    }
+}
+
+fn spark_space_scalar(scalar: &ScalarValue) -> Result<ScalarValue> {
+    match scalar {
+        ScalarValue::Int32(value) => {
+            let result = value.map(|v| {
+                if v <= 0 {
+                    String::new()
+                } else {
+                    " ".repeat(v as usize)
+                }
+            });
+            Ok(ScalarValue::Utf8(result))
+        }
+        other => {
+            exec_err!("Unsupported data type {other:?} for function `space`")
+        }
+    }
+}
+
+fn spark_space_array_inner(array: &Int32Array) -> StringArray {
+    let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 
16);
+    let mut space_buf = String::new();
+    for value in array.iter() {
+        match value {
+            None => builder.append_null(),
+            Some(l) if l > 0 => {
+                let l = l as usize;
+                if space_buf.len() < l {
+                    space_buf = " ".repeat(l);
+                }
+                builder.append_value(&space_buf[..l]);
+            }
+            Some(_) => builder.append_value(""),
+        }
+    }
+    builder.finish()
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::function::string::space::spark_space;
+    use arrow::array::{Array, Int32Array, Int32DictionaryArray};
+    use arrow::datatypes::Int32Type;
+    use datafusion_common::cast::{as_dictionary_array, as_string_array};
+    use datafusion_common::{Result, ScalarValue};
+    use datafusion_expr::ColumnarValue;
+    use std::sync::Arc;
+
+    #[test]
+    fn test_spark_space_int32_array() -> Result<()> {
+        let int32_array = ColumnarValue::Array(Arc::new(Int32Array::from(vec![
+            Some(1),
+            Some(-3),
+            Some(0),
+            Some(5),
+            None,
+        ])));
+        let ColumnarValue::Array(result) = spark_space(&[int32_array])? else {
+            unreachable!()
+        };
+        let result = as_string_array(&result)?;
+
+        assert_eq!(result.value(0), " ");
+        assert_eq!(result.value(1), "");
+        assert_eq!(result.value(2), "");
+        assert_eq!(result.value(3), "     ");
+        assert!(result.is_null(4));
+        Ok(())
+    }
+
+    #[test]
+    fn test_spark_space_dictionary() -> Result<()> {
+        let dictionary = 
ColumnarValue::Array(Arc::new(Int32DictionaryArray::new(
+            Int32Array::from(vec![0, 1, 2, 3, 4]),
+            Arc::new(Int32Array::from(vec![
+                Some(1),
+                Some(-3),
+                Some(0),
+                Some(5),
+                None,
+            ])),
+        )));
+        let ColumnarValue::Array(result) = spark_space(&[dictionary])? else {
+            unreachable!()
+        };
+        let result =
+            
as_string_array(as_dictionary_array::<Int32Type>(&result)?.values())?;
+        assert_eq!(result.value(0), " ");
+        assert_eq!(result.value(1), "");
+        assert_eq!(result.value(2), "");
+        assert_eq!(result.value(3), "     ");
+        assert!(result.is_null(4));
+        Ok(())
+    }
+
+    #[test]
+    fn test_spark_space_scalar() -> Result<()> {
+        let scalar = ColumnarValue::Scalar(ScalarValue::Int32(Some(-5)));
+        let ColumnarValue::Scalar(result) = spark_space(&[scalar])? else {
+            unreachable!()
+        };
+        match result {
+            ScalarValue::Utf8(Some(result)) => {
+                assert_eq!(result, "");
+            }
+            _ => unreachable!(),
+        }
+        Ok(())
+    }
+}
diff --git a/datafusion/sqllogictest/test_files/spark/string/space.slt 
b/datafusion/sqllogictest/test_files/spark/string/space.slt
new file mode 100644
index 0000000000..388f679c4d
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/spark/string/space.slt
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+query T
+SELECT concat(space(1::INT), 'Spark');
+----
+ Spark
+
+query T
+SELECT concat(space(5::INT), 'Spark');
+----
+     Spark
+
+query T
+SELECT space(0::INT);
+----
+(empty)
+
+query T
+SELECT space(-1::INT);
+----
+(empty)
+
+query T
+SELECT space(NULL);
+----
+NULL


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to