This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 139b0b4ac8 feat : support spark compatible int to timestamp cast 
(#20555)
139b0b4ac8 is described below

commit 139b0b4ac8119a4321d346837b9ac56e2eb441eb
Author: Bhargava Vadlamani <[email protected]>
AuthorDate: Wed Mar 25 14:03:39 2026 -0700

    feat : support spark compatible int to timestamp cast (#20555)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes #20554
    
    Ref : https://github.com/apache/datafusion/pull/20555
    
    ## Rationale for this change
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    ## What changes are included in this PR?
    
    1. Created a new cast scalar UDF to support int to timestamp casts
    (spark compatible). The goal is to leverage this is as an entry point
    for all spark compatible / df incompatible casts
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    
    ## Are these changes tested?
    
    1. Yes (through series of unit tests testing various edge cases like
    overflow , null input etc)
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    3. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    ## Are there any user-facing changes?
    
    1. Yes. We are bringing in ability to leverage spark compatible cast
    operations in DF through `spark_cast` as a first step. Next step would
    be to make changes to the planner to leverage spark compatible cast
    instead of regular cast operations
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
---
 datafusion/spark/src/function/conversion/cast.rs   | 659 +++++++++++++++++++++
 datafusion/spark/src/function/conversion/mod.rs    |  19 +-
 .../spark/conversion/cast_int_to_timestamp.slt     | 252 ++++++++
 3 files changed, 928 insertions(+), 2 deletions(-)

diff --git a/datafusion/spark/src/function/conversion/cast.rs 
b/datafusion/spark/src/function/conversion/cast.rs
new file mode 100644
index 0000000000..cd6b7c7d33
--- /dev/null
+++ b/datafusion/spark/src/function/conversion/cast.rs
@@ -0,0 +1,659 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{Array, ArrayRef, AsArray, TimestampMicrosecondBuilder};
+use arrow::datatypes::{
+    ArrowPrimitiveType, DataType, Field, FieldRef, Int8Type, Int16Type, 
Int32Type,
+    Int64Type, TimeUnit,
+};
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::types::{
+    logical_int8, logical_int16, logical_int32, logical_int64, logical_string,
+};
+use datafusion_common::{Result, ScalarValue, exec_err, internal_err};
+use datafusion_expr::{Coercion, TypeSignatureClass};
+use datafusion_expr::{
+    ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, 
ScalarUDFImpl,
+    Signature, TypeSignature, Volatility,
+};
+use std::any::Any;
+use std::sync::Arc;
+
+const MICROS_PER_SECOND: i64 = 1_000_000;
+
+/// Convert seconds to microseconds with saturating overflow behavior (matches 
spark spec)
+#[inline]
+fn secs_to_micros(secs: i64) -> i64 {
+    secs.saturating_mul(MICROS_PER_SECOND)
+}
+
+/// Spark-compatible `cast` function for type conversions
+///
+/// This implements Spark's CAST expression with a target type parameter
+///
+/// # Usage
+/// ```sql
+/// SELECT spark_cast(value, 'timestamp')
+/// ```
+///
+/// # Currently supported conversions
+/// - Int8/Int16/Int32/Int64 -> Timestamp (target_type = 'timestamp')
+///
+/// The integer value is interpreted as seconds since the Unix epoch 
(1970-01-01 00:00:00 UTC)
+/// and converted to a timestamp with microsecond precision (matches spark's 
spec)
+///
+/// # Overflow behavior
+/// Uses saturating multiplication to handle overflow - values that would 
overflow
+/// i64 when multiplied by 1,000,000 are clamped to i64::MAX or i64::MIN
+///
+/// # References
+/// - <https://spark.apache.org/docs/latest/api/sql/index.html#cast>
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct SparkCast {
+    signature: Signature,
+    timezone: Option<Arc<str>>,
+}
+
+impl Default for SparkCast {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl SparkCast {
+    pub fn new() -> Self {
+        Self::new_with_config(&ConfigOptions::default())
+    }
+
+    pub fn new_with_config(config: &ConfigOptions) -> Self {
+        // First arg: value to cast (only signed ints - Spark doesn't have 
unsigned integers)
+        // Second arg: target datatype as Utf8 string literal (ex : 
'timestamp')
+        let string_arg =
+            Coercion::new_exact(TypeSignatureClass::Native(logical_string()));
+
+        // Spark only supports signed integers, so we explicitly list them
+        let signed_int_signatures = [
+            logical_int8(),
+            logical_int16(),
+            logical_int32(),
+            logical_int64(),
+        ]
+        .map(|int_type| {
+            TypeSignature::Coercible(vec![
+                Coercion::new_exact(TypeSignatureClass::Native(int_type)),
+                string_arg.clone(),
+            ])
+        });
+
+        Self {
+            signature: Signature::new(
+                TypeSignature::OneOf(Vec::from(signed_int_signatures)),
+                Volatility::Stable,
+            ),
+            timezone: config
+                .execution
+                .time_zone
+                .as_ref()
+                .map(|tz| Arc::from(tz.as_str()))
+                .or_else(|| Some(Arc::from("UTC"))),
+        }
+    }
+}
+
+/// Parse target type string into a DataType
+fn parse_target_type(type_str: &str, timezone: Option<Arc<str>>) -> 
Result<DataType> {
+    match type_str.to_lowercase().as_str() {
+        // further data type support in future
+        "timestamp" => Ok(DataType::Timestamp(TimeUnit::Microsecond, 
timezone)),
+        other => exec_err!(
+            "Unsupported spark_cast target type '{}'. Supported types: 
timestamp",
+            other
+        ),
+    }
+}
+
+/// Extract target type string from scalar arguments
+fn get_target_type_from_scalar_args(
+    scalar_args: &[Option<&ScalarValue>],
+    timezone: Option<Arc<str>>,
+) -> Result<DataType> {
+    let type_arg = scalar_args.get(1).and_then(|opt| *opt);
+
+    match type_arg {
+        Some(ScalarValue::Utf8(Some(s)))
+        | Some(ScalarValue::LargeUtf8(Some(s)))
+        | Some(ScalarValue::Utf8View(Some(s))) => parse_target_type(s, 
timezone),
+        _ => exec_err!(
+            "spark_cast requires second argument to be a string of target data 
type ex: timestamp"
+        ),
+    }
+}
+
+fn cast_int_to_timestamp<T: ArrowPrimitiveType>(
+    array: &ArrayRef,
+    timezone: Option<Arc<str>>,
+) -> Result<ArrayRef>
+where
+    T::Native: Into<i64>,
+{
+    let arr = array.as_primitive::<T>();
+    let mut builder = TimestampMicrosecondBuilder::with_capacity(arr.len());
+
+    for i in 0..arr.len() {
+        if arr.is_null(i) {
+            builder.append_null();
+        } else {
+            // spark saturates to i64 min/max
+            let micros = secs_to_micros(arr.value(i).into());
+            builder.append_value(micros);
+        }
+    }
+
+    Ok(Arc::new(builder.finish().with_timezone_opt(timezone)))
+}
+
+impl ScalarUDFImpl for SparkCast {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "spark_cast"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+        internal_err!("return_field_from_args should be used instead")
+    }
+
+    fn with_updated_config(&self, config: &ConfigOptions) -> Option<ScalarUDF> 
{
+        Some(ScalarUDF::from(Self::new_with_config(config)))
+    }
+
+    fn return_field_from_args(&self, args: ReturnFieldArgs) -> 
Result<FieldRef> {
+        let nullable = args.arg_fields.iter().any(|f| f.is_nullable());
+        let return_type = get_target_type_from_scalar_args(
+            args.scalar_arguments,
+            self.timezone.clone(),
+        )?;
+        Ok(Arc::new(Field::new(self.name(), return_type, nullable)))
+    }
+
+    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+        let target_type = args.return_field.data_type();
+        match target_type {
+            DataType::Timestamp(TimeUnit::Microsecond, tz) => {
+                cast_to_timestamp(&args.args[0], tz.clone())
+            }
+            other => exec_err!("Unsupported spark_cast target type: {:?}", 
other),
+        }
+    }
+}
+
+/// Cast value to timestamp internal function
+fn cast_to_timestamp(
+    input: &ColumnarValue,
+    timezone: Option<Arc<str>>,
+) -> Result<ColumnarValue> {
+    match input {
+        ColumnarValue::Array(array) => match array.data_type() {
+            DataType::Null => Ok(ColumnarValue::Array(Arc::new(
+                arrow::array::TimestampMicrosecondArray::new_null(array.len())
+                    .with_timezone_opt(timezone),
+            ))),
+            DataType::Int8 => Ok(ColumnarValue::Array(
+                cast_int_to_timestamp::<Int8Type>(array, timezone)?,
+            )),
+            DataType::Int16 => Ok(ColumnarValue::Array(cast_int_to_timestamp::<
+                Int16Type,
+            >(array, timezone)?)),
+            DataType::Int32 => Ok(ColumnarValue::Array(cast_int_to_timestamp::<
+                Int32Type,
+            >(array, timezone)?)),
+            DataType::Int64 => Ok(ColumnarValue::Array(cast_int_to_timestamp::<
+                Int64Type,
+            >(array, timezone)?)),
+            other => exec_err!("Unsupported cast from {:?} to timestamp", 
other),
+        },
+        ColumnarValue::Scalar(scalar) => {
+            let micros = match scalar {
+                ScalarValue::Null
+                | ScalarValue::Int8(None)
+                | ScalarValue::Int16(None)
+                | ScalarValue::Int32(None)
+                | ScalarValue::Int64(None) => None,
+                ScalarValue::Int8(Some(v)) => 
Some(secs_to_micros((*v).into())),
+                ScalarValue::Int16(Some(v)) => 
Some(secs_to_micros((*v).into())),
+                ScalarValue::Int32(Some(v)) => 
Some(secs_to_micros((*v).into())),
+                ScalarValue::Int64(Some(v)) => Some(secs_to_micros(*v)),
+                other => {
+                    return exec_err!("Unsupported cast from {:?} to 
timestamp", other);
+                }
+            };
+            Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
+                micros, timezone,
+            )))
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::{Int8Array, Int16Array, Int32Array, Int64Array};
+    use arrow::datatypes::TimestampMicrosecondType;
+
+    // helpers to make testing easier
+    fn make_args(input: ColumnarValue, target_type: &str) -> 
ScalarFunctionArgs {
+        make_args_with_timezone(input, target_type, Some("UTC"))
+    }
+
+    fn make_args_with_timezone(
+        input: ColumnarValue,
+        target_type: &str,
+        timezone: Option<&str>,
+    ) -> ScalarFunctionArgs {
+        let return_field = Arc::new(Field::new(
+            "result",
+            DataType::Timestamp(
+                TimeUnit::Microsecond,
+                Some(Arc::from(timezone.unwrap())),
+            ),
+            true,
+        ));
+        let mut config = ConfigOptions::default();
+        if let Some(tz) = timezone {
+            config.execution.time_zone = Some(tz.to_string());
+        }
+        ScalarFunctionArgs {
+            args: vec![
+                input,
+                
ColumnarValue::Scalar(ScalarValue::Utf8(Some(target_type.to_string()))),
+            ],
+            arg_fields: vec![],
+            number_rows: 0,
+            return_field,
+            config_options: Arc::new(config),
+        }
+    }
+
+    fn assert_scalar_timestamp(result: ColumnarValue, expected: i64) {
+        assert_scalar_timestamp_with_tz(result, expected, "UTC");
+    }
+
+    fn assert_scalar_timestamp_with_tz(
+        result: ColumnarValue,
+        expected: i64,
+        expected_tz: &str,
+    ) {
+        match result {
+            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
+                Some(val),
+                Some(tz),
+            )) => {
+                assert_eq!(val, expected);
+                assert_eq!(tz.as_ref(), expected_tz);
+            }
+            _ => {
+                panic!(
+                    "Expected scalar timestamp with value {expected} and 
{expected_tz} timezone"
+                )
+            }
+        }
+    }
+
+    fn assert_scalar_null(result: ColumnarValue) {
+        assert_scalar_null_with_tz(result, "UTC");
+    }
+
+    fn assert_scalar_null_with_tz(result: ColumnarValue, expected_tz: &str) {
+        match result {
+            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(None, 
Some(tz))) => {
+                assert_eq!(tz.as_ref(), expected_tz);
+            }
+            _ => panic!("Expected null scalar timestamp with {expected_tz} 
timezone"),
+        }
+    }
+
+    #[test]
+    fn test_cast_int8_array_to_timestamp() {
+        let array: ArrayRef = Arc::new(Int8Array::from(vec![
+            Some(0),
+            Some(1),
+            Some(-1),
+            Some(127),
+            Some(-128),
+            None,
+        ]));
+
+        let cast = SparkCast::new();
+        let args = make_args(ColumnarValue::Array(array), "timestamp");
+        let result = cast.invoke_with_args(args).unwrap();
+
+        match result {
+            ColumnarValue::Array(result_array) => {
+                let ts_array = 
result_array.as_primitive::<TimestampMicrosecondType>();
+                assert_eq!(ts_array.value(0), 0);
+                assert_eq!(ts_array.value(1), 1_000_000);
+                assert_eq!(ts_array.value(2), -1_000_000);
+                assert_eq!(ts_array.value(3), 127_000_000);
+                assert_eq!(ts_array.value(4), -128_000_000);
+                assert!(ts_array.is_null(5));
+            }
+            _ => panic!("Expected array result"),
+        }
+    }
+
+    #[test]
+    fn test_cast_int16_array_to_timestamp() {
+        let array: ArrayRef = Arc::new(Int16Array::from(vec![
+            Some(0),
+            Some(32767),
+            Some(-32768),
+            None,
+        ]));
+
+        let cast = SparkCast::new();
+        let args = make_args(ColumnarValue::Array(array), "timestamp");
+        let result = cast.invoke_with_args(args).unwrap();
+
+        match result {
+            ColumnarValue::Array(result_array) => {
+                let ts_array = 
result_array.as_primitive::<TimestampMicrosecondType>();
+                assert_eq!(ts_array.value(0), 0);
+                assert_eq!(ts_array.value(1), 32_767_000_000);
+                assert_eq!(ts_array.value(2), -32_768_000_000);
+                assert!(ts_array.is_null(3));
+            }
+            _ => panic!("Expected array result"),
+        }
+    }
+
+    #[test]
+    fn test_cast_int32_array_to_timestamp() {
+        let array: ArrayRef =
+            Arc::new(Int32Array::from(vec![Some(0), Some(1704067200), None]));
+
+        let cast = SparkCast::new();
+        let args = make_args(ColumnarValue::Array(array), "timestamp");
+        let result = cast.invoke_with_args(args).unwrap();
+
+        match result {
+            ColumnarValue::Array(result_array) => {
+                let ts_array = 
result_array.as_primitive::<TimestampMicrosecondType>();
+                assert_eq!(ts_array.value(0), 0);
+                assert_eq!(ts_array.value(1), 1_704_067_200_000_000);
+                assert!(ts_array.is_null(2));
+            }
+            _ => panic!("Expected array result"),
+        }
+    }
+
+    #[test]
+    fn test_cast_int64_array_overflow() {
+        let array: ArrayRef =
+            Arc::new(Int64Array::from(vec![Some(i64::MAX), Some(i64::MIN)]));
+
+        let cast = SparkCast::new();
+        let args = make_args(ColumnarValue::Array(array), "timestamp");
+        let result = cast.invoke_with_args(args).unwrap();
+
+        match result {
+            ColumnarValue::Array(result_array) => {
+                let ts_array = 
result_array.as_primitive::<TimestampMicrosecondType>();
+                // saturating_mul clamps to i64::MAX/MIN
+                assert_eq!(ts_array.value(0), i64::MAX);
+                assert_eq!(ts_array.value(1), i64::MIN);
+            }
+            _ => panic!("Expected array result"),
+        }
+    }
+
+    #[test]
+    fn test_cast_int64_array_to_timestamp() {
+        let array: ArrayRef = Arc::new(Int64Array::from(vec![
+            Some(0),
+            Some(1704067200),
+            Some(-86400),
+            None,
+        ]));
+
+        let cast = SparkCast::new();
+        let args = make_args(ColumnarValue::Array(array), "timestamp");
+        let result = cast.invoke_with_args(args).unwrap();
+
+        match result {
+            ColumnarValue::Array(result_array) => {
+                let ts_array = 
result_array.as_primitive::<TimestampMicrosecondType>();
+                assert_eq!(ts_array.value(0), 0);
+                assert_eq!(ts_array.value(1), 1_704_067_200_000_000);
+                assert_eq!(ts_array.value(2), -86_400_000_000); // -1 day
+                assert!(ts_array.is_null(3));
+            }
+            _ => panic!("Expected array result"),
+        }
+    }
+
+    #[test]
+    fn test_cast_scalar_int8() {
+        let cast = SparkCast::new();
+        let args = make_args(
+            ColumnarValue::Scalar(ScalarValue::Int8(Some(100))),
+            "timestamp",
+        );
+        let result = cast.invoke_with_args(args).unwrap();
+        assert_scalar_timestamp(result, 100_000_000);
+    }
+
+    #[test]
+    fn test_cast_scalar_int16() {
+        let cast = SparkCast::new();
+        let args = make_args(
+            ColumnarValue::Scalar(ScalarValue::Int16(Some(100))),
+            "timestamp",
+        );
+        let result = cast.invoke_with_args(args).unwrap();
+        assert_scalar_timestamp(result, 100_000_000);
+    }
+
+    #[test]
+    fn test_cast_scalar_int32() {
+        let cast = SparkCast::new();
+        let args = make_args(
+            ColumnarValue::Scalar(ScalarValue::Int32(Some(1704067200))),
+            "timestamp",
+        );
+        let result = cast.invoke_with_args(args).unwrap();
+        assert_scalar_timestamp(result, 1_704_067_200_000_000);
+    }
+
+    #[test]
+    fn test_cast_scalar_int64() {
+        let cast = SparkCast::new();
+        let args = make_args(
+            ColumnarValue::Scalar(ScalarValue::Int64(Some(1704067200))),
+            "timestamp",
+        );
+        let result = cast.invoke_with_args(args).unwrap();
+        assert_scalar_timestamp(result, 1_704_067_200_000_000);
+    }
+
+    #[test]
+    fn test_cast_scalar_negative() {
+        let cast = SparkCast::new();
+        let args = make_args(
+            ColumnarValue::Scalar(ScalarValue::Int32(Some(-86400))),
+            "timestamp",
+        );
+        let result = cast.invoke_with_args(args).unwrap();
+        // -86400 seconds = -1 day before epoch
+        assert_scalar_timestamp(result, -86_400_000_000);
+    }
+
+    #[test]
+    fn test_cast_scalar_null() {
+        let cast = SparkCast::new();
+        let args =
+            make_args(ColumnarValue::Scalar(ScalarValue::Int64(None)), 
"timestamp");
+        let result = cast.invoke_with_args(args).unwrap();
+        assert_scalar_null(result);
+    }
+
+    #[test]
+    fn test_cast_scalar_int64_overflow() {
+        let cast = SparkCast::new();
+        let args = make_args(
+            ColumnarValue::Scalar(ScalarValue::Int64(Some(i64::MAX))),
+            "timestamp",
+        );
+        let result = cast.invoke_with_args(args).unwrap();
+        // saturating_mul clamps to i64::MAX
+        assert_scalar_timestamp(result, i64::MAX);
+    }
+
+    #[test]
+    fn test_unsupported_target_type() {
+        let cast = SparkCast::new();
+        // invoke_with_args uses return_field which would be set correctly by 
planning
+        // For this test, we need to check return_field_from_args
+        let arg_fields: Vec<FieldRef> =
+            vec![Arc::new(Field::new("a", DataType::Int64, true))];
+        let target_type = ScalarValue::Utf8(Some("string".to_string()));
+        let scalar_arguments: Vec<Option<&ScalarValue>> = vec![None, 
Some(&target_type)];
+        let return_field_args = ReturnFieldArgs {
+            arg_fields: &arg_fields,
+            scalar_arguments: &scalar_arguments,
+        };
+        let result = cast.return_field_from_args(return_field_args);
+        assert!(result.is_err());
+        assert!(
+            result
+                .unwrap_err()
+                .to_string()
+                .contains("Unsupported spark_cast target type")
+        );
+    }
+
+    #[test]
+    fn test_unsupported_source_type() {
+        let cast = SparkCast::new();
+        let args = make_args(
+            
ColumnarValue::Scalar(ScalarValue::Utf8(Some("2024-01-01".to_string()))),
+            "timestamp",
+        );
+        let result = cast.invoke_with_args(args);
+        assert!(result.is_err());
+        assert!(
+            result
+                .unwrap_err()
+                .to_string()
+                .contains("Unsupported cast from")
+        );
+    }
+
+    #[test]
+    fn test_cast_null_to_timestamp() {
+        let cast = SparkCast::new();
+        let args = make_args(ColumnarValue::Scalar(ScalarValue::Null), 
"timestamp");
+        let result = cast.invoke_with_args(args).unwrap();
+        assert_scalar_null(result);
+    }
+
+    #[test]
+    fn test_cast_null_array_to_timestamp() {
+        let array: ArrayRef = Arc::new(arrow::array::NullArray::new(3));
+
+        let cast = SparkCast::new();
+        let args = make_args(ColumnarValue::Array(array), "timestamp");
+        let result = cast.invoke_with_args(args).unwrap();
+
+        match result {
+            ColumnarValue::Array(result_array) => {
+                let ts_array = 
result_array.as_primitive::<TimestampMicrosecondType>();
+                assert_eq!(ts_array.len(), 3);
+                assert!(ts_array.is_null(0));
+                assert!(ts_array.is_null(1));
+                assert!(ts_array.is_null(2));
+            }
+            _ => panic!("Expected array result"),
+        }
+    }
+
+    #[test]
+    fn test_cast_int_to_timestamp_with_timezones() {
+        // Test with various timezones like Comet does
+        let timezones = [
+            "UTC",
+            "America/New_York",
+            "America/Los_Angeles",
+            "Europe/London",
+            "Asia/Tokyo",
+            "Australia/Sydney",
+        ];
+
+        let cast = SparkCast::new();
+        let test_value: i64 = 1704067200; // 2024-01-01 00:00:00 UTC
+        let expected_micros = test_value * MICROS_PER_SECOND;
+
+        for tz in timezones {
+            // scalar
+            let args = make_args_with_timezone(
+                ColumnarValue::Scalar(ScalarValue::Int64(Some(test_value))),
+                "timestamp",
+                Some(tz),
+            );
+            let result = cast.invoke_with_args(args).unwrap();
+            assert_scalar_timestamp_with_tz(result, expected_micros, tz);
+
+            // array input
+            let array: ArrayRef =
+                Arc::new(Int64Array::from(vec![Some(test_value), None]));
+            let args = make_args_with_timezone(
+                ColumnarValue::Array(array),
+                "timestamp",
+                Some(tz),
+            );
+            let result = cast.invoke_with_args(args).unwrap();
+
+            match result {
+                ColumnarValue::Array(result_array) => {
+                    let ts_array =
+                        
result_array.as_primitive::<TimestampMicrosecondType>();
+                    assert_eq!(ts_array.value(0), expected_micros);
+                    assert!(ts_array.is_null(1));
+                    assert_eq!(ts_array.timezone(), Some(tz));
+                }
+                _ => panic!("Expected array result for timezone {tz}"),
+            }
+        }
+    }
+
+    #[test]
+    fn test_cast_int_to_timestamp_default_timezone() {
+        let cast = SparkCast::new();
+        let args = make_args(
+            ColumnarValue::Scalar(ScalarValue::Int64(Some(0))),
+            "timestamp",
+        );
+        let result = cast.invoke_with_args(args).unwrap();
+        // Defaults to UTC
+        assert_scalar_timestamp_with_tz(result, 0, "UTC");
+    }
+}
diff --git a/datafusion/spark/src/function/conversion/mod.rs 
b/datafusion/spark/src/function/conversion/mod.rs
index a87df9a2c8..e8a89fa8c0 100644
--- a/datafusion/spark/src/function/conversion/mod.rs
+++ b/datafusion/spark/src/function/conversion/mod.rs
@@ -15,11 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
+mod cast;
+
 use datafusion_expr::ScalarUDF;
+use datafusion_functions::make_udf_function_with_config;
 use std::sync::Arc;
 
-pub mod expr_fn {}
+make_udf_function_with_config!(cast::SparkCast, spark_cast);
+
+pub mod expr_fn {
+    use datafusion_functions::export_functions;
+
+    export_functions!((
+        spark_cast,
+        "Casts given value to the specified type following Spark-compatible 
semantics",
+        @config arg1 arg2
+    ));
+}
 
 pub fn functions() -> Vec<Arc<ScalarUDF>> {
-    vec![]
+    use datafusion_common::config::ConfigOptions;
+    let config = ConfigOptions::default();
+    vec![spark_cast(&config)]
 }
diff --git 
a/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt 
b/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt
new file mode 100644
index 0000000000..8b77533729
--- /dev/null
+++ 
b/datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt
@@ -0,0 +1,252 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test spark_cast from int8 to timestamp
+query P
+SELECT spark_cast(arrow_cast(0, 'Int8'), 'timestamp');
+----
+1970-01-01T00:00:00Z
+
+query P
+SELECT spark_cast(arrow_cast(1, 'Int8'), 'timestamp');
+----
+1970-01-01T00:00:01Z
+
+query P
+SELECT spark_cast(arrow_cast(-1, 'Int8'), 'timestamp');
+----
+1969-12-31T23:59:59Z
+
+# Test spark_cast from int16 to timestamp
+query P
+SELECT spark_cast(arrow_cast(0, 'Int16'), 'timestamp');
+----
+1970-01-01T00:00:00Z
+
+query P
+SELECT spark_cast(arrow_cast(3600, 'Int16'), 'timestamp');
+----
+1970-01-01T01:00:00Z
+
+# Test spark_cast from int32 to timestamp
+query P
+SELECT spark_cast(arrow_cast(0, 'Int32'), 'timestamp');
+----
+1970-01-01T00:00:00Z
+
+query P
+SELECT spark_cast(arrow_cast(1704067200, 'Int32'), 'timestamp');
+----
+2024-01-01T00:00:00Z
+
+# Test spark_cast from int64 to timestamp
+query P
+SELECT spark_cast(0::bigint, 'timestamp');
+----
+1970-01-01T00:00:00Z
+
+query P
+SELECT spark_cast(1704067200::bigint, 'timestamp');
+----
+2024-01-01T00:00:00Z
+
+# Test NULL handling
+query P
+SELECT spark_cast(arrow_cast(NULL, 'Int8'), 'timestamp');
+----
+NULL
+
+query P
+SELECT spark_cast(arrow_cast(NULL, 'Int16'), 'timestamp');
+----
+NULL
+
+query P
+SELECT spark_cast(arrow_cast(NULL, 'Int32'), 'timestamp');
+----
+NULL
+
+query P
+SELECT spark_cast(NULL::bigint, 'timestamp');
+----
+NULL
+
+# Test untyped NULL
+query P
+SELECT spark_cast(NULL, 'timestamp');
+----
+NULL
+
+# Test Int8 boundary values
+query P
+SELECT spark_cast(arrow_cast(127, 'Int8'), 'timestamp');
+----
+1970-01-01T00:02:07Z
+
+query P
+SELECT spark_cast(arrow_cast(-128, 'Int8'), 'timestamp');
+----
+1969-12-31T23:57:52Z
+
+# Test Int16 boundary values
+query P
+SELECT spark_cast(arrow_cast(32767, 'Int16'), 'timestamp');
+----
+1970-01-01T09:06:07Z
+
+query P
+SELECT spark_cast(arrow_cast(-32768, 'Int16'), 'timestamp');
+----
+1969-12-31T14:53:52Z
+
+# Test Int64 negative value
+query P
+SELECT spark_cast(-86400::bigint, 'timestamp');
+----
+1969-12-31T00:00:00Z
+
+# Test unsupported source type - should error
+statement error
+SELECT spark_cast('2024-01-01', 'timestamp');
+
+# Test unsupported target type - should error
+statement error
+SELECT spark_cast(100, 'string');
+
+# Test with different session timezones to verify simplify() picks up config
+
+# America/Los_Angeles (PST/PDT - has DST)
+statement ok
+SET datafusion.execution.time_zone = 'America/Los_Angeles';
+
+# Epoch in PST (UTC-8)
+query P
+SELECT spark_cast(0::bigint, 'timestamp');
+----
+1969-12-31T16:00:00-08:00
+
+# 2024-01-01 00:00:00 UTC in PST (winter, UTC-8)
+query P
+SELECT spark_cast(1704067200::bigint, 'timestamp');
+----
+2023-12-31T16:00:00-08:00
+
+# America/Phoenix (MST - no DST, always UTC-7)
+statement ok
+SET datafusion.execution.time_zone = 'America/Phoenix';
+
+# Epoch in Phoenix (UTC-7)
+query P
+SELECT spark_cast(0::bigint, 'timestamp');
+----
+1969-12-31T17:00:00-07:00
+
+# 2024-01-01 00:00:00 UTC in Phoenix (still UTC-7, no DST)
+query P
+SELECT spark_cast(1704067200::bigint, 'timestamp');
+----
+2023-12-31T17:00:00-07:00
+
+# Test with different timezones - LA (has DST)
+statement ok
+SET datafusion.execution.time_zone = 'America/Los_Angeles';
+
+query P
+SELECT spark_cast(1710054000::bigint, 'timestamp');
+----
+2024-03-09T23:00:00-08:00
+
+query P
+SELECT spark_cast(1710057600::bigint, 'timestamp');
+----
+2024-03-10T00:00:00-08:00
+
+# Phoenix has no DST - always UTC-7
+statement ok
+SET datafusion.execution.time_zone = 'America/Phoenix';
+
+query P
+SELECT spark_cast(1710054000::bigint, 'timestamp');
+----
+2024-03-10T00:00:00-07:00
+
+query P
+SELECT spark_cast(1710057600::bigint, 'timestamp');
+----
+2024-03-10T01:00:00-07:00
+
+# Reset to default UTC
+statement ok
+SET datafusion.execution.time_zone = 'UTC';
+
+#############################
+# Array Tests
+#############################
+
+# Create test table with 4 int columns: null, min, max, regular value
+statement ok
+CREATE TABLE int_test AS SELECT
+    arrow_cast(column1, 'Int8') as i8_col,
+    arrow_cast(column2, 'Int16') as i16_col,
+    arrow_cast(column3, 'Int32') as i32_col,
+    column4::bigint as i64_col
+FROM (VALUES
+    (NULL, NULL, NULL, NULL),
+    (-128, -32768, -2147483648, -86400),
+    (127, 32767, 2147483647, 86400),
+    (100, 3600, 1710054000, 1710054000)
+);
+
+# Test in UTC
+query PPPP
+SELECT spark_cast(i8_col, 'timestamp'), spark_cast(i16_col, 'timestamp'), 
spark_cast(i32_col, 'timestamp'), spark_cast(i64_col, 'timestamp') FROM 
int_test;
+----
+NULL NULL NULL NULL
+1969-12-31T23:57:52Z 1969-12-31T14:53:52Z 1901-12-13T20:45:52Z 
1969-12-31T00:00:00Z
+1970-01-01T00:02:07Z 1970-01-01T09:06:07Z 2038-01-19T03:14:07Z 
1970-01-02T00:00:00Z
+1970-01-01T00:01:40Z 1970-01-01T01:00:00Z 2024-03-10T07:00:00Z 
2024-03-10T07:00:00Z
+
+# Test in America/Los_Angeles (PST - has DST)
+statement ok
+SET datafusion.execution.time_zone = 'America/Los_Angeles';
+
+query PPPP
+SELECT spark_cast(i8_col, 'timestamp'), spark_cast(i16_col, 'timestamp'), 
spark_cast(i32_col, 'timestamp'), spark_cast(i64_col, 'timestamp') FROM 
int_test;
+----
+NULL NULL NULL NULL
+1969-12-31T15:57:52-08:00 1969-12-31T06:53:52-08:00 1901-12-13T12:45:52-08:00 
1969-12-30T16:00:00-08:00
+1969-12-31T16:02:07-08:00 1970-01-01T01:06:07-08:00 2038-01-18T19:14:07-08:00 
1970-01-01T16:00:00-08:00
+1969-12-31T16:01:40-08:00 1969-12-31T17:00:00-08:00 2024-03-09T23:00:00-08:00 
2024-03-09T23:00:00-08:00
+
+# Test in America/Phoenix (MST - no DST, always UTC-7)
+statement ok
+SET datafusion.execution.time_zone = 'America/Phoenix';
+
+query PPPP
+SELECT spark_cast(i8_col, 'timestamp'), spark_cast(i16_col, 'timestamp'), 
spark_cast(i32_col, 'timestamp'), spark_cast(i64_col, 'timestamp') FROM 
int_test;
+----
+NULL NULL NULL NULL
+1969-12-31T16:57:52-07:00 1969-12-31T07:53:52-07:00 1901-12-13T13:45:52-07:00 
1969-12-30T17:00:00-07:00
+1969-12-31T17:02:07-07:00 1970-01-01T02:06:07-07:00 2038-01-18T20:14:07-07:00 
1970-01-01T17:00:00-07:00
+1969-12-31T17:01:40-07:00 1969-12-31T18:00:00-07:00 2024-03-10T00:00:00-07:00 
2024-03-10T00:00:00-07:00
+
+# Reset and cleanup
+statement ok
+SET datafusion.execution.time_zone = 'UTC';
+
+statement ok
+DROP TABLE int_test;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to