This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 28e042dd47 feat(spark): implement spark datetime function date_add/date_sub (#17024) 28e042dd47 is described below commit 28e042dd47610f65b37fcbd53390aef423f1a517 Author: Chen Chongchen <chenkov...@qq.com> AuthorDate: Tue Aug 12 03:21:36 2025 +0800 feat(spark): implement spark datetime function date_add/date_sub (#17024) * feat: spark date_add/date_sub * fix --- datafusion/spark/src/function/datetime/date_add.rs | 129 +++++++++++++++++++++ datafusion/spark/src/function/datetime/date_sub.rs | 123 ++++++++++++++++++++ datafusion/spark/src/function/datetime/mod.rs | 16 ++- .../test_files/spark/datetime/date_add.slt | 45 ++++++- .../test_files/spark/datetime/date_sub.slt | 50 +++++++- .../test_files/spark/datetime/dateadd.slt | 36 +++++- 6 files changed, 392 insertions(+), 7 deletions(-) diff --git a/datafusion/spark/src/function/datetime/date_add.rs b/datafusion/spark/src/function/datetime/date_add.rs new file mode 100644 index 0000000000..817faed5a6 --- /dev/null +++ b/datafusion/spark/src/function/datetime/date_add.rs @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::ArrayRef; +use arrow::compute; +use arrow::datatypes::{DataType, Date32Type}; +use datafusion_common::cast::{ + as_date32_array, as_int16_array, as_int32_array, as_int8_array, +}; +use datafusion_common::{internal_err, Result}; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, + Volatility, +}; +use datafusion_functions::utils::make_scalar_function; + +#[derive(Debug)] +pub struct SparkDateAdd { + signature: Signature, + aliases: Vec<String>, +} + +impl Default for SparkDateAdd { + fn default() -> Self { + Self::new() + } +} + +impl SparkDateAdd { + pub fn new() -> Self { + Self { + signature: Signature::one_of( + vec![ + TypeSignature::Exact(vec![DataType::Date32, DataType::Int8]), + TypeSignature::Exact(vec![DataType::Date32, DataType::Int16]), + TypeSignature::Exact(vec![DataType::Date32, DataType::Int32]), + ], + Volatility::Immutable, + ), + aliases: vec!["dateadd".to_string()], + } + } +} + +impl ScalarUDFImpl for SparkDateAdd { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "date_add" + } + + fn aliases(&self) -> &[String] { + &self.aliases + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> { + Ok(DataType::Date32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> { + make_scalar_function(spark_date_add, vec![])(&args.args) + } +} + +fn spark_date_add(args: &[ArrayRef]) -> Result<ArrayRef> { + let [date_arg, days_arg] = args else { + return internal_err!( + "Spark `date_add` function requires 2 arguments, got {}", + args.len() + ); + }; + let date_array = as_date32_array(date_arg)?; + let result = match days_arg.data_type() { + DataType::Int8 => { + let days_array = as_int8_array(days_arg)?; + compute::binary::<_, _, _, Date32Type>( + date_array, + days_array, + |date, days| date + days as i32, + )? + } + DataType::Int16 => { + let days_array = as_int16_array(days_arg)?; + compute::binary::<_, _, _, Date32Type>( + date_array, + days_array, + |date, days| date + days as i32, + )? + } + DataType::Int32 => { + let days_array = as_int32_array(days_arg)?; + compute::binary::<_, _, _, Date32Type>( + date_array, + days_array, + |date, days| date + days, + )? + } + _ => { + return internal_err!( + "Spark `date_add` function: argument must be int8, int16, int32, got {:?}", + days_arg.data_type() + ); + } + }; + Ok(Arc::new(result)) +} diff --git a/datafusion/spark/src/function/datetime/date_sub.rs b/datafusion/spark/src/function/datetime/date_sub.rs new file mode 100644 index 0000000000..d350e94d80 --- /dev/null +++ b/datafusion/spark/src/function/datetime/date_sub.rs @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::ArrayRef; +use arrow::compute; +use arrow::datatypes::{DataType, Date32Type}; +use datafusion_common::cast::{ + as_date32_array, as_int16_array, as_int32_array, as_int8_array, +}; +use datafusion_common::{internal_err, Result}; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, + Volatility, +}; +use datafusion_functions::utils::make_scalar_function; + +#[derive(Debug)] +pub struct SparkDateSub { + signature: Signature, +} + +impl Default for SparkDateSub { + fn default() -> Self { + Self::new() + } +} + +impl SparkDateSub { + pub fn new() -> Self { + Self { + signature: Signature::one_of( + vec![ + TypeSignature::Exact(vec![DataType::Date32, DataType::Int8]), + TypeSignature::Exact(vec![DataType::Date32, DataType::Int16]), + TypeSignature::Exact(vec![DataType::Date32, DataType::Int32]), + ], + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for SparkDateSub { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "date_sub" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> { + Ok(DataType::Date32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> { + make_scalar_function(spark_date_sub, vec![])(&args.args) + } +} + +fn spark_date_sub(args: &[ArrayRef]) -> Result<ArrayRef> { + let [date_arg, days_arg] = args else { + return internal_err!( + "Spark `date_sub` function requires 2 arguments, got {}", + args.len() + ); + }; + let date_array = as_date32_array(date_arg)?; + let result = match days_arg.data_type() { + DataType::Int8 => { + let days_array = as_int8_array(days_arg)?; + compute::binary::<_, _, _, Date32Type>( + date_array, + days_array, + |date, days| date - days as i32, + )? + } + DataType::Int16 => { + let days_array = as_int16_array(days_arg)?; + compute::binary::<_, _, _, Date32Type>( + date_array, + days_array, + |date, days| date - days as i32, + )? + } + DataType::Int32 => { + let days_array = as_int32_array(days_arg)?; + compute::binary::<_, _, _, Date32Type>( + date_array, + days_array, + |date, days| date - days, + )? + } + _ => { + return internal_err!( + "Spark `date_add` function: argument must be int8, int16, int32, got {:?}", + days_arg.data_type() + ); + } + }; + Ok(Arc::new(result)) +} diff --git a/datafusion/spark/src/function/datetime/mod.rs b/datafusion/spark/src/function/datetime/mod.rs index 7d0bfcdde1..0e37284cc6 100644 --- a/datafusion/spark/src/function/datetime/mod.rs +++ b/datafusion/spark/src/function/datetime/mod.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +pub mod date_add; +pub mod date_sub; pub mod last_day; pub mod next_day; @@ -22,12 +24,24 @@ use datafusion_expr::ScalarUDF; use datafusion_functions::make_udf_function; use std::sync::Arc; +make_udf_function!(date_add::SparkDateAdd, date_add); +make_udf_function!(date_sub::SparkDateSub, date_sub); make_udf_function!(last_day::SparkLastDay, last_day); make_udf_function!(next_day::SparkNextDay, next_day); pub mod expr_fn { use datafusion_functions::export_functions; + export_functions!(( + date_add, + "Returns the date that is days days after start. The function returns NULL if at least one of the input parameters is NULL.", + arg1 arg2 + )); + export_functions!(( + date_sub, + "Returns the date that is days days before start. The function returns NULL if at least one of the input parameters is NULL.", + arg1 arg2 + )); export_functions!(( last_day, "Returns the last day of the month which the date belongs to.", @@ -43,5 +57,5 @@ pub mod expr_fn { } pub fn functions() -> Vec<Arc<ScalarUDF>> { - vec![last_day(), next_day()] + vec![date_add(), date_sub(), last_day(), next_day()] } diff --git a/datafusion/sqllogictest/test_files/spark/datetime/date_add.slt b/datafusion/sqllogictest/test_files/spark/datetime/date_add.slt index be0b1c8524..146f970166 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/date_add.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/date_add.slt @@ -23,5 +23,46 @@ ## Original Query: SELECT date_add('2016-07-30', 1); ## PySpark 3.5.5 Result: {'date_add(2016-07-30, 1)': datetime.date(2016, 7, 31), 'typeof(date_add(2016-07-30, 1))': 'date', 'typeof(2016-07-30)': 'string', 'typeof(1)': 'int'} -#query -#SELECT date_add('2016-07-30'::string, 1::int); + +# Basic date_add tests +query D +SELECT date_add('2016-07-30'::date, 1::int); +---- +2016-07-31 + +query D +SELECT date_add('2016-07-30'::date, arrow_cast(1, 'Int8')); +---- +2016-07-31 + +query D +SELECT date_add('2016-07-30'::date, arrow_cast(1, 'Int8')); +---- +2016-07-31 + +query D +SELECT date_sub('2016-07-30'::date, 0::int); +---- +2016-07-30 + +# Test with negative day values (should subtract days) +query D +SELECT date_add('2016-07-30'::date, -5::int); +---- +2016-07-25 + +# Test with NULL values +query D +SELECT date_add(NULL::date, 1::int); +---- +NULL + +query D +SELECT date_add('2016-07-30'::date, NULL::int); +---- +NULL + +query D +SELECT date_add(NULL::date, NULL::int); +---- +NULL diff --git a/datafusion/sqllogictest/test_files/spark/datetime/date_sub.slt b/datafusion/sqllogictest/test_files/spark/datetime/date_sub.slt index 4cc0a91550..cb5e77c3b4 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/date_sub.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/date_sub.slt @@ -23,5 +23,51 @@ ## Original Query: SELECT date_sub('2016-07-30', 1); ## PySpark 3.5.5 Result: {'date_sub(2016-07-30, 1)': datetime.date(2016, 7, 29), 'typeof(date_sub(2016-07-30, 1))': 'date', 'typeof(2016-07-30)': 'string', 'typeof(1)': 'int'} -#query -#SELECT date_sub('2016-07-30'::string, 1::int); + +# Basic date_sub tests +query D +SELECT date_sub('2016-07-30'::date, 1::int); +---- +2016-07-29 + +query D +SELECT date_sub('2016-07-30'::date, arrow_cast(1, 'Int8')); +---- +2016-07-29 + +query D +SELECT date_sub('2016-07-30'::date, arrow_cast(1, 'Int16')); +---- +2016-07-29 + +query D +SELECT date_sub('2016-07-30'::date, 0::int); +---- +2016-07-30 + +# Test with negative day values (should add days) +query D +SELECT date_sub('2016-07-30'::date, -1::int); +---- +2016-07-31 + +query D +SELECT date_sub('2016-07-30'::date, -5::int); +---- +2016-08-04 + +# Test with NULL values +query D +SELECT date_sub(NULL::date, 1::int); +---- +NULL + +query D +SELECT date_sub('2016-07-30'::date, NULL::int); +---- +NULL + +query D +SELECT date_sub(NULL::date, NULL::int); +---- +NULL diff --git a/datafusion/sqllogictest/test_files/spark/datetime/dateadd.slt b/datafusion/sqllogictest/test_files/spark/datetime/dateadd.slt index dd2da11908..c369989616 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/dateadd.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/dateadd.slt @@ -23,5 +23,37 @@ ## Original Query: SELECT dateadd('2016-07-30', 1); ## PySpark 3.5.5 Result: {'date_add(2016-07-30, 1)': datetime.date(2016, 7, 31), 'typeof(date_add(2016-07-30, 1))': 'date', 'typeof(2016-07-30)': 'string', 'typeof(1)': 'int'} -#query -#SELECT dateadd('2016-07-30'::string, 1::int); + +# Basic dateadd tests (alias for date_add) +query D +SELECT dateadd('2016-07-30'::date, 1::int); +---- +2016-07-31 + +query D +SELECT dateadd('2016-07-30'::date, 0::int); +---- +2016-07-30 + +# Test with negative day values (should subtract days) + +query D +SELECT dateadd('2016-07-30'::date, -5::int); +---- +2016-07-25 + +# Test with NULL values +query D +SELECT dateadd(NULL::date, 1::int); +---- +NULL + +query D +SELECT dateadd('2016-07-30'::date, NULL::int); +---- +NULL + +query D +SELECT dateadd(NULL::date, NULL::int); +---- +NULL --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org