This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 6bb014f8 fix: Return error in pre_timestamp_cast instead of panic
(#543)
6bb014f8 is described below
commit 6bb014f89b8c45a044ed9fcf2fa244a8e5feb15d
Author: Emil Ejbyfeldt <[email protected]>
AuthorDate: Tue Jun 18 19:04:20 2024 +0200
fix: Return error in pre_timestamp_cast instead of panic (#543)
* fix: Forward errors from pre_timestamp_cast
* Update core/src/execution/datafusion/expressions/utils.rs
Improve comment.
Co-authored-by: Parth Chandra <[email protected]>
* Add spec for invalid timezone
---------
Co-authored-by: Parth Chandra <[email protected]>
---
core/src/execution/datafusion/expressions/cast.rs | 31 ++++++++-
.../execution/datafusion/expressions/temporal.rs | 10 +--
core/src/execution/datafusion/expressions/utils.rs | 77 ++++++++++++----------
3 files changed, 78 insertions(+), 40 deletions(-)
diff --git a/core/src/execution/datafusion/expressions/cast.rs
b/core/src/execution/datafusion/expressions/cast.rs
index 4dae62dc..7a71d201 100644
--- a/core/src/execution/datafusion/expressions/cast.rs
+++ b/core/src/execution/datafusion/expressions/cast.rs
@@ -498,7 +498,7 @@ impl Cast {
fn cast_array(&self, array: ArrayRef) -> DataFusionResult<ArrayRef> {
let to_type = &self.data_type;
- let array = array_with_timezone(array, self.timezone.clone(),
Some(to_type));
+ let array = array_with_timezone(array, self.timezone.clone(),
Some(to_type))?;
let from_type = array.data_type().clone();
// unpack dictionary string arrays first
@@ -1641,6 +1641,8 @@ mod tests {
use arrow_array::StringArray;
use arrow_schema::TimeUnit;
+ use datafusion_physical_expr::expressions::Column;
+
use super::*;
#[test]
@@ -1897,4 +1899,31 @@ mod tests {
assert!(cast_string_to_i8("0.2", EvalMode::Ansi).is_err());
assert!(cast_string_to_i8(".", EvalMode::Ansi).is_err());
}
+
+ #[test]
+ fn test_cast_unsupported_timestamp_to_date() {
+ // Since datafusion uses chrono::Datetime internally not all dates
representable by TimestampMicrosecondType are supported
+ let timestamps: PrimitiveArray<TimestampMicrosecondType> =
vec![i64::MAX].into();
+ let cast = Cast::new(
+ Arc::new(Column::new("a", 0)),
+ DataType::Date32,
+ EvalMode::Legacy,
+ "UTC".to_owned(),
+ );
+ let result =
cast.cast_array(Arc::new(timestamps.with_timezone("Europe/Copenhagen")));
+ assert!(result.is_err())
+ }
+
+ #[test]
+ fn test_cast_invalid_timezone() {
+ let timestamps: PrimitiveArray<TimestampMicrosecondType> =
vec![i64::MAX].into();
+ let cast = Cast::new(
+ Arc::new(Column::new("a", 0)),
+ DataType::Date32,
+ EvalMode::Legacy,
+ "Not a valid timezone".to_owned(),
+ );
+ let result =
cast.cast_array(Arc::new(timestamps.with_timezone("Europe/Copenhagen")));
+ assert!(result.is_err())
+ }
}
diff --git a/core/src/execution/datafusion/expressions/temporal.rs
b/core/src/execution/datafusion/expressions/temporal.rs
index 22b4aee8..69fbb791 100644
--- a/core/src/execution/datafusion/expressions/temporal.rs
+++ b/core/src/execution/datafusion/expressions/temporal.rs
@@ -100,7 +100,7 @@ impl PhysicalExpr for HourExec {
Microsecond,
Some(self.timezone.clone().into()),
)),
- );
+ )?;
let result = date_part(&array, DatePart::Hour)?;
Ok(ColumnarValue::Array(result))
@@ -194,7 +194,7 @@ impl PhysicalExpr for MinuteExec {
Microsecond,
Some(self.timezone.clone().into()),
)),
- );
+ )?;
let result = date_part(&array, DatePart::Minute)?;
Ok(ColumnarValue::Array(result))
@@ -288,7 +288,7 @@ impl PhysicalExpr for SecondExec {
Microsecond,
Some(self.timezone.clone().into()),
)),
- );
+ )?;
let result = date_part(&array, DatePart::Second)?;
Ok(ColumnarValue::Array(result))
@@ -489,7 +489,7 @@ impl PhysicalExpr for TimestampTruncExec {
ts,
tz.clone(),
Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
- );
+ )?;
let result = timestamp_trunc_dyn(&ts, format)?;
Ok(ColumnarValue::Array(result))
}
@@ -498,7 +498,7 @@ impl PhysicalExpr for TimestampTruncExec {
ts,
tz.clone(),
Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
- );
+ )?;
let result = timestamp_trunc_array_fmt_dyn(&ts, &formats)?;
Ok(ColumnarValue::Array(result))
}
diff --git a/core/src/execution/datafusion/expressions/utils.rs
b/core/src/execution/datafusion/expressions/utils.rs
index fe27f796..ee8646a7 100644
--- a/core/src/execution/datafusion/expressions/utils.rs
+++ b/core/src/execution/datafusion/expressions/utils.rs
@@ -81,12 +81,12 @@ pub fn array_with_timezone(
array: ArrayRef,
timezone: String,
to_type: Option<&DataType>,
-) -> ArrayRef {
+) -> Result<ArrayRef, ArrowError> {
match array.data_type() {
DataType::Timestamp(_, None) => {
assert!(!timezone.is_empty());
match to_type {
- Some(DataType::Utf8) | Some(DataType::Date32) => array,
+ Some(DataType::Utf8) | Some(DataType::Date32) => Ok(array),
Some(DataType::Timestamp(_, Some(_))) => {
timestamp_ntz_to_timestamp(array, timezone.as_str(),
Some(timezone.as_str()))
}
@@ -109,7 +109,7 @@ pub fn array_with_timezone(
Some(DataType::Utf8) | Some(DataType::Date32) => {
pre_timestamp_cast(array, timezone)
}
- _ => array,
+ _ => Ok(array),
}
}
DataType::Dictionary(_, value_type)
@@ -118,14 +118,20 @@ pub fn array_with_timezone(
let dict = as_dictionary_array::<Int32Type>(&array);
let array =
as_primitive_array::<TimestampMicrosecondType>(dict.values());
let array_with_timezone =
- array_with_timezone(Arc::new(array.clone()) as ArrayRef,
timezone, to_type);
+ array_with_timezone(Arc::new(array.clone()) as ArrayRef,
timezone, to_type)?;
let dict = dict.with_values(array_with_timezone);
- Arc::new(dict) as ArrayRef
+ Ok(Arc::new(dict))
}
- _ => array,
+ _ => Ok(array),
}
}
+fn datetime_cast_err(value: i64) -> ArrowError {
+ ArrowError::CastError(format!(
+ "Cannot convert TimestampMicrosecondType {value} to datetime. Comet
only supports dates between Jan 1, 262145 BCE and Dec 31, 262143 CE",
+ ))
+}
+
/// Takes in a Timestamp(Microsecond, None) array and a timezone id, and
returns
/// a Timestamp(Microsecond, Some<_>) array.
/// The understanding is that the input array has time in the timezone
specified in the second
@@ -134,36 +140,40 @@ pub fn array_with_timezone(
/// array - input array of timestamp without timezone
/// tz - timezone of the values in the input array
/// to_timezone - timezone to change the input values to
-fn timestamp_ntz_to_timestamp(array: ArrayRef, tz: &str, to_timezone:
Option<&str>) -> ArrayRef {
+fn timestamp_ntz_to_timestamp(
+ array: ArrayRef,
+ tz: &str,
+ to_timezone: Option<&str>,
+) -> Result<ArrayRef, ArrowError> {
assert!(!tz.is_empty());
match array.data_type() {
DataType::Timestamp(_, None) => {
let array = as_primitive_array::<TimestampMicrosecondType>(&array);
- let tz: Tz = tz.parse().unwrap();
- let values = array.iter().map(|v| {
- v.map(|value| {
- let local_datetime =
as_datetime::<TimestampMicrosecondType>(value).unwrap();
- let datetime: DateTime<Tz> =
tz.from_local_datetime(&local_datetime).unwrap();
- datetime.timestamp_micros()
- })
- });
- let mut array: PrimitiveArray<TimestampMicrosecondType> =
- unsafe { PrimitiveArray::from_trusted_len_iter(values) };
- array = if let Some(to_tz) = to_timezone {
+ let tz: Tz = tz.parse()?;
+ let array: PrimitiveArray<TimestampMicrosecondType> =
array.try_unary(|value| {
+ as_datetime::<TimestampMicrosecondType>(value)
+ .ok_or_else(|| datetime_cast_err(value))
+ .map(|local_datetime| {
+ let datetime: DateTime<Tz> =
+ tz.from_local_datetime(&local_datetime).unwrap();
+ datetime.timestamp_micros()
+ })
+ })?;
+ let array_with_tz = if let Some(to_tz) = to_timezone {
array.with_timezone(to_tz)
} else {
array
};
- Arc::new(array) as ArrayRef
+ Ok(Arc::new(array_with_tz))
}
- _ => array,
+ _ => Ok(array),
}
}
const MICROS_PER_SECOND: i64 = 1000000;
/// This takes for special pre-casting cases of Spark. E.g., Timestamp to
String.
-fn pre_timestamp_cast(array: ArrayRef, timezone: String) -> ArrayRef {
+fn pre_timestamp_cast(array: ArrayRef, timezone: String) -> Result<ArrayRef,
ArrowError> {
assert!(!timezone.is_empty());
match array.data_type() {
DataType::Timestamp(_, _) => {
@@ -172,21 +182,20 @@ fn pre_timestamp_cast(array: ArrayRef, timezone: String)
-> ArrayRef {
// timestamp value and remove timezone from array datatype.
let array = as_primitive_array::<TimestampMicrosecondType>(&array);
- let tz: Tz = timezone.parse().unwrap();
- let values = array.iter().map(|v| {
- v.map(|value| {
- let datetime =
as_datetime::<TimestampMicrosecondType>(value).unwrap();
- let offset = tz.offset_from_utc_datetime(&datetime).fix();
- let datetime = datetime + offset;
- datetime.and_utc().timestamp_micros()
- })
- });
+ let tz: Tz = timezone.parse()?;
+ let array: PrimitiveArray<TimestampMicrosecondType> =
array.try_unary(|value| {
+ as_datetime::<TimestampMicrosecondType>(value)
+ .ok_or_else(|| datetime_cast_err(value))
+ .map(|datetime| {
+ let offset =
tz.offset_from_utc_datetime(&datetime).fix();
+ let datetime = datetime + offset;
+ datetime.and_utc().timestamp_micros()
+ })
+ })?;
- let array: PrimitiveArray<TimestampMicrosecondType> =
- unsafe { PrimitiveArray::from_trusted_len_iter(values) };
- Arc::new(array) as ArrayRef
+ Ok(Arc::new(array))
}
- _ => array,
+ _ => Ok(array),
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]