This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 7783b394 chore: Move `cast` to `spark-expr` crate (#654)
7783b394 is described below
commit 7783b394bb9c830ee1466bf729dd9544964f75f5
Author: Andy Grove <[email protected]>
AuthorDate: Fri Jul 12 13:21:50 2024 -0600
chore: Move `cast` to `spark-expr` crate (#654)
* refactor in preparation for moving cast to spark-expr crate
* errors
* move cast to spark-expr crate
* machete
* refactor errors
* clean up imports
---
native/Cargo.lock | 35 +--
native/Cargo.toml | 4 +
native/core/Cargo.toml | 7 +-
.../src/execution/datafusion/expressions/mod.rs | 2 +-
.../src/execution/datafusion/expressions/utils.rs | 249 +--------------------
native/core/src/execution/mod.rs | 2 +-
native/spark-expr/Cargo.toml | 5 +
.../expressions => spark-expr/src}/cast.rs | 119 ++++++++--
native/spark-expr/src/lib.rs | 1 +
native/utils/Cargo.toml | 5 +
native/utils/src/lib.rs | 162 ++++++++++++++
.../{core/src/execution => utils/src}/timezone.rs | 0
12 files changed, 309 insertions(+), 282 deletions(-)
diff --git a/native/Cargo.lock b/native/Cargo.lock
index 6bba0a8e..f64b7b63 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -482,9 +482,9 @@ checksum =
"37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "cc"
-version = "1.0.106"
+version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "066fce287b1d4eafef758e89e09d724a24808a9196fe9756b8ca90e86d0719a2"
+checksum = "907d8581360765417f8f2e0e7d602733bbed60156b4465b7617243689ef9b83d"
dependencies = [
"jobserver",
"libc",
@@ -588,18 +588,18 @@ dependencies = [
[[package]]
name = "clap"
-version = "4.5.8"
+version = "4.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "84b3edb18336f4df585bc9aa31dd99c036dfa5dc5e9a2939a722a188f3a8970d"
+checksum = "64acc1846d54c1fe936a78dc189c34e28d3f5afc348403f28ecf53660b9b8462"
dependencies = [
"clap_builder",
]
[[package]]
name = "clap_builder"
-version = "4.5.8"
+version = "4.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c1c09dd5ada6c6c78075d6fd0da3f90d8080651e2d6cc8eb2f1aaa4034ced708"
+checksum = "6fb8393d67ba2e7bfaf28a23458e4e2b543cc73a99595511eb207fdb8aede942"
dependencies = [
"anstyle",
"clap_lex",
@@ -862,7 +862,6 @@ dependencies = [
"brotli",
"bytes",
"chrono",
- "chrono-tz 0.8.6",
"crc32fast",
"criterion",
"datafusion",
@@ -908,12 +907,17 @@ name = "datafusion-comet-spark-expr"
version = "0.1.0"
dependencies = [
"arrow",
+ "arrow-array",
"arrow-schema",
+ "chrono",
"datafusion",
"datafusion-comet-utils",
"datafusion-common",
+ "datafusion-expr",
"datafusion-functions",
"datafusion-physical-expr",
+ "num",
+ "regex",
"thiserror",
]
@@ -921,6 +925,11 @@ dependencies = [
name = "datafusion-comet-utils"
version = "0.1.0"
dependencies = [
+ "arrow",
+ "arrow-array",
+ "arrow-schema",
+ "chrono",
+ "chrono-tz 0.8.6",
"datafusion-physical-plan",
]
@@ -2723,18 +2732,18 @@ dependencies = [
[[package]]
name = "thiserror"
-version = "1.0.61"
+version = "1.0.62"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709"
+checksum = "f2675633b1499176c2dff06b0856a27976a8f9d436737b4cf4f312d4d91d8bbb"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
-version = "1.0.61"
+version = "1.0.62"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533"
+checksum = "d20468752b09f49e909e55a5d338caa8bedf615594e9d80bc4c565d30faf798c"
dependencies = [
"proc-macro2",
"quote",
@@ -2938,9 +2947,9 @@ dependencies = [
[[package]]
name = "uuid"
-version = "1.9.1"
+version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5de17fd2f7da591098415cff336e12965a28061ddace43b59cb3c430179c9439"
+checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314"
dependencies = [
"getrandom",
]
diff --git a/native/Cargo.toml b/native/Cargo.toml
index 0b392833..09865742 100644
--- a/native/Cargo.toml
+++ b/native/Cargo.toml
@@ -48,6 +48,10 @@ datafusion-physical-expr-common = { git =
"https://github.com/apache/datafusion.
datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git",
rev = "40.0.0", default-features = false }
datafusion-comet-spark-expr = { path = "spark-expr", version = "0.1.0" }
datafusion-comet-utils = { path = "utils", version = "0.1.0" }
+chrono = { version = "0.4", default-features = false, features = ["clock"] }
+chrono-tz = { version = "0.8" }
+num = "0.4"
+regex = "1.9.6"
thiserror = "1"
[profile.release]
diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml
index 50c1ce2b..8e02324c 100644
--- a/native/core/Cargo.toml
+++ b/native/core/Cargo.toml
@@ -59,13 +59,12 @@ flate2 = "1.0"
lz4 = "1.24"
zstd = "0.11"
rand = "0.8"
-num = "0.4"
+num = { workspace = true }
bytes = "1.5.0"
tempfile = "3.8.0"
ahash = { version = "0.8", default-features = false }
itertools = "0.11.0"
-chrono = { version = "0.4", default-features = false, features = ["clock"] }
-chrono-tz = { version = "0.8" }
+chrono = { workspace = true }
paste = "1.0.14"
datafusion-common = { workspace = true }
datafusion = { workspace = true }
@@ -74,7 +73,7 @@ datafusion-physical-expr-common = { workspace = true }
datafusion-physical-expr = { workspace = true }
unicode-segmentation = "^1.10.1"
once_cell = "1.18.0"
-regex = "1.9.6"
+regex = { workspace = true }
crc32fast = "1.3.2"
simd-adler32 = "0.3.7"
datafusion-comet-spark-expr = { workspace = true }
diff --git a/native/core/src/execution/datafusion/expressions/mod.rs
b/native/core/src/execution/datafusion/expressions/mod.rs
index c61266ce..f6fb26b6 100644
--- a/native/core/src/execution/datafusion/expressions/mod.rs
+++ b/native/core/src/execution/datafusion/expressions/mod.rs
@@ -18,7 +18,7 @@
//! Native DataFusion expressions
pub mod bitwise_not;
-pub mod cast;
+pub use datafusion_comet_spark_expr::cast;
pub mod checkoverflow;
mod normalize_nan;
pub mod scalar_funcs;
diff --git a/native/core/src/execution/datafusion/expressions/utils.rs
b/native/core/src/execution/datafusion/expressions/utils.rs
index 6a7ec2e1..04e41e0b 100644
--- a/native/core/src/execution/datafusion/expressions/utils.rs
+++ b/native/core/src/execution/datafusion/expressions/utils.rs
@@ -15,250 +15,5 @@
// specific language governing permissions and limitations
// under the License.
-use crate::execution::timezone::Tz;
-use arrow::{
- array::{
- as_dictionary_array, as_primitive_array, Array, ArrayRef,
GenericStringArray,
- PrimitiveArray,
- },
- compute::unary,
- datatypes::{Int32Type, Int64Type, TimestampMicrosecondType},
- error::ArrowError,
- temporal_conversions::as_datetime,
-};
-use arrow_array::{cast::AsArray, types::ArrowPrimitiveType};
-use arrow_schema::DataType;
-use chrono::{DateTime, Offset, TimeZone};
-use datafusion_common::cast::as_generic_string_array;
-use num::integer::div_floor;
-use std::sync::Arc;
-
-pub use datafusion_comet_utils::down_cast_any_ref;
-
-/// Preprocesses input arrays to add timezone information from Spark to Arrow
array datatype or
-/// to apply timezone offset.
-//
-// We consider the following cases:
-//
-// | --------------------- | ------------ | ----------------- |
-------------------------------- |
-// | Conversion | Input array | Timezone | Output array
|
-// | --------------------- | ------------ | ----------------- |
-------------------------------- |
-// | Timestamp -> | Array in UTC | Timezone of input | A timestamp
with the timezone |
-// | Utf8 or Date32 | | | offset
applied and timezone |
-// | | | | removed
|
-// | --------------------- | ------------ | ----------------- |
-------------------------------- |
-// | Timestamp -> | Array in UTC | Timezone of input | Same as input
array |
-// | Timestamp w/Timezone| | |
|
-// | --------------------- | ------------ | ----------------- |
-------------------------------- |
-// | Timestamp_ntz -> | Array in | Timezone of input | Same as input
array |
-// | Utf8 or Date32 | timezone | |
|
-// | | session local| |
|
-// | | timezone | |
|
-// | --------------------- | ------------ | ----------------- |
-------------------------------- |
-// | Timestamp_ntz -> | Array in | Timezone of input | Array in UTC
and timezone |
-// | Timestamp w/Timezone | session local| | specified in
input |
-// | | timezone | |
|
-// | --------------------- | ------------ | ----------------- |
-------------------------------- |
-// | Timestamp(_ntz) -> |
|
-// | Any other type | Not Supported
|
-// | --------------------- | ------------ | ----------------- |
-------------------------------- |
-//
-pub fn array_with_timezone(
- array: ArrayRef,
- timezone: String,
- to_type: Option<&DataType>,
-) -> Result<ArrayRef, ArrowError> {
- match array.data_type() {
- DataType::Timestamp(_, None) => {
- assert!(!timezone.is_empty());
- match to_type {
- Some(DataType::Utf8) | Some(DataType::Date32) => Ok(array),
- Some(DataType::Timestamp(_, Some(_))) => {
- timestamp_ntz_to_timestamp(array, timezone.as_str(),
Some(timezone.as_str()))
- }
- _ => {
- // Not supported
- panic!(
- "Cannot convert from {:?} to {:?}",
- array.data_type(),
- to_type.unwrap()
- )
- }
- }
- }
- DataType::Timestamp(_, Some(_)) => {
- assert!(!timezone.is_empty());
- let array = as_primitive_array::<TimestampMicrosecondType>(&array);
- let array_with_timezone =
array.clone().with_timezone(timezone.clone());
- let array = Arc::new(array_with_timezone) as ArrayRef;
- match to_type {
- Some(DataType::Utf8) | Some(DataType::Date32) => {
- pre_timestamp_cast(array, timezone)
- }
- _ => Ok(array),
- }
- }
- DataType::Dictionary(_, value_type)
- if matches!(value_type.as_ref(), &DataType::Timestamp(_, _)) =>
- {
- let dict = as_dictionary_array::<Int32Type>(&array);
- let array =
as_primitive_array::<TimestampMicrosecondType>(dict.values());
- let array_with_timezone =
- array_with_timezone(Arc::new(array.clone()) as ArrayRef,
timezone, to_type)?;
- let dict = dict.with_values(array_with_timezone);
- Ok(Arc::new(dict))
- }
- _ => Ok(array),
- }
-}
-
-fn datetime_cast_err(value: i64) -> ArrowError {
- ArrowError::CastError(format!(
- "Cannot convert TimestampMicrosecondType {value} to datetime. Comet
only supports dates between Jan 1, 262145 BCE and Dec 31, 262143 CE",
- ))
-}
-
-/// Takes in a Timestamp(Microsecond, None) array and a timezone id, and
returns
-/// a Timestamp(Microsecond, Some<_>) array.
-/// The understanding is that the input array has time in the timezone
specified in the second
-/// argument.
-/// Parameters:
-/// array - input array of timestamp without timezone
-/// tz - timezone of the values in the input array
-/// to_timezone - timezone to change the input values to
-fn timestamp_ntz_to_timestamp(
- array: ArrayRef,
- tz: &str,
- to_timezone: Option<&str>,
-) -> Result<ArrayRef, ArrowError> {
- assert!(!tz.is_empty());
- match array.data_type() {
- DataType::Timestamp(_, None) => {
- let array = as_primitive_array::<TimestampMicrosecondType>(&array);
- let tz: Tz = tz.parse()?;
- let array: PrimitiveArray<TimestampMicrosecondType> =
array.try_unary(|value| {
- as_datetime::<TimestampMicrosecondType>(value)
- .ok_or_else(|| datetime_cast_err(value))
- .map(|local_datetime| {
- let datetime: DateTime<Tz> =
- tz.from_local_datetime(&local_datetime).unwrap();
- datetime.timestamp_micros()
- })
- })?;
- let array_with_tz = if let Some(to_tz) = to_timezone {
- array.with_timezone(to_tz)
- } else {
- array
- };
- Ok(Arc::new(array_with_tz))
- }
- _ => Ok(array),
- }
-}
-
-const MICROS_PER_SECOND: i64 = 1000000;
-
-/// This takes for special pre-casting cases of Spark. E.g., Timestamp to
String.
-fn pre_timestamp_cast(array: ArrayRef, timezone: String) -> Result<ArrayRef,
ArrowError> {
- assert!(!timezone.is_empty());
- match array.data_type() {
- DataType::Timestamp(_, _) => {
- // Spark doesn't output timezone while casting timestamp to
string, but arrow's cast
- // kernel does if timezone exists. So we need to apply offset of
timezone to array
- // timestamp value and remove timezone from array datatype.
- let array = as_primitive_array::<TimestampMicrosecondType>(&array);
-
- let tz: Tz = timezone.parse()?;
- let array: PrimitiveArray<TimestampMicrosecondType> =
array.try_unary(|value| {
- as_datetime::<TimestampMicrosecondType>(value)
- .ok_or_else(|| datetime_cast_err(value))
- .map(|datetime| {
- let offset =
tz.offset_from_utc_datetime(&datetime).fix();
- let datetime = datetime + offset;
- datetime.and_utc().timestamp_micros()
- })
- })?;
-
- Ok(Arc::new(array))
- }
- _ => Ok(array),
- }
-}
-
-/// This takes for special casting cases of Spark. E.g., Timestamp to Long.
-/// This function runs as a post process of the DataFusion cast(). By the time
it arrives here,
-/// Dictionary arrays are already unpacked by the DataFusion cast() since
Spark cannot specify
-/// Dictionary as to_type. The from_type is taken before the DataFusion cast()
runs in
-/// expressions/cast.rs, so it can be still Dictionary.
-pub(crate) fn spark_cast(array: ArrayRef, from_type: &DataType, to_type:
&DataType) -> ArrayRef {
- match (from_type, to_type) {
- (DataType::Timestamp(_, _), DataType::Int64) => {
- // See Spark's `Cast` expression
- unary_dyn::<_, Int64Type>(&array, |v| div_floor(v,
MICROS_PER_SECOND)).unwrap()
- }
- (DataType::Dictionary(_, value_type), DataType::Int64)
- if matches!(value_type.as_ref(), &DataType::Timestamp(_, _)) =>
- {
- // See Spark's `Cast` expression
- unary_dyn::<_, Int64Type>(&array, |v| div_floor(v,
MICROS_PER_SECOND)).unwrap()
- }
- (DataType::Timestamp(_, _), DataType::Utf8) =>
remove_trailing_zeroes(array),
- (DataType::Dictionary(_, value_type), DataType::Utf8)
- if matches!(value_type.as_ref(), &DataType::Timestamp(_, _)) =>
- {
- remove_trailing_zeroes(array)
- }
- _ => array,
- }
-}
-
-/// A fork & modified version of Arrow's `unary_dyn` which is being deprecated
-fn unary_dyn<F, T>(array: &ArrayRef, op: F) -> Result<ArrayRef, ArrowError>
-where
- T: ArrowPrimitiveType,
- F: Fn(T::Native) -> T::Native,
-{
- if let Some(d) = array.as_any_dictionary_opt() {
- let new_values = unary_dyn::<F, T>(d.values(), op)?;
- return Ok(Arc::new(d.with_values(Arc::new(new_values))));
- }
-
- match array.as_primitive_opt::<T>() {
- Some(a) if PrimitiveArray::<T>::is_compatible(a.data_type()) => {
- Ok(Arc::new(unary::<T, F, T>(
- array.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap(),
- op,
- )))
- }
- _ => Err(ArrowError::NotYetImplemented(format!(
- "Cannot perform unary operation of type {} on array of type {}",
- T::DATA_TYPE,
- array.data_type()
- ))),
- }
-}
-
-/// Remove any trailing zeroes in the string if they occur after in the
fractional seconds,
-/// to match Spark behavior
-/// example:
-/// "1970-01-01 05:29:59.900" => "1970-01-01 05:29:59.9"
-/// "1970-01-01 05:29:59.990" => "1970-01-01 05:29:59.99"
-/// "1970-01-01 05:29:59.999" => "1970-01-01 05:29:59.999"
-/// "1970-01-01 05:30:00" => "1970-01-01 05:30:00"
-/// "1970-01-01 05:30:00.001" => "1970-01-01 05:30:00.001"
-fn remove_trailing_zeroes(array: ArrayRef) -> ArrayRef {
- let string_array = as_generic_string_array::<i32>(&array).unwrap();
- let result = string_array
- .iter()
- .map(|s| s.map(trim_end))
- .collect::<GenericStringArray<i32>>();
- Arc::new(result) as ArrayRef
-}
-
-fn trim_end(s: &str) -> &str {
- if s.rfind('.').is_some() {
- s.trim_end_matches('0')
- } else {
- s
- }
-}
+// re-export for legacy reasons
+pub use datafusion_comet_utils::{array_with_timezone, down_cast_any_ref};
diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs
index b3be83b5..a13a1bc8 100644
--- a/native/core/src/execution/mod.rs
+++ b/native/core/src/execution/mod.rs
@@ -26,7 +26,7 @@ pub mod operators;
pub mod serde;
pub mod shuffle;
pub(crate) mod sort;
-mod timezone;
+pub use datafusion_comet_utils::timezone;
pub(crate) mod utils;
mod memory_pool;
diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml
index 4a9b9408..220417fe 100644
--- a/native/spark-expr/Cargo.toml
+++ b/native/spark-expr/Cargo.toml
@@ -28,12 +28,17 @@ edition = { workspace = true }
[dependencies]
arrow = { workspace = true }
+arrow-array = { workspace = true }
arrow-schema = { workspace = true }
+chrono = { workspace = true }
datafusion = { workspace = true }
datafusion-common = { workspace = true }
datafusion-functions = { workspace = true }
+datafusion-expr = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-comet-utils = { workspace = true }
+num = { workspace = true }
+regex = { workspace = true }
thiserror = { workspace = true }
[lib]
diff --git a/native/core/src/execution/datafusion/expressions/cast.rs
b/native/spark-expr/src/cast.rs
similarity index 94%
rename from native/core/src/execution/datafusion/expressions/cast.rs
rename to native/spark-expr/src/cast.rs
index 0b513e77..b9cf2790 100644
--- a/native/core/src/execution/datafusion/expressions/cast.rs
+++ b/native/spark-expr/src/cast.rs
@@ -24,36 +24,45 @@ use std::{
};
use arrow::{
- compute::{cast_with_options, CastOptions},
+ array::{
+ cast::AsArray,
+ types::{Date32Type, Int16Type, Int32Type, Int8Type},
+ Array, ArrayRef, BooleanArray, Decimal128Array, Float32Array,
Float64Array,
+ GenericStringArray, Int16Array, Int32Array, Int64Array, Int8Array,
OffsetSizeTrait,
+ PrimitiveArray,
+ },
+ compute::{cast_with_options, unary, CastOptions},
datatypes::{
- ArrowPrimitiveType, Decimal128Type, DecimalType, Float32Type,
Float64Type,
+ ArrowPrimitiveType, Decimal128Type, DecimalType, Float32Type,
Float64Type, Int64Type,
TimestampMicrosecondType,
},
+ error::ArrowError,
record_batch::RecordBatch,
util::display::FormatOptions,
};
-use arrow_array::{
- types::{Date32Type, Int16Type, Int32Type, Int64Type, Int8Type},
- Array, ArrayRef, BooleanArray, Decimal128Array, Float32Array,
Float64Array, GenericStringArray,
- Int16Array, Int32Array, Int64Array, Int8Array, OffsetSizeTrait,
PrimitiveArray,
-};
use arrow_schema::{DataType, Schema};
-use chrono::{NaiveDate, NaiveDateTime, TimeZone, Timelike};
-use datafusion::logical_expr::ColumnarValue;
-use datafusion_comet_spark_expr::{SparkError, SparkResult};
-use datafusion_common::{internal_err, Result as DataFusionResult, ScalarValue};
+
+use datafusion_common::{
+ cast::as_generic_string_array, internal_err, Result as DataFusionResult,
ScalarValue,
+};
+use datafusion_expr::ColumnarValue;
use datafusion_physical_expr::PhysicalExpr;
-use num::{cast::AsPrimitive, traits::CheckedNeg, CheckedSub, Integer, Num,
ToPrimitive};
-use regex::Regex;
-use crate::execution::datafusion::expressions::utils::{
- array_with_timezone, down_cast_any_ref, spark_cast,
+use chrono::{NaiveDate, NaiveDateTime, TimeZone, Timelike};
+use num::{
+ cast::AsPrimitive, integer::div_floor, traits::CheckedNeg, CheckedSub,
Integer, Num,
+ ToPrimitive,
};
+use regex::Regex;
+
+use datafusion_comet_utils::{array_with_timezone, down_cast_any_ref};
-use super::EvalMode;
+use crate::{EvalMode, SparkError, SparkResult};
static TIMESTAMP_FORMAT: Option<&str> = Some("%Y-%m-%d %H:%M:%S%.f");
+const MICROS_PER_SECOND: i64 = 1000000;
+
static CAST_OPTIONS: CastOptions = CastOptions {
safe: true,
format_options: FormatOptions::new()
@@ -1633,6 +1642,84 @@ fn date_parser(date_str: &str, eval_mode: EvalMode) ->
SparkResult<Option<i32>>
}
}
+/// This takes for special casting cases of Spark. E.g., Timestamp to Long.
+/// This function runs as a post process of the DataFusion cast(). By the time
it arrives here,
+/// Dictionary arrays are already unpacked by the DataFusion cast() since
Spark cannot specify
+/// Dictionary as to_type. The from_type is taken before the DataFusion cast()
runs in
+/// expressions/cast.rs, so it can be still Dictionary.
+fn spark_cast(array: ArrayRef, from_type: &DataType, to_type: &DataType) ->
ArrayRef {
+ match (from_type, to_type) {
+ (DataType::Timestamp(_, _), DataType::Int64) => {
+ // See Spark's `Cast` expression
+ unary_dyn::<_, Int64Type>(&array, |v| div_floor(v,
MICROS_PER_SECOND)).unwrap()
+ }
+ (DataType::Dictionary(_, value_type), DataType::Int64)
+ if matches!(value_type.as_ref(), &DataType::Timestamp(_, _)) =>
+ {
+ // See Spark's `Cast` expression
+ unary_dyn::<_, Int64Type>(&array, |v| div_floor(v,
MICROS_PER_SECOND)).unwrap()
+ }
+ (DataType::Timestamp(_, _), DataType::Utf8) =>
remove_trailing_zeroes(array),
+ (DataType::Dictionary(_, value_type), DataType::Utf8)
+ if matches!(value_type.as_ref(), &DataType::Timestamp(_, _)) =>
+ {
+ remove_trailing_zeroes(array)
+ }
+ _ => array,
+ }
+}
+
+/// A fork & modified version of Arrow's `unary_dyn` which is being deprecated
+fn unary_dyn<F, T>(array: &ArrayRef, op: F) -> Result<ArrayRef, ArrowError>
+where
+ T: ArrowPrimitiveType,
+ F: Fn(T::Native) -> T::Native,
+{
+ if let Some(d) = array.as_any_dictionary_opt() {
+ let new_values = unary_dyn::<F, T>(d.values(), op)?;
+ return Ok(Arc::new(d.with_values(Arc::new(new_values))));
+ }
+
+ match array.as_primitive_opt::<T>() {
+ Some(a) if PrimitiveArray::<T>::is_compatible(a.data_type()) => {
+ Ok(Arc::new(unary::<T, F, T>(
+ array.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap(),
+ op,
+ )))
+ }
+ _ => Err(ArrowError::NotYetImplemented(format!(
+ "Cannot perform unary operation of type {} on array of type {}",
+ T::DATA_TYPE,
+ array.data_type()
+ ))),
+ }
+}
+
+/// Remove any trailing zeroes in the string if they occur after in the
fractional seconds,
+/// to match Spark behavior
+/// example:
+/// "1970-01-01 05:29:59.900" => "1970-01-01 05:29:59.9"
+/// "1970-01-01 05:29:59.990" => "1970-01-01 05:29:59.99"
+/// "1970-01-01 05:29:59.999" => "1970-01-01 05:29:59.999"
+/// "1970-01-01 05:30:00" => "1970-01-01 05:30:00"
+/// "1970-01-01 05:30:00.001" => "1970-01-01 05:30:00.001"
+fn remove_trailing_zeroes(array: ArrayRef) -> ArrayRef {
+ let string_array = as_generic_string_array::<i32>(&array).unwrap();
+ let result = string_array
+ .iter()
+ .map(|s| s.map(trim_end))
+ .collect::<GenericStringArray<i32>>();
+ Arc::new(result) as ArrayRef
+}
+
+fn trim_end(s: &str) -> &str {
+ if s.rfind('.').is_some() {
+ s.trim_end_matches('0')
+ } else {
+ s
+ }
+}
+
#[cfg(test)]
mod tests {
use arrow::datatypes::TimestampMicrosecondType;
diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs
index 57da56f9..93c7f249 100644
--- a/native/spark-expr/src/lib.rs
+++ b/native/spark-expr/src/lib.rs
@@ -16,6 +16,7 @@
// under the License.
mod abs;
+pub mod cast;
mod error;
mod if_expr;
diff --git a/native/utils/Cargo.toml b/native/utils/Cargo.toml
index 05ddd348..f9ae4743 100644
--- a/native/utils/Cargo.toml
+++ b/native/utils/Cargo.toml
@@ -27,6 +27,11 @@ license = { workspace = true }
edition = { workspace = true }
[dependencies]
+arrow = { workspace = true }
+arrow-array = { workspace = true }
+arrow-schema = { workspace = true }
+chrono = { workspace = true }
+chrono-tz = { workspace = true }
datafusion-physical-plan = { workspace = true }
[lib]
diff --git a/native/utils/src/lib.rs b/native/utils/src/lib.rs
index 54ff55b4..4600abfa 100644
--- a/native/utils/src/lib.rs
+++ b/native/utils/src/lib.rs
@@ -15,9 +15,23 @@
// specific language governing permissions and limitations
// under the License.
+use arrow_array::{
+ cast::as_primitive_array,
+ types::{Int32Type, TimestampMicrosecondType},
+};
+use arrow_schema::{ArrowError, DataType};
use std::any::Any;
use std::sync::Arc;
+use arrow::{
+ array::{as_dictionary_array, Array, ArrayRef, PrimitiveArray},
+ temporal_conversions::as_datetime,
+};
+use chrono::{DateTime, Offset, TimeZone};
+use timezone::Tz;
+
+pub mod timezone;
+
use datafusion_physical_plan::PhysicalExpr;
/// A utility function from DataFusion. It is not exposed by DataFusion.
@@ -34,3 +48,151 @@ pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
any
}
}
+
+/// Preprocesses input arrays to add timezone information from Spark to Arrow
array datatype or
+/// to apply timezone offset.
+//
+// We consider the following cases:
+//
+// | --------------------- | ------------ | ----------------- |
-------------------------------- |
+// | Conversion | Input array | Timezone | Output array
|
+// | --------------------- | ------------ | ----------------- |
-------------------------------- |
+// | Timestamp -> | Array in UTC | Timezone of input | A timestamp
with the timezone |
+// | Utf8 or Date32 | | | offset
applied and timezone |
+// | | | | removed
|
+// | --------------------- | ------------ | ----------------- |
-------------------------------- |
+// | Timestamp -> | Array in UTC | Timezone of input | Same as input
array |
+// | Timestamp w/Timezone| | |
|
+// | --------------------- | ------------ | ----------------- |
-------------------------------- |
+// | Timestamp_ntz -> | Array in | Timezone of input | Same as input
array |
+// | Utf8 or Date32 | timezone | |
|
+// | | session local| |
|
+// | | timezone | |
|
+// | --------------------- | ------------ | ----------------- |
-------------------------------- |
+// | Timestamp_ntz -> | Array in | Timezone of input | Array in UTC
and timezone |
+// | Timestamp w/Timezone | session local| | specified in
input |
+// | | timezone | |
|
+// | --------------------- | ------------ | ----------------- |
-------------------------------- |
+// | Timestamp(_ntz) -> |
|
+// | Any other type | Not Supported
|
+// | --------------------- | ------------ | ----------------- |
-------------------------------- |
+//
+pub fn array_with_timezone(
+ array: ArrayRef,
+ timezone: String,
+ to_type: Option<&DataType>,
+) -> Result<ArrayRef, ArrowError> {
+ match array.data_type() {
+ DataType::Timestamp(_, None) => {
+ assert!(!timezone.is_empty());
+ match to_type {
+ Some(DataType::Utf8) | Some(DataType::Date32) => Ok(array),
+ Some(DataType::Timestamp(_, Some(_))) => {
+ timestamp_ntz_to_timestamp(array, timezone.as_str(),
Some(timezone.as_str()))
+ }
+ _ => {
+ // Not supported
+ panic!(
+ "Cannot convert from {:?} to {:?}",
+ array.data_type(),
+ to_type.unwrap()
+ )
+ }
+ }
+ }
+ DataType::Timestamp(_, Some(_)) => {
+ assert!(!timezone.is_empty());
+ let array = as_primitive_array::<TimestampMicrosecondType>(&array);
+ let array_with_timezone =
array.clone().with_timezone(timezone.clone());
+ let array = Arc::new(array_with_timezone) as ArrayRef;
+ match to_type {
+ Some(DataType::Utf8) | Some(DataType::Date32) => {
+ pre_timestamp_cast(array, timezone)
+ }
+ _ => Ok(array),
+ }
+ }
+ DataType::Dictionary(_, value_type)
+ if matches!(value_type.as_ref(), &DataType::Timestamp(_, _)) =>
+ {
+ let dict = as_dictionary_array::<Int32Type>(&array);
+ let array =
as_primitive_array::<TimestampMicrosecondType>(dict.values());
+ let array_with_timezone =
+ array_with_timezone(Arc::new(array.clone()) as ArrayRef,
timezone, to_type)?;
+ let dict = dict.with_values(array_with_timezone);
+ Ok(Arc::new(dict))
+ }
+ _ => Ok(array),
+ }
+}
+
+fn datetime_cast_err(value: i64) -> ArrowError {
+ ArrowError::CastError(format!(
+ "Cannot convert TimestampMicrosecondType {value} to datetime. Comet
only supports dates between Jan 1, 262145 BCE and Dec 31, 262143 CE",
+ ))
+}
+
+/// Takes in a Timestamp(Microsecond, None) array and a timezone id, and
returns
+/// a Timestamp(Microsecond, Some<_>) array.
+/// The understanding is that the input array has time in the timezone
specified in the second
+/// argument.
+/// Parameters:
+/// array - input array of timestamp without timezone
+/// tz - timezone of the values in the input array
+/// to_timezone - timezone to change the input values to
+fn timestamp_ntz_to_timestamp(
+ array: ArrayRef,
+ tz: &str,
+ to_timezone: Option<&str>,
+) -> Result<ArrayRef, ArrowError> {
+ assert!(!tz.is_empty());
+ match array.data_type() {
+ DataType::Timestamp(_, None) => {
+ let array = as_primitive_array::<TimestampMicrosecondType>(&array);
+ let tz: Tz = tz.parse()?;
+ let array: PrimitiveArray<TimestampMicrosecondType> =
array.try_unary(|value| {
+ as_datetime::<TimestampMicrosecondType>(value)
+ .ok_or_else(|| datetime_cast_err(value))
+ .map(|local_datetime| {
+ let datetime: DateTime<Tz> =
+ tz.from_local_datetime(&local_datetime).unwrap();
+ datetime.timestamp_micros()
+ })
+ })?;
+ let array_with_tz = if let Some(to_tz) = to_timezone {
+ array.with_timezone(to_tz)
+ } else {
+ array
+ };
+ Ok(Arc::new(array_with_tz))
+ }
+ _ => Ok(array),
+ }
+}
+
+/// This takes for special pre-casting cases of Spark. E.g., Timestamp to
String.
+fn pre_timestamp_cast(array: ArrayRef, timezone: String) -> Result<ArrayRef,
ArrowError> {
+ assert!(!timezone.is_empty());
+ match array.data_type() {
+ DataType::Timestamp(_, _) => {
+ // Spark doesn't output timezone while casting timestamp to
string, but arrow's cast
+ // kernel does if timezone exists. So we need to apply offset of
timezone to array
+ // timestamp value and remove timezone from array datatype.
+ let array = as_primitive_array::<TimestampMicrosecondType>(&array);
+
+ let tz: Tz = timezone.parse()?;
+ let array: PrimitiveArray<TimestampMicrosecondType> =
array.try_unary(|value| {
+ as_datetime::<TimestampMicrosecondType>(value)
+ .ok_or_else(|| datetime_cast_err(value))
+ .map(|datetime| {
+ let offset =
tz.offset_from_utc_datetime(&datetime).fix();
+ let datetime = datetime + offset;
+ datetime.and_utc().timestamp_micros()
+ })
+ })?;
+
+ Ok(Arc::new(array))
+ }
+ _ => Ok(array),
+ }
+}
diff --git a/native/core/src/execution/timezone.rs
b/native/utils/src/timezone.rs
similarity index 100%
rename from native/core/src/execution/timezone.rs
rename to native/utils/src/timezone.rs
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]