This is an automated email from the ASF dual-hosted git repository. ytyou pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new aab44fdaae feat: [datafusion-spark] Implement `next_day` function (#16780) aab44fdaae is described below commit aab44fdaaec28fd13b544ccff239c8b8fe6cee2f Author: Peter Nguyen <petern0...@gmail.com> AuthorDate: Mon Jul 28 19:40:06 2025 -0700 feat: [datafusion-spark] Implement `next_day` function (#16780) * Implement next_day * cargo fmt * Update datafusion/spark/Cargo.toml Co-authored-by: Bruce Ritchie <bruce.ritc...@veeva.com> * PR feedback * Add more tests * Move ansi mode part of doc to a comment and cleanup * cargo fmt * Add test * Remove commentted tests and duplicate tests --------- Co-authored-by: Bruce Ritchie <bruce.ritc...@veeva.com> --- datafusion/spark/src/function/datetime/mod.rs | 11 +- datafusion/spark/src/function/datetime/next_day.rs | 226 +++++++++++++++++++++ .../test_files/spark/datetime/next_day.slt | 62 +++++- 3 files changed, 294 insertions(+), 5 deletions(-) diff --git a/datafusion/spark/src/function/datetime/mod.rs b/datafusion/spark/src/function/datetime/mod.rs index 3bde960ae0..7d0bfcdde1 100644 --- a/datafusion/spark/src/function/datetime/mod.rs +++ b/datafusion/spark/src/function/datetime/mod.rs @@ -16,12 +16,14 @@ // under the License. pub mod last_day; +pub mod next_day; use datafusion_expr::ScalarUDF; use datafusion_functions::make_udf_function; use std::sync::Arc; make_udf_function!(last_day::SparkLastDay, last_day); +make_udf_function!(next_day::SparkNextDay, next_day); pub mod expr_fn { use datafusion_functions::export_functions; @@ -31,8 +33,15 @@ pub mod expr_fn { "Returns the last day of the month which the date belongs to.", arg1 )); + // TODO: add once ANSI support is added: + // "When both of the input parameters are not NULL and day_of_week is an invalid input, the function throws SparkIllegalArgumentException if spark.sql.ansi.enabled is set to true, otherwise NULL." + export_functions!(( + next_day, + "Returns the first date which is later than start_date and named as indicated. The function returns NULL if at least one of the input parameters is NULL.", + arg1 arg2 + )); } pub fn functions() -> Vec<Arc<ScalarUDF>> { - vec![last_day()] + vec![last_day(), next_day()] } diff --git a/datafusion/spark/src/function/datetime/next_day.rs b/datafusion/spark/src/function/datetime/next_day.rs new file mode 100644 index 0000000000..6d7f62ea3a --- /dev/null +++ b/datafusion/spark/src/function/datetime/next_day.rs @@ -0,0 +1,226 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{new_null_array, ArrayRef, AsArray, Date32Array, StringArrayType}; +use arrow::datatypes::{DataType, Date32Type}; +use chrono::{Datelike, Duration, Weekday}; +use datafusion_common::{exec_err, Result, ScalarValue}; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; + +/// <https://spark.apache.org/docs/latest/api/sql/index.html#next_day> +#[derive(Debug)] +pub struct SparkNextDay { + signature: Signature, +} + +impl Default for SparkNextDay { + fn default() -> Self { + Self::new() + } +} + +impl SparkNextDay { + pub fn new() -> Self { + Self { + signature: Signature::exact( + vec![DataType::Date32, DataType::Utf8], + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for SparkNextDay { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "next_day" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> { + Ok(DataType::Date32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> { + let ScalarFunctionArgs { args, .. } = args; + let [date, day_of_week] = args.as_slice() else { + return exec_err!( + "Spark `next_day` function requires 2 arguments, got {}", + args.len() + ); + }; + + match (date, day_of_week) { + (ColumnarValue::Scalar(date), ColumnarValue::Scalar(day_of_week)) => { + match (date, day_of_week) { + (ScalarValue::Date32(days), ScalarValue::Utf8(day_of_week) | ScalarValue::LargeUtf8(day_of_week) | ScalarValue::Utf8View(day_of_week)) => { + if let Some(days) = days { + if let Some(day_of_week) = day_of_week { + Ok(ColumnarValue::Scalar(ScalarValue::Date32( + spark_next_day(*days, day_of_week.as_str()), + ))) + } else { + // TODO: if spark.sql.ansi.enabled is false, + // returns NULL instead of an error for a malformed dayOfWeek. + Ok(ColumnarValue::Scalar(ScalarValue::Date32(None))) + } + } else { + Ok(ColumnarValue::Scalar(ScalarValue::Date32(None))) + } + } + _ => exec_err!("Spark `next_day` function: first arg must be date, second arg must be string. Got {args:?}"), + } + } + (ColumnarValue::Array(date_array), ColumnarValue::Scalar(day_of_week)) => { + match (date_array.data_type(), day_of_week) { + (DataType::Date32, ScalarValue::Utf8(day_of_week) | ScalarValue::LargeUtf8(day_of_week) | ScalarValue::Utf8View(day_of_week)) => { + if let Some(day_of_week) = day_of_week { + let result: Date32Array = date_array + .as_primitive::<Date32Type>() + .unary_opt(|days| spark_next_day(days, day_of_week.as_str())) + .with_data_type(DataType::Date32); + Ok(ColumnarValue::Array(Arc::new(result) as ArrayRef)) + } else { + // TODO: if spark.sql.ansi.enabled is false, + // returns NULL instead of an error for a malformed dayOfWeek. + Ok(ColumnarValue::Array(Arc::new(new_null_array(&DataType::Date32, date_array.len())))) + } + } + _ => exec_err!("Spark `next_day` function: first arg must be date, second arg must be string. Got {args:?}"), + } + } + ( + ColumnarValue::Array(date_array), + ColumnarValue::Array(day_of_week_array), + ) => { + let result = match (date_array.data_type(), day_of_week_array.data_type()) + { + ( + DataType::Date32, + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View, + ) => { + let date_array: &Date32Array = + date_array.as_primitive::<Date32Type>(); + match day_of_week_array.data_type() { + DataType::Utf8 => { + let day_of_week_array = + day_of_week_array.as_string::<i32>(); + process_next_day_arrays(date_array, day_of_week_array) + } + DataType::LargeUtf8 => { + let day_of_week_array = + day_of_week_array.as_string::<i64>(); + process_next_day_arrays(date_array, day_of_week_array) + } + DataType::Utf8View => { + let day_of_week_array = + day_of_week_array.as_string_view(); + process_next_day_arrays(date_array, day_of_week_array) + } + other => { + exec_err!("Spark `next_day` function: second arg must be string. Got {other:?}") + } + } + } + (left, right) => { + exec_err!( + "Spark `next_day` function: first arg must be date, second arg must be string. Got {left:?}, {right:?}" + ) + } + }?; + Ok(ColumnarValue::Array(result)) + } + _ => exec_err!("Unsupported args {args:?} for Spark function `next_day`"), + } + } +} + +fn process_next_day_arrays<'a, S>( + date_array: &Date32Array, + day_of_week_array: &'a S, +) -> Result<ArrayRef> +where + &'a S: StringArrayType<'a>, +{ + let result = date_array + .iter() + .zip(day_of_week_array.iter()) + .map(|(days, day_of_week)| { + if let Some(days) = days { + if let Some(day_of_week) = day_of_week { + spark_next_day(days, day_of_week) + } else { + // TODO: if spark.sql.ansi.enabled is false, + // returns NULL instead of an error for a malformed dayOfWeek. + None + } + } else { + None + } + }) + .collect::<Date32Array>(); + Ok(Arc::new(result) as ArrayRef) +} + +fn spark_next_day(days: i32, day_of_week: &str) -> Option<i32> { + let date = Date32Type::to_naive_date(days); + + let day_of_week = day_of_week.trim().to_uppercase(); + let day_of_week = match day_of_week.as_str() { + "MO" | "MON" | "MONDAY" => Some("MONDAY"), + "TU" | "TUE" | "TUESDAY" => Some("TUESDAY"), + "WE" | "WED" | "WEDNESDAY" => Some("WEDNESDAY"), + "TH" | "THU" | "THURSDAY" => Some("THURSDAY"), + "FR" | "FRI" | "FRIDAY" => Some("FRIDAY"), + "SA" | "SAT" | "SATURDAY" => Some("SATURDAY"), + "SU" | "SUN" | "SUNDAY" => Some("SUNDAY"), + _ => { + // TODO: if spark.sql.ansi.enabled is false, + // returns NULL instead of an error for a malformed dayOfWeek. + None + } + }; + + if let Some(day_of_week) = day_of_week { + let day_of_week = day_of_week.parse::<Weekday>(); + match day_of_week { + Ok(day_of_week) => Some(Date32Type::from_naive_date( + date + Duration::days( + (7 - date.weekday().days_since(day_of_week)) as i64, + ), + )), + Err(_) => { + // TODO: if spark.sql.ansi.enabled is false, + // returns NULL instead of an error for a malformed dayOfWeek. + None + } + } + } else { + None + } +} diff --git a/datafusion/sqllogictest/test_files/spark/datetime/next_day.slt b/datafusion/sqllogictest/test_files/spark/datetime/next_day.slt index ffc7040f47..872d1f2b58 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/next_day.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/next_day.slt @@ -21,7 +21,61 @@ # For more information, please see: # https://github.com/apache/datafusion/issues/15914 -## Original Query: SELECT next_day('2015-01-14', 'TU'); -## PySpark 3.5.5 Result: {'next_day(2015-01-14, TU)': datetime.date(2015, 1, 20), 'typeof(next_day(2015-01-14, TU))': 'date', 'typeof(2015-01-14)': 'string', 'typeof(TU)': 'string'} -#query -#SELECT next_day('2015-01-14'::string, 'TU'::string); +query D +SELECT next_day('2015-01-14'::DATE, 'TU'::string); +---- +2015-01-20 + +query D +SELECT next_day('2015-07-27'::DATE, 'Sun'::string); +---- +2015-08-02 + +query D +SELECT next_day('2015-07-27'::DATE, 'Sat'::string); +---- +2015-08-01 + +query error Failed to coerce arguments to satisfy a call to 'next_day' function +SELECT next_day('2015-07-27'::DATE); + +query error Failed to coerce arguments to satisfy a call to 'next_day' function +SELECT next_day('Sun'::string); + +query error 'next_day' does not support zero arguments +SELECT next_day(); + +query error Failed to coerce arguments to satisfy a call to 'next_day' function +SELECT next_day(1::int, 'Sun'::string); + +query error Failed to coerce arguments to satisfy a call to 'next_day' function +SELECT next_day('2015-07-27'::DATE, 'Sat'::string, 'Sun'::string); + +query error Failed to coerce arguments to satisfy a call to 'next_day' function +SELECT next_day('invalid_date'::string, 'Mon'::string); + +query D +SELECT next_day('2000-01-01'::DATE, 2.0::float); +---- +NULL + +query D +SELECT next_day('2020-01-01'::DATE, 'invalid_day'::string); +---- +NULL + +query error Cast error: Cannot cast string '2015-13-32' to value of Date32 type +SELECT next_day('2015-13-32'::DATE, 'Sun'::string); + +query D +SELECT next_day(a, b) +FROM VALUES + ('2000-01-01'::DATE, 'Mon'::string), + (NULL::DATE, NULL::string), + (NULL::DATE, 'Mon'::string), + ('2015-01-14'::DATE, NULL::string) as t(a, b); +---- +2000-01-03 +NULL +NULL +NULL --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org