Jefffrey commented on code in PR #17561:
URL: https://github.com/apache/datafusion/pull/17561#discussion_r2375754843


##########
datafusion/spark/src/function/string/format_string.rs:
##########
@@ -0,0 +1,2374 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::fmt::Write;
+use std::sync::Arc;
+
+use core::num::FpCategory;
+
+use arrow::{
+    array::{Array, ArrayRef, LargeStringArray, StringArray, StringViewArray},
+    datatypes::DataType,
+};
+use bigdecimal::{
+    num_bigint::{BigInt, Sign},
+    BigDecimal, ToPrimitive,
+};
+use chrono::{DateTime, Datelike, Timelike, Utc};
+use datafusion_common::{
+    exec_datafusion_err, exec_err, plan_err, DataFusionError, Result, 
ScalarValue,
+};
+use datafusion_expr::{
+    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
+    Volatility,
+};
+
+/// Spark-compatible `format_string` expression
+/// <https://spark.apache.org/docs/latest/api/sql/index.html#format_string>
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct FormatStringFunc {
+    signature: Signature,
+    aliases: Vec<String>,
+}
+
+impl Default for FormatStringFunc {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl FormatStringFunc {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::new(TypeSignature::VariadicAny, 
Volatility::Immutable),
+            aliases: vec![String::from("printf")],
+        }
+    }
+}
+
+impl ScalarUDFImpl for FormatStringFunc {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "format_string"
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        if arg_types.is_empty() {
+            return plan_err!("The format_string function expects at least one 
argument");
+        }
+        if arg_types[0] == DataType::Null {
+            return Ok(DataType::Utf8);
+        }
+        if !matches!(
+            arg_types[0],
+            DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
+        ) {
+            return plan_err!("The format_string function expects the first 
argument to be Utf8, LargeUtf8 or Utf8View");
+        }
+        Ok(arg_types[0].clone())
+    }
+
+    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+        let len = args
+            .args
+            .iter()
+            .fold(Option::<usize>::None, |acc, arg| match arg {
+                ColumnarValue::Scalar(_) => acc,
+                ColumnarValue::Array(a) => Some(a.len()),
+            });
+        let is_scalar = len.is_none();
+        let data_types = args.args[1..]
+            .iter()
+            .map(|arg| arg.data_type())
+            .collect::<Vec<_>>();
+        let fmt_type = args.args[0].data_type();
+
+        match &args.args[0] {
+            ColumnarValue::Scalar(ScalarValue::Null) => {
+                Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None)))
+            }
+            ColumnarValue::Scalar(ScalarValue::Utf8(Some(fmt)))
+            | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(fmt)))
+            | ColumnarValue::Scalar(ScalarValue::Utf8View(Some(fmt))) => {
+                let formatter = Formatter::parse(fmt, &data_types)?;
+                let mut result = Vec::with_capacity(len.unwrap_or(1));
+                for i in 0..len.unwrap_or(1) {
+                    let scalars = args.args[1..]
+                        .iter()
+                        .map(|arg| try_to_scalar(arg.clone(), i))
+                        .collect::<Result<Vec<_>>>()?;
+                    let formatted = formatter.format(&scalars)?;
+                    result.push(formatted);
+                }
+                if is_scalar {
+                    match fmt_type {
+                        DataType::Utf8 => 
Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
+                            Some(result.first().unwrap().clone()),
+                        ))),
+                        DataType::LargeUtf8 => Ok(ColumnarValue::Scalar(
+                            
ScalarValue::LargeUtf8(Some(result.first().unwrap().clone())),
+                        )),
+                        DataType::Utf8View => Ok(ColumnarValue::Scalar(
+                            
ScalarValue::Utf8View(Some(result.first().unwrap().clone())),
+                        )),

Review Comment:
   ```suggestion
                       let scalar_result = result.pop().unwrap();
                       match fmt_type {
                           DataType::Utf8 => 
Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
                               Some(scalar_result),
                           ))),
                           DataType::LargeUtf8 => Ok(ColumnarValue::Scalar(
                               ScalarValue::LargeUtf8(Some(scalar_result)),
                           )),
                           DataType::Utf8View => Ok(ColumnarValue::Scalar(
                               ScalarValue::Utf8View(Some(scalar_result)),
                           )),
   ```
   
   To avoid clone



##########
datafusion/spark/src/function/string/format_string.rs:
##########
@@ -0,0 +1,2374 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::fmt::Write;
+use std::sync::Arc;
+
+use core::num::FpCategory;
+
+use arrow::{
+    array::{Array, ArrayRef, LargeStringArray, StringArray, StringViewArray},
+    datatypes::DataType,
+};
+use bigdecimal::{
+    num_bigint::{BigInt, Sign},
+    BigDecimal, ToPrimitive,
+};
+use chrono::{DateTime, Datelike, Timelike, Utc};
+use datafusion_common::{
+    exec_datafusion_err, exec_err, plan_err, DataFusionError, Result, 
ScalarValue,
+};
+use datafusion_expr::{
+    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
+    Volatility,
+};
+
+/// Spark-compatible `format_string` expression
+/// <https://spark.apache.org/docs/latest/api/sql/index.html#format_string>
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct FormatStringFunc {
+    signature: Signature,
+    aliases: Vec<String>,
+}
+
+impl Default for FormatStringFunc {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl FormatStringFunc {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::new(TypeSignature::VariadicAny, 
Volatility::Immutable),
+            aliases: vec![String::from("printf")],
+        }
+    }
+}
+
+impl ScalarUDFImpl for FormatStringFunc {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "format_string"
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        if arg_types.is_empty() {
+            return plan_err!("The format_string function expects at least one 
argument");
+        }
+        if arg_types[0] == DataType::Null {
+            return Ok(DataType::Utf8);
+        }
+        if !matches!(
+            arg_types[0],
+            DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
+        ) {
+            return plan_err!("The format_string function expects the first 
argument to be Utf8, LargeUtf8 or Utf8View");
+        }
+        Ok(arg_types[0].clone())
+    }
+
+    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+        let len = args
+            .args
+            .iter()
+            .fold(Option::<usize>::None, |acc, arg| match arg {
+                ColumnarValue::Scalar(_) => acc,
+                ColumnarValue::Array(a) => Some(a.len()),
+            });

Review Comment:
   ```suggestion
           let len = args.args.iter().find_map(|arg| match arg {
               ColumnarValue::Scalar(_) => None,
               ColumnarValue::Array(a) => Some(a.len()),
           });
   ```
   
   Can use find instead of fold here



##########
datafusion/spark/src/function/string/format_string.rs:
##########
@@ -0,0 +1,2374 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::fmt::Write;
+use std::sync::Arc;
+
+use core::num::FpCategory;
+
+use arrow::{
+    array::{Array, ArrayRef, LargeStringArray, StringArray, StringViewArray},
+    datatypes::DataType,
+};
+use bigdecimal::{
+    num_bigint::{BigInt, Sign},
+    BigDecimal, ToPrimitive,
+};
+use chrono::{DateTime, Datelike, Timelike, Utc};
+use datafusion_common::{
+    exec_datafusion_err, exec_err, plan_err, DataFusionError, Result, 
ScalarValue,
+};
+use datafusion_expr::{
+    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
+    Volatility,
+};
+
+/// Spark-compatible `format_string` expression
+/// <https://spark.apache.org/docs/latest/api/sql/index.html#format_string>
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct FormatStringFunc {
+    signature: Signature,
+    aliases: Vec<String>,
+}
+
+impl Default for FormatStringFunc {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl FormatStringFunc {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::new(TypeSignature::VariadicAny, 
Volatility::Immutable),
+            aliases: vec![String::from("printf")],
+        }
+    }
+}
+
+impl ScalarUDFImpl for FormatStringFunc {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "format_string"
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        if arg_types.is_empty() {
+            return plan_err!("The format_string function expects at least one 
argument");
+        }
+        if arg_types[0] == DataType::Null {
+            return Ok(DataType::Utf8);
+        }
+        if !matches!(
+            arg_types[0],
+            DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
+        ) {
+            return plan_err!("The format_string function expects the first 
argument to be Utf8, LargeUtf8 or Utf8View");
+        }
+        Ok(arg_types[0].clone())

Review Comment:
   ```suggestion
           match arg_types[0] {
               DataType::Null => Ok(DataType::Utf8),
               DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => 
Ok(arg_types[0].clone()),
               _ => plan_err!("The format_string function expects the first 
argument to be Utf8, LargeUtf8 or Utf8View")
           }
   ```
   
   Simplification since `VariadicAny` enforces at least 1 argument



##########
datafusion/spark/src/function/string/format_string.rs:
##########
@@ -0,0 +1,2374 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::fmt::Write;
+use std::sync::Arc;
+
+use core::num::FpCategory;
+
+use arrow::{
+    array::{Array, ArrayRef, LargeStringArray, StringArray, StringViewArray},
+    datatypes::DataType,
+};
+use bigdecimal::{
+    num_bigint::{BigInt, Sign},
+    BigDecimal, ToPrimitive,
+};
+use chrono::{DateTime, Datelike, Timelike, Utc};
+use datafusion_common::{
+    exec_datafusion_err, exec_err, plan_err, DataFusionError, Result, 
ScalarValue,
+};
+use datafusion_expr::{
+    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
+    Volatility,
+};
+
+/// Spark-compatible `format_string` expression
+/// <https://spark.apache.org/docs/latest/api/sql/index.html#format_string>
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct FormatStringFunc {
+    signature: Signature,
+    aliases: Vec<String>,
+}
+
+impl Default for FormatStringFunc {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl FormatStringFunc {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::new(TypeSignature::VariadicAny, 
Volatility::Immutable),
+            aliases: vec![String::from("printf")],
+        }
+    }
+}
+
+impl ScalarUDFImpl for FormatStringFunc {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "format_string"
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        if arg_types.is_empty() {
+            return plan_err!("The format_string function expects at least one 
argument");
+        }
+        if arg_types[0] == DataType::Null {
+            return Ok(DataType::Utf8);
+        }
+        if !matches!(
+            arg_types[0],
+            DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
+        ) {
+            return plan_err!("The format_string function expects the first 
argument to be Utf8, LargeUtf8 or Utf8View");
+        }
+        Ok(arg_types[0].clone())
+    }
+
+    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+        let len = args
+            .args
+            .iter()
+            .fold(Option::<usize>::None, |acc, arg| match arg {
+                ColumnarValue::Scalar(_) => acc,
+                ColumnarValue::Array(a) => Some(a.len()),
+            });
+        let is_scalar = len.is_none();
+        let data_types = args.args[1..]
+            .iter()
+            .map(|arg| arg.data_type())
+            .collect::<Vec<_>>();
+        let fmt_type = args.args[0].data_type();
+
+        match &args.args[0] {
+            ColumnarValue::Scalar(ScalarValue::Null) => {
+                Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None)))
+            }
+            ColumnarValue::Scalar(ScalarValue::Utf8(Some(fmt)))
+            | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(fmt)))
+            | ColumnarValue::Scalar(ScalarValue::Utf8View(Some(fmt))) => {
+                let formatter = Formatter::parse(fmt, &data_types)?;
+                let mut result = Vec::with_capacity(len.unwrap_or(1));
+                for i in 0..len.unwrap_or(1) {
+                    let scalars = args.args[1..]
+                        .iter()
+                        .map(|arg| try_to_scalar(arg.clone(), i))
+                        .collect::<Result<Vec<_>>>()?;
+                    let formatted = formatter.format(&scalars)?;
+                    result.push(formatted);
+                }
+                if is_scalar {
+                    match fmt_type {
+                        DataType::Utf8 => 
Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
+                            Some(result.first().unwrap().clone()),
+                        ))),
+                        DataType::LargeUtf8 => Ok(ColumnarValue::Scalar(
+                            
ScalarValue::LargeUtf8(Some(result.first().unwrap().clone())),
+                        )),
+                        DataType::Utf8View => Ok(ColumnarValue::Scalar(
+                            
ScalarValue::Utf8View(Some(result.first().unwrap().clone())),
+                        )),
+                        _ => unreachable!(),
+                    }
+                } else {
+                    let array: ArrayRef = match fmt_type {
+                        DataType::Utf8 => Arc::new(StringArray::from(result)),
+                        DataType::LargeUtf8 => 
Arc::new(LargeStringArray::from(result)),
+                        DataType::Utf8View => 
Arc::new(StringViewArray::from(result)),
+                        _ => unreachable!(),
+                    };
+                    Ok(ColumnarValue::Array(array))
+                }
+            }
+            ColumnarValue::Array(fmts) => {
+                let mut result = Vec::with_capacity(len.unwrap());
+                for i in 0..len.unwrap() {
+                    let fmt = ScalarValue::try_from_array(fmts, i)?;
+                    match fmt.try_as_str() {
+                        Some(Some(fmt)) => {
+                            let formatter = Formatter::parse(fmt, 
&data_types)?;
+                            let scalars = args.args[1..]
+                                .iter()
+                                .map(|arg| try_to_scalar(arg.clone(), i))
+                                .collect::<Result<Vec<_>>>()?;
+                            let formatted = formatter.format(&scalars)?;
+                            result.push(Some(formatted));
+                        }
+                        Some(None) => {
+                            result.push(None);
+                        }
+                        _ => {
+                            return exec_err!(
+                                "Expected string type, got {:?}",
+                                fmt.data_type()
+                            )
+                        }

Review Comment:
   Is reaching this arm possible? If I understand correctly, it's getting the 
strings out of the first array which should be guaranteed to be a type of 
string array by now, so it is a bit confusing to see this handling here



##########
datafusion/spark/src/function/string/format_string.rs:
##########
@@ -0,0 +1,2374 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::fmt::Write;
+use std::sync::Arc;
+
+use core::num::FpCategory;
+
+use arrow::{
+    array::{Array, ArrayRef, LargeStringArray, StringArray, StringViewArray},
+    datatypes::DataType,
+};
+use bigdecimal::{
+    num_bigint::{BigInt, Sign},
+    BigDecimal, ToPrimitive,
+};
+use chrono::{DateTime, Datelike, Timelike, Utc};
+use datafusion_common::{
+    exec_datafusion_err, exec_err, plan_err, DataFusionError, Result, 
ScalarValue,
+};
+use datafusion_expr::{
+    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
+    Volatility,
+};
+
+/// Spark-compatible `format_string` expression
+/// <https://spark.apache.org/docs/latest/api/sql/index.html#format_string>
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct FormatStringFunc {
+    signature: Signature,
+    aliases: Vec<String>,
+}
+
+impl Default for FormatStringFunc {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl FormatStringFunc {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::new(TypeSignature::VariadicAny, 
Volatility::Immutable),
+            aliases: vec![String::from("printf")],
+        }
+    }
+}
+
+impl ScalarUDFImpl for FormatStringFunc {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "format_string"
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        if arg_types.is_empty() {
+            return plan_err!("The format_string function expects at least one 
argument");
+        }
+        if arg_types[0] == DataType::Null {
+            return Ok(DataType::Utf8);
+        }
+        if !matches!(
+            arg_types[0],
+            DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
+        ) {
+            return plan_err!("The format_string function expects the first 
argument to be Utf8, LargeUtf8 or Utf8View");
+        }
+        Ok(arg_types[0].clone())
+    }
+
+    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+        let len = args
+            .args
+            .iter()
+            .fold(Option::<usize>::None, |acc, arg| match arg {
+                ColumnarValue::Scalar(_) => acc,
+                ColumnarValue::Array(a) => Some(a.len()),
+            });
+        let is_scalar = len.is_none();
+        let data_types = args.args[1..]
+            .iter()
+            .map(|arg| arg.data_type())
+            .collect::<Vec<_>>();
+        let fmt_type = args.args[0].data_type();
+
+        match &args.args[0] {
+            ColumnarValue::Scalar(ScalarValue::Null) => {
+                Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None)))
+            }
+            ColumnarValue::Scalar(ScalarValue::Utf8(Some(fmt)))
+            | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(fmt)))
+            | ColumnarValue::Scalar(ScalarValue::Utf8View(Some(fmt))) => {

Review Comment:
   Need to handle case where its 
`ColumnarValue::Scalar(ScalarValue::Utf8(None))` (and for other string types 
too) I believe?



##########
datafusion/spark/src/function/string/format_string.rs:
##########
@@ -0,0 +1,2374 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::fmt::Write;
+use std::sync::Arc;
+
+use core::num::FpCategory;
+
+use arrow::{
+    array::{Array, ArrayRef, LargeStringArray, StringArray, StringViewArray},
+    datatypes::DataType,
+};
+use bigdecimal::{
+    num_bigint::{BigInt, Sign},
+    BigDecimal, ToPrimitive,
+};
+use chrono::{DateTime, Datelike, Timelike, Utc};
+use datafusion_common::{
+    exec_datafusion_err, exec_err, plan_err, DataFusionError, Result, 
ScalarValue,
+};
+use datafusion_expr::{
+    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
+    Volatility,
+};
+
+/// Spark-compatible `format_string` expression
+/// <https://spark.apache.org/docs/latest/api/sql/index.html#format_string>
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct FormatStringFunc {
+    signature: Signature,
+    aliases: Vec<String>,
+}
+
+impl Default for FormatStringFunc {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl FormatStringFunc {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::new(TypeSignature::VariadicAny, 
Volatility::Immutable),
+            aliases: vec![String::from("printf")],
+        }
+    }
+}
+
+impl ScalarUDFImpl for FormatStringFunc {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "format_string"
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        if arg_types.is_empty() {
+            return plan_err!("The format_string function expects at least one 
argument");
+        }
+        if arg_types[0] == DataType::Null {
+            return Ok(DataType::Utf8);
+        }
+        if !matches!(
+            arg_types[0],
+            DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
+        ) {
+            return plan_err!("The format_string function expects the first 
argument to be Utf8, LargeUtf8 or Utf8View");
+        }
+        Ok(arg_types[0].clone())
+    }
+
+    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+        let len = args
+            .args
+            .iter()
+            .fold(Option::<usize>::None, |acc, arg| match arg {
+                ColumnarValue::Scalar(_) => acc,
+                ColumnarValue::Array(a) => Some(a.len()),
+            });
+        let is_scalar = len.is_none();
+        let data_types = args.args[1..]
+            .iter()
+            .map(|arg| arg.data_type())
+            .collect::<Vec<_>>();
+        let fmt_type = args.args[0].data_type();
+
+        match &args.args[0] {
+            ColumnarValue::Scalar(ScalarValue::Null) => {
+                Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None)))
+            }
+            ColumnarValue::Scalar(ScalarValue::Utf8(Some(fmt)))
+            | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(fmt)))
+            | ColumnarValue::Scalar(ScalarValue::Utf8View(Some(fmt))) => {
+                let formatter = Formatter::parse(fmt, &data_types)?;
+                let mut result = Vec::with_capacity(len.unwrap_or(1));
+                for i in 0..len.unwrap_or(1) {
+                    let scalars = args.args[1..]
+                        .iter()
+                        .map(|arg| try_to_scalar(arg.clone(), i))
+                        .collect::<Result<Vec<_>>>()?;
+                    let formatted = formatter.format(&scalars)?;
+                    result.push(formatted);
+                }
+                if is_scalar {
+                    match fmt_type {
+                        DataType::Utf8 => 
Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
+                            Some(result.first().unwrap().clone()),
+                        ))),
+                        DataType::LargeUtf8 => Ok(ColumnarValue::Scalar(
+                            
ScalarValue::LargeUtf8(Some(result.first().unwrap().clone())),
+                        )),
+                        DataType::Utf8View => Ok(ColumnarValue::Scalar(
+                            
ScalarValue::Utf8View(Some(result.first().unwrap().clone())),
+                        )),
+                        _ => unreachable!(),
+                    }
+                } else {
+                    let array: ArrayRef = match fmt_type {
+                        DataType::Utf8 => Arc::new(StringArray::from(result)),
+                        DataType::LargeUtf8 => 
Arc::new(LargeStringArray::from(result)),
+                        DataType::Utf8View => 
Arc::new(StringViewArray::from(result)),
+                        _ => unreachable!(),
+                    };
+                    Ok(ColumnarValue::Array(array))
+                }
+            }
+            ColumnarValue::Array(fmts) => {
+                let mut result = Vec::with_capacity(len.unwrap());
+                for i in 0..len.unwrap() {
+                    let fmt = ScalarValue::try_from_array(fmts, i)?;
+                    match fmt.try_as_str() {
+                        Some(Some(fmt)) => {
+                            let formatter = Formatter::parse(fmt, 
&data_types)?;
+                            let scalars = args.args[1..]
+                                .iter()
+                                .map(|arg| try_to_scalar(arg.clone(), i))
+                                .collect::<Result<Vec<_>>>()?;
+                            let formatted = formatter.format(&scalars)?;
+                            result.push(Some(formatted));
+                        }
+                        Some(None) => {
+                            result.push(None);
+                        }
+                        _ => {
+                            return exec_err!(
+                                "Expected string type, got {:?}",
+                                fmt.data_type()
+                            )
+                        }
+                    }
+                }
+                let array: ArrayRef = match fmt_type {
+                    DataType::Utf8 => Arc::new(StringArray::from(result)),
+                    DataType::LargeUtf8 => 
Arc::new(LargeStringArray::from(result)),
+                    DataType::Utf8View => 
Arc::new(StringViewArray::from(result)),
+                    _ => unreachable!(),
+                };
+                Ok(ColumnarValue::Array(array))
+            }
+            _ => exec_err!(
+                "The format_string function expects the first argument to be a 
string"
+            ),
+        }
+    }
+}
+
+fn try_to_scalar(arg: ColumnarValue, index: usize) -> Result<ScalarValue> {
+    match arg {
+        ColumnarValue::Scalar(scalar) => Ok(scalar),
+        ColumnarValue::Array(array) => Ok(ScalarValue::try_from_array(&array, 
index)?),
+    }
+}
+
+/// Compatible with `java.util.Formatter`
+#[derive(Debug)]
+pub struct Formatter<'a> {
+    pub elements: Vec<FormatElement<'a>>,
+    pub arg_num: usize,
+}
+
+impl<'a> Formatter<'a> {
+    pub fn new(elements: Vec<FormatElement<'a>>) -> Self {
+        let arg_num = elements
+            .iter()
+            .map(|element| match element {
+                FormatElement::Format(spec) => spec.argument_index,
+                _ => 0,
+            })
+            .max()
+            .unwrap_or(0);
+        Self { elements, arg_num }
+    }
+
+    pub fn parse(fmt: &'a str, arg_types: &[DataType]) -> Result<Self> {
+        // find the first %
+        let mut res = Vec::new();
+
+        let mut rem = fmt;
+        let mut argument_index = 0;
+
+        let mut prev: Option<usize> = None;
+
+        while !rem.is_empty() {
+            if let Some((verbatim_prefix, rest)) = rem.split_once('%') {
+                if !verbatim_prefix.is_empty() {
+                    res.push(FormatElement::Verbatim(verbatim_prefix));
+                }
+                if let Some(rest) = rest.strip_prefix('%') {
+                    res.push(FormatElement::Verbatim("%"));
+                    rem = rest;
+                    continue;
+                }
+                if let Some(rest) = rest.strip_prefix('n') {
+                    res.push(FormatElement::Verbatim("\n"));
+                    rem = rest;
+                    continue;
+                }
+                if let Some(rest) = rest.strip_prefix('<') {
+                    // %< means reuse the previous argument
+                    let Some(p) = prev else {
+                        return exec_err!("No previous argument to reference");
+                    };
+                    let (spec, rest) =
+                        take_conversion_specifier(rest, p, arg_types[p - 
1].clone())?;
+                    res.push(FormatElement::Format(spec));
+                    rem = rest;
+                    continue;
+                }
+
+                let (current_argument_index, rest2) = take_numeric_param(rest, 
false);
+                let (current_argument_index, rest) =
+                    match (current_argument_index, rest2.starts_with('$')) {
+                        (NumericParam::Literal(index), true) => {
+                            (index as usize, &rest2[1..])
+                        }
+                        (NumericParam::FromArgument, true) => {
+                            return exec_err!("Invalid numeric parameter")
+                        }
+                        (_, false) => {
+                            argument_index += 1;
+                            (argument_index, rest)
+                        }
+                    };
+                if current_argument_index == 0 || current_argument_index > 
arg_types.len()
+                {
+                    return exec_err!(
+                        "Argument index {} is out of bounds",
+                        current_argument_index
+                    );
+                }
+
+                let (spec, rest) = take_conversion_specifier(
+                    rest,
+                    current_argument_index,
+                    arg_types[current_argument_index - 1].clone(),
+                )
+                .map_err(|e| exec_datafusion_err!("{:?}, format string: {:?}", 
e, fmt))?;
+                res.push(FormatElement::Format(spec));
+                prev = Some(spec.argument_index);
+                rem = rest;
+            } else {
+                res.push(FormatElement::Verbatim(rem));
+                break;
+            }
+        }
+
+        Ok(Self::new(res))
+    }
+
+    pub fn format(&self, args: &[ScalarValue]) -> Result<String> {
+        if args.len() < self.arg_num {
+            return exec_err!(
+                "Expected at least {} arguments, got {}",
+                self.arg_num,
+                args.len()
+            );
+        }
+        let mut string = String::new();
+        for element in &self.elements {
+            match element {
+                FormatElement::Verbatim(text) => {
+                    string.push_str(text);
+                }
+                FormatElement::Format(spec) => {
+                    spec.format(&mut string, &args[spec.argument_index - 1])?;

Review Comment:
   Why is this `- 1` offset done?



##########
datafusion/spark/src/function/string/format_string.rs:
##########
@@ -0,0 +1,2374 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::fmt::Write;
+use std::sync::Arc;
+
+use core::num::FpCategory;
+
+use arrow::{
+    array::{Array, ArrayRef, LargeStringArray, StringArray, StringViewArray},
+    datatypes::DataType,
+};
+use bigdecimal::{
+    num_bigint::{BigInt, Sign},
+    BigDecimal, ToPrimitive,
+};
+use chrono::{DateTime, Datelike, Timelike, Utc};
+use datafusion_common::{
+    exec_datafusion_err, exec_err, plan_err, DataFusionError, Result, 
ScalarValue,
+};
+use datafusion_expr::{
+    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
+    Volatility,
+};
+
+/// Spark-compatible `format_string` expression
+/// <https://spark.apache.org/docs/latest/api/sql/index.html#format_string>
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct FormatStringFunc {
+    signature: Signature,
+    aliases: Vec<String>,
+}
+
+impl Default for FormatStringFunc {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl FormatStringFunc {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::new(TypeSignature::VariadicAny, 
Volatility::Immutable),
+            aliases: vec![String::from("printf")],
+        }
+    }
+}
+
+impl ScalarUDFImpl for FormatStringFunc {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "format_string"
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        if arg_types.is_empty() {
+            return plan_err!("The format_string function expects at least one 
argument");
+        }
+        if arg_types[0] == DataType::Null {
+            return Ok(DataType::Utf8);
+        }
+        if !matches!(
+            arg_types[0],
+            DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
+        ) {
+            return plan_err!("The format_string function expects the first 
argument to be Utf8, LargeUtf8 or Utf8View");
+        }
+        Ok(arg_types[0].clone())
+    }
+
+    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+        let len = args
+            .args
+            .iter()
+            .fold(Option::<usize>::None, |acc, arg| match arg {
+                ColumnarValue::Scalar(_) => acc,
+                ColumnarValue::Array(a) => Some(a.len()),
+            });
+        let is_scalar = len.is_none();
+        let data_types = args.args[1..]
+            .iter()
+            .map(|arg| arg.data_type())
+            .collect::<Vec<_>>();
+        let fmt_type = args.args[0].data_type();
+
+        match &args.args[0] {
+            ColumnarValue::Scalar(ScalarValue::Null) => {
+                Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None)))
+            }
+            ColumnarValue::Scalar(ScalarValue::Utf8(Some(fmt)))
+            | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(fmt)))
+            | ColumnarValue::Scalar(ScalarValue::Utf8View(Some(fmt))) => {
+                let formatter = Formatter::parse(fmt, &data_types)?;
+                let mut result = Vec::with_capacity(len.unwrap_or(1));
+                for i in 0..len.unwrap_or(1) {
+                    let scalars = args.args[1..]
+                        .iter()
+                        .map(|arg| try_to_scalar(arg.clone(), i))
+                        .collect::<Result<Vec<_>>>()?;
+                    let formatted = formatter.format(&scalars)?;
+                    result.push(formatted);
+                }
+                if is_scalar {
+                    match fmt_type {
+                        DataType::Utf8 => 
Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
+                            Some(result.first().unwrap().clone()),
+                        ))),
+                        DataType::LargeUtf8 => Ok(ColumnarValue::Scalar(
+                            
ScalarValue::LargeUtf8(Some(result.first().unwrap().clone())),
+                        )),
+                        DataType::Utf8View => Ok(ColumnarValue::Scalar(
+                            
ScalarValue::Utf8View(Some(result.first().unwrap().clone())),
+                        )),
+                        _ => unreachable!(),
+                    }
+                } else {
+                    let array: ArrayRef = match fmt_type {
+                        DataType::Utf8 => Arc::new(StringArray::from(result)),
+                        DataType::LargeUtf8 => 
Arc::new(LargeStringArray::from(result)),
+                        DataType::Utf8View => 
Arc::new(StringViewArray::from(result)),
+                        _ => unreachable!(),
+                    };
+                    Ok(ColumnarValue::Array(array))
+                }
+            }
+            ColumnarValue::Array(fmts) => {
+                let mut result = Vec::with_capacity(len.unwrap());
+                for i in 0..len.unwrap() {
+                    let fmt = ScalarValue::try_from_array(fmts, i)?;
+                    match fmt.try_as_str() {
+                        Some(Some(fmt)) => {
+                            let formatter = Formatter::parse(fmt, 
&data_types)?;
+                            let scalars = args.args[1..]
+                                .iter()
+                                .map(|arg| try_to_scalar(arg.clone(), i))
+                                .collect::<Result<Vec<_>>>()?;
+                            let formatted = formatter.format(&scalars)?;
+                            result.push(Some(formatted));
+                        }
+                        Some(None) => {
+                            result.push(None);
+                        }
+                        _ => {
+                            return exec_err!(
+                                "Expected string type, got {:?}",
+                                fmt.data_type()
+                            )
+                        }
+                    }
+                }
+                let array: ArrayRef = match fmt_type {
+                    DataType::Utf8 => Arc::new(StringArray::from(result)),
+                    DataType::LargeUtf8 => 
Arc::new(LargeStringArray::from(result)),
+                    DataType::Utf8View => 
Arc::new(StringViewArray::from(result)),
+                    _ => unreachable!(),
+                };
+                Ok(ColumnarValue::Array(array))
+            }
+            _ => exec_err!(
+                "The format_string function expects the first argument to be a 
string"
+            ),
+        }
+    }
+}
+
+fn try_to_scalar(arg: ColumnarValue, index: usize) -> Result<ScalarValue> {
+    match arg {
+        ColumnarValue::Scalar(scalar) => Ok(scalar),
+        ColumnarValue::Array(array) => Ok(ScalarValue::try_from_array(&array, 
index)?),
+    }
+}
+
+/// Compatible with `java.util.Formatter`
+#[derive(Debug)]
+pub struct Formatter<'a> {
+    pub elements: Vec<FormatElement<'a>>,
+    pub arg_num: usize,
+}
+
+impl<'a> Formatter<'a> {
+    pub fn new(elements: Vec<FormatElement<'a>>) -> Self {
+        let arg_num = elements
+            .iter()
+            .map(|element| match element {
+                FormatElement::Format(spec) => spec.argument_index,
+                _ => 0,
+            })
+            .max()
+            .unwrap_or(0);
+        Self { elements, arg_num }
+    }
+
+    pub fn parse(fmt: &'a str, arg_types: &[DataType]) -> Result<Self> {
+        // find the first %
+        let mut res = Vec::new();
+
+        let mut rem = fmt;
+        let mut argument_index = 0;
+
+        let mut prev: Option<usize> = None;
+
+        while !rem.is_empty() {

Review Comment:
   Can we have a high level description of what is happening in this function?



##########
datafusion/spark/src/function/string/format_string.rs:
##########
@@ -0,0 +1,2374 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::fmt::Write;
+use std::sync::Arc;
+
+use core::num::FpCategory;
+
+use arrow::{
+    array::{Array, ArrayRef, LargeStringArray, StringArray, StringViewArray},
+    datatypes::DataType,
+};
+use bigdecimal::{
+    num_bigint::{BigInt, Sign},
+    BigDecimal, ToPrimitive,
+};
+use chrono::{DateTime, Datelike, Timelike, Utc};
+use datafusion_common::{
+    exec_datafusion_err, exec_err, plan_err, DataFusionError, Result, 
ScalarValue,
+};
+use datafusion_expr::{
+    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
+    Volatility,
+};
+
+/// Spark-compatible `format_string` expression
+/// <https://spark.apache.org/docs/latest/api/sql/index.html#format_string>
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct FormatStringFunc {
+    signature: Signature,
+    aliases: Vec<String>,
+}
+
+impl Default for FormatStringFunc {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl FormatStringFunc {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::new(TypeSignature::VariadicAny, 
Volatility::Immutable),
+            aliases: vec![String::from("printf")],
+        }
+    }
+}
+
+impl ScalarUDFImpl for FormatStringFunc {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "format_string"
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        if arg_types.is_empty() {
+            return plan_err!("The format_string function expects at least one 
argument");
+        }
+        if arg_types[0] == DataType::Null {
+            return Ok(DataType::Utf8);
+        }
+        if !matches!(
+            arg_types[0],
+            DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
+        ) {
+            return plan_err!("The format_string function expects the first 
argument to be Utf8, LargeUtf8 or Utf8View");
+        }
+        Ok(arg_types[0].clone())
+    }
+
+    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+        let len = args
+            .args
+            .iter()
+            .fold(Option::<usize>::None, |acc, arg| match arg {
+                ColumnarValue::Scalar(_) => acc,
+                ColumnarValue::Array(a) => Some(a.len()),
+            });
+        let is_scalar = len.is_none();
+        let data_types = args.args[1..]
+            .iter()
+            .map(|arg| arg.data_type())
+            .collect::<Vec<_>>();
+        let fmt_type = args.args[0].data_type();
+
+        match &args.args[0] {
+            ColumnarValue::Scalar(ScalarValue::Null) => {
+                Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None)))
+            }
+            ColumnarValue::Scalar(ScalarValue::Utf8(Some(fmt)))
+            | ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(fmt)))
+            | ColumnarValue::Scalar(ScalarValue::Utf8View(Some(fmt))) => {
+                let formatter = Formatter::parse(fmt, &data_types)?;
+                let mut result = Vec::with_capacity(len.unwrap_or(1));
+                for i in 0..len.unwrap_or(1) {
+                    let scalars = args.args[1..]
+                        .iter()
+                        .map(|arg| try_to_scalar(arg.clone(), i))
+                        .collect::<Result<Vec<_>>>()?;
+                    let formatted = formatter.format(&scalars)?;
+                    result.push(formatted);
+                }
+                if is_scalar {
+                    match fmt_type {
+                        DataType::Utf8 => 
Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
+                            Some(result.first().unwrap().clone()),
+                        ))),
+                        DataType::LargeUtf8 => Ok(ColumnarValue::Scalar(
+                            
ScalarValue::LargeUtf8(Some(result.first().unwrap().clone())),
+                        )),
+                        DataType::Utf8View => Ok(ColumnarValue::Scalar(
+                            
ScalarValue::Utf8View(Some(result.first().unwrap().clone())),
+                        )),
+                        _ => unreachable!(),
+                    }
+                } else {
+                    let array: ArrayRef = match fmt_type {
+                        DataType::Utf8 => Arc::new(StringArray::from(result)),
+                        DataType::LargeUtf8 => 
Arc::new(LargeStringArray::from(result)),
+                        DataType::Utf8View => 
Arc::new(StringViewArray::from(result)),
+                        _ => unreachable!(),
+                    };
+                    Ok(ColumnarValue::Array(array))
+                }
+            }
+            ColumnarValue::Array(fmts) => {
+                let mut result = Vec::with_capacity(len.unwrap());
+                for i in 0..len.unwrap() {
+                    let fmt = ScalarValue::try_from_array(fmts, i)?;
+                    match fmt.try_as_str() {
+                        Some(Some(fmt)) => {
+                            let formatter = Formatter::parse(fmt, 
&data_types)?;
+                            let scalars = args.args[1..]
+                                .iter()
+                                .map(|arg| try_to_scalar(arg.clone(), i))
+                                .collect::<Result<Vec<_>>>()?;
+                            let formatted = formatter.format(&scalars)?;
+                            result.push(Some(formatted));
+                        }
+                        Some(None) => {
+                            result.push(None);
+                        }
+                        _ => {
+                            return exec_err!(
+                                "Expected string type, got {:?}",
+                                fmt.data_type()
+                            )
+                        }
+                    }
+                }
+                let array: ArrayRef = match fmt_type {
+                    DataType::Utf8 => Arc::new(StringArray::from(result)),
+                    DataType::LargeUtf8 => 
Arc::new(LargeStringArray::from(result)),
+                    DataType::Utf8View => 
Arc::new(StringViewArray::from(result)),
+                    _ => unreachable!(),
+                };
+                Ok(ColumnarValue::Array(array))
+            }
+            _ => exec_err!(
+                "The format_string function expects the first argument to be a 
string"
+            ),
+        }
+    }
+}
+
+fn try_to_scalar(arg: ColumnarValue, index: usize) -> Result<ScalarValue> {
+    match arg {
+        ColumnarValue::Scalar(scalar) => Ok(scalar),
+        ColumnarValue::Array(array) => Ok(ScalarValue::try_from_array(&array, 
index)?),
+    }
+}
+
+/// Compatible with `java.util.Formatter`
+#[derive(Debug)]
+pub struct Formatter<'a> {
+    pub elements: Vec<FormatElement<'a>>,
+    pub arg_num: usize,
+}
+
+impl<'a> Formatter<'a> {
+    pub fn new(elements: Vec<FormatElement<'a>>) -> Self {
+        let arg_num = elements
+            .iter()
+            .map(|element| match element {
+                FormatElement::Format(spec) => spec.argument_index,
+                _ => 0,
+            })
+            .max()
+            .unwrap_or(0);
+        Self { elements, arg_num }
+    }
+
+    pub fn parse(fmt: &'a str, arg_types: &[DataType]) -> Result<Self> {
+        // find the first %
+        let mut res = Vec::new();
+
+        let mut rem = fmt;
+        let mut argument_index = 0;
+
+        let mut prev: Option<usize> = None;
+
+        while !rem.is_empty() {
+            if let Some((verbatim_prefix, rest)) = rem.split_once('%') {
+                if !verbatim_prefix.is_empty() {
+                    res.push(FormatElement::Verbatim(verbatim_prefix));
+                }
+                if let Some(rest) = rest.strip_prefix('%') {
+                    res.push(FormatElement::Verbatim("%"));
+                    rem = rest;
+                    continue;
+                }
+                if let Some(rest) = rest.strip_prefix('n') {
+                    res.push(FormatElement::Verbatim("\n"));
+                    rem = rest;
+                    continue;
+                }
+                if let Some(rest) = rest.strip_prefix('<') {
+                    // %< means reuse the previous argument
+                    let Some(p) = prev else {
+                        return exec_err!("No previous argument to reference");
+                    };
+                    let (spec, rest) =
+                        take_conversion_specifier(rest, p, arg_types[p - 
1].clone())?;
+                    res.push(FormatElement::Format(spec));
+                    rem = rest;
+                    continue;
+                }
+
+                let (current_argument_index, rest2) = take_numeric_param(rest, 
false);
+                let (current_argument_index, rest) =
+                    match (current_argument_index, rest2.starts_with('$')) {
+                        (NumericParam::Literal(index), true) => {
+                            (index as usize, &rest2[1..])
+                        }
+                        (NumericParam::FromArgument, true) => {
+                            return exec_err!("Invalid numeric parameter")
+                        }
+                        (_, false) => {
+                            argument_index += 1;
+                            (argument_index, rest)
+                        }
+                    };
+                if current_argument_index == 0 || current_argument_index > 
arg_types.len()
+                {
+                    return exec_err!(
+                        "Argument index {} is out of bounds",
+                        current_argument_index
+                    );
+                }
+
+                let (spec, rest) = take_conversion_specifier(
+                    rest,
+                    current_argument_index,
+                    arg_types[current_argument_index - 1].clone(),
+                )
+                .map_err(|e| exec_datafusion_err!("{:?}, format string: {:?}", 
e, fmt))?;
+                res.push(FormatElement::Format(spec));
+                prev = Some(spec.argument_index);
+                rem = rest;
+            } else {
+                res.push(FormatElement::Verbatim(rem));
+                break;
+            }
+        }
+
+        Ok(Self::new(res))
+    }
+
+    pub fn format(&self, args: &[ScalarValue]) -> Result<String> {
+        if args.len() < self.arg_num {
+            return exec_err!(
+                "Expected at least {} arguments, got {}",
+                self.arg_num,
+                args.len()
+            );
+        }
+        let mut string = String::new();
+        for element in &self.elements {
+            match element {
+                FormatElement::Verbatim(text) => {
+                    string.push_str(text);
+                }
+                FormatElement::Format(spec) => {
+                    spec.format(&mut string, &args[spec.argument_index - 1])?;
+                }
+            }
+        }
+        Ok(string)
+    }
+}
+
+#[derive(Debug)]
+pub enum FormatElement<'a> {
+    /// Some characters that are copied to the output as-is
+    Verbatim(&'a str),
+    /// A format specifier
+    Format(ConversionSpecifier),
+}
+
+/// Parsed printf conversion specifier
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct ConversionSpecifier {
+    pub argument_index: usize,
+    /// flag `#`: use `0x`, etc?
+    pub alt_form: bool,
+    /// flag `0`: left-pad with zeros?
+    pub zero_pad: bool,
+    /// flag `-`: left-adjust (pad with spaces on the right)
+    pub left_adj: bool,
+    /// flag `' '` (space): indicate sign with a space?
+    pub space_sign: bool,
+    /// flag `+`: Always show sign? (for signed numbers)
+    pub force_sign: bool,
+    /// flag `,`: include locale-specific grouping separators
+    pub grouping_separator: bool,
+    /// flag `(`: enclose negative numbers in parentheses
+    pub negative_in_parentheses: bool,
+    /// field width
+    pub width: NumericParam,
+    /// floating point field precision
+    pub precision: NumericParam,
+    /// data type
+    pub conversion_type: ConversionType,
+}
+
+/// Width / precision parameter
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum NumericParam {
+    /// The literal width
+    Literal(i32),
+    /// Get the width from the previous argument
+    FromArgument,
+}
+
+/// Printf data type
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ConversionType {
+    /// `B`
+    BooleanUpper,
+    /// `b`
+    BooleanLower,
+    /// Not implemented yet. Can be implemented after 
<https://github.com/apache/datafusion/pull/17093> is merged
+    /// `h`
+    HexHashLower,
+    /// `H`
+    HexHashUpper,
+    /// `d`
+    DecInt,
+    /// `o`
+    OctInt,
+    /// `x`
+    HexIntLower,
+    /// `X`
+    HexIntUpper,
+    /// `e`
+    SciFloatLower,
+    /// `E`
+    SciFloatUpper,
+    /// `f`
+    DecFloatLower,
+    /// `g`
+    CompactFloatLower,
+    /// `G`
+    CompactFloatUpper,
+    /// `a`
+    HexFloatLower,
+    /// `A`
+    HexFloatUpper,
+    /// `t`
+    TimeLower(TimeFormat),
+    /// `T`
+    TimeUpper(TimeFormat),
+    /// `c`
+    CharLower,
+    /// `C`
+    CharUpper,
+    /// `s`
+    StringLower,
+    /// `S`
+    StringUpper,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum TimeFormat {
+    // Hour of the day for the 24-hour clock,
+    // formatted as two digits with a leading zero as necessary i.e. 00 - 23. 
00 corresponds to midnight.
+    HUpper,
+    // Hour for the 12-hour clock,
+    // formatted as two digits with a leading zero as necessary, i.e. 01 - 12. 
01 corresponds to one o'clock (either morning or afternoon).
+    IUpper,
+    // Hour of the day for the 24-hour clock,
+    // i.e. 0 - 23. 0 corresponds to midnight.
+    KLower,
+    // Hour for the 12-hour clock,
+    // i.e. 1 - 12. 1 corresponds to one o'clock (either morning or afternoon).
+    LLower,
+    // Minute within the hour formatted as two digits with a leading zero as 
necessary, i.e. 00 - 59.
+    MUpper,
+    // Seconds within the minute, formatted as two digits with a leading zero 
as necessary,
+    // i.e. 00 - 60 ("60" is a special value required to support leap seconds).
+    SUpper,
+    // Millisecond within the second formatted as three digits with leading 
zeros as necessary, i.e. 000 - 999.
+    LUpper,
+    // Nanosecond within the second, formatted as nine digits with leading 
zeros as necessary,
+    // i.e. 000000000 - 999999999. The precision of this value is limited by 
the resolution of the underlying operating system or hardware.
+    NUpper,
+    // Locale-specific morning or afternoon marker in lower case, e.g."am" or 
"pm".
+    // Use of the conversion prefix 'T' forces this output to upper case. 
(Note that 'p' produces lower-case output.
+    // This is different from GNU date and POSIX strftime(3c) which produce 
upper-case output.)
+    PLower,
+    // RFC 822 style numeric time zone offset from GMT,
+    // e.g. -0800. This value will be adjusted as necessary for Daylight 
Saving Time.
+    // For long, Long, and Date the time zone used is the default time zone 
for this instance of the Java virtual machine.
+    ZLower,
+    // A string representing the abbreviation for the time zone. This value 
will be adjusted as necessary for Daylight Saving Time.
+    // For long, Long, and Date the time zone used is the default time zone 
for this instance of the Java virtual machine.
+    // The Formatter's locale will supersede the locale of the argument (if 
any).
+    ZUpper,
+    // Seconds since the beginning of the epoch starting at 1 January 1970 
00:00:00 UTC,
+    // i.e. Long.MIN_VALUE/1000 to Long.MAX_VALUE/1000.
+    SLower,
+    // Milliseconds since the beginning of the epoch starting at 1 January 
1970 00:00:00 UTC,
+    // i.e. Long.MIN_VALUE to Long.MAX_VALUE. The precision of this value is 
limited by the resolution of the underlying operating system or hardware.
+    QUpper,
+    // Locale-specific full month name, e.g. "January", "February".
+    BUpper,
+    // Locale-specific abbreviated month name, e.g. "Jan", "Feb".
+    BLower,
+    // Locale-specific full weekday name, e.g. "Monday", "Tuesday".
+    AUpper,
+    // Locale-specific abbreviated weekday name, e.g. "Mon", "Tue".
+    ALower,
+    // Four-digit year divided by 100, formatted as two digits with leading 
zero as necessary, i.e. 00 - 99
+    CUpper,
+    // Year, formatted to at least four digits with leading zeros as 
necessary, e.g. 0092 equals 92 CE for the Gregorian calendar.
+    YUpper,
+    // Last two digits of the year, formatted with leading zeros as necessary, 
i.e. 00 - 99.
+    YLower,
+    // Day of year, formatted as three digits with leading zeros as necessary, 
e.g. 001 - 366 for the Gregorian calendar. 001 corresponds to the first day of 
the year.
+    JLower,
+    // Month, formatted as two digits with leading zeros as necessary, i.e. 01 
- 13, where "01" is the first month of the year and ("13" is a special value 
required to support lunar calendars).
+    MLower,
+    // Day of month, formatted as two digits with leading zeros as necessary, 
i.e. 01 - 31, where "01" is the first day of the month.
+    DLower,
+    // Day of month, formatted as two digits, i.e. 1 - 31 where "1" is the 
first day of the month.
+    ELower,
+    // Time formatted for the 24-hour clock as "%tH:%tM"
+    RUpper,
+    // Time formatted for the 24-hour clock as "%tH:%tM:%tS"
+    TUpper,
+    // Time formatted for the 12-hour clock as "%tI:%tM:%tS %Tp". The location 
of the morning or afternoon marker ('%Tp') may be locale-dependent.
+    RLower,
+    // Date formatted as "%tm/%td/%ty"
+    DUpper,
+    // ISO 8601 complete date formatted as "%tY-%tm-%td"
+    FUpper,
+    // Date and time formatted as "%ta %tb %td %tT %tZ %tY", e.g. "Sun Jul 20 
16:17:00 EDT 1969"
+    CLower,
+}
+
+impl TryFrom<char> for TimeFormat {
+    type Error = DataFusionError;
+    fn try_from(value: char) -> Result<Self, Self::Error> {
+        match value {
+            'H' => Ok(TimeFormat::HUpper),
+            'I' => Ok(TimeFormat::IUpper),
+            'k' => Ok(TimeFormat::KLower),
+            'l' => Ok(TimeFormat::LLower),
+            'M' => Ok(TimeFormat::MUpper),
+            'S' => Ok(TimeFormat::SUpper),
+            'L' => Ok(TimeFormat::LUpper),
+            'N' => Ok(TimeFormat::NUpper),
+            'p' => Ok(TimeFormat::PLower),
+            'z' => Ok(TimeFormat::ZLower),
+            'Z' => Ok(TimeFormat::ZUpper),
+            's' => Ok(TimeFormat::SLower),
+            'Q' => Ok(TimeFormat::QUpper),
+            'B' => Ok(TimeFormat::BUpper),
+            'b' | 'h' => Ok(TimeFormat::BLower),
+            'A' => Ok(TimeFormat::AUpper),
+            'a' => Ok(TimeFormat::ALower),
+            'C' => Ok(TimeFormat::CUpper),
+            'Y' => Ok(TimeFormat::YUpper),
+            'y' => Ok(TimeFormat::YLower),
+            'j' => Ok(TimeFormat::JLower),
+            'm' => Ok(TimeFormat::MLower),
+            'd' => Ok(TimeFormat::DLower),
+            'e' => Ok(TimeFormat::ELower),
+            'R' => Ok(TimeFormat::RUpper),
+            'T' => Ok(TimeFormat::TUpper),
+            'r' => Ok(TimeFormat::RLower),
+            'D' => Ok(TimeFormat::DUpper),
+            'F' => Ok(TimeFormat::FUpper),
+            'c' => Ok(TimeFormat::CLower),
+            _ => exec_err!("Invalid time format: {}", value),
+        }
+    }
+}
+
+impl ConversionType {
+    pub fn is_string(&self) -> bool {
+        matches!(
+            self,
+            ConversionType::StringLower | ConversionType::StringUpper
+        )
+    }
+
+    pub fn validate(&self, arg_type: DataType) -> Result<()> {
+        match self {
+            ConversionType::BooleanLower | ConversionType::BooleanUpper => {
+                if !matches!(arg_type, DataType::Boolean) {
+                    return exec_err!(
+                        "Invalid argument type for boolean conversion: {:?}",
+                        arg_type
+                    );
+                }
+            }
+            ConversionType::CharLower | ConversionType::CharUpper => {
+                if !matches!(
+                    arg_type,
+                    DataType::Int8
+                        | DataType::UInt8
+                        | DataType::Int16
+                        | DataType::UInt16
+                        | DataType::Int32
+                        | DataType::UInt32
+                        | DataType::Int64
+                        | DataType::UInt64
+                ) {
+                    return exec_err!(
+                        "Invalid argument type for char conversion: {:?}",
+                        arg_type
+                    );
+                }
+            }
+            ConversionType::DecInt
+            | ConversionType::OctInt
+            | ConversionType::HexIntLower
+            | ConversionType::HexIntUpper => {
+                if !arg_type.is_integer() {
+                    return exec_err!(
+                        "Invalid argument type for integer conversion: {:?}",
+                        arg_type
+                    );
+                }
+            }
+            ConversionType::SciFloatLower
+            | ConversionType::SciFloatUpper
+            | ConversionType::DecFloatLower
+            | ConversionType::CompactFloatLower
+            | ConversionType::CompactFloatUpper
+            | ConversionType::HexFloatLower
+            | ConversionType::HexFloatUpper => {
+                if !arg_type.is_numeric() {
+                    return exec_err!(
+                        "Invalid argument type for float conversion: {:?}",
+                        arg_type
+                    );
+                }
+            }
+            ConversionType::TimeLower(_) | ConversionType::TimeUpper(_) => {
+                if !arg_type.is_temporal() {
+                    return exec_err!(
+                        "Invalid argument type for time conversion: {:?}",
+                        arg_type
+                    );
+                }
+            }
+            _ => {}
+        }
+        Ok(())
+    }
+}
+
+fn take_conversion_specifier(
+    s: &str,

Review Comment:
   ```suggestion
       mut s: &str,
   ```
   
   We can specify arguments as `mut` to avoid needing to rebind below as mut:
   
   ```rust
   let mut s = s;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to