This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 267ad4c56 feat: add support for datediff expression (#3145)
267ad4c56 is described below
commit 267ad4c56028919cd9d8bc60b4302eb08b918549
Author: Andy Grove <[email protected]>
AuthorDate: Tue Jan 20 18:57:56 2026 -0700
feat: add support for datediff expression (#3145)
---
docs/source/user-guide/latest/configs.md | 1 +
native/spark-expr/src/comet_scalar_funcs.rs | 5 +-
native/spark-expr/src/datetime_funcs/date_diff.rs | 104 +++++++++++++++++++++
native/spark-expr/src/datetime_funcs/mod.rs | 2 +
native/spark-expr/src/lib.rs | 4 +-
.../org/apache/comet/serde/QueryPlanSerde.scala | 1 +
.../scala/org/apache/comet/serde/datetime.scala | 4 +-
.../comet/CometTemporalExpressionSuite.scala | 49 ++++++++++
8 files changed, 166 insertions(+), 4 deletions(-)
diff --git a/docs/source/user-guide/latest/configs.md
b/docs/source/user-guide/latest/configs.md
index 5eea5c4e5..d2fbca346 100644
--- a/docs/source/user-guide/latest/configs.md
+++ b/docs/source/user-guide/latest/configs.md
@@ -234,6 +234,7 @@ These settings can be used to determine which parts of the
plan are accelerated
| `spark.comet.expression.CreateArray.enabled` | Enable Comet acceleration for
`CreateArray` | true |
| `spark.comet.expression.CreateNamedStruct.enabled` | Enable Comet
acceleration for `CreateNamedStruct` | true |
| `spark.comet.expression.DateAdd.enabled` | Enable Comet acceleration for
`DateAdd` | true |
+| `spark.comet.expression.DateDiff.enabled` | Enable Comet acceleration for
`DateDiff` | true |
| `spark.comet.expression.DateFormatClass.enabled` | Enable Comet acceleration
for `DateFormatClass` | true |
| `spark.comet.expression.DateSub.enabled` | Enable Comet acceleration for
`DateSub` | true |
| `spark.comet.expression.DayOfMonth.enabled` | Enable Comet acceleration for
`DayOfMonth` | true |
diff --git a/native/spark-expr/src/comet_scalar_funcs.rs
b/native/spark-expr/src/comet_scalar_funcs.rs
index 8384a4646..760dc3570 100644
--- a/native/spark-expr/src/comet_scalar_funcs.rs
+++ b/native/spark-expr/src/comet_scalar_funcs.rs
@@ -22,8 +22,8 @@ use crate::math_funcs::modulo_expr::spark_modulo;
use crate::{
spark_array_repeat, spark_ceil, spark_decimal_div,
spark_decimal_integral_div, spark_floor,
spark_isnan, spark_lpad, spark_make_decimal, spark_read_side_padding,
spark_round, spark_rpad,
- spark_unhex, spark_unscaled_value, EvalMode, SparkBitwiseCount,
SparkDateTrunc, SparkSizeFunc,
- SparkStringSpace,
+ spark_unhex, spark_unscaled_value, EvalMode, SparkBitwiseCount,
SparkDateDiff, SparkDateTrunc,
+ SparkSizeFunc, SparkStringSpace,
};
use arrow::datatypes::DataType;
use datafusion::common::{DataFusionError, Result as DataFusionResult};
@@ -192,6 +192,7 @@ pub fn create_comet_physical_fun_with_eval_mode(
fn all_scalar_functions() -> Vec<Arc<ScalarUDF>> {
vec![
Arc::new(ScalarUDF::new_from_impl(SparkBitwiseCount::default())),
+ Arc::new(ScalarUDF::new_from_impl(SparkDateDiff::default())),
Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())),
Arc::new(ScalarUDF::new_from_impl(SparkStringSpace::default())),
Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())),
diff --git a/native/spark-expr/src/datetime_funcs/date_diff.rs
b/native/spark-expr/src/datetime_funcs/date_diff.rs
new file mode 100644
index 000000000..6a593f0f8
--- /dev/null
+++ b/native/spark-expr/src/datetime_funcs/date_diff.rs
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{Array, Date32Array, Int32Array};
+use arrow::compute::kernels::arity::binary;
+use arrow::datatypes::DataType;
+use datafusion::common::{utils::take_function_args, DataFusionError, Result};
+use datafusion::logical_expr::{
+ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
+};
+use std::any::Any;
+use std::sync::Arc;
+
+/// Spark-compatible date_diff function.
+/// Returns the number of days from startDate to endDate (endDate - startDate).
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct SparkDateDiff {
+ signature: Signature,
+ aliases: Vec<String>,
+}
+
+impl SparkDateDiff {
+ pub fn new() -> Self {
+ Self {
+ signature: Signature::exact(
+ vec![DataType::Date32, DataType::Date32],
+ Volatility::Immutable,
+ ),
+ aliases: vec!["datediff".to_string()],
+ }
+ }
+}
+
+impl Default for SparkDateDiff {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl ScalarUDFImpl for SparkDateDiff {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "date_diff"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, _: &[DataType]) -> Result<DataType> {
+ Ok(DataType::Int32)
+ }
+
+ fn invoke_with_args(&self, args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
+ let [end_date, start_date] = take_function_args(self.name(),
args.args)?;
+
+ // Convert scalars to arrays for uniform processing
+ let end_arr = end_date.into_array(1)?;
+ let start_arr = start_date.into_array(1)?;
+
+ let end_date_array = end_arr
+ .as_any()
+ .downcast_ref::<Date32Array>()
+ .ok_or_else(|| {
+ DataFusionError::Execution("date_diff expects Date32Array for
end_date".to_string())
+ })?;
+
+ let start_date_array = start_arr
+ .as_any()
+ .downcast_ref::<Date32Array>()
+ .ok_or_else(|| {
+ DataFusionError::Execution(
+ "date_diff expects Date32Array for start_date".to_string(),
+ )
+ })?;
+
+ // Date32 stores days since epoch, so difference is just subtraction
+ let result: Int32Array =
+ binary(end_date_array, start_date_array, |end, start| end -
start)?;
+
+ Ok(ColumnarValue::Array(Arc::new(result)))
+ }
+
+ fn aliases(&self) -> &[String] {
+ &self.aliases
+ }
+}
diff --git a/native/spark-expr/src/datetime_funcs/mod.rs
b/native/spark-expr/src/datetime_funcs/mod.rs
index ef8041e5f..c984e3a38 100644
--- a/native/spark-expr/src/datetime_funcs/mod.rs
+++ b/native/spark-expr/src/datetime_funcs/mod.rs
@@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.
+mod date_diff;
mod date_trunc;
mod extract_date_part;
mod timestamp_trunc;
+pub use date_diff::SparkDateDiff;
pub use date_trunc::SparkDateTrunc;
pub use extract_date_part::SparkHour;
pub use extract_date_part::SparkMinute;
diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs
index f26fd911d..7f6d0b08a 100644
--- a/native/spark-expr/src/lib.rs
+++ b/native/spark-expr/src/lib.rs
@@ -69,7 +69,9 @@ pub use comet_scalar_funcs::{
create_comet_physical_fun, create_comet_physical_fun_with_eval_mode,
register_all_comet_functions,
};
-pub use datetime_funcs::{SparkDateTrunc, SparkHour, SparkMinute, SparkSecond,
TimestampTruncExpr};
+pub use datetime_funcs::{
+ SparkDateDiff, SparkDateTrunc, SparkHour, SparkMinute, SparkSecond,
TimestampTruncExpr,
+};
pub use error::{SparkError, SparkResult};
pub use hash_funcs::*;
pub use json_funcs::{FromJson, ToJson};
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 3569559df..abc76aa2e 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -185,6 +185,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
private val temporalExpressions: Map[Class[_ <: Expression],
CometExpressionSerde[_]] = Map(
classOf[DateAdd] -> CometDateAdd,
+ classOf[DateDiff] -> CometDateDiff,
classOf[DateFormatClass] -> CometDateFormat,
classOf[DateSub] -> CometDateSub,
classOf[UnixDate] -> CometUnixDate,
diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala
b/spark/src/main/scala/org/apache/comet/serde/datetime.scala
index b191e8721..066c90c0f 100644
--- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala
@@ -21,7 +21,7 @@ package org.apache.comet.serde
import java.util.Locale
-import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd,
DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour,
Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, UnixDate,
WeekDay, WeekOfYear, Year}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd,
DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear,
GetDateField, Hour, Literal, Minute, Month, Quarter, Second, TruncDate,
TruncTimestamp, UnixDate, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.types.{DateType, IntegerType, StringType}
import org.apache.spark.unsafe.types.UTF8String
@@ -258,6 +258,8 @@ object CometDateAdd extends
CometScalarFunction[DateAdd]("date_add")
object CometDateSub extends CometScalarFunction[DateSub]("date_sub")
+object CometDateDiff extends CometScalarFunction[DateDiff]("date_diff")
+
/**
* Converts a date to the number of days since Unix epoch (1970-01-01). Since
dates are internally
* stored as days since epoch, this is a simple cast to integer.
diff --git
a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala
index 35976ffa9..0ed33bb9b 100644
--- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala
@@ -123,6 +123,55 @@ class CometTemporalExpressionSuite extends CometTestBase
with AdaptiveSparkPlanH
FuzzDataGenerator.generateDataFrame(r, spark, schema, 1000,
DataGenOptions())
}
+ test("datediff") {
+ val r = new Random(42)
+ val schema = StructType(
+ Seq(
+ StructField("c0", DataTypes.DateType, true),
+ StructField("c1", DataTypes.DateType, true)))
+ val df = FuzzDataGenerator.generateDataFrame(r, spark, schema, 1000,
DataGenOptions())
+ df.createOrReplaceTempView("tbl")
+
+ // Basic test with random dates
+ checkSparkAnswerAndOperator("SELECT c0, c1, datediff(c0, c1) FROM tbl
ORDER BY c0, c1")
+
+ // Disable constant folding to ensure literal expressions are executed by
Comet
+ withSQLConf(
+ SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
+ "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") {
+ // Test positive difference (end date > start date)
+ checkSparkAnswerAndOperator("SELECT datediff(DATE('2009-07-31'),
DATE('2009-07-30'))")
+
+ // Test negative difference (end date < start date)
+ checkSparkAnswerAndOperator("SELECT datediff(DATE('2009-07-30'),
DATE('2009-07-31'))")
+
+ // Test same dates (should be 0)
+ checkSparkAnswerAndOperator("SELECT datediff(DATE('2009-07-30'),
DATE('2009-07-30'))")
+
+ // Test larger date differences
+ checkSparkAnswerAndOperator("SELECT datediff(DATE('2024-01-01'),
DATE('2020-01-01'))")
+
+ // Test null handling
+ checkSparkAnswerAndOperator("SELECT datediff(NULL, DATE('2009-07-30'))")
+ checkSparkAnswerAndOperator("SELECT datediff(DATE('2009-07-30'), NULL)")
+
+ // Test leap year edge cases
+ // 1900 was NOT a leap year (divisible by 100 but not 400)
+ // 2000 WAS a leap year (divisible by 400)
+ // So Feb 27 to Mar 1 spans different number of days:
+ // 1900: 2 days (Feb 28, Mar 1)
+ // 2000: 3 days (Feb 28, Feb 29, Mar 1)
+ checkSparkAnswerAndOperator("SELECT datediff(DATE('1900-03-01'),
DATE('1900-02-27'))")
+ checkSparkAnswerAndOperator("SELECT datediff(DATE('2000-03-01'),
DATE('2000-02-27'))")
+
+ // Additional leap year tests
+ // 2004 was a leap year (divisible by 4, not by 100)
+ checkSparkAnswerAndOperator("SELECT datediff(DATE('2004-03-01'),
DATE('2004-02-28'))")
+ // 2100 will NOT be a leap year (divisible by 100 but not 400)
+ checkSparkAnswerAndOperator("SELECT datediff(DATE('2100-03-01'),
DATE('2100-02-28'))")
+ }
+ }
+
test("date_format with timestamp column") {
// Filter out formats with embedded quotes that need special handling
val supportedFormats = CometDateFormat.supportedFormats.keys.toSeq
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]