This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new e4a014290 feat: Add support for `unix_timestamp` function (#2936)
e4a014290 is described below
commit e4a014290caa924ee7426e8885d3af8b0eed693d
Author: Andy Grove <[email protected]>
AuthorDate: Tue Jan 20 21:08:27 2026 -0700
feat: Add support for `unix_timestamp` function (#2936)
---
docs/source/user-guide/latest/configs.md | 1 +
docs/spark_expressions_support.md | 2 +-
native/core/src/execution/expressions/mod.rs | 1 +
native/core/src/execution/expressions/temporal.rs | 162 ++++++++++++++
native/core/src/execution/planner.rs | 65 +-----
.../src/execution/planner/expression_registry.rs | 27 ++-
native/proto/src/proto/expr.proto | 8 +-
native/spark-expr/src/datetime_funcs/mod.rs | 2 +
.../src/datetime_funcs/unix_timestamp.rs | 242 +++++++++++++++++++++
native/spark-expr/src/lib.rs | 3 +-
.../org/apache/comet/serde/QueryPlanSerde.scala | 1 +
.../scala/org/apache/comet/serde/datetime.scala | 56 ++++-
.../comet/CometTemporalExpressionSuite.scala | 61 +++++-
.../CometDatetimeExpressionBenchmark.scala | 49 ++++-
14 files changed, 605 insertions(+), 75 deletions(-)
diff --git a/docs/source/user-guide/latest/configs.md
b/docs/source/user-guide/latest/configs.md
index d2fbca346..096521b97 100644
--- a/docs/source/user-guide/latest/configs.md
+++ b/docs/source/user-guide/latest/configs.md
@@ -336,6 +336,7 @@ These settings can be used to determine which parts of the
plan are accelerated
| `spark.comet.expression.UnaryMinus.enabled` | Enable Comet acceleration for
`UnaryMinus` | true |
| `spark.comet.expression.Unhex.enabled` | Enable Comet acceleration for
`Unhex` | true |
| `spark.comet.expression.UnixDate.enabled` | Enable Comet acceleration for
`UnixDate` | true |
+| `spark.comet.expression.UnixTimestamp.enabled` | Enable Comet acceleration
for `UnixTimestamp` | true |
| `spark.comet.expression.UnscaledValue.enabled` | Enable Comet acceleration
for `UnscaledValue` | true |
| `spark.comet.expression.Upper.enabled` | Enable Comet acceleration for
`Upper` | true |
| `spark.comet.expression.WeekDay.enabled` | Enable Comet acceleration for
`WeekDay` | true |
diff --git a/docs/spark_expressions_support.md
b/docs/spark_expressions_support.md
index 67eb519ea..fa6b3a43f 100644
--- a/docs/spark_expressions_support.md
+++ b/docs/spark_expressions_support.md
@@ -217,7 +217,7 @@
- [ ] unix_micros
- [ ] unix_millis
- [ ] unix_seconds
-- [ ] unix_timestamp
+- [x] unix_timestamp
- [ ] weekday
- [ ] weekofyear
- [ ] year
diff --git a/native/core/src/execution/expressions/mod.rs
b/native/core/src/execution/expressions/mod.rs
index a06b41b2c..563d62e91 100644
--- a/native/core/src/execution/expressions/mod.rs
+++ b/native/core/src/execution/expressions/mod.rs
@@ -24,6 +24,7 @@ pub mod logical;
pub mod nullcheck;
pub mod strings;
pub mod subquery;
+pub mod temporal;
pub use datafusion_comet_spark_expr::EvalMode;
diff --git a/native/core/src/execution/expressions/temporal.rs
b/native/core/src/execution/expressions/temporal.rs
new file mode 100644
index 000000000..ae57cd3b2
--- /dev/null
+++ b/native/core/src/execution/expressions/temporal.rs
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Temporal expression builders
+
+use std::sync::Arc;
+
+use arrow::datatypes::{DataType, Field, SchemaRef};
+use datafusion::config::ConfigOptions;
+use datafusion::logical_expr::ScalarUDF;
+use datafusion::physical_expr::{PhysicalExpr, ScalarFunctionExpr};
+use datafusion_comet_proto::spark_expression::Expr;
+use datafusion_comet_spark_expr::{
+ SparkHour, SparkMinute, SparkSecond, SparkUnixTimestamp,
TimestampTruncExpr,
+};
+
+use crate::execution::{
+ expressions::extract_expr,
+ operators::ExecutionError,
+ planner::{expression_registry::ExpressionBuilder, PhysicalPlanner},
+};
+
+pub struct HourBuilder;
+
+impl ExpressionBuilder for HourBuilder {
+ fn build(
+ &self,
+ spark_expr: &Expr,
+ input_schema: SchemaRef,
+ planner: &PhysicalPlanner,
+ ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
+ let expr = extract_expr!(spark_expr, Hour);
+ let child = planner.create_expr(expr.child.as_ref().unwrap(),
Arc::clone(&input_schema))?;
+ let timezone = expr.timezone.clone();
+ let args = vec![child];
+ let comet_hour =
Arc::new(ScalarUDF::new_from_impl(SparkHour::new(timezone)));
+ let field_ref = Arc::new(Field::new("hour", DataType::Int32, true));
+ let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
+ "hour",
+ comet_hour,
+ args,
+ field_ref,
+ Arc::new(ConfigOptions::default()),
+ );
+
+ Ok(Arc::new(expr))
+ }
+}
+
+pub struct MinuteBuilder;
+
+impl ExpressionBuilder for MinuteBuilder {
+ fn build(
+ &self,
+ spark_expr: &Expr,
+ input_schema: SchemaRef,
+ planner: &PhysicalPlanner,
+ ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
+ let expr = extract_expr!(spark_expr, Minute);
+ let child = planner.create_expr(expr.child.as_ref().unwrap(),
Arc::clone(&input_schema))?;
+ let timezone = expr.timezone.clone();
+ let args = vec![child];
+ let comet_minute =
Arc::new(ScalarUDF::new_from_impl(SparkMinute::new(timezone)));
+ let field_ref = Arc::new(Field::new("minute", DataType::Int32, true));
+ let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
+ "minute",
+ comet_minute,
+ args,
+ field_ref,
+ Arc::new(ConfigOptions::default()),
+ );
+
+ Ok(Arc::new(expr))
+ }
+}
+
+pub struct SecondBuilder;
+
+impl ExpressionBuilder for SecondBuilder {
+ fn build(
+ &self,
+ spark_expr: &Expr,
+ input_schema: SchemaRef,
+ planner: &PhysicalPlanner,
+ ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
+ let expr = extract_expr!(spark_expr, Second);
+ let child = planner.create_expr(expr.child.as_ref().unwrap(),
Arc::clone(&input_schema))?;
+ let timezone = expr.timezone.clone();
+ let args = vec![child];
+ let comet_second =
Arc::new(ScalarUDF::new_from_impl(SparkSecond::new(timezone)));
+ let field_ref = Arc::new(Field::new("second", DataType::Int32, true));
+ let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
+ "second",
+ comet_second,
+ args,
+ field_ref,
+ Arc::new(ConfigOptions::default()),
+ );
+
+ Ok(Arc::new(expr))
+ }
+}
+
+pub struct UnixTimestampBuilder;
+
+impl ExpressionBuilder for UnixTimestampBuilder {
+ fn build(
+ &self,
+ spark_expr: &Expr,
+ input_schema: SchemaRef,
+ planner: &PhysicalPlanner,
+ ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
+ let expr = extract_expr!(spark_expr, UnixTimestamp);
+ let child = planner.create_expr(expr.child.as_ref().unwrap(),
Arc::clone(&input_schema))?;
+ let timezone = expr.timezone.clone();
+ let args = vec![child];
+ let comet_unix_timestamp =
+
Arc::new(ScalarUDF::new_from_impl(SparkUnixTimestamp::new(timezone)));
+ let field_ref = Arc::new(Field::new("unix_timestamp", DataType::Int64,
true));
+ let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
+ "unix_timestamp",
+ comet_unix_timestamp,
+ args,
+ field_ref,
+ Arc::new(ConfigOptions::default()),
+ );
+
+ Ok(Arc::new(expr))
+ }
+}
+
+pub struct TruncTimestampBuilder;
+
+impl ExpressionBuilder for TruncTimestampBuilder {
+ fn build(
+ &self,
+ spark_expr: &Expr,
+ input_schema: SchemaRef,
+ planner: &PhysicalPlanner,
+ ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
+ let expr = extract_expr!(spark_expr, TruncTimestamp);
+ let child = planner.create_expr(expr.child.as_ref().unwrap(),
Arc::clone(&input_schema))?;
+ let format = planner.create_expr(expr.format.as_ref().unwrap(),
input_schema)?;
+ let timezone = expr.timezone.clone();
+
+ Ok(Arc::new(TimestampTruncExpr::new(child, format, timezone)))
+ }
+}
diff --git a/native/core/src/execution/planner.rs
b/native/core/src/execution/planner.rs
index b13fafe45..2f1f1f32b 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -70,8 +70,7 @@ use datafusion::{
};
use datafusion_comet_spark_expr::{
create_comet_physical_fun, create_comet_physical_fun_with_eval_mode,
BinaryOutputStyle,
- BloomFilterAgg, BloomFilterMightContain, EvalMode, SparkHour, SparkMinute,
SparkSecond,
- SumInteger,
+ BloomFilterAgg, BloomFilterMightContain, EvalMode, SumInteger,
};
use iceberg::expr::Bind;
@@ -126,8 +125,7 @@ use
datafusion_comet_spark_expr::monotonically_increasing_id::MonotonicallyIncre
use datafusion_comet_spark_expr::{
ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Correlation,
Covariance, CreateNamedStruct,
GetArrayStructFields, GetStructField, IfExpr, ListExtract,
NormalizeNaNAndZero, RandExpr,
- RandnExpr, SparkCastOptions, Stddev, SumDecimal, TimestampTruncExpr,
ToJson, UnboundColumn,
- Variance,
+ RandnExpr, SparkCastOptions, Stddev, SumDecimal, ToJson, UnboundColumn,
Variance,
};
use itertools::Itertools;
use jni::objects::GlobalRef;
@@ -375,65 +373,6 @@ impl PhysicalPlanner {
SparkCastOptions::new(eval_mode, &expr.timezone,
expr.allow_incompat),
)))
}
- ExprStruct::Hour(expr) => {
- let child =
- self.create_expr(expr.child.as_ref().unwrap(),
Arc::clone(&input_schema))?;
- let timezone = expr.timezone.clone();
- let args = vec![child];
- let comet_hour =
Arc::new(ScalarUDF::new_from_impl(SparkHour::new(timezone)));
- let field_ref = Arc::new(Field::new("hour", DataType::Int32,
true));
- let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
- "hour",
- comet_hour,
- args,
- field_ref,
- Arc::new(ConfigOptions::default()),
- );
-
- Ok(Arc::new(expr))
- }
- ExprStruct::Minute(expr) => {
- let child =
- self.create_expr(expr.child.as_ref().unwrap(),
Arc::clone(&input_schema))?;
- let timezone = expr.timezone.clone();
- let args = vec![child];
- let comet_minute =
Arc::new(ScalarUDF::new_from_impl(SparkMinute::new(timezone)));
- let field_ref = Arc::new(Field::new("minute", DataType::Int32,
true));
- let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
- "minute",
- comet_minute,
- args,
- field_ref,
- Arc::new(ConfigOptions::default()),
- );
-
- Ok(Arc::new(expr))
- }
- ExprStruct::Second(expr) => {
- let child =
- self.create_expr(expr.child.as_ref().unwrap(),
Arc::clone(&input_schema))?;
- let timezone = expr.timezone.clone();
- let args = vec![child];
- let comet_second =
Arc::new(ScalarUDF::new_from_impl(SparkSecond::new(timezone)));
- let field_ref = Arc::new(Field::new("second", DataType::Int32,
true));
- let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
- "second",
- comet_second,
- args,
- field_ref,
- Arc::new(ConfigOptions::default()),
- );
-
- Ok(Arc::new(expr))
- }
- ExprStruct::TruncTimestamp(expr) => {
- let child =
- self.create_expr(expr.child.as_ref().unwrap(),
Arc::clone(&input_schema))?;
- let format = self.create_expr(expr.format.as_ref().unwrap(),
input_schema)?;
- let timezone = expr.timezone.clone();
-
- Ok(Arc::new(TimestampTruncExpr::new(child, format, timezone)))
- }
ExprStruct::CheckOverflow(expr) => {
let child = self.create_expr(expr.child.as_ref().unwrap(),
input_schema)?;
let data_type =
to_arrow_datatype(expr.datatype.as_ref().unwrap());
diff --git a/native/core/src/execution/planner/expression_registry.rs
b/native/core/src/execution/planner/expression_registry.rs
index e85fbe510..34aa3de17 100644
--- a/native/core/src/execution/planner/expression_registry.rs
+++ b/native/core/src/execution/planner/expression_registry.rs
@@ -109,6 +109,7 @@ pub enum ExpressionType {
Minute,
Second,
TruncTimestamp,
+ UnixTimestamp,
}
/// Registry for expression builders
@@ -181,9 +182,8 @@ impl ExpressionRegistry {
// Register string expressions
self.register_string_expressions();
- // TODO: Register other expression categories in future phases
- // self.register_temporal_expressions();
- // etc.
+ // Register temporal expressions
+ self.register_temporal_expressions();
}
/// Register arithmetic expression builders
@@ -286,6 +286,26 @@ impl ExpressionRegistry {
.insert(ExpressionType::FromJson, Box::new(FromJsonBuilder));
}
+ /// Register temporal expression builders
+ fn register_temporal_expressions(&mut self) {
+ use crate::execution::expressions::temporal::*;
+
+ self.builders
+ .insert(ExpressionType::Hour, Box::new(HourBuilder));
+ self.builders
+ .insert(ExpressionType::Minute, Box::new(MinuteBuilder));
+ self.builders
+ .insert(ExpressionType::Second, Box::new(SecondBuilder));
+ self.builders.insert(
+ ExpressionType::UnixTimestamp,
+ Box::new(UnixTimestampBuilder),
+ );
+ self.builders.insert(
+ ExpressionType::TruncTimestamp,
+ Box::new(TruncTimestampBuilder),
+ );
+ }
+
/// Extract expression type from Spark protobuf expression
fn get_expression_type(spark_expr: &Expr) -> Result<ExpressionType,
ExecutionError> {
match spark_expr.expr_struct.as_ref() {
@@ -355,6 +375,7 @@ impl ExpressionRegistry {
Some(ExprStruct::Minute(_)) => Ok(ExpressionType::Minute),
Some(ExprStruct::Second(_)) => Ok(ExpressionType::Second),
Some(ExprStruct::TruncTimestamp(_)) =>
Ok(ExpressionType::TruncTimestamp),
+ Some(ExprStruct::UnixTimestamp(_)) =>
Ok(ExpressionType::UnixTimestamp),
Some(other) => Err(ExecutionError::GeneralError(format!(
"Unsupported expression type: {:?}",
diff --git a/native/proto/src/proto/expr.proto
b/native/proto/src/proto/expr.proto
index 5f258fd67..e2d0ca1c0 100644
--- a/native/proto/src/proto/expr.proto
+++ b/native/proto/src/proto/expr.proto
@@ -85,7 +85,8 @@ message Expr {
Rand randn = 62;
EmptyExpr spark_partition_id = 63;
EmptyExpr monotonically_increasing_id = 64;
- FromJson from_json = 89;
+ UnixTimestamp unix_timestamp = 65;
+ FromJson from_json = 66;
}
}
@@ -304,6 +305,11 @@ message Second {
string timezone = 2;
}
+message UnixTimestamp {
+ Expr child = 1;
+ string timezone = 2;
+}
+
message CheckOverflow {
Expr child = 1;
DataType datatype = 2;
diff --git a/native/spark-expr/src/datetime_funcs/mod.rs
b/native/spark-expr/src/datetime_funcs/mod.rs
index c984e3a38..183271147 100644
--- a/native/spark-expr/src/datetime_funcs/mod.rs
+++ b/native/spark-expr/src/datetime_funcs/mod.rs
@@ -19,6 +19,7 @@ mod date_diff;
mod date_trunc;
mod extract_date_part;
mod timestamp_trunc;
+mod unix_timestamp;
pub use date_diff::SparkDateDiff;
pub use date_trunc::SparkDateTrunc;
@@ -26,3 +27,4 @@ pub use extract_date_part::SparkHour;
pub use extract_date_part::SparkMinute;
pub use extract_date_part::SparkSecond;
pub use timestamp_trunc::TimestampTruncExpr;
+pub use unix_timestamp::SparkUnixTimestamp;
diff --git a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs
b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs
new file mode 100644
index 000000000..c4f157629
--- /dev/null
+++ b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::utils::array_with_timezone;
+use arrow::array::{Array, AsArray, PrimitiveArray};
+use arrow::compute::cast;
+use arrow::datatypes::{DataType, Int64Type, TimeUnit::Microsecond};
+use datafusion::common::{internal_datafusion_err, DataFusionError};
+use datafusion::logical_expr::{
+ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
+};
+use num::integer::div_floor;
+use std::{any::Any, fmt::Debug, sync::Arc};
+
+const MICROS_PER_SECOND: i64 = 1_000_000;
+
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct SparkUnixTimestamp {
+ signature: Signature,
+ aliases: Vec<String>,
+ timezone: String,
+}
+
+impl SparkUnixTimestamp {
+ pub fn new(timezone: String) -> Self {
+ Self {
+ signature: Signature::user_defined(Volatility::Immutable),
+ aliases: vec![],
+ timezone,
+ }
+ }
+}
+
+impl ScalarUDFImpl for SparkUnixTimestamp {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "unix_timestamp"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, arg_types: &[DataType]) ->
datafusion::common::Result<DataType> {
+ Ok(match &arg_types[0] {
+ DataType::Dictionary(_, _) => {
+ DataType::Dictionary(Box::new(DataType::Int32),
Box::new(DataType::Int64))
+ }
+ _ => DataType::Int64,
+ })
+ }
+
+ fn invoke_with_args(
+ &self,
+ args: ScalarFunctionArgs,
+ ) -> datafusion::common::Result<ColumnarValue> {
+ let args: [ColumnarValue; 1] = args
+ .args
+ .try_into()
+ .map_err(|_| internal_datafusion_err!("unix_timestamp expects
exactly one argument"))?;
+
+ match args {
+ [ColumnarValue::Array(array)] => match array.data_type() {
+ DataType::Timestamp(_, _) => {
+ let is_utc = self.timezone == "UTC";
+ let array = if is_utc
+ && matches!(array.data_type(),
DataType::Timestamp(Microsecond, Some(tz)) if tz.as_ref() == "UTC")
+ {
+ array
+ } else {
+ array_with_timezone(
+ array,
+ self.timezone.clone(),
+ Some(&DataType::Timestamp(Microsecond,
Some("UTC".into()))),
+ )?
+ };
+
+ let timestamp_array =
+
array.as_primitive::<arrow::datatypes::TimestampMicrosecondType>();
+
+ let result: PrimitiveArray<Int64Type> = if
timestamp_array.null_count() == 0 {
+ timestamp_array
+ .values()
+ .iter()
+ .map(|µs| micros / MICROS_PER_SECOND)
+ .collect()
+ } else {
+ timestamp_array
+ .iter()
+ .map(|v| v.map(|micros| div_floor(micros,
MICROS_PER_SECOND)))
+ .collect()
+ };
+
+ Ok(ColumnarValue::Array(Arc::new(result)))
+ }
+ DataType::Date32 => {
+ let timestamp_array = cast(&array,
&DataType::Timestamp(Microsecond, None))?;
+
+ let is_utc = self.timezone == "UTC";
+ let array = if is_utc {
+ timestamp_array
+ } else {
+ array_with_timezone(
+ timestamp_array,
+ self.timezone.clone(),
+ Some(&DataType::Timestamp(Microsecond,
Some("UTC".into()))),
+ )?
+ };
+
+ let timestamp_array =
+
array.as_primitive::<arrow::datatypes::TimestampMicrosecondType>();
+
+ let result: PrimitiveArray<Int64Type> = if
timestamp_array.null_count() == 0 {
+ timestamp_array
+ .values()
+ .iter()
+ .map(|µs| micros / MICROS_PER_SECOND)
+ .collect()
+ } else {
+ timestamp_array
+ .iter()
+ .map(|v| v.map(|micros| div_floor(micros,
MICROS_PER_SECOND)))
+ .collect()
+ };
+
+ Ok(ColumnarValue::Array(Arc::new(result)))
+ }
+ _ => Err(DataFusionError::Execution(format!(
+ "unix_timestamp does not support input type: {:?}",
+ array.data_type()
+ ))),
+ },
+ _ => Err(DataFusionError::Execution(
+ "unix_timestamp(scalar) should be fold in Spark JVM
side.".to_string(),
+ )),
+ }
+ }
+
+ fn aliases(&self) -> &[String] {
+ &self.aliases
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow::array::{Array, Date32Array, TimestampMicrosecondArray};
+ use arrow::datatypes::Field;
+ use datafusion::config::ConfigOptions;
+ use std::sync::Arc;
+
+ #[test]
+ fn test_unix_timestamp_from_timestamp() {
+ // Test with known timestamp value
+ // 2020-01-01 00:00:00 UTC = 1577836800 seconds = 1577836800000000
microseconds
+ let input =
TimestampMicrosecondArray::from(vec![Some(1577836800000000)]);
+ let udf = SparkUnixTimestamp::new("UTC".to_string());
+
+ let return_field = Arc::new(Field::new("unix_timestamp",
DataType::Int64, true));
+ let args = ScalarFunctionArgs {
+ args: vec![ColumnarValue::Array(Arc::new(input))],
+ number_rows: 1,
+ return_field,
+ config_options: Arc::new(ConfigOptions::default()),
+ arg_fields: vec![],
+ };
+
+ let result = udf.invoke_with_args(args).unwrap();
+ if let ColumnarValue::Array(result_array) = result {
+ let int64_array =
result_array.as_primitive::<arrow::datatypes::Int64Type>();
+ assert_eq!(int64_array.value(0), 1577836800);
+ } else {
+ panic!("Expected array result");
+ }
+ }
+
+ #[test]
+ fn test_unix_timestamp_from_date() {
+ // Test with Date32
+ // Date32(18262) = 2020-01-01 = 1577836800 seconds
+ let input = Date32Array::from(vec![Some(18262)]);
+ let udf = SparkUnixTimestamp::new("UTC".to_string());
+
+ let return_field = Arc::new(Field::new("unix_timestamp",
DataType::Int64, true));
+ let args = ScalarFunctionArgs {
+ args: vec![ColumnarValue::Array(Arc::new(input))],
+ number_rows: 1,
+ return_field,
+ config_options: Arc::new(ConfigOptions::default()),
+ arg_fields: vec![],
+ };
+
+ let result = udf.invoke_with_args(args).unwrap();
+ if let ColumnarValue::Array(result_array) = result {
+ let int64_array =
result_array.as_primitive::<arrow::datatypes::Int64Type>();
+ assert_eq!(int64_array.value(0), 1577836800);
+ } else {
+ panic!("Expected array result");
+ }
+ }
+
+ #[test]
+ fn test_unix_timestamp_with_nulls() {
+ let input =
TimestampMicrosecondArray::from(vec![Some(1577836800000000), None]);
+ let udf = SparkUnixTimestamp::new("UTC".to_string());
+
+ let return_field = Arc::new(Field::new("unix_timestamp",
DataType::Int64, true));
+ let args = ScalarFunctionArgs {
+ args: vec![ColumnarValue::Array(Arc::new(input))],
+ number_rows: 2,
+ return_field,
+ config_options: Arc::new(ConfigOptions::default()),
+ arg_fields: vec![],
+ };
+
+ let result = udf.invoke_with_args(args).unwrap();
+ if let ColumnarValue::Array(result_array) = result {
+ let int64_array =
result_array.as_primitive::<arrow::datatypes::Int64Type>();
+ assert_eq!(int64_array.value(0), 1577836800);
+ assert!(int64_array.is_null(1));
+ } else {
+ panic!("Expected array result");
+ }
+ }
+}
diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs
index 7f6d0b08a..086c09730 100644
--- a/native/spark-expr/src/lib.rs
+++ b/native/spark-expr/src/lib.rs
@@ -70,7 +70,8 @@ pub use comet_scalar_funcs::{
register_all_comet_functions,
};
pub use datetime_funcs::{
- SparkDateDiff, SparkDateTrunc, SparkHour, SparkMinute, SparkSecond,
TimestampTruncExpr,
+ SparkDateDiff, SparkDateTrunc, SparkHour, SparkMinute, SparkSecond,
SparkUnixTimestamp,
+ TimestampTruncExpr,
};
pub use error::{SparkError, SparkResult};
pub use hash_funcs::*;
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index abc76aa2e..627a5e42c 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -195,6 +195,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
classOf[Second] -> CometSecond,
classOf[TruncDate] -> CometTruncDate,
classOf[TruncTimestamp] -> CometTruncTimestamp,
+ classOf[UnixTimestamp] -> CometUnixTimestamp,
classOf[Year] -> CometYear,
classOf[Month] -> CometMonth,
classOf[DayOfMonth] -> CometDayOfMonth,
diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala
b/spark/src/main/scala/org/apache/comet/serde/datetime.scala
index 066c90c0f..5e7e9c68b 100644
--- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala
@@ -21,8 +21,8 @@ package org.apache.comet.serde
import java.util.Locale
-import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd,
DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear,
GetDateField, Hour, Literal, Minute, Month, Quarter, Second, TruncDate,
TruncTimestamp, UnixDate, WeekDay, WeekOfYear, Year}
-import org.apache.spark.sql.types.{DateType, IntegerType, StringType}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd,
DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear,
GetDateField, Hour, Literal, Minute, Month, Quarter, Second, TruncDate,
TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
+import org.apache.spark.sql.types.{DateType, IntegerType, StringType,
TimestampType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.comet.CometSparkSessionExtensions.withInfo
@@ -254,6 +254,58 @@ object CometSecond extends CometExpressionSerde[Second] {
}
}
+object CometUnixTimestamp extends CometExpressionSerde[UnixTimestamp] {
+
+ private def isSupportedInputType(expr: UnixTimestamp): Boolean = {
+ // Note: TimestampNTZType is not supported because Comet incorrectly
applies
+ // timezone conversion to TimestampNTZ values. TimestampNTZ stores local
time
+ // without timezone, so no conversion should be applied.
+ expr.children.head.dataType match {
+ case TimestampType | DateType => true
+ case _ => false
+ }
+ }
+
+ override def getSupportLevel(expr: UnixTimestamp): SupportLevel = {
+ if (isSupportedInputType(expr)) {
+ Compatible()
+ } else {
+ val inputType = expr.children.head.dataType
+ Unsupported(Some(s"unix_timestamp does not support input type:
$inputType"))
+ }
+ }
+
+ override def convert(
+ expr: UnixTimestamp,
+ inputs: Seq[Attribute],
+ binding: Boolean): Option[ExprOuterClass.Expr] = {
+ if (!isSupportedInputType(expr)) {
+ val inputType = expr.children.head.dataType
+ withInfo(expr, s"unix_timestamp does not support input type: $inputType")
+ return None
+ }
+
+ val childExpr = exprToProtoInternal(expr.children.head, inputs, binding)
+
+ if (childExpr.isDefined) {
+ val builder = ExprOuterClass.UnixTimestamp.newBuilder()
+ builder.setChild(childExpr.get)
+
+ val timeZone = expr.timeZoneId.getOrElse("UTC")
+ builder.setTimezone(timeZone)
+
+ Some(
+ ExprOuterClass.Expr
+ .newBuilder()
+ .setUnixTimestamp(builder)
+ .build())
+ } else {
+ withInfo(expr, expr.children.head)
+ None
+ }
+ }
+}
+
object CometDateAdd extends CometScalarFunction[DateAdd]("date_add")
object CometDateSub extends CometScalarFunction[DateSub]("date_sub")
diff --git
a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala
index 0ed33bb9b..0ccc21077 100644
--- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala
@@ -21,7 +21,7 @@ package org.apache.comet
import scala.util.Random
-import org.apache.spark.sql.{CometTestBase, SaveMode}
+import org.apache.spark.sql.{CometTestBase, Row, SaveMode}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
@@ -114,6 +114,65 @@ class CometTemporalExpressionSuite extends CometTestBase
with AdaptiveSparkPlanH
}
}
+ test("unix_timestamp - timestamp input") {
+ createTimestampTestData.createOrReplaceTempView("tbl")
+ for (timezone <- Seq("UTC", "America/Los_Angeles")) {
+ withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timezone) {
+ checkSparkAnswerAndOperator("SELECT c0, unix_timestamp(c0) from tbl
order by c0")
+ }
+ }
+ }
+
+ test("unix_timestamp - date input") {
+ val r = new Random(42)
+ val dateSchema = StructType(Seq(StructField("d", DataTypes.DateType,
true)))
+ val dateDF = FuzzDataGenerator.generateDataFrame(r, spark, dateSchema,
100, DataGenOptions())
+ dateDF.createOrReplaceTempView("date_tbl")
+ for (timezone <- Seq("UTC", "America/Los_Angeles")) {
+ withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timezone) {
+ checkSparkAnswerAndOperator("SELECT d, unix_timestamp(d) from date_tbl
order by d")
+ }
+ }
+ }
+
+ test("unix_timestamp - timestamp_ntz input falls back to Spark") {
+ // TimestampNTZ is not supported because Comet incorrectly applies timezone
+ // conversion. TimestampNTZ stores local time without timezone, so the unix
+ // timestamp should just be the value divided by microseconds per second.
+ val r = new Random(42)
+ val ntzSchema = StructType(Seq(StructField("ts_ntz",
DataTypes.TimestampNTZType, true)))
+ val ntzDF = FuzzDataGenerator.generateDataFrame(r, spark, ntzSchema, 100,
DataGenOptions())
+ ntzDF.createOrReplaceTempView("ntz_tbl")
+ checkSparkAnswerAndFallbackReason(
+ "SELECT ts_ntz, unix_timestamp(ts_ntz) from ntz_tbl order by ts_ntz",
+ "unix_timestamp does not support input type: TimestampNTZType")
+ }
+
+ test("unix_timestamp - string input falls back to Spark") {
+ withTempView("string_tbl") {
+ // Create test data with timestamp strings
+ val schema = StructType(Seq(StructField("ts_str", DataTypes.StringType,
true)))
+ val data = Seq(
+ Row("2020-01-01 00:00:00"),
+ Row("2021-06-15 12:30:45"),
+ Row("2022-12-31 23:59:59"),
+ Row(null))
+ spark
+ .createDataFrame(spark.sparkContext.parallelize(data), schema)
+ .createOrReplaceTempView("string_tbl")
+
+ // String input should fall back to Spark
+ checkSparkAnswerAndFallbackReason(
+ "SELECT ts_str, unix_timestamp(ts_str) from string_tbl order by
ts_str",
+ "unix_timestamp does not support input type: StringType")
+
+ // String input with custom format should also fall back
+ checkSparkAnswerAndFallbackReason(
+ "SELECT ts_str, unix_timestamp(ts_str, 'yyyy-MM-dd HH:mm:ss') from
string_tbl",
+ "unix_timestamp does not support input type: StringType")
+ }
+ }
+
private def createTimestampTestData = {
val r = new Random(42)
val schema = StructType(
diff --git
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
index 6b6cd8cfa..b7a14d408 100644
---
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
@@ -22,12 +22,13 @@ package org.apache.spark.sql.benchmark
import
org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, LA}
import org.apache.spark.sql.internal.SQLConf
+// spotless:off
/**
* Benchmark to measure Comet execution performance. To run this benchmark:
- * `SPARK_GENERATE_BENCHMARK_FILES=1 make
- * benchmark-org.apache.spark.sql.benchmark.CometDatetimeExpressionBenchmark`
Results will be
- * written to
"spark/benchmarks/CometDatetimeExpressionBenchmark-**results.txt".
+ * `SPARK_GENERATE_BENCHMARK_FILES=1 make
benchmark-org.apache.spark.sql.benchmark.CometDatetimeExpressionBenchmark`
+ * Results will be written to
"spark/benchmarks/CometDatetimeExpressionBenchmark-**results.txt".
*/
+// spotless:on
object CometDatetimeExpressionBenchmark extends CometBenchmarkBase {
def dateTruncExprBenchmark(values: Int): Unit = {
@@ -71,9 +72,51 @@ object CometDatetimeExpressionBenchmark extends
CometBenchmarkBase {
}
}
+ def unixTimestampBenchmark(values: Int, timeZone: String): Unit = {
+ withTempPath { dir =>
+ withTempTable("parquetV1Table") {
+ prepareTable(
+ dir,
+ spark.sql(s"select timestamp_micros(cast(value/100000 as integer))
as ts FROM $tbl"))
+ withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+ val name = s"Unix Timestamp from Timestamp ($timeZone)"
+ val query = "select unix_timestamp(ts) from parquetV1Table"
+ runExpressionBenchmark(name, values, query)
+ }
+ }
+ }
+ }
+
+ def unixTimestampFromDateBenchmark(values: Int, timeZone: String): Unit = {
+ withTempPath { dir =>
+ withTempTable("parquetV1Table") {
+ prepareTable(
+ dir,
+ spark.sql(
+ s"select cast(timestamp_micros(cast(value/100000 as integer)) as
date) as dt FROM $tbl"))
+ withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+ val name = s"Unix Timestamp from Date ($timeZone)"
+ val query = "select unix_timestamp(dt) from parquetV1Table"
+ runExpressionBenchmark(name, values, query)
+ }
+ }
+ }
+ }
+
override def runCometBenchmark(mainArgs: Array[String]): Unit = {
val values = 1024 * 1024;
+ for (timeZone <- Seq("UTC", "America/Los_Angeles")) {
+ withSQLConf("spark.sql.parquet.datetimeRebaseModeInWrite" ->
"CORRECTED") {
+ runBenchmarkWithTable(s"UnixTimestamp(timestamp) - $timeZone", values)
{ v =>
+ unixTimestampBenchmark(v, timeZone)
+ }
+ runBenchmarkWithTable(s"UnixTimestamp(date) - $timeZone", values) { v
=>
+ unixTimestampFromDateBenchmark(v, timeZone)
+ }
+ }
+ }
+
withDefaultTimeZone(LA) {
withSQLConf(
SQLConf.SESSION_LOCAL_TIMEZONE.key -> LA.getId,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]