martin-g commented on code in PR #20555: URL: https://github.com/apache/datafusion/pull/20555#discussion_r2910708740
########## datafusion/spark/src/function/conversion/cast.rs: ########## @@ -0,0 +1,655 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, ArrayRef, AsArray, TimestampMicrosecondBuilder}; +use arrow::datatypes::{ + ArrowPrimitiveType, DataType, Field, FieldRef, Int8Type, Int16Type, Int32Type, + Int64Type, TimeUnit, +}; +use datafusion::logical_expr::{Coercion, TypeSignatureClass}; +use datafusion_common::config::ConfigOptions; +use datafusion_common::types::logical_string; +use datafusion_common::{ + Result as DataFusionResult, ScalarValue, exec_err, internal_err, +}; +use datafusion_expr::TypeSignatureClass::Integer; +use datafusion_expr::{ + ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, + Signature, TypeSignature, Volatility, +}; +use std::any::Any; +use std::sync::Arc; + +const MICROS_PER_SECOND: i64 = 1_000_000; + +/// Convert seconds to microseconds with saturating overflow behavior (matches spark spec) +#[inline] +fn secs_to_micros(secs: i64) -> i64 { + secs.saturating_mul(MICROS_PER_SECOND) +} + +/// Spark-compatible `cast` function for type conversions +/// +/// This implements Spark's CAST expression with a target type parameter +/// +/// # Usage +/// ```sql +/// SELECT spark_cast(value, 'timestamp') +/// ``` +/// +/// # Currently supported conversions +/// - Int8/Int16/Int32/Int64 -> Timestamp (target_type = 'timestamp') +/// +/// The integer value is interpreted as seconds since the Unix epoch (1970-01-01 00:00:00 UTC) +/// and converted to a timestamp with microsecond precision (matches spark's spec) +/// +/// # Overflow behavior +/// Uses saturating multiplication to handle overflow - values that would overflow +/// i64 when multiplied by 1,000,000 are clamped to i64::MAX or i64::MIN +/// +/// # References +/// - <https://spark.apache.org/docs/latest/api/sql/index.html#cast> +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkCast { + signature: Signature, + timezone: Option<Arc<str>>, +} + +impl Default for SparkCast { + fn default() -> Self { + Self::new() + } +} + +impl SparkCast { + pub fn new() -> Self { + Self::new_with_config(&ConfigOptions::default()) + } + + pub fn new_with_config(config: &ConfigOptions) -> Self { + // First arg: value to cast (only ints for now with potential to add further support later) + // Second arg: target datatype as Utf8 string literal (ex : 'timestamp') + let int_arg = Coercion::new_exact(Integer); + let string_arg = + Coercion::new_exact(TypeSignatureClass::Native(logical_string())); + Self { + signature: Signature::new( + TypeSignature::Coercible(vec![int_arg.clone(), string_arg.clone()]), Review Comment: ```suggestion TypeSignature::Coercible(vec![int_arg, string_arg]), ``` Those are not used below, so they could be _moved_. ########## datafusion/spark/src/function/conversion/cast.rs: ########## @@ -0,0 +1,655 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, ArrayRef, AsArray, TimestampMicrosecondBuilder}; +use arrow::datatypes::{ + ArrowPrimitiveType, DataType, Field, FieldRef, Int8Type, Int16Type, Int32Type, + Int64Type, TimeUnit, +}; +use datafusion::logical_expr::{Coercion, TypeSignatureClass}; +use datafusion_common::config::ConfigOptions; +use datafusion_common::types::logical_string; +use datafusion_common::{ + Result as DataFusionResult, ScalarValue, exec_err, internal_err, +}; +use datafusion_expr::TypeSignatureClass::Integer; +use datafusion_expr::{ + ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, + Signature, TypeSignature, Volatility, +}; +use std::any::Any; +use std::sync::Arc; + +const MICROS_PER_SECOND: i64 = 1_000_000; + +/// Convert seconds to microseconds with saturating overflow behavior (matches spark spec) +#[inline] +fn secs_to_micros(secs: i64) -> i64 { + secs.saturating_mul(MICROS_PER_SECOND) +} + +/// Spark-compatible `cast` function for type conversions +/// +/// This implements Spark's CAST expression with a target type parameter +/// +/// # Usage +/// ```sql +/// SELECT spark_cast(value, 'timestamp') +/// ``` +/// +/// # Currently supported conversions +/// - Int8/Int16/Int32/Int64 -> Timestamp (target_type = 'timestamp') +/// +/// The integer value is interpreted as seconds since the Unix epoch (1970-01-01 00:00:00 UTC) +/// and converted to a timestamp with microsecond precision (matches spark's spec) +/// +/// # Overflow behavior +/// Uses saturating multiplication to handle overflow - values that would overflow +/// i64 when multiplied by 1,000,000 are clamped to i64::MAX or i64::MIN +/// +/// # References +/// - <https://spark.apache.org/docs/latest/api/sql/index.html#cast> +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkCast { + signature: Signature, + timezone: Option<Arc<str>>, +} + +impl Default for SparkCast { + fn default() -> Self { + Self::new() + } +} + +impl SparkCast { + pub fn new() -> Self { + Self::new_with_config(&ConfigOptions::default()) + } + + pub fn new_with_config(config: &ConfigOptions) -> Self { + // First arg: value to cast (only ints for now with potential to add further support later) + // Second arg: target datatype as Utf8 string literal (ex : 'timestamp') + let int_arg = Coercion::new_exact(Integer); + let string_arg = + Coercion::new_exact(TypeSignatureClass::Native(logical_string())); + Self { + signature: Signature::new( + TypeSignature::Coercible(vec![int_arg.clone(), string_arg.clone()]), + Volatility::Stable, + ), + timezone: config + .execution + .time_zone + .as_ref() + .map(|tz| Arc::from(tz.as_str())) + .or_else(|| Some(Arc::from("UTC"))), + } + } +} + +/// Parse target type string into a DataType +fn parse_target_type( + type_str: &str, + timezone: Option<Arc<str>>, +) -> DataFusionResult<DataType> { + match type_str.to_lowercase().as_str() { + // further data type support in future + "timestamp" => Ok(DataType::Timestamp(TimeUnit::Microsecond, timezone)), + other => exec_err!( + "Unsupported spark_cast target type '{}'. Supported types: timestamp", + other + ), + } +} + +/// Extract target type string from scalar arguments +fn get_target_type_from_scalar_args( + scalar_args: &[Option<&ScalarValue>], + timezone: Option<Arc<str>>, +) -> DataFusionResult<DataType> { + let type_arg = scalar_args.get(1).and_then(|opt| *opt); + + match type_arg { + Some(ScalarValue::Utf8(Some(s))) + | Some(ScalarValue::LargeUtf8(Some(s))) + | Some(ScalarValue::Utf8View(Some(s))) => parse_target_type(s, timezone), + _ => exec_err!( + "spark_cast requires second argument to be a string of target data type ex: timestamp" + ), + } +} + +fn cast_int_to_timestamp<T: ArrowPrimitiveType>( + array: &ArrayRef, + timezone: Option<Arc<str>>, +) -> DataFusionResult<ArrayRef> +where + T::Native: Into<i64>, +{ + let arr = array.as_primitive::<T>(); + let mut builder = TimestampMicrosecondBuilder::with_capacity(arr.len()); + + for i in 0..arr.len() { + if arr.is_null(i) { + builder.append_null(); + } else { + // spark saturates to i64 min/max + let micros = secs_to_micros(arr.value(i).into()); + builder.append_value(micros); + } + } + + Ok(Arc::new(builder.finish().with_timezone_opt(timezone))) +} + +impl ScalarUDFImpl for SparkCast { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "spark_cast" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> DataFusionResult<DataType> { + internal_err!("return_field_from_args should be used instead") + } + + fn with_updated_config(&self, config: &ConfigOptions) -> Option<ScalarUDF> { + Some(ScalarUDF::from(Self::new_with_config(config))) + } + + fn return_field_from_args( + &self, + args: ReturnFieldArgs, + ) -> DataFusionResult<FieldRef> { + let nullable = args.arg_fields.iter().any(|f| f.is_nullable()); + let return_type = get_target_type_from_scalar_args( + args.scalar_arguments, + self.timezone.clone(), + )?; + Ok(Arc::new(Field::new(self.name(), return_type, nullable))) + } + + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> DataFusionResult<ColumnarValue> { + let target_type = args.return_field.data_type(); + match target_type { + DataType::Timestamp(TimeUnit::Microsecond, tz) => { + cast_to_timestamp(&args.args[0], tz.clone()) + } + other => exec_err!("Unsupported spark_cast target type: {:?}", other), + } + } +} + +/// Cast value to timestamp internal function +fn cast_to_timestamp( + input: &ColumnarValue, + timezone: Option<Arc<str>>, +) -> DataFusionResult<ColumnarValue> { + match input { + ColumnarValue::Array(array) => match array.data_type() { + DataType::Null => Ok(ColumnarValue::Array(Arc::new( + arrow::array::TimestampMicrosecondArray::new_null(array.len()) + .with_timezone_opt(timezone), + ))), + DataType::Int8 => Ok(ColumnarValue::Array( + cast_int_to_timestamp::<Int8Type>(array, timezone)?, + )), + DataType::Int16 => Ok(ColumnarValue::Array(cast_int_to_timestamp::< + Int16Type, + >(array, timezone)?)), + DataType::Int32 => Ok(ColumnarValue::Array(cast_int_to_timestamp::< + Int32Type, + >(array, timezone)?)), + DataType::Int64 => Ok(ColumnarValue::Array(cast_int_to_timestamp::< + Int64Type, Review Comment: The signature also allows unsigned integers but here they would fail as unsupported. ########## datafusion/spark/src/function/conversion/cast.rs: ########## @@ -0,0 +1,655 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, ArrayRef, AsArray, TimestampMicrosecondBuilder}; +use arrow::datatypes::{ + ArrowPrimitiveType, DataType, Field, FieldRef, Int8Type, Int16Type, Int32Type, + Int64Type, TimeUnit, +}; +use datafusion::logical_expr::{Coercion, TypeSignatureClass}; Review Comment: Let's use `use datafusion_expr::` here too, for consistency. ########## datafusion/spark/src/function/conversion/cast.rs: ########## @@ -0,0 +1,655 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, ArrayRef, AsArray, TimestampMicrosecondBuilder}; +use arrow::datatypes::{ + ArrowPrimitiveType, DataType, Field, FieldRef, Int8Type, Int16Type, Int32Type, + Int64Type, TimeUnit, +}; +use datafusion::logical_expr::{Coercion, TypeSignatureClass}; +use datafusion_common::config::ConfigOptions; +use datafusion_common::types::logical_string; +use datafusion_common::{ + Result as DataFusionResult, ScalarValue, exec_err, internal_err, +}; +use datafusion_expr::TypeSignatureClass::Integer; +use datafusion_expr::{ + ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, + Signature, TypeSignature, Volatility, +}; +use std::any::Any; +use std::sync::Arc; + +const MICROS_PER_SECOND: i64 = 1_000_000; + +/// Convert seconds to microseconds with saturating overflow behavior (matches spark spec) +#[inline] +fn secs_to_micros(secs: i64) -> i64 { + secs.saturating_mul(MICROS_PER_SECOND) +} + +/// Spark-compatible `cast` function for type conversions +/// +/// This implements Spark's CAST expression with a target type parameter +/// +/// # Usage +/// ```sql +/// SELECT spark_cast(value, 'timestamp') +/// ``` +/// +/// # Currently supported conversions +/// - Int8/Int16/Int32/Int64 -> Timestamp (target_type = 'timestamp') +/// +/// The integer value is interpreted as seconds since the Unix epoch (1970-01-01 00:00:00 UTC) +/// and converted to a timestamp with microsecond precision (matches spark's spec) +/// +/// # Overflow behavior +/// Uses saturating multiplication to handle overflow - values that would overflow +/// i64 when multiplied by 1,000,000 are clamped to i64::MAX or i64::MIN +/// +/// # References +/// - <https://spark.apache.org/docs/latest/api/sql/index.html#cast> +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkCast { + signature: Signature, + timezone: Option<Arc<str>>, +} + +impl Default for SparkCast { + fn default() -> Self { + Self::new() + } +} + +impl SparkCast { + pub fn new() -> Self { + Self::new_with_config(&ConfigOptions::default()) + } + + pub fn new_with_config(config: &ConfigOptions) -> Self { + // First arg: value to cast (only ints for now with potential to add further support later) + // Second arg: target datatype as Utf8 string literal (ex : 'timestamp') + let int_arg = Coercion::new_exact(Integer); + let string_arg = + Coercion::new_exact(TypeSignatureClass::Native(logical_string())); + Self { + signature: Signature::new( + TypeSignature::Coercible(vec![int_arg.clone(), string_arg.clone()]), + Volatility::Stable, + ), + timezone: config + .execution + .time_zone + .as_ref() + .map(|tz| Arc::from(tz.as_str())) + .or_else(|| Some(Arc::from("UTC"))), + } + } +} + +/// Parse target type string into a DataType +fn parse_target_type( + type_str: &str, + timezone: Option<Arc<str>>, +) -> DataFusionResult<DataType> { + match type_str.to_lowercase().as_str() { + // further data type support in future + "timestamp" => Ok(DataType::Timestamp(TimeUnit::Microsecond, timezone)), + other => exec_err!( + "Unsupported spark_cast target type '{}'. Supported types: timestamp", + other + ), + } +} + +/// Extract target type string from scalar arguments +fn get_target_type_from_scalar_args( + scalar_args: &[Option<&ScalarValue>], + timezone: Option<Arc<str>>, +) -> DataFusionResult<DataType> { + let type_arg = scalar_args.get(1).and_then(|opt| *opt); + + match type_arg { + Some(ScalarValue::Utf8(Some(s))) + | Some(ScalarValue::LargeUtf8(Some(s))) + | Some(ScalarValue::Utf8View(Some(s))) => parse_target_type(s, timezone), + _ => exec_err!( + "spark_cast requires second argument to be a string of target data type ex: timestamp" + ), + } +} + +fn cast_int_to_timestamp<T: ArrowPrimitiveType>( + array: &ArrayRef, + timezone: Option<Arc<str>>, +) -> DataFusionResult<ArrayRef> +where + T::Native: Into<i64>, +{ + let arr = array.as_primitive::<T>(); + let mut builder = TimestampMicrosecondBuilder::with_capacity(arr.len()); + + for i in 0..arr.len() { + if arr.is_null(i) { + builder.append_null(); + } else { + // spark saturates to i64 min/max + let micros = secs_to_micros(arr.value(i).into()); + builder.append_value(micros); + } + } + + Ok(Arc::new(builder.finish().with_timezone_opt(timezone))) +} + +impl ScalarUDFImpl for SparkCast { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "spark_cast" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> DataFusionResult<DataType> { + internal_err!("return_field_from_args should be used instead") + } + + fn with_updated_config(&self, config: &ConfigOptions) -> Option<ScalarUDF> { + Some(ScalarUDF::from(Self::new_with_config(config))) + } + + fn return_field_from_args( + &self, + args: ReturnFieldArgs, + ) -> DataFusionResult<FieldRef> { + let nullable = args.arg_fields.iter().any(|f| f.is_nullable()); + let return_type = get_target_type_from_scalar_args( + args.scalar_arguments, + self.timezone.clone(), + )?; + Ok(Arc::new(Field::new(self.name(), return_type, nullable))) + } + + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> DataFusionResult<ColumnarValue> { + let target_type = args.return_field.data_type(); + match target_type { + DataType::Timestamp(TimeUnit::Microsecond, tz) => { + cast_to_timestamp(&args.args[0], tz.clone()) + } + other => exec_err!("Unsupported spark_cast target type: {:?}", other), + } + } +} + +/// Cast value to timestamp internal function +fn cast_to_timestamp( + input: &ColumnarValue, + timezone: Option<Arc<str>>, +) -> DataFusionResult<ColumnarValue> { + match input { + ColumnarValue::Array(array) => match array.data_type() { + DataType::Null => Ok(ColumnarValue::Array(Arc::new( + arrow::array::TimestampMicrosecondArray::new_null(array.len()) + .with_timezone_opt(timezone), + ))), + DataType::Int8 => Ok(ColumnarValue::Array( + cast_int_to_timestamp::<Int8Type>(array, timezone)?, + )), + DataType::Int16 => Ok(ColumnarValue::Array(cast_int_to_timestamp::< + Int16Type, + >(array, timezone)?)), + DataType::Int32 => Ok(ColumnarValue::Array(cast_int_to_timestamp::< + Int32Type, + >(array, timezone)?)), + DataType::Int64 => Ok(ColumnarValue::Array(cast_int_to_timestamp::< + Int64Type, + >(array, timezone)?)), + other => exec_err!("Unsupported cast from {:?} to timestamp", other), + }, + ColumnarValue::Scalar(scalar) => { + let micros = match scalar { + ScalarValue::Null + | ScalarValue::Int8(None) + | ScalarValue::Int16(None) + | ScalarValue::Int32(None) + | ScalarValue::Int64(None) => None, + ScalarValue::Int8(Some(v)) => Some(secs_to_micros((*v).into())), + ScalarValue::Int16(Some(v)) => Some(secs_to_micros((*v).into())), + ScalarValue::Int32(Some(v)) => Some(secs_to_micros((*v).into())), + ScalarValue::Int64(Some(v)) => Some(secs_to_micros(*v)), + other => { + return exec_err!("Unsupported cast from {:?} to timestamp", other); + } + }; + Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( + micros, timezone, + ))) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Int8Array, Int16Array, Int32Array, Int64Array}; + use arrow::datatypes::TimestampMicrosecondType; + + // helpers to make testing easier + fn make_args(input: ColumnarValue, target_type: &str) -> ScalarFunctionArgs { + make_args_with_timezone(input, target_type, Some("UTC")) + } + + fn make_args_with_timezone( + input: ColumnarValue, + target_type: &str, + timezone: Option<&str>, + ) -> ScalarFunctionArgs { + let return_field = Arc::new(Field::new( + "result", + DataType::Timestamp( + TimeUnit::Microsecond, + Some(Arc::from(timezone.unwrap())), Review Comment: `timezone` is an Option. Unwrapping it may panic. I think it would be safer to go thru `SparkCast::new_with_config(&config).return_field_from_args(...)` ########## datafusion/sqllogictest/test_files/spark/conversion/cast_int_to_timestamp.slt: ########## @@ -0,0 +1,194 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test spark_cast from int8 to timestamp +query P +SELECT spark_cast(arrow_cast(0, 'Int8'), 'timestamp'); +---- +1970-01-01T00:00:00Z + +query P +SELECT spark_cast(arrow_cast(1, 'Int8'), 'timestamp'); +---- +1970-01-01T00:00:01Z + +query P +SELECT spark_cast(arrow_cast(-1, 'Int8'), 'timestamp'); +---- +1969-12-31T23:59:59Z + +# Test spark_cast from int16 to timestamp +query P +SELECT spark_cast(arrow_cast(0, 'Int16'), 'timestamp'); +---- +1970-01-01T00:00:00Z + +query P +SELECT spark_cast(arrow_cast(3600, 'Int16'), 'timestamp'); +---- +1970-01-01T01:00:00Z + +# Test spark_cast from int32 to timestamp +query P +SELECT spark_cast(arrow_cast(0, 'Int32'), 'timestamp'); +---- +1970-01-01T00:00:00Z + +query P +SELECT spark_cast(arrow_cast(1704067200, 'Int32'), 'timestamp'); +---- +2024-01-01T00:00:00Z + +# Test spark_cast from int64 to timestamp +query P +SELECT spark_cast(0::bigint, 'timestamp'); +---- +1970-01-01T00:00:00Z + +query P +SELECT spark_cast(1704067200::bigint, 'timestamp'); +---- +2024-01-01T00:00:00Z + +# Test NULL handling +query P +SELECT spark_cast(arrow_cast(NULL, 'Int8'), 'timestamp'); +---- +NULL + +query P +SELECT spark_cast(arrow_cast(NULL, 'Int16'), 'timestamp'); +---- +NULL + +query P +SELECT spark_cast(arrow_cast(NULL, 'Int32'), 'timestamp'); +---- +NULL + +query P +SELECT spark_cast(NULL::bigint, 'timestamp'); +---- +NULL + +# Test untyped NULL +query P +SELECT spark_cast(NULL, 'timestamp'); +---- +NULL + +# Test Int8 boundary values +query P +SELECT spark_cast(arrow_cast(127, 'Int8'), 'timestamp'); +---- +1970-01-01T00:02:07Z + +query P +SELECT spark_cast(arrow_cast(-128, 'Int8'), 'timestamp'); +---- +1969-12-31T23:57:52Z + +# Test Int16 boundary values +query P +SELECT spark_cast(arrow_cast(32767, 'Int16'), 'timestamp'); +---- +1970-01-01T09:06:07Z + +query P +SELECT spark_cast(arrow_cast(-32768, 'Int16'), 'timestamp'); +---- +1969-12-31T14:53:52Z + +# Test Int64 negative value +query P +SELECT spark_cast(-86400::bigint, 'timestamp'); +---- +1969-12-31T00:00:00Z + +# Test unsupported source type - should error +statement error +SELECT spark_cast('2024-01-01', 'timestamp'); + +# Test unsupported target type - should error +statement error +SELECT spark_cast(100, 'string'); + +# Test with different session timezones to verify simplify() picks up config + +# America/Los_Angeles (PST/PDT - has DST) +statement ok +SET datafusion.execution.time_zone = 'America/Los_Angeles'; + +# Epoch in PST (UTC-8) +query P +SELECT spark_cast(0::bigint, 'timestamp'); +---- +1969-12-31T16:00:00-08:00 + +# 2024-01-01 00:00:00 UTC in PST (winter, UTC-8) +query P +SELECT spark_cast(1704067200::bigint, 'timestamp'); +---- +2023-12-31T16:00:00-08:00 + +# America/Phoenix (MST - no DST, always UTC-7) +statement ok +SET datafusion.execution.time_zone = 'America/Phoenix'; + +# Epoch in Phoenix (UTC-7) +query P +SELECT spark_cast(0::bigint, 'timestamp'); +---- +1969-12-31T17:00:00-07:00 + +# 2024-01-01 00:00:00 UTC in Phoenix (still UTC-7, no DST) +query P +SELECT spark_cast(1704067200::bigint, 'timestamp'); +---- +2023-12-31T17:00:00-07:00 + +# Test with different timezones - LA (has DST) +statement ok +SET datafusion.execution.time_zone = 'America/Los_Angeles'; + +query P +SELECT spark_cast(1710054000::bigint, 'timestamp'); +---- +2024-03-09T23:00:00-08:00 Review Comment: What is the idea here ? DST in LA has started at Mar 8 2026 02:00am ########## datafusion/spark/src/function/conversion/cast.rs: ########## @@ -0,0 +1,655 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, ArrayRef, AsArray, TimestampMicrosecondBuilder}; +use arrow::datatypes::{ + ArrowPrimitiveType, DataType, Field, FieldRef, Int8Type, Int16Type, Int32Type, + Int64Type, TimeUnit, +}; +use datafusion::logical_expr::{Coercion, TypeSignatureClass}; +use datafusion_common::config::ConfigOptions; +use datafusion_common::types::logical_string; +use datafusion_common::{ + Result as DataFusionResult, ScalarValue, exec_err, internal_err, +}; +use datafusion_expr::TypeSignatureClass::Integer; +use datafusion_expr::{ + ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, + Signature, TypeSignature, Volatility, +}; +use std::any::Any; +use std::sync::Arc; + +const MICROS_PER_SECOND: i64 = 1_000_000; + +/// Convert seconds to microseconds with saturating overflow behavior (matches spark spec) +#[inline] +fn secs_to_micros(secs: i64) -> i64 { + secs.saturating_mul(MICROS_PER_SECOND) +} + +/// Spark-compatible `cast` function for type conversions +/// +/// This implements Spark's CAST expression with a target type parameter +/// +/// # Usage +/// ```sql +/// SELECT spark_cast(value, 'timestamp') +/// ``` +/// +/// # Currently supported conversions +/// - Int8/Int16/Int32/Int64 -> Timestamp (target_type = 'timestamp') +/// +/// The integer value is interpreted as seconds since the Unix epoch (1970-01-01 00:00:00 UTC) +/// and converted to a timestamp with microsecond precision (matches spark's spec) +/// +/// # Overflow behavior +/// Uses saturating multiplication to handle overflow - values that would overflow +/// i64 when multiplied by 1,000,000 are clamped to i64::MAX or i64::MIN +/// +/// # References +/// - <https://spark.apache.org/docs/latest/api/sql/index.html#cast> +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkCast { + signature: Signature, + timezone: Option<Arc<str>>, +} + +impl Default for SparkCast { + fn default() -> Self { + Self::new() + } +} + +impl SparkCast { + pub fn new() -> Self { + Self::new_with_config(&ConfigOptions::default()) + } + + pub fn new_with_config(config: &ConfigOptions) -> Self { + // First arg: value to cast (only ints for now with potential to add further support later) + // Second arg: target datatype as Utf8 string literal (ex : 'timestamp') + let int_arg = Coercion::new_exact(Integer); + let string_arg = + Coercion::new_exact(TypeSignatureClass::Native(logical_string())); + Self { + signature: Signature::new( + TypeSignature::Coercible(vec![int_arg.clone(), string_arg.clone()]), + Volatility::Stable, + ), + timezone: config + .execution + .time_zone + .as_ref() + .map(|tz| Arc::from(tz.as_str())) + .or_else(|| Some(Arc::from("UTC"))), + } + } +} + +/// Parse target type string into a DataType +fn parse_target_type( + type_str: &str, + timezone: Option<Arc<str>>, +) -> DataFusionResult<DataType> { + match type_str.to_lowercase().as_str() { + // further data type support in future + "timestamp" => Ok(DataType::Timestamp(TimeUnit::Microsecond, timezone)), + other => exec_err!( + "Unsupported spark_cast target type '{}'. Supported types: timestamp", + other + ), + } +} + +/// Extract target type string from scalar arguments +fn get_target_type_from_scalar_args( + scalar_args: &[Option<&ScalarValue>], + timezone: Option<Arc<str>>, +) -> DataFusionResult<DataType> { + let type_arg = scalar_args.get(1).and_then(|opt| *opt); + + match type_arg { + Some(ScalarValue::Utf8(Some(s))) + | Some(ScalarValue::LargeUtf8(Some(s))) + | Some(ScalarValue::Utf8View(Some(s))) => parse_target_type(s, timezone), + _ => exec_err!( + "spark_cast requires second argument to be a string of target data type ex: timestamp" + ), + } +} + +fn cast_int_to_timestamp<T: ArrowPrimitiveType>( + array: &ArrayRef, + timezone: Option<Arc<str>>, +) -> DataFusionResult<ArrayRef> +where + T::Native: Into<i64>, +{ + let arr = array.as_primitive::<T>(); + let mut builder = TimestampMicrosecondBuilder::with_capacity(arr.len()); + + for i in 0..arr.len() { + if arr.is_null(i) { + builder.append_null(); + } else { + // spark saturates to i64 min/max + let micros = secs_to_micros(arr.value(i).into()); + builder.append_value(micros); + } + } + + Ok(Arc::new(builder.finish().with_timezone_opt(timezone))) +} + +impl ScalarUDFImpl for SparkCast { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "spark_cast" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> DataFusionResult<DataType> { + internal_err!("return_field_from_args should be used instead") + } + + fn with_updated_config(&self, config: &ConfigOptions) -> Option<ScalarUDF> { + Some(ScalarUDF::from(Self::new_with_config(config))) + } + + fn return_field_from_args( + &self, + args: ReturnFieldArgs, + ) -> DataFusionResult<FieldRef> { + let nullable = args.arg_fields.iter().any(|f| f.is_nullable()); + let return_type = get_target_type_from_scalar_args( + args.scalar_arguments, + self.timezone.clone(), + )?; + Ok(Arc::new(Field::new(self.name(), return_type, nullable))) + } + + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> DataFusionResult<ColumnarValue> { + let target_type = args.return_field.data_type(); + match target_type { + DataType::Timestamp(TimeUnit::Microsecond, tz) => { + cast_to_timestamp(&args.args[0], tz.clone()) + } + other => exec_err!("Unsupported spark_cast target type: {:?}", other), + } + } +} + +/// Cast value to timestamp internal function +fn cast_to_timestamp( + input: &ColumnarValue, + timezone: Option<Arc<str>>, +) -> DataFusionResult<ColumnarValue> { + match input { + ColumnarValue::Array(array) => match array.data_type() { + DataType::Null => Ok(ColumnarValue::Array(Arc::new( + arrow::array::TimestampMicrosecondArray::new_null(array.len()) + .with_timezone_opt(timezone), + ))), + DataType::Int8 => Ok(ColumnarValue::Array( + cast_int_to_timestamp::<Int8Type>(array, timezone)?, + )), + DataType::Int16 => Ok(ColumnarValue::Array(cast_int_to_timestamp::< + Int16Type, + >(array, timezone)?)), + DataType::Int32 => Ok(ColumnarValue::Array(cast_int_to_timestamp::< + Int32Type, + >(array, timezone)?)), + DataType::Int64 => Ok(ColumnarValue::Array(cast_int_to_timestamp::< + Int64Type, + >(array, timezone)?)), + other => exec_err!("Unsupported cast from {:?} to timestamp", other), + }, + ColumnarValue::Scalar(scalar) => { + let micros = match scalar { + ScalarValue::Null + | ScalarValue::Int8(None) + | ScalarValue::Int16(None) + | ScalarValue::Int32(None) + | ScalarValue::Int64(None) => None, + ScalarValue::Int8(Some(v)) => Some(secs_to_micros((*v).into())), + ScalarValue::Int16(Some(v)) => Some(secs_to_micros((*v).into())), + ScalarValue::Int32(Some(v)) => Some(secs_to_micros((*v).into())), + ScalarValue::Int64(Some(v)) => Some(secs_to_micros(*v)), + other => { + return exec_err!("Unsupported cast from {:?} to timestamp", other); + } + }; + Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( + micros, timezone, + ))) + } + } +} + +#[cfg(test)] Review Comment: SLT tests are preferred than unit tests because: - they are more close to how the end users would use an UDF. - easier to write and read - their dev cycle is much faster - it is much faster to edit a .slt and re-run it than edit a .rs file, re-compile it and re-run it. cast_int_to_timestamp.slt is a good one but it has only tests for scalars. It would be good to add for arrays too (or even move the unit tests). ########## datafusion/spark/src/function/conversion/mod.rs: ########## @@ -15,11 +15,24 @@ // specific language governing permissions and limitations // under the License. +mod cast; + use datafusion_expr::ScalarUDF; +use datafusion_functions::make_udf_function; use std::sync::Arc; -pub mod expr_fn {} +make_udf_function!(cast::SparkCast, spark_cast); + +pub mod expr_fn { + use datafusion_functions::export_functions; + + export_functions!(( + spark_cast, + "Casts given value to the specified type following Spark-compatible semantics", + arg1 arg2 Review Comment: ```suggestion @config arg1 arg2 ``` See https://github.com/apache/datafusion/blob/fd97799ddc06347e25b0a2285a6b76e1c0d887c6/datafusion/functions/src/macros.rs#L43 and https://github.com/apache/datafusion/blob/fd97799ddc06347e25b0a2285a6b76e1c0d887c6/datafusion/functions/src/datetime/mod.rs#L129 ########## datafusion/spark/src/function/conversion/cast.rs: ########## @@ -0,0 +1,655 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, ArrayRef, AsArray, TimestampMicrosecondBuilder}; +use arrow::datatypes::{ + ArrowPrimitiveType, DataType, Field, FieldRef, Int8Type, Int16Type, Int32Type, + Int64Type, TimeUnit, +}; +use datafusion::logical_expr::{Coercion, TypeSignatureClass}; +use datafusion_common::config::ConfigOptions; +use datafusion_common::types::logical_string; +use datafusion_common::{ + Result as DataFusionResult, ScalarValue, exec_err, internal_err, Review Comment: nit: Any reason to alias to DataFusionResult ? I don't see any possibilities for clashes below. Most of the rest of the code just uses `Result` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
