This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6e5344ae36 Extract parquet statistics from timestamps with timezones 
(#10766)
6e5344ae36 is described below

commit 6e5344ae367001dbf70fa2882c2e89eca4a2bbd8
Author: Xin Li <[email protected]>
AuthorDate: Mon Jun 3 23:25:16 2024 +0800

    Extract parquet statistics from timestamps with timezones (#10766)
    
    * Fix incorrect statistics read for timestamp columns in parquet
---
 .../datasource/physical_plan/parquet/statistics.rs | 285 +++++++++++++--
 datafusion/core/tests/parquet/arrow_statistics.rs  | 383 +++++++++++++++++----
 datafusion/core/tests/parquet/mod.rs               |  46 ++-
 3 files changed, 615 insertions(+), 99 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
index ae8395aef6..1c20fa7caa 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
@@ -19,7 +19,7 @@
 
 // TODO: potentially move this to arrow-rs: 
https://github.com/apache/arrow-rs/issues/4328
 
-use arrow::{array::ArrayRef, datatypes::DataType};
+use arrow::{array::ArrayRef, datatypes::DataType, datatypes::TimeUnit};
 use arrow_array::{new_empty_array, new_null_array, UInt64Array};
 use arrow_schema::{Field, FieldRef, Schema};
 use datafusion_common::{
@@ -112,6 +112,26 @@ macro_rules! get_statistic {
                     Some(DataType::UInt64) => {
                         Some(ScalarValue::UInt64(Some((*s.$func()) as u64)))
                     }
+                    Some(DataType::Timestamp(unit, timezone)) => {
+                        Some(match unit {
+                            TimeUnit::Second => ScalarValue::TimestampSecond(
+                                Some(*s.$func()),
+                                timezone.clone(),
+                            ),
+                            TimeUnit::Millisecond => 
ScalarValue::TimestampMillisecond(
+                                Some(*s.$func()),
+                                timezone.clone(),
+                            ),
+                            TimeUnit::Microsecond => 
ScalarValue::TimestampMicrosecond(
+                                Some(*s.$func()),
+                                timezone.clone(),
+                            ),
+                            TimeUnit::Nanosecond => 
ScalarValue::TimestampNanosecond(
+                                Some(*s.$func()),
+                                timezone.clone(),
+                            ),
+                        })
+                    }
                     _ => Some(ScalarValue::Int64(Some(*s.$func()))),
                 }
             }
@@ -395,7 +415,8 @@ mod test {
     use arrow_array::{
         new_null_array, Array, BinaryArray, BooleanArray, Date32Array, 
Date64Array,
         Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array, 
Int64Array,
-        Int8Array, RecordBatch, StringArray, StructArray, 
TimestampNanosecondArray,
+        Int8Array, RecordBatch, StringArray, StructArray, 
TimestampMicrosecondArray,
+        TimestampMillisecondArray, TimestampNanosecondArray, 
TimestampSecondArray,
     };
     use arrow_schema::{Field, SchemaRef};
     use bytes::Bytes;
@@ -536,28 +557,209 @@ mod test {
     }
 
     #[test]
-    #[should_panic(
-        expected = "Inconsistent types in ScalarValue::iter_to_array. Expected 
Int64, got TimestampNanosecond(NULL, None)"
-    )]
-    // Due to https://github.com/apache/datafusion/issues/8295
     fn roundtrip_timestamp() {
         Test {
-            input: timestamp_array([
-                // row group 1
-                Some(1),
-                None,
-                Some(3),
-                // row group 2
-                Some(9),
-                Some(5),
+            input: timestamp_seconds_array(
+                [
+                    // row group 1
+                    Some(1),
+                    None,
+                    Some(3),
+                    // row group 2
+                    Some(9),
+                    Some(5),
+                    None,
+                    // row group 3
+                    None,
+                    None,
+                    None,
+                ],
                 None,
-                // row group 3
+            ),
+            expected_min: timestamp_seconds_array([Some(1), Some(5), None], 
None),
+            expected_max: timestamp_seconds_array([Some(3), Some(9), None], 
None),
+        }
+        .run();
+
+        Test {
+            input: timestamp_milliseconds_array(
+                [
+                    // row group 1
+                    Some(1),
+                    None,
+                    Some(3),
+                    // row group 2
+                    Some(9),
+                    Some(5),
+                    None,
+                    // row group 3
+                    None,
+                    None,
+                    None,
+                ],
                 None,
+            ),
+            expected_min: timestamp_milliseconds_array([Some(1), Some(5), 
None], None),
+            expected_max: timestamp_milliseconds_array([Some(3), Some(9), 
None], None),
+        }
+        .run();
+
+        Test {
+            input: timestamp_microseconds_array(
+                [
+                    // row group 1
+                    Some(1),
+                    None,
+                    Some(3),
+                    // row group 2
+                    Some(9),
+                    Some(5),
+                    None,
+                    // row group 3
+                    None,
+                    None,
+                    None,
+                ],
                 None,
+            ),
+            expected_min: timestamp_microseconds_array([Some(1), Some(5), 
None], None),
+            expected_max: timestamp_microseconds_array([Some(3), Some(9), 
None], None),
+        }
+        .run();
+
+        Test {
+            input: timestamp_nanoseconds_array(
+                [
+                    // row group 1
+                    Some(1),
+                    None,
+                    Some(3),
+                    // row group 2
+                    Some(9),
+                    Some(5),
+                    None,
+                    // row group 3
+                    None,
+                    None,
+                    None,
+                ],
                 None,
-            ]),
-            expected_min: timestamp_array([Some(1), Some(5), None]),
-            expected_max: timestamp_array([Some(3), Some(9), None]),
+            ),
+            expected_min: timestamp_nanoseconds_array([Some(1), Some(5), 
None], None),
+            expected_max: timestamp_nanoseconds_array([Some(3), Some(9), 
None], None),
+        }
+        .run()
+    }
+
+    #[test]
+    fn roundtrip_timestamp_timezoned() {
+        Test {
+            input: timestamp_seconds_array(
+                [
+                    // row group 1
+                    Some(1),
+                    None,
+                    Some(3),
+                    // row group 2
+                    Some(9),
+                    Some(5),
+                    None,
+                    // row group 3
+                    None,
+                    None,
+                    None,
+                ],
+                Some("UTC"),
+            ),
+            expected_min: timestamp_seconds_array([Some(1), Some(5), None], 
Some("UTC")),
+            expected_max: timestamp_seconds_array([Some(3), Some(9), None], 
Some("UTC")),
+        }
+        .run();
+
+        Test {
+            input: timestamp_milliseconds_array(
+                [
+                    // row group 1
+                    Some(1),
+                    None,
+                    Some(3),
+                    // row group 2
+                    Some(9),
+                    Some(5),
+                    None,
+                    // row group 3
+                    None,
+                    None,
+                    None,
+                ],
+                Some("UTC"),
+            ),
+            expected_min: timestamp_milliseconds_array(
+                [Some(1), Some(5), None],
+                Some("UTC"),
+            ),
+            expected_max: timestamp_milliseconds_array(
+                [Some(3), Some(9), None],
+                Some("UTC"),
+            ),
+        }
+        .run();
+
+        Test {
+            input: timestamp_microseconds_array(
+                [
+                    // row group 1
+                    Some(1),
+                    None,
+                    Some(3),
+                    // row group 2
+                    Some(9),
+                    Some(5),
+                    None,
+                    // row group 3
+                    None,
+                    None,
+                    None,
+                ],
+                Some("UTC"),
+            ),
+            expected_min: timestamp_microseconds_array(
+                [Some(1), Some(5), None],
+                Some("UTC"),
+            ),
+            expected_max: timestamp_microseconds_array(
+                [Some(3), Some(9), None],
+                Some("UTC"),
+            ),
+        }
+        .run();
+
+        Test {
+            input: timestamp_nanoseconds_array(
+                [
+                    // row group 1
+                    Some(1),
+                    None,
+                    Some(3),
+                    // row group 2
+                    Some(9),
+                    Some(5),
+                    None,
+                    // row group 3
+                    None,
+                    None,
+                    None,
+                ],
+                Some("UTC"),
+            ),
+            expected_min: timestamp_nanoseconds_array(
+                [Some(1), Some(5), None],
+                Some("UTC"),
+            ),
+            expected_max: timestamp_nanoseconds_array(
+                [Some(3), Some(9), None],
+                Some("UTC"),
+            ),
         }
         .run()
     }
@@ -914,8 +1116,8 @@ mod test {
             // File has no min/max for timestamp_col
             .with_column(ExpectedColumn {
                 name: "timestamp_col",
-                expected_min: timestamp_array([None]),
-                expected_max: timestamp_array([None]),
+                expected_min: timestamp_nanoseconds_array([None], None),
+                expected_max: timestamp_nanoseconds_array([None], None),
             })
             .with_column(ExpectedColumn {
                 name: "year",
@@ -1135,9 +1337,48 @@ mod test {
         Arc::new(array)
     }
 
-    fn timestamp_array(input: impl IntoIterator<Item = Option<i64>>) -> 
ArrayRef {
+    fn timestamp_seconds_array(
+        input: impl IntoIterator<Item = Option<i64>>,
+        timzezone: Option<&str>,
+    ) -> ArrayRef {
+        let array: TimestampSecondArray = input.into_iter().collect();
+        match timzezone {
+            Some(tz) => Arc::new(array.with_timezone(tz)),
+            None => Arc::new(array),
+        }
+    }
+
+    fn timestamp_milliseconds_array(
+        input: impl IntoIterator<Item = Option<i64>>,
+        timzezone: Option<&str>,
+    ) -> ArrayRef {
+        let array: TimestampMillisecondArray = input.into_iter().collect();
+        match timzezone {
+            Some(tz) => Arc::new(array.with_timezone(tz)),
+            None => Arc::new(array),
+        }
+    }
+
+    fn timestamp_microseconds_array(
+        input: impl IntoIterator<Item = Option<i64>>,
+        timzezone: Option<&str>,
+    ) -> ArrayRef {
+        let array: TimestampMicrosecondArray = input.into_iter().collect();
+        match timzezone {
+            Some(tz) => Arc::new(array.with_timezone(tz)),
+            None => Arc::new(array),
+        }
+    }
+
+    fn timestamp_nanoseconds_array(
+        input: impl IntoIterator<Item = Option<i64>>,
+        timzezone: Option<&str>,
+    ) -> ArrayRef {
         let array: TimestampNanosecondArray = input.into_iter().collect();
-        Arc::new(array)
+        match timzezone {
+            Some(tz) => Arc::new(array.with_timezone(tz)),
+            None => Arc::new(array),
+        }
     }
 
     fn utf8_array<'a>(input: impl IntoIterator<Item = Option<&'a str>>) -> 
ArrayRef {
diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs 
b/datafusion/core/tests/parquet/arrow_statistics.rs
index eebf3447cb..2836cd2893 100644
--- a/datafusion/core/tests/parquet/arrow_statistics.rs
+++ b/datafusion/core/tests/parquet/arrow_statistics.rs
@@ -22,12 +22,16 @@ use std::fs::File;
 use std::sync::Arc;
 
 use arrow::compute::kernels::cast_utils::Parser;
-use arrow::datatypes::{Date32Type, Date64Type};
+use arrow::datatypes::{
+    Date32Type, Date64Type, TimestampMicrosecondType, TimestampMillisecondType,
+    TimestampNanosecondType, TimestampSecondType,
+};
 use arrow_array::{
     make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, 
Date64Array,
     Decimal128Array, FixedSizeBinaryArray, Float32Array, Float64Array, 
Int16Array,
-    Int32Array, Int64Array, Int8Array, RecordBatch, StringArray, UInt16Array,
-    UInt32Array, UInt64Array, UInt8Array,
+    Int32Array, Int64Array, Int8Array, RecordBatch, StringArray,
+    TimestampMicrosecondArray, TimestampMillisecondArray, 
TimestampNanosecondArray,
+    TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
 };
 use arrow_schema::{DataType, Field, Schema};
 use datafusion::datasource::physical_plan::parquet::{
@@ -456,36 +460,40 @@ async fn test_int_8() {
 // timestamp
 #[tokio::test]
 async fn test_timestamp() {
-    // This creates a parquet files of 5 columns named "nanos", "micros", 
"millis", "seconds", "names"
+    // This creates a parquet files of 9 columns named "nanos", 
"nanos_timezoned", "micros", "micros_timezoned", "millis", "millis_timezoned", 
"seconds", "seconds_timezoned", "names"
     // "nanos" --> TimestampNanosecondArray
+    // "nanos_timezoned" --> TimestampNanosecondArray
     // "micros" --> TimestampMicrosecondArray
+    // "micros_timezoned" --> TimestampMicrosecondArray
     // "millis" --> TimestampMillisecondArray
+    // "millis_timezoned" --> TimestampMillisecondArray
     // "seconds" --> TimestampSecondArray
+    // "seconds_timezoned" --> TimestampSecondArray
     // "names" --> StringArray
     //
     // The file is created by 4 record batches, each has 5 rowws.
     // Since the row group isze is set to 5, those 4 batches will go into 4 
row groups
-    // This creates a parquet files of 4 columns named "i8", "i16", "i32", 
"i64"
+    // This creates a parquet files of 4 columns named "nanos", 
"nanos_timezoned", "micros", "micros_timezoned", "millis", "millis_timezoned", 
"seconds", "seconds_timezoned"
     let reader = TestReader {
         scenario: Scenario::Timestamps,
         row_per_group: 5,
     };
 
+    let tz = "Pacific/Efate";
+
     Test {
         reader: reader.build().await,
-        // mins are [1577840461000000000, 1577840471000000000, 
1577841061000000000, 1578704461000000000,]
-        expected_min: Arc::new(Int64Array::from(vec![
-            1577840461000000000,
-            1577840471000000000,
-            1577841061000000000,
-            1578704461000000000,
+        expected_min: Arc::new(TimestampNanosecondArray::from(vec![
+            TimestampNanosecondType::parse("2020-01-01T01:01:01"),
+            TimestampNanosecondType::parse("2020-01-01T01:01:11"),
+            TimestampNanosecondType::parse("2020-01-01T01:11:01"),
+            TimestampNanosecondType::parse("2020-01-11T01:01:01"),
         ])),
-        // maxes are [1577926861000000000, 1577926871000000000, 
1577927461000000000, 1578790861000000000,]
-        expected_max: Arc::new(Int64Array::from(vec![
-            1577926861000000000,
-            1577926871000000000,
-            1577927461000000000,
-            1578790861000000000,
+        expected_max: Arc::new(TimestampNanosecondArray::from(vec![
+            TimestampNanosecondType::parse("2020-01-02T01:01:01"),
+            TimestampNanosecondType::parse("2020-01-02T01:01:11"),
+            TimestampNanosecondType::parse("2020-01-02T01:11:01"),
+            TimestampNanosecondType::parse("2020-01-12T01:01:01"),
         ])),
         // nulls are [1, 1, 1, 1]
         expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
@@ -495,21 +503,48 @@ async fn test_timestamp() {
     }
     .run();
 
-    // micros
+    Test {
+        reader: reader.build().await,
+        expected_min: Arc::new(
+            TimestampNanosecondArray::from(vec![
+                TimestampNanosecondType::parse("2020-01-01T01:01:01"),
+                TimestampNanosecondType::parse("2020-01-01T01:01:11"),
+                TimestampNanosecondType::parse("2020-01-01T01:11:01"),
+                TimestampNanosecondType::parse("2020-01-11T01:01:01"),
+            ])
+            .with_timezone(tz),
+        ),
+        expected_max: Arc::new(
+            TimestampNanosecondArray::from(vec![
+                TimestampNanosecondType::parse("2020-01-02T01:01:01"),
+                TimestampNanosecondType::parse("2020-01-02T01:01:11"),
+                TimestampNanosecondType::parse("2020-01-02T01:11:01"),
+                TimestampNanosecondType::parse("2020-01-12T01:01:01"),
+            ])
+            .with_timezone(tz),
+        ),
+        // nulls are [1, 1, 1, 1]
+        expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
+        // row counts are [5, 5, 5, 5]
+        expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
+        column_name: "nanos_timezoned",
+    }
+    .run();
 
+    // micros
     Test {
         reader: reader.build().await,
-        expected_min: Arc::new(Int64Array::from(vec![
-            1577840461000000,
-            1577840471000000,
-            1577841061000000,
-            1578704461000000,
+        expected_min: Arc::new(TimestampMicrosecondArray::from(vec![
+            TimestampMicrosecondType::parse("2020-01-01T01:01:01"),
+            TimestampMicrosecondType::parse("2020-01-01T01:01:11"),
+            TimestampMicrosecondType::parse("2020-01-01T01:11:01"),
+            TimestampMicrosecondType::parse("2020-01-11T01:01:01"),
         ])),
-        expected_max: Arc::new(Int64Array::from(vec![
-            1577926861000000,
-            1577926871000000,
-            1577927461000000,
-            1578790861000000,
+        expected_max: Arc::new(TimestampMicrosecondArray::from(vec![
+            TimestampMicrosecondType::parse("2020-01-02T01:01:01"),
+            TimestampMicrosecondType::parse("2020-01-02T01:01:11"),
+            TimestampMicrosecondType::parse("2020-01-02T01:11:01"),
+            TimestampMicrosecondType::parse("2020-01-12T01:01:01"),
         ])),
         expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
         expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
@@ -517,20 +552,48 @@ async fn test_timestamp() {
     }
     .run();
 
+    Test {
+        reader: reader.build().await,
+        expected_min: Arc::new(
+            TimestampMicrosecondArray::from(vec![
+                TimestampMicrosecondType::parse("2020-01-01T01:01:01"),
+                TimestampMicrosecondType::parse("2020-01-01T01:01:11"),
+                TimestampMicrosecondType::parse("2020-01-01T01:11:01"),
+                TimestampMicrosecondType::parse("2020-01-11T01:01:01"),
+            ])
+            .with_timezone(tz),
+        ),
+        expected_max: Arc::new(
+            TimestampMicrosecondArray::from(vec![
+                TimestampMicrosecondType::parse("2020-01-02T01:01:01"),
+                TimestampMicrosecondType::parse("2020-01-02T01:01:11"),
+                TimestampMicrosecondType::parse("2020-01-02T01:11:01"),
+                TimestampMicrosecondType::parse("2020-01-12T01:01:01"),
+            ])
+            .with_timezone(tz),
+        ),
+        // nulls are [1, 1, 1, 1]
+        expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
+        // row counts are [5, 5, 5, 5]
+        expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
+        column_name: "micros_timezoned",
+    }
+    .run();
+
     // millis
     Test {
         reader: reader.build().await,
-        expected_min: Arc::new(Int64Array::from(vec![
-            1577840461000,
-            1577840471000,
-            1577841061000,
-            1578704461000,
+        expected_min: Arc::new(TimestampMillisecondArray::from(vec![
+            TimestampMillisecondType::parse("2020-01-01T01:01:01"),
+            TimestampMillisecondType::parse("2020-01-01T01:01:11"),
+            TimestampMillisecondType::parse("2020-01-01T01:11:01"),
+            TimestampMillisecondType::parse("2020-01-11T01:01:01"),
         ])),
-        expected_max: Arc::new(Int64Array::from(vec![
-            1577926861000,
-            1577926871000,
-            1577927461000,
-            1578790861000,
+        expected_max: Arc::new(TimestampMillisecondArray::from(vec![
+            TimestampMillisecondType::parse("2020-01-02T01:01:01"),
+            TimestampMillisecondType::parse("2020-01-02T01:01:11"),
+            TimestampMillisecondType::parse("2020-01-02T01:11:01"),
+            TimestampMillisecondType::parse("2020-01-12T01:01:01"),
         ])),
         expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
         expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
@@ -538,30 +601,96 @@ async fn test_timestamp() {
     }
     .run();
 
+    Test {
+        reader: reader.build().await,
+        expected_min: Arc::new(
+            TimestampMillisecondArray::from(vec![
+                TimestampMillisecondType::parse("2020-01-01T01:01:01"),
+                TimestampMillisecondType::parse("2020-01-01T01:01:11"),
+                TimestampMillisecondType::parse("2020-01-01T01:11:01"),
+                TimestampMillisecondType::parse("2020-01-11T01:01:01"),
+            ])
+            .with_timezone(tz),
+        ),
+        expected_max: Arc::new(
+            TimestampMillisecondArray::from(vec![
+                TimestampMillisecondType::parse("2020-01-02T01:01:01"),
+                TimestampMillisecondType::parse("2020-01-02T01:01:11"),
+                TimestampMillisecondType::parse("2020-01-02T01:11:01"),
+                TimestampMillisecondType::parse("2020-01-12T01:01:01"),
+            ])
+            .with_timezone(tz),
+        ),
+        // nulls are [1, 1, 1, 1]
+        expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
+        // row counts are [5, 5, 5, 5]
+        expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
+        column_name: "millis_timezoned",
+    }
+    .run();
+
     // seconds
     Test {
         reader: reader.build().await,
-        expected_min: Arc::new(Int64Array::from(vec![
-            1577840461, 1577840471, 1577841061, 1578704461,
+        expected_min: Arc::new(TimestampSecondArray::from(vec![
+            TimestampSecondType::parse("2020-01-01T01:01:01"),
+            TimestampSecondType::parse("2020-01-01T01:01:11"),
+            TimestampSecondType::parse("2020-01-01T01:11:01"),
+            TimestampSecondType::parse("2020-01-11T01:01:01"),
         ])),
-        expected_max: Arc::new(Int64Array::from(vec![
-            1577926861, 1577926871, 1577927461, 1578790861,
+        expected_max: Arc::new(TimestampSecondArray::from(vec![
+            TimestampSecondType::parse("2020-01-02T01:01:01"),
+            TimestampSecondType::parse("2020-01-02T01:01:11"),
+            TimestampSecondType::parse("2020-01-02T01:11:01"),
+            TimestampSecondType::parse("2020-01-12T01:01:01"),
         ])),
         expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
         expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
         column_name: "seconds",
     }
     .run();
+
+    Test {
+        reader: reader.build().await,
+        expected_min: Arc::new(
+            TimestampSecondArray::from(vec![
+                TimestampSecondType::parse("2020-01-01T01:01:01"),
+                TimestampSecondType::parse("2020-01-01T01:01:11"),
+                TimestampSecondType::parse("2020-01-01T01:11:01"),
+                TimestampSecondType::parse("2020-01-11T01:01:01"),
+            ])
+            .with_timezone(tz),
+        ),
+        expected_max: Arc::new(
+            TimestampSecondArray::from(vec![
+                TimestampSecondType::parse("2020-01-02T01:01:01"),
+                TimestampSecondType::parse("2020-01-02T01:01:11"),
+                TimestampSecondType::parse("2020-01-02T01:11:01"),
+                TimestampSecondType::parse("2020-01-12T01:01:01"),
+            ])
+            .with_timezone(tz),
+        ),
+        // nulls are [1, 1, 1, 1]
+        expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
+        // row counts are [5, 5, 5, 5]
+        expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
+        column_name: "seconds_timezoned",
+    }
+    .run();
 }
 
 // timestamp with different row group sizes
 #[tokio::test]
 async fn test_timestamp_diff_rg_sizes() {
-    // This creates a parquet files of 5 columns named "nanos", "micros", 
"millis", "seconds", "names"
+    // This creates a parquet files of 9 columns named "nanos", 
"nanos_timezoned", "micros", "micros_timezoned", "millis", "millis_timezoned", 
"seconds", "seconds_timezoned", "names"
     // "nanos" --> TimestampNanosecondArray
+    // "nanos_timezoned" --> TimestampNanosecondArray
     // "micros" --> TimestampMicrosecondArray
+    // "micros_timezoned" --> TimestampMicrosecondArray
     // "millis" --> TimestampMillisecondArray
+    // "millis_timezoned" --> TimestampMillisecondArray
     // "seconds" --> TimestampSecondArray
+    // "seconds_timezoned" --> TimestampSecondArray
     // "names" --> StringArray
     //
     // The file is created by 4 record batches (each has a null row), each has 
5 rows but then will be split into 3 row groups with size 8, 8, 4
@@ -570,19 +699,19 @@ async fn test_timestamp_diff_rg_sizes() {
         row_per_group: 8, // note that the row group size is 8
     };
 
+    let tz = "Pacific/Efate";
+
     Test {
         reader: reader.build().await,
-        // mins are [1577840461000000000, 1577841061000000000, 
1578704521000000000]
-        expected_min: Arc::new(Int64Array::from(vec![
-            1577840461000000000,
-            1577841061000000000,
-            1578704521000000000,
+        expected_min: Arc::new(TimestampNanosecondArray::from(vec![
+            TimestampNanosecondType::parse("2020-01-01T01:01:01"),
+            TimestampNanosecondType::parse("2020-01-01T01:11:01"),
+            TimestampNanosecondType::parse("2020-01-11T01:02:01"),
         ])),
-        // maxes are [1577926861000000000, 1578704461000000000, 
157879086100000000]
-        expected_max: Arc::new(Int64Array::from(vec![
-            1577926861000000000,
-            1578704461000000000,
-            1578790861000000000,
+        expected_max: Arc::new(TimestampNanosecondArray::from(vec![
+            TimestampNanosecondType::parse("2020-01-02T01:01:01"),
+            TimestampNanosecondType::parse("2020-01-11T01:01:01"),
+            TimestampNanosecondType::parse("2020-01-12T01:01:01"),
         ])),
         // nulls are [1, 2, 1]
         expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
@@ -592,18 +721,44 @@ async fn test_timestamp_diff_rg_sizes() {
     }
     .run();
 
+    Test {
+        reader: reader.build().await,
+        expected_min: Arc::new(
+            TimestampNanosecondArray::from(vec![
+                TimestampNanosecondType::parse("2020-01-01T01:01:01"),
+                TimestampNanosecondType::parse("2020-01-01T01:11:01"),
+                TimestampNanosecondType::parse("2020-01-11T01:02:01"),
+            ])
+            .with_timezone(tz),
+        ),
+        expected_max: Arc::new(
+            TimestampNanosecondArray::from(vec![
+                TimestampNanosecondType::parse("2020-01-02T01:01:01"),
+                TimestampNanosecondType::parse("2020-01-11T01:01:01"),
+                TimestampNanosecondType::parse("2020-01-12T01:01:01"),
+            ])
+            .with_timezone(tz),
+        ),
+        // nulls are [1, 2, 1]
+        expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
+        // row counts are [8, 8, 4]
+        expected_row_counts: UInt64Array::from(vec![8, 8, 4]),
+        column_name: "nanos_timezoned",
+    }
+    .run();
+
     // micros
     Test {
         reader: reader.build().await,
-        expected_min: Arc::new(Int64Array::from(vec![
-            1577840461000000,
-            1577841061000000,
-            1578704521000000,
+        expected_min: Arc::new(TimestampMicrosecondArray::from(vec![
+            TimestampMicrosecondType::parse("2020-01-01T01:01:01"),
+            TimestampMicrosecondType::parse("2020-01-01T01:11:01"),
+            TimestampMicrosecondType::parse("2020-01-11T01:02:01"),
         ])),
-        expected_max: Arc::new(Int64Array::from(vec![
-            1577926861000000,
-            1578704461000000,
-            1578790861000000,
+        expected_max: Arc::new(TimestampMicrosecondArray::from(vec![
+            TimestampMicrosecondType::parse("2020-01-02T01:01:01"),
+            TimestampMicrosecondType::parse("2020-01-11T01:01:01"),
+            TimestampMicrosecondType::parse("2020-01-12T01:01:01"),
         ])),
         expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
         expected_row_counts: UInt64Array::from(vec![8, 8, 4]),
@@ -611,18 +766,44 @@ async fn test_timestamp_diff_rg_sizes() {
     }
     .run();
 
+    Test {
+        reader: reader.build().await,
+        expected_min: Arc::new(
+            TimestampMicrosecondArray::from(vec![
+                TimestampMicrosecondType::parse("2020-01-01T01:01:01"),
+                TimestampMicrosecondType::parse("2020-01-01T01:11:01"),
+                TimestampMicrosecondType::parse("2020-01-11T01:02:01"),
+            ])
+            .with_timezone(tz),
+        ),
+        expected_max: Arc::new(
+            TimestampMicrosecondArray::from(vec![
+                TimestampMicrosecondType::parse("2020-01-02T01:01:01"),
+                TimestampMicrosecondType::parse("2020-01-11T01:01:01"),
+                TimestampMicrosecondType::parse("2020-01-12T01:01:01"),
+            ])
+            .with_timezone(tz),
+        ),
+        // nulls are [1, 2, 1]
+        expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
+        // row counts are [8, 8, 4]
+        expected_row_counts: UInt64Array::from(vec![8, 8, 4]),
+        column_name: "micros_timezoned",
+    }
+    .run();
+
     // millis
     Test {
         reader: reader.build().await,
-        expected_min: Arc::new(Int64Array::from(vec![
-            1577840461000,
-            1577841061000,
-            1578704521000,
+        expected_min: Arc::new(TimestampMillisecondArray::from(vec![
+            TimestampMillisecondType::parse("2020-01-01T01:01:01"),
+            TimestampMillisecondType::parse("2020-01-01T01:11:01"),
+            TimestampMillisecondType::parse("2020-01-11T01:02:01"),
         ])),
-        expected_max: Arc::new(Int64Array::from(vec![
-            1577926861000,
-            1578704461000,
-            1578790861000,
+        expected_max: Arc::new(TimestampMillisecondArray::from(vec![
+            TimestampMillisecondType::parse("2020-01-02T01:01:01"),
+            TimestampMillisecondType::parse("2020-01-11T01:01:01"),
+            TimestampMillisecondType::parse("2020-01-12T01:01:01"),
         ])),
         expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
         expected_row_counts: UInt64Array::from(vec![8, 8, 4]),
@@ -630,20 +811,76 @@ async fn test_timestamp_diff_rg_sizes() {
     }
     .run();
 
+    Test {
+        reader: reader.build().await,
+        expected_min: Arc::new(
+            TimestampMillisecondArray::from(vec![
+                TimestampMillisecondType::parse("2020-01-01T01:01:01"),
+                TimestampMillisecondType::parse("2020-01-01T01:11:01"),
+                TimestampMillisecondType::parse("2020-01-11T01:02:01"),
+            ])
+            .with_timezone(tz),
+        ),
+        expected_max: Arc::new(
+            TimestampMillisecondArray::from(vec![
+                TimestampMillisecondType::parse("2020-01-02T01:01:01"),
+                TimestampMillisecondType::parse("2020-01-11T01:01:01"),
+                TimestampMillisecondType::parse("2020-01-12T01:01:01"),
+            ])
+            .with_timezone(tz),
+        ),
+        // nulls are [1, 2, 1]
+        expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
+        // row counts are [8, 8, 4]
+        expected_row_counts: UInt64Array::from(vec![8, 8, 4]),
+        column_name: "millis_timezoned",
+    }
+    .run();
+
     // seconds
     Test {
         reader: reader.build().await,
-        expected_min: Arc::new(Int64Array::from(vec![
-            1577840461, 1577841061, 1578704521,
+        expected_min: Arc::new(TimestampSecondArray::from(vec![
+            TimestampSecondType::parse("2020-01-01T01:01:01"),
+            TimestampSecondType::parse("2020-01-01T01:11:01"),
+            TimestampSecondType::parse("2020-01-11T01:02:01"),
         ])),
-        expected_max: Arc::new(Int64Array::from(vec![
-            1577926861, 1578704461, 1578790861,
+        expected_max: Arc::new(TimestampSecondArray::from(vec![
+            TimestampSecondType::parse("2020-01-02T01:01:01"),
+            TimestampSecondType::parse("2020-01-11T01:01:01"),
+            TimestampSecondType::parse("2020-01-12T01:01:01"),
         ])),
         expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
         expected_row_counts: UInt64Array::from(vec![8, 8, 4]),
         column_name: "seconds",
     }
     .run();
+
+    Test {
+        reader: reader.build().await,
+        expected_min: Arc::new(
+            TimestampSecondArray::from(vec![
+                TimestampSecondType::parse("2020-01-01T01:01:01"),
+                TimestampSecondType::parse("2020-01-01T01:11:01"),
+                TimestampSecondType::parse("2020-01-11T01:02:01"),
+            ])
+            .with_timezone(tz),
+        ),
+        expected_max: Arc::new(
+            TimestampSecondArray::from(vec![
+                TimestampSecondType::parse("2020-01-02T01:01:01"),
+                TimestampSecondType::parse("2020-01-11T01:01:01"),
+                TimestampSecondType::parse("2020-01-12T01:01:01"),
+            ])
+            .with_timezone(tz),
+        ),
+        // nulls are [1, 2, 1]
+        expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
+        // row counts are [8, 8, 4]
+        expected_row_counts: UInt64Array::from(vec![8, 8, 4]),
+        column_name: "seconds_timezoned",
+    }
+    .run();
 }
 
 // date with different row group sizes
diff --git a/datafusion/core/tests/parquet/mod.rs 
b/datafusion/core/tests/parquet/mod.rs
index 94ae9ff601..41a0a86aa8 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -332,9 +332,13 @@ fn make_boolean_batch(v: Vec<Option<bool>>) -> RecordBatch 
{
 ///
 /// Columns are named:
 /// "nanos" --> TimestampNanosecondArray
+/// "nanos_timezoned" --> TimestampNanosecondArray with timezone
 /// "micros" --> TimestampMicrosecondArray
+/// "micros_timezoned" --> TimestampMicrosecondArray with timezone
 /// "millis" --> TimestampMillisecondArray
+/// "millis_timezoned" --> TimestampMillisecondArray with timezone
 /// "seconds" --> TimestampSecondArray
+/// "seconds_timezoned" --> TimestampSecondArray with timezone
 /// "names" --> StringArray
 fn make_timestamp_batch(offset: Duration) -> RecordBatch {
     let ts_strings = vec![
@@ -345,6 +349,8 @@ fn make_timestamp_batch(offset: Duration) -> RecordBatch {
         Some("2020-01-02T01:01:01.0000000000001"),
     ];
 
+    let tz_string = "Pacific/Efate";
+
     let offset_nanos = offset.num_nanoseconds().expect("non overflow nanos");
 
     let ts_nanos = ts_strings
@@ -382,19 +388,47 @@ fn make_timestamp_batch(offset: Duration) -> RecordBatch {
         .map(|(i, _)| format!("Row {i} + {offset}"))
         .collect::<Vec<_>>();
 
-    let arr_nanos = TimestampNanosecondArray::from(ts_nanos);
-    let arr_micros = TimestampMicrosecondArray::from(ts_micros);
-    let arr_millis = TimestampMillisecondArray::from(ts_millis);
-    let arr_seconds = TimestampSecondArray::from(ts_seconds);
+    let arr_nanos = TimestampNanosecondArray::from(ts_nanos.clone());
+    let arr_nanos_timezoned =
+        TimestampNanosecondArray::from(ts_nanos).with_timezone(tz_string);
+    let arr_micros = TimestampMicrosecondArray::from(ts_micros.clone());
+    let arr_micros_timezoned =
+        TimestampMicrosecondArray::from(ts_micros).with_timezone(tz_string);
+    let arr_millis = TimestampMillisecondArray::from(ts_millis.clone());
+    let arr_millis_timezoned =
+        TimestampMillisecondArray::from(ts_millis).with_timezone(tz_string);
+    let arr_seconds = TimestampSecondArray::from(ts_seconds.clone());
+    let arr_seconds_timezoned =
+        TimestampSecondArray::from(ts_seconds).with_timezone(tz_string);
 
     let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
     let arr_names = StringArray::from(names);
 
     let schema = Schema::new(vec![
         Field::new("nanos", arr_nanos.data_type().clone(), true),
+        Field::new(
+            "nanos_timezoned",
+            arr_nanos_timezoned.data_type().clone(),
+            true,
+        ),
         Field::new("micros", arr_micros.data_type().clone(), true),
+        Field::new(
+            "micros_timezoned",
+            arr_micros_timezoned.data_type().clone(),
+            true,
+        ),
         Field::new("millis", arr_millis.data_type().clone(), true),
+        Field::new(
+            "millis_timezoned",
+            arr_millis_timezoned.data_type().clone(),
+            true,
+        ),
         Field::new("seconds", arr_seconds.data_type().clone(), true),
+        Field::new(
+            "seconds_timezoned",
+            arr_seconds_timezoned.data_type().clone(),
+            true,
+        ),
         Field::new("name", arr_names.data_type().clone(), true),
     ]);
     let schema = Arc::new(schema);
@@ -403,9 +437,13 @@ fn make_timestamp_batch(offset: Duration) -> RecordBatch {
         schema,
         vec![
             Arc::new(arr_nanos),
+            Arc::new(arr_nanos_timezoned),
             Arc::new(arr_micros),
+            Arc::new(arr_micros_timezoned),
             Arc::new(arr_millis),
+            Arc::new(arr_millis_timezoned),
             Arc::new(arr_seconds),
+            Arc::new(arr_seconds_timezoned),
             Arc::new(arr_names),
         ],
     )


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to