This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 6bb014f8 fix:  Return error in pre_timestamp_cast instead of panic  
(#543)
6bb014f8 is described below

commit 6bb014f89b8c45a044ed9fcf2fa244a8e5feb15d
Author: Emil Ejbyfeldt <[email protected]>
AuthorDate: Tue Jun 18 19:04:20 2024 +0200

    fix:  Return error in pre_timestamp_cast instead of panic  (#543)
    
    * fix: Forward errors from pre_timestamp_cast
    
    * Update core/src/execution/datafusion/expressions/utils.rs
    
    Improve comment.
    
    Co-authored-by: Parth Chandra <[email protected]>
    
    * Add spec for invalid timezone
    
    ---------
    
    Co-authored-by: Parth Chandra <[email protected]>
---
 core/src/execution/datafusion/expressions/cast.rs  | 31 ++++++++-
 .../execution/datafusion/expressions/temporal.rs   | 10 +--
 core/src/execution/datafusion/expressions/utils.rs | 77 ++++++++++++----------
 3 files changed, 78 insertions(+), 40 deletions(-)

diff --git a/core/src/execution/datafusion/expressions/cast.rs 
b/core/src/execution/datafusion/expressions/cast.rs
index 4dae62dc..7a71d201 100644
--- a/core/src/execution/datafusion/expressions/cast.rs
+++ b/core/src/execution/datafusion/expressions/cast.rs
@@ -498,7 +498,7 @@ impl Cast {
 
     fn cast_array(&self, array: ArrayRef) -> DataFusionResult<ArrayRef> {
         let to_type = &self.data_type;
-        let array = array_with_timezone(array, self.timezone.clone(), 
Some(to_type));
+        let array = array_with_timezone(array, self.timezone.clone(), 
Some(to_type))?;
         let from_type = array.data_type().clone();
 
         // unpack dictionary string arrays first
@@ -1641,6 +1641,8 @@ mod tests {
     use arrow_array::StringArray;
     use arrow_schema::TimeUnit;
 
+    use datafusion_physical_expr::expressions::Column;
+
     use super::*;
 
     #[test]
@@ -1897,4 +1899,31 @@ mod tests {
         assert!(cast_string_to_i8("0.2", EvalMode::Ansi).is_err());
         assert!(cast_string_to_i8(".", EvalMode::Ansi).is_err());
     }
+
+    #[test]
+    fn test_cast_unsupported_timestamp_to_date() {
+        // Since datafusion uses chrono::Datetime internally not all dates 
representable by TimestampMicrosecondType are supported
+        let timestamps: PrimitiveArray<TimestampMicrosecondType> = 
vec![i64::MAX].into();
+        let cast = Cast::new(
+            Arc::new(Column::new("a", 0)),
+            DataType::Date32,
+            EvalMode::Legacy,
+            "UTC".to_owned(),
+        );
+        let result = 
cast.cast_array(Arc::new(timestamps.with_timezone("Europe/Copenhagen")));
+        assert!(result.is_err())
+    }
+
+    #[test]
+    fn test_cast_invalid_timezone() {
+        let timestamps: PrimitiveArray<TimestampMicrosecondType> = 
vec![i64::MAX].into();
+        let cast = Cast::new(
+            Arc::new(Column::new("a", 0)),
+            DataType::Date32,
+            EvalMode::Legacy,
+            "Not a valid timezone".to_owned(),
+        );
+        let result = 
cast.cast_array(Arc::new(timestamps.with_timezone("Europe/Copenhagen")));
+        assert!(result.is_err())
+    }
 }
diff --git a/core/src/execution/datafusion/expressions/temporal.rs 
b/core/src/execution/datafusion/expressions/temporal.rs
index 22b4aee8..69fbb791 100644
--- a/core/src/execution/datafusion/expressions/temporal.rs
+++ b/core/src/execution/datafusion/expressions/temporal.rs
@@ -100,7 +100,7 @@ impl PhysicalExpr for HourExec {
                         Microsecond,
                         Some(self.timezone.clone().into()),
                     )),
-                );
+                )?;
                 let result = date_part(&array, DatePart::Hour)?;
 
                 Ok(ColumnarValue::Array(result))
@@ -194,7 +194,7 @@ impl PhysicalExpr for MinuteExec {
                         Microsecond,
                         Some(self.timezone.clone().into()),
                     )),
-                );
+                )?;
                 let result = date_part(&array, DatePart::Minute)?;
 
                 Ok(ColumnarValue::Array(result))
@@ -288,7 +288,7 @@ impl PhysicalExpr for SecondExec {
                         Microsecond,
                         Some(self.timezone.clone().into()),
                     )),
-                );
+                )?;
                 let result = date_part(&array, DatePart::Second)?;
 
                 Ok(ColumnarValue::Array(result))
@@ -489,7 +489,7 @@ impl PhysicalExpr for TimestampTruncExec {
                     ts,
                     tz.clone(),
                     Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
-                );
+                )?;
                 let result = timestamp_trunc_dyn(&ts, format)?;
                 Ok(ColumnarValue::Array(result))
             }
@@ -498,7 +498,7 @@ impl PhysicalExpr for TimestampTruncExec {
                     ts,
                     tz.clone(),
                     Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
-                );
+                )?;
                 let result = timestamp_trunc_array_fmt_dyn(&ts, &formats)?;
                 Ok(ColumnarValue::Array(result))
             }
diff --git a/core/src/execution/datafusion/expressions/utils.rs 
b/core/src/execution/datafusion/expressions/utils.rs
index fe27f796..ee8646a7 100644
--- a/core/src/execution/datafusion/expressions/utils.rs
+++ b/core/src/execution/datafusion/expressions/utils.rs
@@ -81,12 +81,12 @@ pub fn array_with_timezone(
     array: ArrayRef,
     timezone: String,
     to_type: Option<&DataType>,
-) -> ArrayRef {
+) -> Result<ArrayRef, ArrowError> {
     match array.data_type() {
         DataType::Timestamp(_, None) => {
             assert!(!timezone.is_empty());
             match to_type {
-                Some(DataType::Utf8) | Some(DataType::Date32) => array,
+                Some(DataType::Utf8) | Some(DataType::Date32) => Ok(array),
                 Some(DataType::Timestamp(_, Some(_))) => {
                     timestamp_ntz_to_timestamp(array, timezone.as_str(), 
Some(timezone.as_str()))
                 }
@@ -109,7 +109,7 @@ pub fn array_with_timezone(
                 Some(DataType::Utf8) | Some(DataType::Date32) => {
                     pre_timestamp_cast(array, timezone)
                 }
-                _ => array,
+                _ => Ok(array),
             }
         }
         DataType::Dictionary(_, value_type)
@@ -118,14 +118,20 @@ pub fn array_with_timezone(
             let dict = as_dictionary_array::<Int32Type>(&array);
             let array = 
as_primitive_array::<TimestampMicrosecondType>(dict.values());
             let array_with_timezone =
-                array_with_timezone(Arc::new(array.clone()) as ArrayRef, 
timezone, to_type);
+                array_with_timezone(Arc::new(array.clone()) as ArrayRef, 
timezone, to_type)?;
             let dict = dict.with_values(array_with_timezone);
-            Arc::new(dict) as ArrayRef
+            Ok(Arc::new(dict))
         }
-        _ => array,
+        _ => Ok(array),
     }
 }
 
+fn datetime_cast_err(value: i64) -> ArrowError {
+    ArrowError::CastError(format!(
+        "Cannot convert TimestampMicrosecondType {value} to datetime. Comet 
only supports dates between Jan 1, 262145 BCE and Dec 31, 262143 CE",
+    ))
+}
+
 /// Takes in a Timestamp(Microsecond, None) array and a timezone id, and 
returns
 /// a Timestamp(Microsecond, Some<_>) array.
 /// The understanding is that the input array has time in the timezone 
specified in the second
@@ -134,36 +140,40 @@ pub fn array_with_timezone(
 ///     array - input array of timestamp without timezone
 ///     tz - timezone of the values in the input array
 ///     to_timezone - timezone to change the input values to
-fn timestamp_ntz_to_timestamp(array: ArrayRef, tz: &str, to_timezone: 
Option<&str>) -> ArrayRef {
+fn timestamp_ntz_to_timestamp(
+    array: ArrayRef,
+    tz: &str,
+    to_timezone: Option<&str>,
+) -> Result<ArrayRef, ArrowError> {
     assert!(!tz.is_empty());
     match array.data_type() {
         DataType::Timestamp(_, None) => {
             let array = as_primitive_array::<TimestampMicrosecondType>(&array);
-            let tz: Tz = tz.parse().unwrap();
-            let values = array.iter().map(|v| {
-                v.map(|value| {
-                    let local_datetime = 
as_datetime::<TimestampMicrosecondType>(value).unwrap();
-                    let datetime: DateTime<Tz> = 
tz.from_local_datetime(&local_datetime).unwrap();
-                    datetime.timestamp_micros()
-                })
-            });
-            let mut array: PrimitiveArray<TimestampMicrosecondType> =
-                unsafe { PrimitiveArray::from_trusted_len_iter(values) };
-            array = if let Some(to_tz) = to_timezone {
+            let tz: Tz = tz.parse()?;
+            let array: PrimitiveArray<TimestampMicrosecondType> = 
array.try_unary(|value| {
+                as_datetime::<TimestampMicrosecondType>(value)
+                    .ok_or_else(|| datetime_cast_err(value))
+                    .map(|local_datetime| {
+                        let datetime: DateTime<Tz> =
+                            tz.from_local_datetime(&local_datetime).unwrap();
+                        datetime.timestamp_micros()
+                    })
+            })?;
+            let array_with_tz = if let Some(to_tz) = to_timezone {
                 array.with_timezone(to_tz)
             } else {
                 array
             };
-            Arc::new(array) as ArrayRef
+            Ok(Arc::new(array_with_tz))
         }
-        _ => array,
+        _ => Ok(array),
     }
 }
 
 const MICROS_PER_SECOND: i64 = 1000000;
 
 /// This takes for special pre-casting cases of Spark. E.g., Timestamp to 
String.
-fn pre_timestamp_cast(array: ArrayRef, timezone: String) -> ArrayRef {
+fn pre_timestamp_cast(array: ArrayRef, timezone: String) -> Result<ArrayRef, 
ArrowError> {
     assert!(!timezone.is_empty());
     match array.data_type() {
         DataType::Timestamp(_, _) => {
@@ -172,21 +182,20 @@ fn pre_timestamp_cast(array: ArrayRef, timezone: String) 
-> ArrayRef {
             // timestamp value and remove timezone from array datatype.
             let array = as_primitive_array::<TimestampMicrosecondType>(&array);
 
-            let tz: Tz = timezone.parse().unwrap();
-            let values = array.iter().map(|v| {
-                v.map(|value| {
-                    let datetime = 
as_datetime::<TimestampMicrosecondType>(value).unwrap();
-                    let offset = tz.offset_from_utc_datetime(&datetime).fix();
-                    let datetime = datetime + offset;
-                    datetime.and_utc().timestamp_micros()
-                })
-            });
+            let tz: Tz = timezone.parse()?;
+            let array: PrimitiveArray<TimestampMicrosecondType> = 
array.try_unary(|value| {
+                as_datetime::<TimestampMicrosecondType>(value)
+                    .ok_or_else(|| datetime_cast_err(value))
+                    .map(|datetime| {
+                        let offset = 
tz.offset_from_utc_datetime(&datetime).fix();
+                        let datetime = datetime + offset;
+                        datetime.and_utc().timestamp_micros()
+                    })
+            })?;
 
-            let array: PrimitiveArray<TimestampMicrosecondType> =
-                unsafe { PrimitiveArray::from_trusted_len_iter(values) };
-            Arc::new(array) as ArrayRef
+            Ok(Arc::new(array))
         }
-        _ => array,
+        _ => Ok(array),
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to