This is an automated email from the ASF dual-hosted git repository. mbutrovich pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 8f94c2558 Chore: implement datetime funcs as ScalarUDFImpl (#1874) 8f94c2558 is described below commit 8f94c255861161e44654ccad9c91b6f47ef3ea23 Author: trompa <tro...@gmail.com> AuthorDate: Tue Jun 17 22:29:41 2025 +0200 Chore: implement datetime funcs as ScalarUDFImpl (#1874) --- native/core/src/execution/planner.rs | 39 +++++-- .../src/datetime_funcs/extract_date_part.rs | 104 +++++++++++++++++ native/spark-expr/src/datetime_funcs/hour.rs | 126 --------------------- native/spark-expr/src/datetime_funcs/minute.rs | 126 --------------------- native/spark-expr/src/datetime_funcs/mod.rs | 10 +- native/spark-expr/src/datetime_funcs/second.rs | 126 --------------------- native/spark-expr/src/lib.rs | 2 +- .../org/apache/comet/CometExpressionSuite.scala | 24 ++++ 8 files changed, 161 insertions(+), 396 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 09853b6d4..886e6912d 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -66,7 +66,7 @@ use datafusion::{ }; use datafusion_comet_spark_expr::{ create_comet_physical_fun, create_negate_expr, SparkBitwiseCount, SparkBitwiseNot, - SparkDateTrunc, + SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, }; use crate::execution::operators::ExecutionError::GeneralError; @@ -106,10 +106,9 @@ use datafusion_comet_proto::{ }; use datafusion_comet_spark_expr::{ ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Contains, Correlation, Covariance, - CreateNamedStruct, EndsWith, GetArrayStructFields, GetStructField, HourExpr, IfExpr, Like, - ListExtract, MinuteExpr, NormalizeNaNAndZero, RLike, SecondExpr, SparkCastOptions, StartsWith, - Stddev, StringSpaceExpr, SubstringExpr, SumDecimal, TimestampTruncExpr, ToJson, UnboundColumn, - Variance, + CreateNamedStruct, EndsWith, GetArrayStructFields, GetStructField, IfExpr, Like, ListExtract, + NormalizeNaNAndZero, RLike, SparkCastOptions, StartsWith, Stddev, StringSpaceExpr, + SubstringExpr, SumDecimal, TimestampTruncExpr, ToJson, UnboundColumn, Variance, }; use datafusion_spark::function::math::expm1::SparkExpm1; use itertools::Itertools; @@ -460,22 +459,40 @@ impl PhysicalPlanner { ))) } ExprStruct::Hour(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?; + let child = + self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; let timezone = expr.timezone.clone(); + let args = vec![child]; + let comet_hour = Arc::new(ScalarUDF::new_from_impl(SparkHour::new(timezone))); + let field_ref = Arc::new(Field::new("hour", DataType::Int32, true)); + let expr: ScalarFunctionExpr = + ScalarFunctionExpr::new("hour", comet_hour, args, field_ref); - Ok(Arc::new(HourExpr::new(child, timezone))) + Ok(Arc::new(expr)) } ExprStruct::Minute(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?; + let child = + self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; let timezone = expr.timezone.clone(); + let args = vec![child]; + let comet_minute = Arc::new(ScalarUDF::new_from_impl(SparkMinute::new(timezone))); + let field_ref = Arc::new(Field::new("minute", DataType::Int32, true)); + let expr: ScalarFunctionExpr = + ScalarFunctionExpr::new("minute", comet_minute, args, field_ref); - Ok(Arc::new(MinuteExpr::new(child, timezone))) + Ok(Arc::new(expr)) } ExprStruct::Second(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?; + let child = + self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; let timezone = expr.timezone.clone(); + let args = vec![child]; + let comet_second = Arc::new(ScalarUDF::new_from_impl(SparkSecond::new(timezone))); + let field_ref = Arc::new(Field::new("second", DataType::Int32, true)); + let expr: ScalarFunctionExpr = + ScalarFunctionExpr::new("second", comet_second, args, field_ref); - Ok(Arc::new(SecondExpr::new(child, timezone))) + Ok(Arc::new(expr)) } ExprStruct::TruncTimestamp(expr) => { let child = diff --git a/native/spark-expr/src/datetime_funcs/extract_date_part.rs b/native/spark-expr/src/datetime_funcs/extract_date_part.rs new file mode 100644 index 000000000..4f4debd2b --- /dev/null +++ b/native/spark-expr/src/datetime_funcs/extract_date_part.rs @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::utils::array_with_timezone; +use arrow::compute::{date_part, DatePart}; +use arrow::datatypes::{DataType, TimeUnit::Microsecond}; +use datafusion::common::{internal_datafusion_err, DataFusionError}; +use datafusion::logical_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use std::{any::Any, fmt::Debug}; + +macro_rules! extract_date_part { + ($struct_name:ident, $fn_name:expr, $date_part_variant:ident) => { + #[derive(Debug)] + pub struct $struct_name { + signature: Signature, + aliases: Vec<String>, + timezone: String, + } + + impl $struct_name { + pub fn new(timezone: String) -> Self { + Self { + signature: Signature::user_defined(Volatility::Immutable), + aliases: vec![], + timezone, + } + } + } + + impl ScalarUDFImpl for $struct_name { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + $fn_name + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> datafusion::common::Result<DataType> { + Ok(match &arg_types[0] { + DataType::Dictionary(_, _) => { + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32)) + } + _ => DataType::Int32, + }) + } + + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion::common::Result<ColumnarValue> { + let args: [ColumnarValue; 1] = args.args.try_into().map_err(|_| { + internal_datafusion_err!(concat!($fn_name, " expects exactly one argument")) + })?; + + match args { + [ColumnarValue::Array(array)] => { + let array = array_with_timezone( + array, + self.timezone.clone(), + Some(&DataType::Timestamp( + Microsecond, + Some(self.timezone.clone().into()), + )), + )?; + let result = date_part(&array, DatePart::$date_part_variant)?; + Ok(ColumnarValue::Array(result)) + } + _ => Err(DataFusionError::Execution( + concat!($fn_name, "(scalar) should be fold in Spark JVM side.").to_string(), + )), + } + } + + fn aliases(&self) -> &[String] { + &self.aliases + } + } + }; +} + +extract_date_part!(SparkHour, "hour", Hour); +extract_date_part!(SparkMinute, "minute", Minute); +extract_date_part!(SparkSecond, "second", Second); diff --git a/native/spark-expr/src/datetime_funcs/hour.rs b/native/spark-expr/src/datetime_funcs/hour.rs deleted file mode 100644 index bd89d3f35..000000000 --- a/native/spark-expr/src/datetime_funcs/hour.rs +++ /dev/null @@ -1,126 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::utils::array_with_timezone; -use arrow::datatypes::{DataType, Schema, TimeUnit::Microsecond}; -use arrow::{ - compute::{date_part, DatePart}, - record_batch::RecordBatch, -}; -use datafusion::common::DataFusionError; -use datafusion::logical_expr::ColumnarValue; -use datafusion::physical_expr::PhysicalExpr; -use std::hash::Hash; -use std::{ - any::Any, - fmt::{Debug, Display, Formatter}, - sync::Arc, -}; - -#[derive(Debug, Eq)] -pub struct HourExpr { - /// An array with DataType::Timestamp(TimeUnit::Microsecond, None) - child: Arc<dyn PhysicalExpr>, - timezone: String, -} - -impl Hash for HourExpr { - fn hash<H: std::hash::Hasher>(&self, state: &mut H) { - self.child.hash(state); - self.timezone.hash(state); - } -} -impl PartialEq for HourExpr { - fn eq(&self, other: &Self) -> bool { - self.child.eq(&other.child) && self.timezone.eq(&other.timezone) - } -} - -impl HourExpr { - pub fn new(child: Arc<dyn PhysicalExpr>, timezone: String) -> Self { - HourExpr { child, timezone } - } -} - -impl Display for HourExpr { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "Hour [timezone:{}, child: {}]", - self.timezone, self.child - ) - } -} - -impl PhysicalExpr for HourExpr { - fn as_any(&self) -> &dyn Any { - self - } - - fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result { - unimplemented!() - } - - fn data_type(&self, input_schema: &Schema) -> datafusion::common::Result<DataType> { - match self.child.data_type(input_schema).unwrap() { - DataType::Dictionary(key_type, _) => { - Ok(DataType::Dictionary(key_type, Box::new(DataType::Int32))) - } - _ => Ok(DataType::Int32), - } - } - - fn nullable(&self, _: &Schema) -> datafusion::common::Result<bool> { - Ok(true) - } - - fn evaluate(&self, batch: &RecordBatch) -> datafusion::common::Result<ColumnarValue> { - let arg = self.child.evaluate(batch)?; - match arg { - ColumnarValue::Array(array) => { - let array = array_with_timezone( - array, - self.timezone.clone(), - Some(&DataType::Timestamp( - Microsecond, - Some(self.timezone.clone().into()), - )), - )?; - let result = date_part(&array, DatePart::Hour)?; - - Ok(ColumnarValue::Array(result)) - } - _ => Err(DataFusionError::Execution( - "Hour(scalar) should be fold in Spark JVM side.".to_string(), - )), - } - } - - fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> { - vec![&self.child] - } - - fn with_new_children( - self: Arc<Self>, - children: Vec<Arc<dyn PhysicalExpr>>, - ) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> { - Ok(Arc::new(HourExpr::new( - Arc::clone(&children[0]), - self.timezone.clone(), - ))) - } -} diff --git a/native/spark-expr/src/datetime_funcs/minute.rs b/native/spark-expr/src/datetime_funcs/minute.rs deleted file mode 100644 index 36f845272..000000000 --- a/native/spark-expr/src/datetime_funcs/minute.rs +++ /dev/null @@ -1,126 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::utils::array_with_timezone; -use arrow::datatypes::{DataType, Schema, TimeUnit::Microsecond}; -use arrow::{ - compute::{date_part, DatePart}, - record_batch::RecordBatch, -}; -use datafusion::common::DataFusionError; -use datafusion::logical_expr::ColumnarValue; -use datafusion::physical_expr::PhysicalExpr; -use std::hash::Hash; -use std::{ - any::Any, - fmt::{Debug, Display, Formatter}, - sync::Arc, -}; - -#[derive(Debug, Eq)] -pub struct MinuteExpr { - /// An array with DataType::Timestamp(TimeUnit::Microsecond, None) - child: Arc<dyn PhysicalExpr>, - timezone: String, -} - -impl Hash for MinuteExpr { - fn hash<H: std::hash::Hasher>(&self, state: &mut H) { - self.child.hash(state); - self.timezone.hash(state); - } -} -impl PartialEq for MinuteExpr { - fn eq(&self, other: &Self) -> bool { - self.child.eq(&other.child) && self.timezone.eq(&other.timezone) - } -} - -impl MinuteExpr { - pub fn new(child: Arc<dyn PhysicalExpr>, timezone: String) -> Self { - MinuteExpr { child, timezone } - } -} - -impl Display for MinuteExpr { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "Minute [timezone:{}, child: {}]", - self.timezone, self.child - ) - } -} - -impl PhysicalExpr for MinuteExpr { - fn as_any(&self) -> &dyn Any { - self - } - - fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result { - unimplemented!() - } - - fn data_type(&self, input_schema: &Schema) -> datafusion::common::Result<DataType> { - match self.child.data_type(input_schema).unwrap() { - DataType::Dictionary(key_type, _) => { - Ok(DataType::Dictionary(key_type, Box::new(DataType::Int32))) - } - _ => Ok(DataType::Int32), - } - } - - fn nullable(&self, _: &Schema) -> datafusion::common::Result<bool> { - Ok(true) - } - - fn evaluate(&self, batch: &RecordBatch) -> datafusion::common::Result<ColumnarValue> { - let arg = self.child.evaluate(batch)?; - match arg { - ColumnarValue::Array(array) => { - let array = array_with_timezone( - array, - self.timezone.clone(), - Some(&DataType::Timestamp( - Microsecond, - Some(self.timezone.clone().into()), - )), - )?; - let result = date_part(&array, DatePart::Minute)?; - - Ok(ColumnarValue::Array(result)) - } - _ => Err(DataFusionError::Execution( - "Minute(scalar) should be fold in Spark JVM side.".to_string(), - )), - } - } - - fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> { - vec![&self.child] - } - - fn with_new_children( - self: Arc<Self>, - children: Vec<Arc<dyn PhysicalExpr>>, - ) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> { - Ok(Arc::new(MinuteExpr::new( - Arc::clone(&children[0]), - self.timezone.clone(), - ))) - } -} diff --git a/native/spark-expr/src/datetime_funcs/mod.rs b/native/spark-expr/src/datetime_funcs/mod.rs index e0baa1fce..0ca7bb940 100644 --- a/native/spark-expr/src/datetime_funcs/mod.rs +++ b/native/spark-expr/src/datetime_funcs/mod.rs @@ -17,14 +17,12 @@ mod date_arithmetic; mod date_trunc; -mod hour; -mod minute; -mod second; +mod extract_date_part; mod timestamp_trunc; pub use date_arithmetic::{spark_date_add, spark_date_sub}; pub use date_trunc::SparkDateTrunc; -pub use hour::HourExpr; -pub use minute::MinuteExpr; -pub use second::SecondExpr; +pub use extract_date_part::SparkHour; +pub use extract_date_part::SparkMinute; +pub use extract_date_part::SparkSecond; pub use timestamp_trunc::TimestampTruncExpr; diff --git a/native/spark-expr/src/datetime_funcs/second.rs b/native/spark-expr/src/datetime_funcs/second.rs deleted file mode 100644 index 865e6f09d..000000000 --- a/native/spark-expr/src/datetime_funcs/second.rs +++ /dev/null @@ -1,126 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::utils::array_with_timezone; -use arrow::datatypes::{DataType, Schema, TimeUnit::Microsecond}; -use arrow::{ - compute::{date_part, DatePart}, - record_batch::RecordBatch, -}; -use datafusion::common::DataFusionError; -use datafusion::logical_expr::ColumnarValue; -use datafusion::physical_expr::PhysicalExpr; -use std::hash::Hash; -use std::{ - any::Any, - fmt::{Debug, Display, Formatter}, - sync::Arc, -}; - -#[derive(Debug, Eq)] -pub struct SecondExpr { - /// An array with DataType::Timestamp(TimeUnit::Microsecond, None) - child: Arc<dyn PhysicalExpr>, - timezone: String, -} - -impl Hash for SecondExpr { - fn hash<H: std::hash::Hasher>(&self, state: &mut H) { - self.child.hash(state); - self.timezone.hash(state); - } -} -impl PartialEq for SecondExpr { - fn eq(&self, other: &Self) -> bool { - self.child.eq(&other.child) && self.timezone.eq(&other.timezone) - } -} - -impl SecondExpr { - pub fn new(child: Arc<dyn PhysicalExpr>, timezone: String) -> Self { - SecondExpr { child, timezone } - } -} - -impl Display for SecondExpr { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "Second (timezone:{}, child: {}]", - self.timezone, self.child - ) - } -} - -impl PhysicalExpr for SecondExpr { - fn as_any(&self) -> &dyn Any { - self - } - - fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result { - unimplemented!() - } - - fn data_type(&self, input_schema: &Schema) -> datafusion::common::Result<DataType> { - match self.child.data_type(input_schema).unwrap() { - DataType::Dictionary(key_type, _) => { - Ok(DataType::Dictionary(key_type, Box::new(DataType::Int32))) - } - _ => Ok(DataType::Int32), - } - } - - fn nullable(&self, _: &Schema) -> datafusion::common::Result<bool> { - Ok(true) - } - - fn evaluate(&self, batch: &RecordBatch) -> datafusion::common::Result<ColumnarValue> { - let arg = self.child.evaluate(batch)?; - match arg { - ColumnarValue::Array(array) => { - let array = array_with_timezone( - array, - self.timezone.clone(), - Some(&DataType::Timestamp( - Microsecond, - Some(self.timezone.clone().into()), - )), - )?; - let result = date_part(&array, DatePart::Second)?; - - Ok(ColumnarValue::Array(result)) - } - _ => Err(DataFusionError::Execution( - "Second(scalar) should be fold in Spark JVM side.".to_string(), - )), - } - } - - fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> { - vec![&self.child] - } - - fn with_new_children( - self: Arc<Self>, - children: Vec<Arc<dyn PhysicalExpr>>, - ) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> { - Ok(Arc::new(SecondExpr::new( - Arc::clone(&children[0]), - self.timezone.clone(), - ))) - } -} diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index c2aac93e2..f43fde9f4 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -60,7 +60,7 @@ pub use conversion_funcs::*; pub use comet_scalar_funcs::create_comet_physical_fun; pub use datetime_funcs::{ - spark_date_add, spark_date_sub, HourExpr, MinuteExpr, SecondExpr, SparkDateTrunc, + spark_date_add, spark_date_sub, SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, TimestampTruncExpr, }; pub use error::{SparkError, SparkResult}; diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index c5451e27f..606c1a525 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -31,6 +31,7 @@ import org.scalatest.Tag import org.apache.hadoop.fs.Path import org.apache.spark.sql.{CometTestBase, DataFrame, Row} +import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometProjectExec, CometWindowExec} import org.apache.spark.sql.execution.{InputAdapter, ProjectExec, WholeStageCodegenExec} @@ -490,6 +491,29 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("time expressions folded on jvm") { + val ts = "1969-12-31 16:23:45" + + val functions = Map("hour" -> 16, "minute" -> 23, "second" -> 45) + + functions.foreach { case (func, expectedValue) => + val query = s"SELECT $func('$ts') AS result" + val df = spark.sql(query) + val optimizedPlan = df.queryExecution.optimizedPlan + + val isFolded = optimizedPlan.expressions.exists { + case alias: Alias => + alias.child match { + case Literal(value, _) => value == expectedValue + case _ => false + } + case _ => false + } + + assert(isFolded, s"Expected '$func(...)' to be constant-folded to Literal($expectedValue)") + } + } + test("hour on int96 timestamp column") { import testImplicits._ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org