This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new e4a014290 feat: Add support for `unix_timestamp` function (#2936)
e4a014290 is described below

commit e4a014290caa924ee7426e8885d3af8b0eed693d
Author: Andy Grove <[email protected]>
AuthorDate: Tue Jan 20 21:08:27 2026 -0700

    feat: Add support for `unix_timestamp` function (#2936)
---
 docs/source/user-guide/latest/configs.md           |   1 +
 docs/spark_expressions_support.md                  |   2 +-
 native/core/src/execution/expressions/mod.rs       |   1 +
 native/core/src/execution/expressions/temporal.rs  | 162 ++++++++++++++
 native/core/src/execution/planner.rs               |  65 +-----
 .../src/execution/planner/expression_registry.rs   |  27 ++-
 native/proto/src/proto/expr.proto                  |   8 +-
 native/spark-expr/src/datetime_funcs/mod.rs        |   2 +
 .../src/datetime_funcs/unix_timestamp.rs           | 242 +++++++++++++++++++++
 native/spark-expr/src/lib.rs                       |   3 +-
 .../org/apache/comet/serde/QueryPlanSerde.scala    |   1 +
 .../scala/org/apache/comet/serde/datetime.scala    |  56 ++++-
 .../comet/CometTemporalExpressionSuite.scala       |  61 +++++-
 .../CometDatetimeExpressionBenchmark.scala         |  49 ++++-
 14 files changed, 605 insertions(+), 75 deletions(-)

diff --git a/docs/source/user-guide/latest/configs.md 
b/docs/source/user-guide/latest/configs.md
index d2fbca346..096521b97 100644
--- a/docs/source/user-guide/latest/configs.md
+++ b/docs/source/user-guide/latest/configs.md
@@ -336,6 +336,7 @@ These settings can be used to determine which parts of the 
plan are accelerated
 | `spark.comet.expression.UnaryMinus.enabled` | Enable Comet acceleration for 
`UnaryMinus` | true |
 | `spark.comet.expression.Unhex.enabled` | Enable Comet acceleration for 
`Unhex` | true |
 | `spark.comet.expression.UnixDate.enabled` | Enable Comet acceleration for 
`UnixDate` | true |
+| `spark.comet.expression.UnixTimestamp.enabled` | Enable Comet acceleration 
for `UnixTimestamp` | true |
 | `spark.comet.expression.UnscaledValue.enabled` | Enable Comet acceleration 
for `UnscaledValue` | true |
 | `spark.comet.expression.Upper.enabled` | Enable Comet acceleration for 
`Upper` | true |
 | `spark.comet.expression.WeekDay.enabled` | Enable Comet acceleration for 
`WeekDay` | true |
diff --git a/docs/spark_expressions_support.md 
b/docs/spark_expressions_support.md
index 67eb519ea..fa6b3a43f 100644
--- a/docs/spark_expressions_support.md
+++ b/docs/spark_expressions_support.md
@@ -217,7 +217,7 @@
 - [ ] unix_micros
 - [ ] unix_millis
 - [ ] unix_seconds
-- [ ] unix_timestamp
+- [x] unix_timestamp
 - [ ] weekday
 - [ ] weekofyear
 - [ ] year
diff --git a/native/core/src/execution/expressions/mod.rs 
b/native/core/src/execution/expressions/mod.rs
index a06b41b2c..563d62e91 100644
--- a/native/core/src/execution/expressions/mod.rs
+++ b/native/core/src/execution/expressions/mod.rs
@@ -24,6 +24,7 @@ pub mod logical;
 pub mod nullcheck;
 pub mod strings;
 pub mod subquery;
+pub mod temporal;
 
 pub use datafusion_comet_spark_expr::EvalMode;
 
diff --git a/native/core/src/execution/expressions/temporal.rs 
b/native/core/src/execution/expressions/temporal.rs
new file mode 100644
index 000000000..ae57cd3b2
--- /dev/null
+++ b/native/core/src/execution/expressions/temporal.rs
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Temporal expression builders
+
+use std::sync::Arc;
+
+use arrow::datatypes::{DataType, Field, SchemaRef};
+use datafusion::config::ConfigOptions;
+use datafusion::logical_expr::ScalarUDF;
+use datafusion::physical_expr::{PhysicalExpr, ScalarFunctionExpr};
+use datafusion_comet_proto::spark_expression::Expr;
+use datafusion_comet_spark_expr::{
+    SparkHour, SparkMinute, SparkSecond, SparkUnixTimestamp, 
TimestampTruncExpr,
+};
+
+use crate::execution::{
+    expressions::extract_expr,
+    operators::ExecutionError,
+    planner::{expression_registry::ExpressionBuilder, PhysicalPlanner},
+};
+
+pub struct HourBuilder;
+
+impl ExpressionBuilder for HourBuilder {
+    fn build(
+        &self,
+        spark_expr: &Expr,
+        input_schema: SchemaRef,
+        planner: &PhysicalPlanner,
+    ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
+        let expr = extract_expr!(spark_expr, Hour);
+        let child = planner.create_expr(expr.child.as_ref().unwrap(), 
Arc::clone(&input_schema))?;
+        let timezone = expr.timezone.clone();
+        let args = vec![child];
+        let comet_hour = 
Arc::new(ScalarUDF::new_from_impl(SparkHour::new(timezone)));
+        let field_ref = Arc::new(Field::new("hour", DataType::Int32, true));
+        let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
+            "hour",
+            comet_hour,
+            args,
+            field_ref,
+            Arc::new(ConfigOptions::default()),
+        );
+
+        Ok(Arc::new(expr))
+    }
+}
+
+pub struct MinuteBuilder;
+
+impl ExpressionBuilder for MinuteBuilder {
+    fn build(
+        &self,
+        spark_expr: &Expr,
+        input_schema: SchemaRef,
+        planner: &PhysicalPlanner,
+    ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
+        let expr = extract_expr!(spark_expr, Minute);
+        let child = planner.create_expr(expr.child.as_ref().unwrap(), 
Arc::clone(&input_schema))?;
+        let timezone = expr.timezone.clone();
+        let args = vec![child];
+        let comet_minute = 
Arc::new(ScalarUDF::new_from_impl(SparkMinute::new(timezone)));
+        let field_ref = Arc::new(Field::new("minute", DataType::Int32, true));
+        let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
+            "minute",
+            comet_minute,
+            args,
+            field_ref,
+            Arc::new(ConfigOptions::default()),
+        );
+
+        Ok(Arc::new(expr))
+    }
+}
+
+pub struct SecondBuilder;
+
+impl ExpressionBuilder for SecondBuilder {
+    fn build(
+        &self,
+        spark_expr: &Expr,
+        input_schema: SchemaRef,
+        planner: &PhysicalPlanner,
+    ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
+        let expr = extract_expr!(spark_expr, Second);
+        let child = planner.create_expr(expr.child.as_ref().unwrap(), 
Arc::clone(&input_schema))?;
+        let timezone = expr.timezone.clone();
+        let args = vec![child];
+        let comet_second = 
Arc::new(ScalarUDF::new_from_impl(SparkSecond::new(timezone)));
+        let field_ref = Arc::new(Field::new("second", DataType::Int32, true));
+        let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
+            "second",
+            comet_second,
+            args,
+            field_ref,
+            Arc::new(ConfigOptions::default()),
+        );
+
+        Ok(Arc::new(expr))
+    }
+}
+
+pub struct UnixTimestampBuilder;
+
+impl ExpressionBuilder for UnixTimestampBuilder {
+    fn build(
+        &self,
+        spark_expr: &Expr,
+        input_schema: SchemaRef,
+        planner: &PhysicalPlanner,
+    ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
+        let expr = extract_expr!(spark_expr, UnixTimestamp);
+        let child = planner.create_expr(expr.child.as_ref().unwrap(), 
Arc::clone(&input_schema))?;
+        let timezone = expr.timezone.clone();
+        let args = vec![child];
+        let comet_unix_timestamp =
+            
Arc::new(ScalarUDF::new_from_impl(SparkUnixTimestamp::new(timezone)));
+        let field_ref = Arc::new(Field::new("unix_timestamp", DataType::Int64, 
true));
+        let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
+            "unix_timestamp",
+            comet_unix_timestamp,
+            args,
+            field_ref,
+            Arc::new(ConfigOptions::default()),
+        );
+
+        Ok(Arc::new(expr))
+    }
+}
+
+pub struct TruncTimestampBuilder;
+
+impl ExpressionBuilder for TruncTimestampBuilder {
+    fn build(
+        &self,
+        spark_expr: &Expr,
+        input_schema: SchemaRef,
+        planner: &PhysicalPlanner,
+    ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
+        let expr = extract_expr!(spark_expr, TruncTimestamp);
+        let child = planner.create_expr(expr.child.as_ref().unwrap(), 
Arc::clone(&input_schema))?;
+        let format = planner.create_expr(expr.format.as_ref().unwrap(), 
input_schema)?;
+        let timezone = expr.timezone.clone();
+
+        Ok(Arc::new(TimestampTruncExpr::new(child, format, timezone)))
+    }
+}
diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index b13fafe45..2f1f1f32b 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -70,8 +70,7 @@ use datafusion::{
 };
 use datafusion_comet_spark_expr::{
     create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, 
BinaryOutputStyle,
-    BloomFilterAgg, BloomFilterMightContain, EvalMode, SparkHour, SparkMinute, 
SparkSecond,
-    SumInteger,
+    BloomFilterAgg, BloomFilterMightContain, EvalMode, SumInteger,
 };
 use iceberg::expr::Bind;
 
@@ -126,8 +125,7 @@ use 
datafusion_comet_spark_expr::monotonically_increasing_id::MonotonicallyIncre
 use datafusion_comet_spark_expr::{
     ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Correlation, 
Covariance, CreateNamedStruct,
     GetArrayStructFields, GetStructField, IfExpr, ListExtract, 
NormalizeNaNAndZero, RandExpr,
-    RandnExpr, SparkCastOptions, Stddev, SumDecimal, TimestampTruncExpr, 
ToJson, UnboundColumn,
-    Variance,
+    RandnExpr, SparkCastOptions, Stddev, SumDecimal, ToJson, UnboundColumn, 
Variance,
 };
 use itertools::Itertools;
 use jni::objects::GlobalRef;
@@ -375,65 +373,6 @@ impl PhysicalPlanner {
                     SparkCastOptions::new(eval_mode, &expr.timezone, 
expr.allow_incompat),
                 )))
             }
-            ExprStruct::Hour(expr) => {
-                let child =
-                    self.create_expr(expr.child.as_ref().unwrap(), 
Arc::clone(&input_schema))?;
-                let timezone = expr.timezone.clone();
-                let args = vec![child];
-                let comet_hour = 
Arc::new(ScalarUDF::new_from_impl(SparkHour::new(timezone)));
-                let field_ref = Arc::new(Field::new("hour", DataType::Int32, 
true));
-                let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
-                    "hour",
-                    comet_hour,
-                    args,
-                    field_ref,
-                    Arc::new(ConfigOptions::default()),
-                );
-
-                Ok(Arc::new(expr))
-            }
-            ExprStruct::Minute(expr) => {
-                let child =
-                    self.create_expr(expr.child.as_ref().unwrap(), 
Arc::clone(&input_schema))?;
-                let timezone = expr.timezone.clone();
-                let args = vec![child];
-                let comet_minute = 
Arc::new(ScalarUDF::new_from_impl(SparkMinute::new(timezone)));
-                let field_ref = Arc::new(Field::new("minute", DataType::Int32, 
true));
-                let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
-                    "minute",
-                    comet_minute,
-                    args,
-                    field_ref,
-                    Arc::new(ConfigOptions::default()),
-                );
-
-                Ok(Arc::new(expr))
-            }
-            ExprStruct::Second(expr) => {
-                let child =
-                    self.create_expr(expr.child.as_ref().unwrap(), 
Arc::clone(&input_schema))?;
-                let timezone = expr.timezone.clone();
-                let args = vec![child];
-                let comet_second = 
Arc::new(ScalarUDF::new_from_impl(SparkSecond::new(timezone)));
-                let field_ref = Arc::new(Field::new("second", DataType::Int32, 
true));
-                let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
-                    "second",
-                    comet_second,
-                    args,
-                    field_ref,
-                    Arc::new(ConfigOptions::default()),
-                );
-
-                Ok(Arc::new(expr))
-            }
-            ExprStruct::TruncTimestamp(expr) => {
-                let child =
-                    self.create_expr(expr.child.as_ref().unwrap(), 
Arc::clone(&input_schema))?;
-                let format = self.create_expr(expr.format.as_ref().unwrap(), 
input_schema)?;
-                let timezone = expr.timezone.clone();
-
-                Ok(Arc::new(TimestampTruncExpr::new(child, format, timezone)))
-            }
             ExprStruct::CheckOverflow(expr) => {
                 let child = self.create_expr(expr.child.as_ref().unwrap(), 
input_schema)?;
                 let data_type = 
to_arrow_datatype(expr.datatype.as_ref().unwrap());
diff --git a/native/core/src/execution/planner/expression_registry.rs 
b/native/core/src/execution/planner/expression_registry.rs
index e85fbe510..34aa3de17 100644
--- a/native/core/src/execution/planner/expression_registry.rs
+++ b/native/core/src/execution/planner/expression_registry.rs
@@ -109,6 +109,7 @@ pub enum ExpressionType {
     Minute,
     Second,
     TruncTimestamp,
+    UnixTimestamp,
 }
 
 /// Registry for expression builders
@@ -181,9 +182,8 @@ impl ExpressionRegistry {
         // Register string expressions
         self.register_string_expressions();
 
-        // TODO: Register other expression categories in future phases
-        // self.register_temporal_expressions();
-        // etc.
+        // Register temporal expressions
+        self.register_temporal_expressions();
     }
 
     /// Register arithmetic expression builders
@@ -286,6 +286,26 @@ impl ExpressionRegistry {
             .insert(ExpressionType::FromJson, Box::new(FromJsonBuilder));
     }
 
+    /// Register temporal expression builders
+    fn register_temporal_expressions(&mut self) {
+        use crate::execution::expressions::temporal::*;
+
+        self.builders
+            .insert(ExpressionType::Hour, Box::new(HourBuilder));
+        self.builders
+            .insert(ExpressionType::Minute, Box::new(MinuteBuilder));
+        self.builders
+            .insert(ExpressionType::Second, Box::new(SecondBuilder));
+        self.builders.insert(
+            ExpressionType::UnixTimestamp,
+            Box::new(UnixTimestampBuilder),
+        );
+        self.builders.insert(
+            ExpressionType::TruncTimestamp,
+            Box::new(TruncTimestampBuilder),
+        );
+    }
+
     /// Extract expression type from Spark protobuf expression
     fn get_expression_type(spark_expr: &Expr) -> Result<ExpressionType, 
ExecutionError> {
         match spark_expr.expr_struct.as_ref() {
@@ -355,6 +375,7 @@ impl ExpressionRegistry {
             Some(ExprStruct::Minute(_)) => Ok(ExpressionType::Minute),
             Some(ExprStruct::Second(_)) => Ok(ExpressionType::Second),
             Some(ExprStruct::TruncTimestamp(_)) => 
Ok(ExpressionType::TruncTimestamp),
+            Some(ExprStruct::UnixTimestamp(_)) => 
Ok(ExpressionType::UnixTimestamp),
 
             Some(other) => Err(ExecutionError::GeneralError(format!(
                 "Unsupported expression type: {:?}",
diff --git a/native/proto/src/proto/expr.proto 
b/native/proto/src/proto/expr.proto
index 5f258fd67..e2d0ca1c0 100644
--- a/native/proto/src/proto/expr.proto
+++ b/native/proto/src/proto/expr.proto
@@ -85,7 +85,8 @@ message Expr {
     Rand randn = 62;
     EmptyExpr spark_partition_id = 63;
     EmptyExpr monotonically_increasing_id = 64;
-    FromJson from_json = 89;
+    UnixTimestamp unix_timestamp = 65;
+    FromJson from_json = 66;
   }
 }
 
@@ -304,6 +305,11 @@ message Second {
   string timezone = 2;
 }
 
+message UnixTimestamp {
+  Expr child = 1;
+  string timezone = 2;
+}
+
 message CheckOverflow {
   Expr child = 1;
   DataType datatype = 2;
diff --git a/native/spark-expr/src/datetime_funcs/mod.rs 
b/native/spark-expr/src/datetime_funcs/mod.rs
index c984e3a38..183271147 100644
--- a/native/spark-expr/src/datetime_funcs/mod.rs
+++ b/native/spark-expr/src/datetime_funcs/mod.rs
@@ -19,6 +19,7 @@ mod date_diff;
 mod date_trunc;
 mod extract_date_part;
 mod timestamp_trunc;
+mod unix_timestamp;
 
 pub use date_diff::SparkDateDiff;
 pub use date_trunc::SparkDateTrunc;
@@ -26,3 +27,4 @@ pub use extract_date_part::SparkHour;
 pub use extract_date_part::SparkMinute;
 pub use extract_date_part::SparkSecond;
 pub use timestamp_trunc::TimestampTruncExpr;
+pub use unix_timestamp::SparkUnixTimestamp;
diff --git a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs 
b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs
new file mode 100644
index 000000000..c4f157629
--- /dev/null
+++ b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::utils::array_with_timezone;
+use arrow::array::{Array, AsArray, PrimitiveArray};
+use arrow::compute::cast;
+use arrow::datatypes::{DataType, Int64Type, TimeUnit::Microsecond};
+use datafusion::common::{internal_datafusion_err, DataFusionError};
+use datafusion::logical_expr::{
+    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
+};
+use num::integer::div_floor;
+use std::{any::Any, fmt::Debug, sync::Arc};
+
+const MICROS_PER_SECOND: i64 = 1_000_000;
+
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct SparkUnixTimestamp {
+    signature: Signature,
+    aliases: Vec<String>,
+    timezone: String,
+}
+
+impl SparkUnixTimestamp {
+    pub fn new(timezone: String) -> Self {
+        Self {
+            signature: Signature::user_defined(Volatility::Immutable),
+            aliases: vec![],
+            timezone,
+        }
+    }
+}
+
+impl ScalarUDFImpl for SparkUnixTimestamp {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "unix_timestamp"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> 
datafusion::common::Result<DataType> {
+        Ok(match &arg_types[0] {
+            DataType::Dictionary(_, _) => {
+                DataType::Dictionary(Box::new(DataType::Int32), 
Box::new(DataType::Int64))
+            }
+            _ => DataType::Int64,
+        })
+    }
+
+    fn invoke_with_args(
+        &self,
+        args: ScalarFunctionArgs,
+    ) -> datafusion::common::Result<ColumnarValue> {
+        let args: [ColumnarValue; 1] = args
+            .args
+            .try_into()
+            .map_err(|_| internal_datafusion_err!("unix_timestamp expects 
exactly one argument"))?;
+
+        match args {
+            [ColumnarValue::Array(array)] => match array.data_type() {
+                DataType::Timestamp(_, _) => {
+                    let is_utc = self.timezone == "UTC";
+                    let array = if is_utc
+                        && matches!(array.data_type(), 
DataType::Timestamp(Microsecond, Some(tz)) if tz.as_ref() == "UTC")
+                    {
+                        array
+                    } else {
+                        array_with_timezone(
+                            array,
+                            self.timezone.clone(),
+                            Some(&DataType::Timestamp(Microsecond, 
Some("UTC".into()))),
+                        )?
+                    };
+
+                    let timestamp_array =
+                        
array.as_primitive::<arrow::datatypes::TimestampMicrosecondType>();
+
+                    let result: PrimitiveArray<Int64Type> = if 
timestamp_array.null_count() == 0 {
+                        timestamp_array
+                            .values()
+                            .iter()
+                            .map(|&micros| micros / MICROS_PER_SECOND)
+                            .collect()
+                    } else {
+                        timestamp_array
+                            .iter()
+                            .map(|v| v.map(|micros| div_floor(micros, 
MICROS_PER_SECOND)))
+                            .collect()
+                    };
+
+                    Ok(ColumnarValue::Array(Arc::new(result)))
+                }
+                DataType::Date32 => {
+                    let timestamp_array = cast(&array, 
&DataType::Timestamp(Microsecond, None))?;
+
+                    let is_utc = self.timezone == "UTC";
+                    let array = if is_utc {
+                        timestamp_array
+                    } else {
+                        array_with_timezone(
+                            timestamp_array,
+                            self.timezone.clone(),
+                            Some(&DataType::Timestamp(Microsecond, 
Some("UTC".into()))),
+                        )?
+                    };
+
+                    let timestamp_array =
+                        
array.as_primitive::<arrow::datatypes::TimestampMicrosecondType>();
+
+                    let result: PrimitiveArray<Int64Type> = if 
timestamp_array.null_count() == 0 {
+                        timestamp_array
+                            .values()
+                            .iter()
+                            .map(|&micros| micros / MICROS_PER_SECOND)
+                            .collect()
+                    } else {
+                        timestamp_array
+                            .iter()
+                            .map(|v| v.map(|micros| div_floor(micros, 
MICROS_PER_SECOND)))
+                            .collect()
+                    };
+
+                    Ok(ColumnarValue::Array(Arc::new(result)))
+                }
+                _ => Err(DataFusionError::Execution(format!(
+                    "unix_timestamp does not support input type: {:?}",
+                    array.data_type()
+                ))),
+            },
+            _ => Err(DataFusionError::Execution(
+                "unix_timestamp(scalar) should be fold in Spark JVM 
side.".to_string(),
+            )),
+        }
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::{Array, Date32Array, TimestampMicrosecondArray};
+    use arrow::datatypes::Field;
+    use datafusion::config::ConfigOptions;
+    use std::sync::Arc;
+
+    #[test]
+    fn test_unix_timestamp_from_timestamp() {
+        // Test with known timestamp value
+        // 2020-01-01 00:00:00 UTC = 1577836800 seconds = 1577836800000000 
microseconds
+        let input = 
TimestampMicrosecondArray::from(vec![Some(1577836800000000)]);
+        let udf = SparkUnixTimestamp::new("UTC".to_string());
+
+        let return_field = Arc::new(Field::new("unix_timestamp", 
DataType::Int64, true));
+        let args = ScalarFunctionArgs {
+            args: vec![ColumnarValue::Array(Arc::new(input))],
+            number_rows: 1,
+            return_field,
+            config_options: Arc::new(ConfigOptions::default()),
+            arg_fields: vec![],
+        };
+
+        let result = udf.invoke_with_args(args).unwrap();
+        if let ColumnarValue::Array(result_array) = result {
+            let int64_array = 
result_array.as_primitive::<arrow::datatypes::Int64Type>();
+            assert_eq!(int64_array.value(0), 1577836800);
+        } else {
+            panic!("Expected array result");
+        }
+    }
+
+    #[test]
+    fn test_unix_timestamp_from_date() {
+        // Test with Date32
+        // Date32(18262) = 2020-01-01 = 1577836800 seconds
+        let input = Date32Array::from(vec![Some(18262)]);
+        let udf = SparkUnixTimestamp::new("UTC".to_string());
+
+        let return_field = Arc::new(Field::new("unix_timestamp", 
DataType::Int64, true));
+        let args = ScalarFunctionArgs {
+            args: vec![ColumnarValue::Array(Arc::new(input))],
+            number_rows: 1,
+            return_field,
+            config_options: Arc::new(ConfigOptions::default()),
+            arg_fields: vec![],
+        };
+
+        let result = udf.invoke_with_args(args).unwrap();
+        if let ColumnarValue::Array(result_array) = result {
+            let int64_array = 
result_array.as_primitive::<arrow::datatypes::Int64Type>();
+            assert_eq!(int64_array.value(0), 1577836800);
+        } else {
+            panic!("Expected array result");
+        }
+    }
+
+    #[test]
+    fn test_unix_timestamp_with_nulls() {
+        let input = 
TimestampMicrosecondArray::from(vec![Some(1577836800000000), None]);
+        let udf = SparkUnixTimestamp::new("UTC".to_string());
+
+        let return_field = Arc::new(Field::new("unix_timestamp", 
DataType::Int64, true));
+        let args = ScalarFunctionArgs {
+            args: vec![ColumnarValue::Array(Arc::new(input))],
+            number_rows: 2,
+            return_field,
+            config_options: Arc::new(ConfigOptions::default()),
+            arg_fields: vec![],
+        };
+
+        let result = udf.invoke_with_args(args).unwrap();
+        if let ColumnarValue::Array(result_array) = result {
+            let int64_array = 
result_array.as_primitive::<arrow::datatypes::Int64Type>();
+            assert_eq!(int64_array.value(0), 1577836800);
+            assert!(int64_array.is_null(1));
+        } else {
+            panic!("Expected array result");
+        }
+    }
+}
diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs
index 7f6d0b08a..086c09730 100644
--- a/native/spark-expr/src/lib.rs
+++ b/native/spark-expr/src/lib.rs
@@ -70,7 +70,8 @@ pub use comet_scalar_funcs::{
     register_all_comet_functions,
 };
 pub use datetime_funcs::{
-    SparkDateDiff, SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, 
TimestampTruncExpr,
+    SparkDateDiff, SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, 
SparkUnixTimestamp,
+    TimestampTruncExpr,
 };
 pub use error::{SparkError, SparkResult};
 pub use hash_funcs::*;
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index abc76aa2e..627a5e42c 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -195,6 +195,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
     classOf[Second] -> CometSecond,
     classOf[TruncDate] -> CometTruncDate,
     classOf[TruncTimestamp] -> CometTruncTimestamp,
+    classOf[UnixTimestamp] -> CometUnixTimestamp,
     classOf[Year] -> CometYear,
     classOf[Month] -> CometMonth,
     classOf[DayOfMonth] -> CometDayOfMonth,
diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala 
b/spark/src/main/scala/org/apache/comet/serde/datetime.scala
index 066c90c0f..5e7e9c68b 100644
--- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala
@@ -21,8 +21,8 @@ package org.apache.comet.serde
 
 import java.util.Locale
 
-import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, 
DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, 
GetDateField, Hour, Literal, Minute, Month, Quarter, Second, TruncDate, 
TruncTimestamp, UnixDate, WeekDay, WeekOfYear, Year}
-import org.apache.spark.sql.types.{DateType, IntegerType, StringType}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, 
DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, 
GetDateField, Hour, Literal, Minute, Month, Quarter, Second, TruncDate, 
TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
+import org.apache.spark.sql.types.{DateType, IntegerType, StringType, 
TimestampType}
 import org.apache.spark.unsafe.types.UTF8String
 
 import org.apache.comet.CometSparkSessionExtensions.withInfo
@@ -254,6 +254,58 @@ object CometSecond extends CometExpressionSerde[Second] {
   }
 }
 
+object CometUnixTimestamp extends CometExpressionSerde[UnixTimestamp] {
+
+  private def isSupportedInputType(expr: UnixTimestamp): Boolean = {
+    // Note: TimestampNTZType is not supported because Comet incorrectly 
applies
+    // timezone conversion to TimestampNTZ values. TimestampNTZ stores local 
time
+    // without timezone, so no conversion should be applied.
+    expr.children.head.dataType match {
+      case TimestampType | DateType => true
+      case _ => false
+    }
+  }
+
+  override def getSupportLevel(expr: UnixTimestamp): SupportLevel = {
+    if (isSupportedInputType(expr)) {
+      Compatible()
+    } else {
+      val inputType = expr.children.head.dataType
+      Unsupported(Some(s"unix_timestamp does not support input type: 
$inputType"))
+    }
+  }
+
+  override def convert(
+      expr: UnixTimestamp,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+    if (!isSupportedInputType(expr)) {
+      val inputType = expr.children.head.dataType
+      withInfo(expr, s"unix_timestamp does not support input type: $inputType")
+      return None
+    }
+
+    val childExpr = exprToProtoInternal(expr.children.head, inputs, binding)
+
+    if (childExpr.isDefined) {
+      val builder = ExprOuterClass.UnixTimestamp.newBuilder()
+      builder.setChild(childExpr.get)
+
+      val timeZone = expr.timeZoneId.getOrElse("UTC")
+      builder.setTimezone(timeZone)
+
+      Some(
+        ExprOuterClass.Expr
+          .newBuilder()
+          .setUnixTimestamp(builder)
+          .build())
+    } else {
+      withInfo(expr, expr.children.head)
+      None
+    }
+  }
+}
+
 object CometDateAdd extends CometScalarFunction[DateAdd]("date_add")
 
 object CometDateSub extends CometScalarFunction[DateSub]("date_sub")
diff --git 
a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala
index 0ed33bb9b..0ccc21077 100644
--- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala
@@ -21,7 +21,7 @@ package org.apache.comet
 
 import scala.util.Random
 
-import org.apache.spark.sql.{CometTestBase, SaveMode}
+import org.apache.spark.sql.{CometTestBase, Row, SaveMode}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
@@ -114,6 +114,65 @@ class CometTemporalExpressionSuite extends CometTestBase 
with AdaptiveSparkPlanH
     }
   }
 
+  test("unix_timestamp - timestamp input") {
+    createTimestampTestData.createOrReplaceTempView("tbl")
+    for (timezone <- Seq("UTC", "America/Los_Angeles")) {
+      withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timezone) {
+        checkSparkAnswerAndOperator("SELECT c0, unix_timestamp(c0) from tbl 
order by c0")
+      }
+    }
+  }
+
+  test("unix_timestamp - date input") {
+    val r = new Random(42)
+    val dateSchema = StructType(Seq(StructField("d", DataTypes.DateType, 
true)))
+    val dateDF = FuzzDataGenerator.generateDataFrame(r, spark, dateSchema, 
100, DataGenOptions())
+    dateDF.createOrReplaceTempView("date_tbl")
+    for (timezone <- Seq("UTC", "America/Los_Angeles")) {
+      withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timezone) {
+        checkSparkAnswerAndOperator("SELECT d, unix_timestamp(d) from date_tbl 
order by d")
+      }
+    }
+  }
+
+  test("unix_timestamp - timestamp_ntz input falls back to Spark") {
+    // TimestampNTZ is not supported because Comet incorrectly applies timezone
+    // conversion. TimestampNTZ stores local time without timezone, so the unix
+    // timestamp should just be the value divided by microseconds per second.
+    val r = new Random(42)
+    val ntzSchema = StructType(Seq(StructField("ts_ntz", 
DataTypes.TimestampNTZType, true)))
+    val ntzDF = FuzzDataGenerator.generateDataFrame(r, spark, ntzSchema, 100, 
DataGenOptions())
+    ntzDF.createOrReplaceTempView("ntz_tbl")
+    checkSparkAnswerAndFallbackReason(
+      "SELECT ts_ntz, unix_timestamp(ts_ntz) from ntz_tbl order by ts_ntz",
+      "unix_timestamp does not support input type: TimestampNTZType")
+  }
+
+  test("unix_timestamp - string input falls back to Spark") {
+    withTempView("string_tbl") {
+      // Create test data with timestamp strings
+      val schema = StructType(Seq(StructField("ts_str", DataTypes.StringType, 
true)))
+      val data = Seq(
+        Row("2020-01-01 00:00:00"),
+        Row("2021-06-15 12:30:45"),
+        Row("2022-12-31 23:59:59"),
+        Row(null))
+      spark
+        .createDataFrame(spark.sparkContext.parallelize(data), schema)
+        .createOrReplaceTempView("string_tbl")
+
+      // String input should fall back to Spark
+      checkSparkAnswerAndFallbackReason(
+        "SELECT ts_str, unix_timestamp(ts_str) from string_tbl order by 
ts_str",
+        "unix_timestamp does not support input type: StringType")
+
+      // String input with custom format should also fall back
+      checkSparkAnswerAndFallbackReason(
+        "SELECT ts_str, unix_timestamp(ts_str, 'yyyy-MM-dd HH:mm:ss') from 
string_tbl",
+        "unix_timestamp does not support input type: StringType")
+    }
+  }
+
   private def createTimestampTestData = {
     val r = new Random(42)
     val schema = StructType(
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
index 6b6cd8cfa..b7a14d408 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
@@ -22,12 +22,13 @@ package org.apache.spark.sql.benchmark
 import 
org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, LA}
 import org.apache.spark.sql.internal.SQLConf
 
+// spotless:off
 /**
  * Benchmark to measure Comet execution performance. To run this benchmark:
- * `SPARK_GENERATE_BENCHMARK_FILES=1 make
- * benchmark-org.apache.spark.sql.benchmark.CometDatetimeExpressionBenchmark` 
Results will be
- * written to 
"spark/benchmarks/CometDatetimeExpressionBenchmark-**results.txt".
+ * `SPARK_GENERATE_BENCHMARK_FILES=1 make 
benchmark-org.apache.spark.sql.benchmark.CometDatetimeExpressionBenchmark`
+ * Results will be written to 
"spark/benchmarks/CometDatetimeExpressionBenchmark-**results.txt".
  */
+// spotless:on
 object CometDatetimeExpressionBenchmark extends CometBenchmarkBase {
 
   def dateTruncExprBenchmark(values: Int): Unit = {
@@ -71,9 +72,51 @@ object CometDatetimeExpressionBenchmark extends 
CometBenchmarkBase {
     }
   }
 
+  def unixTimestampBenchmark(values: Int, timeZone: String): Unit = {
+    withTempPath { dir =>
+      withTempTable("parquetV1Table") {
+        prepareTable(
+          dir,
+          spark.sql(s"select timestamp_micros(cast(value/100000 as integer)) 
as ts FROM $tbl"))
+        withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+          val name = s"Unix Timestamp from Timestamp ($timeZone)"
+          val query = "select unix_timestamp(ts) from parquetV1Table"
+          runExpressionBenchmark(name, values, query)
+        }
+      }
+    }
+  }
+
+  def unixTimestampFromDateBenchmark(values: Int, timeZone: String): Unit = {
+    withTempPath { dir =>
+      withTempTable("parquetV1Table") {
+        prepareTable(
+          dir,
+          spark.sql(
+            s"select cast(timestamp_micros(cast(value/100000 as integer)) as 
date) as dt FROM $tbl"))
+        withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+          val name = s"Unix Timestamp from Date ($timeZone)"
+          val query = "select unix_timestamp(dt) from parquetV1Table"
+          runExpressionBenchmark(name, values, query)
+        }
+      }
+    }
+  }
+
   override def runCometBenchmark(mainArgs: Array[String]): Unit = {
     val values = 1024 * 1024;
 
+    for (timeZone <- Seq("UTC", "America/Los_Angeles")) {
+      withSQLConf("spark.sql.parquet.datetimeRebaseModeInWrite" -> 
"CORRECTED") {
+        runBenchmarkWithTable(s"UnixTimestamp(timestamp) - $timeZone", values) 
{ v =>
+          unixTimestampBenchmark(v, timeZone)
+        }
+        runBenchmarkWithTable(s"UnixTimestamp(date) - $timeZone", values) { v 
=>
+          unixTimestampFromDateBenchmark(v, timeZone)
+        }
+      }
+    }
+
     withDefaultTimeZone(LA) {
       withSQLConf(
         SQLConf.SESSION_LOCAL_TIMEZONE.key -> LA.getId,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to