This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 21e33a3d1 Propagate timezone to created arrays (#5481)
21e33a3d1 is described below

commit 21e33a3d1047aa97be60b2efb4516efdd1d2b6bb
Author: Max Burke <[email protected]>
AuthorDate: Mon Mar 6 05:58:13 2023 -0800

    Propagate timezone to created arrays (#5481)
    
    * Propagate timezone to created arrays
    
    * Add test for timezone propogation
    
    This was manifesting as an error in the use of window functions on
    Timestamp types with Arrow failing to concatenate arrays of different
    data types (one array with a timezone, one without).
---
 datafusion/common/src/scalar.rs                    |  32 +++++++++++++-------
 .../tests/parquet/data/timestamp_with_tz.parquet   | Bin 0 -> 87689 bytes
 datafusion/core/tests/sql/parquet.rs               |  33 +++++++++++++++++++++
 3 files changed, 55 insertions(+), 10 deletions(-)

diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index c31b37b63..212328121 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -1296,7 +1296,7 @@ impl ScalarValue {
         }
 
         macro_rules! build_array_primitive_tz {
-            ($ARRAY_TY:ident, $SCALAR_TY:ident) => {{
+            ($ARRAY_TY:ident, $SCALAR_TY:ident, $TZ:expr) => {{
                 {
                     let array = scalars.map(|sv| {
                         if let ScalarValue::$SCALAR_TY(v, _) = sv {
@@ -1310,7 +1310,7 @@ impl ScalarValue {
                         }
                     })
                     .collect::<Result<$ARRAY_TY>>()?;
-                    Arc::new(array)
+                    Arc::new(array.with_timezone_opt($TZ.clone()))
                 }
             }};
         }
@@ -1444,17 +1444,29 @@ impl ScalarValue {
             DataType::Time64(TimeUnit::Nanosecond) => {
                 build_array_primitive!(Time64NanosecondArray, Time64Nanosecond)
             }
-            DataType::Timestamp(TimeUnit::Second, _) => {
-                build_array_primitive_tz!(TimestampSecondArray, 
TimestampSecond)
+            DataType::Timestamp(TimeUnit::Second, tz) => {
+                build_array_primitive_tz!(TimestampSecondArray, 
TimestampSecond, tz)
             }
-            DataType::Timestamp(TimeUnit::Millisecond, _) => {
-                build_array_primitive_tz!(TimestampMillisecondArray, 
TimestampMillisecond)
+            DataType::Timestamp(TimeUnit::Millisecond, tz) => {
+                build_array_primitive_tz!(
+                    TimestampMillisecondArray,
+                    TimestampMillisecond,
+                    tz
+                )
             }
-            DataType::Timestamp(TimeUnit::Microsecond, _) => {
-                build_array_primitive_tz!(TimestampMicrosecondArray, 
TimestampMicrosecond)
+            DataType::Timestamp(TimeUnit::Microsecond, tz) => {
+                build_array_primitive_tz!(
+                    TimestampMicrosecondArray,
+                    TimestampMicrosecond,
+                    tz
+                )
             }
-            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
-                build_array_primitive_tz!(TimestampNanosecondArray, 
TimestampNanosecond)
+            DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
+                build_array_primitive_tz!(
+                    TimestampNanosecondArray,
+                    TimestampNanosecond,
+                    tz
+                )
             }
             DataType::Interval(IntervalUnit::DayTime) => {
                 build_array_primitive!(IntervalDayTimeArray, IntervalDayTime)
diff --git a/datafusion/core/tests/parquet/data/timestamp_with_tz.parquet 
b/datafusion/core/tests/parquet/data/timestamp_with_tz.parquet
new file mode 100644
index 000000000..075f846a6
Binary files /dev/null and 
b/datafusion/core/tests/parquet/data/timestamp_with_tz.parquet differ
diff --git a/datafusion/core/tests/sql/parquet.rs 
b/datafusion/core/tests/sql/parquet.rs
index 31cd0da21..b18528992 100644
--- a/datafusion/core/tests/sql/parquet.rs
+++ b/datafusion/core/tests/sql/parquet.rs
@@ -165,6 +165,39 @@ async fn fixed_size_binary_columns() {
     }
 }
 
+#[tokio::test]
+async fn window_fn_timestamp_tz() {
+    let ctx = SessionContext::new();
+    ctx.register_parquet(
+        "t0",
+        "tests/parquet/data/timestamp_with_tz.parquet",
+        ParquetReadOptions::default(),
+    )
+    .await
+    .unwrap();
+
+    let sql = "SELECT count, LAG(timestamp, 1) OVER (ORDER BY timestamp) FROM 
t0";
+    let dataframe = ctx.sql(sql).await.unwrap();
+    let results = dataframe.collect().await.unwrap();
+
+    let mut num_rows = 0;
+    for batch in results {
+        num_rows += batch.num_rows();
+        assert_eq!(2, batch.num_columns());
+
+        let ty = batch.column(0).data_type().clone();
+        assert_eq!(DataType::Int64, ty);
+
+        let ty = batch.column(1).data_type().clone();
+        assert_eq!(
+            DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".to_owned())),
+            ty
+        );
+    }
+
+    assert_eq!(131072, num_rows);
+}
+
 #[tokio::test]
 async fn parquet_single_nan_schema() {
     let ctx = SessionContext::new();

Reply via email to