This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6e5344ae36 Extract parquet statistics from timestamps with timezones
(#10766)
6e5344ae36 is described below
commit 6e5344ae367001dbf70fa2882c2e89eca4a2bbd8
Author: Xin Li <[email protected]>
AuthorDate: Mon Jun 3 23:25:16 2024 +0800
Extract parquet statistics from timestamps with timezones (#10766)
* Fix incorrect statistics read for timestamp columns in parquet
---
.../datasource/physical_plan/parquet/statistics.rs | 285 +++++++++++++--
datafusion/core/tests/parquet/arrow_statistics.rs | 383 +++++++++++++++++----
datafusion/core/tests/parquet/mod.rs | 46 ++-
3 files changed, 615 insertions(+), 99 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
index ae8395aef6..1c20fa7caa 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
@@ -19,7 +19,7 @@
// TODO: potentially move this to arrow-rs:
https://github.com/apache/arrow-rs/issues/4328
-use arrow::{array::ArrayRef, datatypes::DataType};
+use arrow::{array::ArrayRef, datatypes::DataType, datatypes::TimeUnit};
use arrow_array::{new_empty_array, new_null_array, UInt64Array};
use arrow_schema::{Field, FieldRef, Schema};
use datafusion_common::{
@@ -112,6 +112,26 @@ macro_rules! get_statistic {
Some(DataType::UInt64) => {
Some(ScalarValue::UInt64(Some((*s.$func()) as u64)))
}
+ Some(DataType::Timestamp(unit, timezone)) => {
+ Some(match unit {
+ TimeUnit::Second => ScalarValue::TimestampSecond(
+ Some(*s.$func()),
+ timezone.clone(),
+ ),
+ TimeUnit::Millisecond =>
ScalarValue::TimestampMillisecond(
+ Some(*s.$func()),
+ timezone.clone(),
+ ),
+ TimeUnit::Microsecond =>
ScalarValue::TimestampMicrosecond(
+ Some(*s.$func()),
+ timezone.clone(),
+ ),
+ TimeUnit::Nanosecond =>
ScalarValue::TimestampNanosecond(
+ Some(*s.$func()),
+ timezone.clone(),
+ ),
+ })
+ }
_ => Some(ScalarValue::Int64(Some(*s.$func()))),
}
}
@@ -395,7 +415,8 @@ mod test {
use arrow_array::{
new_null_array, Array, BinaryArray, BooleanArray, Date32Array,
Date64Array,
Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array,
Int64Array,
- Int8Array, RecordBatch, StringArray, StructArray,
TimestampNanosecondArray,
+ Int8Array, RecordBatch, StringArray, StructArray,
TimestampMicrosecondArray,
+ TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use arrow_schema::{Field, SchemaRef};
use bytes::Bytes;
@@ -536,28 +557,209 @@ mod test {
}
#[test]
- #[should_panic(
- expected = "Inconsistent types in ScalarValue::iter_to_array. Expected
Int64, got TimestampNanosecond(NULL, None)"
- )]
- // Due to https://github.com/apache/datafusion/issues/8295
fn roundtrip_timestamp() {
Test {
- input: timestamp_array([
- // row group 1
- Some(1),
- None,
- Some(3),
- // row group 2
- Some(9),
- Some(5),
+ input: timestamp_seconds_array(
+ [
+ // row group 1
+ Some(1),
+ None,
+ Some(3),
+ // row group 2
+ Some(9),
+ Some(5),
+ None,
+ // row group 3
+ None,
+ None,
+ None,
+ ],
None,
- // row group 3
+ ),
+ expected_min: timestamp_seconds_array([Some(1), Some(5), None],
None),
+ expected_max: timestamp_seconds_array([Some(3), Some(9), None],
None),
+ }
+ .run();
+
+ Test {
+ input: timestamp_milliseconds_array(
+ [
+ // row group 1
+ Some(1),
+ None,
+ Some(3),
+ // row group 2
+ Some(9),
+ Some(5),
+ None,
+ // row group 3
+ None,
+ None,
+ None,
+ ],
None,
+ ),
+ expected_min: timestamp_milliseconds_array([Some(1), Some(5),
None], None),
+ expected_max: timestamp_milliseconds_array([Some(3), Some(9),
None], None),
+ }
+ .run();
+
+ Test {
+ input: timestamp_microseconds_array(
+ [
+ // row group 1
+ Some(1),
+ None,
+ Some(3),
+ // row group 2
+ Some(9),
+ Some(5),
+ None,
+ // row group 3
+ None,
+ None,
+ None,
+ ],
None,
+ ),
+ expected_min: timestamp_microseconds_array([Some(1), Some(5),
None], None),
+ expected_max: timestamp_microseconds_array([Some(3), Some(9),
None], None),
+ }
+ .run();
+
+ Test {
+ input: timestamp_nanoseconds_array(
+ [
+ // row group 1
+ Some(1),
+ None,
+ Some(3),
+ // row group 2
+ Some(9),
+ Some(5),
+ None,
+ // row group 3
+ None,
+ None,
+ None,
+ ],
None,
- ]),
- expected_min: timestamp_array([Some(1), Some(5), None]),
- expected_max: timestamp_array([Some(3), Some(9), None]),
+ ),
+ expected_min: timestamp_nanoseconds_array([Some(1), Some(5),
None], None),
+ expected_max: timestamp_nanoseconds_array([Some(3), Some(9),
None], None),
+ }
+ .run()
+ }
+
+ #[test]
+ fn roundtrip_timestamp_timezoned() {
+ Test {
+ input: timestamp_seconds_array(
+ [
+ // row group 1
+ Some(1),
+ None,
+ Some(3),
+ // row group 2
+ Some(9),
+ Some(5),
+ None,
+ // row group 3
+ None,
+ None,
+ None,
+ ],
+ Some("UTC"),
+ ),
+ expected_min: timestamp_seconds_array([Some(1), Some(5), None],
Some("UTC")),
+ expected_max: timestamp_seconds_array([Some(3), Some(9), None],
Some("UTC")),
+ }
+ .run();
+
+ Test {
+ input: timestamp_milliseconds_array(
+ [
+ // row group 1
+ Some(1),
+ None,
+ Some(3),
+ // row group 2
+ Some(9),
+ Some(5),
+ None,
+ // row group 3
+ None,
+ None,
+ None,
+ ],
+ Some("UTC"),
+ ),
+ expected_min: timestamp_milliseconds_array(
+ [Some(1), Some(5), None],
+ Some("UTC"),
+ ),
+ expected_max: timestamp_milliseconds_array(
+ [Some(3), Some(9), None],
+ Some("UTC"),
+ ),
+ }
+ .run();
+
+ Test {
+ input: timestamp_microseconds_array(
+ [
+ // row group 1
+ Some(1),
+ None,
+ Some(3),
+ // row group 2
+ Some(9),
+ Some(5),
+ None,
+ // row group 3
+ None,
+ None,
+ None,
+ ],
+ Some("UTC"),
+ ),
+ expected_min: timestamp_microseconds_array(
+ [Some(1), Some(5), None],
+ Some("UTC"),
+ ),
+ expected_max: timestamp_microseconds_array(
+ [Some(3), Some(9), None],
+ Some("UTC"),
+ ),
+ }
+ .run();
+
+ Test {
+ input: timestamp_nanoseconds_array(
+ [
+ // row group 1
+ Some(1),
+ None,
+ Some(3),
+ // row group 2
+ Some(9),
+ Some(5),
+ None,
+ // row group 3
+ None,
+ None,
+ None,
+ ],
+ Some("UTC"),
+ ),
+ expected_min: timestamp_nanoseconds_array(
+ [Some(1), Some(5), None],
+ Some("UTC"),
+ ),
+ expected_max: timestamp_nanoseconds_array(
+ [Some(3), Some(9), None],
+ Some("UTC"),
+ ),
}
.run()
}
@@ -914,8 +1116,8 @@ mod test {
// File has no min/max for timestamp_col
.with_column(ExpectedColumn {
name: "timestamp_col",
- expected_min: timestamp_array([None]),
- expected_max: timestamp_array([None]),
+ expected_min: timestamp_nanoseconds_array([None], None),
+ expected_max: timestamp_nanoseconds_array([None], None),
})
.with_column(ExpectedColumn {
name: "year",
@@ -1135,9 +1337,48 @@ mod test {
Arc::new(array)
}
- fn timestamp_array(input: impl IntoIterator<Item = Option<i64>>) ->
ArrayRef {
+ fn timestamp_seconds_array(
+ input: impl IntoIterator<Item = Option<i64>>,
+ timzezone: Option<&str>,
+ ) -> ArrayRef {
+ let array: TimestampSecondArray = input.into_iter().collect();
+ match timzezone {
+ Some(tz) => Arc::new(array.with_timezone(tz)),
+ None => Arc::new(array),
+ }
+ }
+
+ fn timestamp_milliseconds_array(
+ input: impl IntoIterator<Item = Option<i64>>,
+ timzezone: Option<&str>,
+ ) -> ArrayRef {
+ let array: TimestampMillisecondArray = input.into_iter().collect();
+ match timzezone {
+ Some(tz) => Arc::new(array.with_timezone(tz)),
+ None => Arc::new(array),
+ }
+ }
+
+ fn timestamp_microseconds_array(
+ input: impl IntoIterator<Item = Option<i64>>,
+ timzezone: Option<&str>,
+ ) -> ArrayRef {
+ let array: TimestampMicrosecondArray = input.into_iter().collect();
+ match timzezone {
+ Some(tz) => Arc::new(array.with_timezone(tz)),
+ None => Arc::new(array),
+ }
+ }
+
+ fn timestamp_nanoseconds_array(
+ input: impl IntoIterator<Item = Option<i64>>,
+ timzezone: Option<&str>,
+ ) -> ArrayRef {
let array: TimestampNanosecondArray = input.into_iter().collect();
- Arc::new(array)
+ match timzezone {
+ Some(tz) => Arc::new(array.with_timezone(tz)),
+ None => Arc::new(array),
+ }
}
fn utf8_array<'a>(input: impl IntoIterator<Item = Option<&'a str>>) ->
ArrayRef {
diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs
b/datafusion/core/tests/parquet/arrow_statistics.rs
index eebf3447cb..2836cd2893 100644
--- a/datafusion/core/tests/parquet/arrow_statistics.rs
+++ b/datafusion/core/tests/parquet/arrow_statistics.rs
@@ -22,12 +22,16 @@ use std::fs::File;
use std::sync::Arc;
use arrow::compute::kernels::cast_utils::Parser;
-use arrow::datatypes::{Date32Type, Date64Type};
+use arrow::datatypes::{
+ Date32Type, Date64Type, TimestampMicrosecondType, TimestampMillisecondType,
+ TimestampNanosecondType, TimestampSecondType,
+};
use arrow_array::{
make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array,
Date64Array,
Decimal128Array, FixedSizeBinaryArray, Float32Array, Float64Array,
Int16Array,
- Int32Array, Int64Array, Int8Array, RecordBatch, StringArray, UInt16Array,
- UInt32Array, UInt64Array, UInt8Array,
+ Int32Array, Int64Array, Int8Array, RecordBatch, StringArray,
+ TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray,
+ TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow_schema::{DataType, Field, Schema};
use datafusion::datasource::physical_plan::parquet::{
@@ -456,36 +460,40 @@ async fn test_int_8() {
// timestamp
#[tokio::test]
async fn test_timestamp() {
- // This creates a parquet files of 5 columns named "nanos", "micros",
"millis", "seconds", "names"
+ // This creates a parquet files of 9 columns named "nanos",
"nanos_timezoned", "micros", "micros_timezoned", "millis", "millis_timezoned",
"seconds", "seconds_timezoned", "names"
// "nanos" --> TimestampNanosecondArray
+ // "nanos_timezoned" --> TimestampNanosecondArray
// "micros" --> TimestampMicrosecondArray
+ // "micros_timezoned" --> TimestampMicrosecondArray
// "millis" --> TimestampMillisecondArray
+ // "millis_timezoned" --> TimestampMillisecondArray
// "seconds" --> TimestampSecondArray
+ // "seconds_timezoned" --> TimestampSecondArray
// "names" --> StringArray
//
// The file is created by 4 record batches, each has 5 rowws.
// Since the row group isze is set to 5, those 4 batches will go into 4
row groups
- // This creates a parquet files of 4 columns named "i8", "i16", "i32",
"i64"
+ // This creates a parquet files of 4 columns named "nanos",
"nanos_timezoned", "micros", "micros_timezoned", "millis", "millis_timezoned",
"seconds", "seconds_timezoned"
let reader = TestReader {
scenario: Scenario::Timestamps,
row_per_group: 5,
};
+ let tz = "Pacific/Efate";
+
Test {
reader: reader.build().await,
- // mins are [1577840461000000000, 1577840471000000000,
1577841061000000000, 1578704461000000000,]
- expected_min: Arc::new(Int64Array::from(vec![
- 1577840461000000000,
- 1577840471000000000,
- 1577841061000000000,
- 1578704461000000000,
+ expected_min: Arc::new(TimestampNanosecondArray::from(vec![
+ TimestampNanosecondType::parse("2020-01-01T01:01:01"),
+ TimestampNanosecondType::parse("2020-01-01T01:01:11"),
+ TimestampNanosecondType::parse("2020-01-01T01:11:01"),
+ TimestampNanosecondType::parse("2020-01-11T01:01:01"),
])),
- // maxes are [1577926861000000000, 1577926871000000000,
1577927461000000000, 1578790861000000000,]
- expected_max: Arc::new(Int64Array::from(vec![
- 1577926861000000000,
- 1577926871000000000,
- 1577927461000000000,
- 1578790861000000000,
+ expected_max: Arc::new(TimestampNanosecondArray::from(vec![
+ TimestampNanosecondType::parse("2020-01-02T01:01:01"),
+ TimestampNanosecondType::parse("2020-01-02T01:01:11"),
+ TimestampNanosecondType::parse("2020-01-02T01:11:01"),
+ TimestampNanosecondType::parse("2020-01-12T01:01:01"),
])),
// nulls are [1, 1, 1, 1]
expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
@@ -495,21 +503,48 @@ async fn test_timestamp() {
}
.run();
- // micros
+ Test {
+ reader: reader.build().await,
+ expected_min: Arc::new(
+ TimestampNanosecondArray::from(vec![
+ TimestampNanosecondType::parse("2020-01-01T01:01:01"),
+ TimestampNanosecondType::parse("2020-01-01T01:01:11"),
+ TimestampNanosecondType::parse("2020-01-01T01:11:01"),
+ TimestampNanosecondType::parse("2020-01-11T01:01:01"),
+ ])
+ .with_timezone(tz),
+ ),
+ expected_max: Arc::new(
+ TimestampNanosecondArray::from(vec![
+ TimestampNanosecondType::parse("2020-01-02T01:01:01"),
+ TimestampNanosecondType::parse("2020-01-02T01:01:11"),
+ TimestampNanosecondType::parse("2020-01-02T01:11:01"),
+ TimestampNanosecondType::parse("2020-01-12T01:01:01"),
+ ])
+ .with_timezone(tz),
+ ),
+ // nulls are [1, 1, 1, 1]
+ expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
+ // row counts are [5, 5, 5, 5]
+ expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
+ column_name: "nanos_timezoned",
+ }
+ .run();
+ // micros
Test {
reader: reader.build().await,
- expected_min: Arc::new(Int64Array::from(vec![
- 1577840461000000,
- 1577840471000000,
- 1577841061000000,
- 1578704461000000,
+ expected_min: Arc::new(TimestampMicrosecondArray::from(vec![
+ TimestampMicrosecondType::parse("2020-01-01T01:01:01"),
+ TimestampMicrosecondType::parse("2020-01-01T01:01:11"),
+ TimestampMicrosecondType::parse("2020-01-01T01:11:01"),
+ TimestampMicrosecondType::parse("2020-01-11T01:01:01"),
])),
- expected_max: Arc::new(Int64Array::from(vec![
- 1577926861000000,
- 1577926871000000,
- 1577927461000000,
- 1578790861000000,
+ expected_max: Arc::new(TimestampMicrosecondArray::from(vec![
+ TimestampMicrosecondType::parse("2020-01-02T01:01:01"),
+ TimestampMicrosecondType::parse("2020-01-02T01:01:11"),
+ TimestampMicrosecondType::parse("2020-01-02T01:11:01"),
+ TimestampMicrosecondType::parse("2020-01-12T01:01:01"),
])),
expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
@@ -517,20 +552,48 @@ async fn test_timestamp() {
}
.run();
+ Test {
+ reader: reader.build().await,
+ expected_min: Arc::new(
+ TimestampMicrosecondArray::from(vec![
+ TimestampMicrosecondType::parse("2020-01-01T01:01:01"),
+ TimestampMicrosecondType::parse("2020-01-01T01:01:11"),
+ TimestampMicrosecondType::parse("2020-01-01T01:11:01"),
+ TimestampMicrosecondType::parse("2020-01-11T01:01:01"),
+ ])
+ .with_timezone(tz),
+ ),
+ expected_max: Arc::new(
+ TimestampMicrosecondArray::from(vec![
+ TimestampMicrosecondType::parse("2020-01-02T01:01:01"),
+ TimestampMicrosecondType::parse("2020-01-02T01:01:11"),
+ TimestampMicrosecondType::parse("2020-01-02T01:11:01"),
+ TimestampMicrosecondType::parse("2020-01-12T01:01:01"),
+ ])
+ .with_timezone(tz),
+ ),
+ // nulls are [1, 1, 1, 1]
+ expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
+ // row counts are [5, 5, 5, 5]
+ expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
+ column_name: "micros_timezoned",
+ }
+ .run();
+
// millis
Test {
reader: reader.build().await,
- expected_min: Arc::new(Int64Array::from(vec![
- 1577840461000,
- 1577840471000,
- 1577841061000,
- 1578704461000,
+ expected_min: Arc::new(TimestampMillisecondArray::from(vec![
+ TimestampMillisecondType::parse("2020-01-01T01:01:01"),
+ TimestampMillisecondType::parse("2020-01-01T01:01:11"),
+ TimestampMillisecondType::parse("2020-01-01T01:11:01"),
+ TimestampMillisecondType::parse("2020-01-11T01:01:01"),
])),
- expected_max: Arc::new(Int64Array::from(vec![
- 1577926861000,
- 1577926871000,
- 1577927461000,
- 1578790861000,
+ expected_max: Arc::new(TimestampMillisecondArray::from(vec![
+ TimestampMillisecondType::parse("2020-01-02T01:01:01"),
+ TimestampMillisecondType::parse("2020-01-02T01:01:11"),
+ TimestampMillisecondType::parse("2020-01-02T01:11:01"),
+ TimestampMillisecondType::parse("2020-01-12T01:01:01"),
])),
expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
@@ -538,30 +601,96 @@ async fn test_timestamp() {
}
.run();
+ Test {
+ reader: reader.build().await,
+ expected_min: Arc::new(
+ TimestampMillisecondArray::from(vec![
+ TimestampMillisecondType::parse("2020-01-01T01:01:01"),
+ TimestampMillisecondType::parse("2020-01-01T01:01:11"),
+ TimestampMillisecondType::parse("2020-01-01T01:11:01"),
+ TimestampMillisecondType::parse("2020-01-11T01:01:01"),
+ ])
+ .with_timezone(tz),
+ ),
+ expected_max: Arc::new(
+ TimestampMillisecondArray::from(vec![
+ TimestampMillisecondType::parse("2020-01-02T01:01:01"),
+ TimestampMillisecondType::parse("2020-01-02T01:01:11"),
+ TimestampMillisecondType::parse("2020-01-02T01:11:01"),
+ TimestampMillisecondType::parse("2020-01-12T01:01:01"),
+ ])
+ .with_timezone(tz),
+ ),
+ // nulls are [1, 1, 1, 1]
+ expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
+ // row counts are [5, 5, 5, 5]
+ expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
+ column_name: "millis_timezoned",
+ }
+ .run();
+
// seconds
Test {
reader: reader.build().await,
- expected_min: Arc::new(Int64Array::from(vec![
- 1577840461, 1577840471, 1577841061, 1578704461,
+ expected_min: Arc::new(TimestampSecondArray::from(vec![
+ TimestampSecondType::parse("2020-01-01T01:01:01"),
+ TimestampSecondType::parse("2020-01-01T01:01:11"),
+ TimestampSecondType::parse("2020-01-01T01:11:01"),
+ TimestampSecondType::parse("2020-01-11T01:01:01"),
])),
- expected_max: Arc::new(Int64Array::from(vec![
- 1577926861, 1577926871, 1577927461, 1578790861,
+ expected_max: Arc::new(TimestampSecondArray::from(vec![
+ TimestampSecondType::parse("2020-01-02T01:01:01"),
+ TimestampSecondType::parse("2020-01-02T01:01:11"),
+ TimestampSecondType::parse("2020-01-02T01:11:01"),
+ TimestampSecondType::parse("2020-01-12T01:01:01"),
])),
expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
column_name: "seconds",
}
.run();
+
+ Test {
+ reader: reader.build().await,
+ expected_min: Arc::new(
+ TimestampSecondArray::from(vec![
+ TimestampSecondType::parse("2020-01-01T01:01:01"),
+ TimestampSecondType::parse("2020-01-01T01:01:11"),
+ TimestampSecondType::parse("2020-01-01T01:11:01"),
+ TimestampSecondType::parse("2020-01-11T01:01:01"),
+ ])
+ .with_timezone(tz),
+ ),
+ expected_max: Arc::new(
+ TimestampSecondArray::from(vec![
+ TimestampSecondType::parse("2020-01-02T01:01:01"),
+ TimestampSecondType::parse("2020-01-02T01:01:11"),
+ TimestampSecondType::parse("2020-01-02T01:11:01"),
+ TimestampSecondType::parse("2020-01-12T01:01:01"),
+ ])
+ .with_timezone(tz),
+ ),
+ // nulls are [1, 1, 1, 1]
+ expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]),
+ // row counts are [5, 5, 5, 5]
+ expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
+ column_name: "seconds_timezoned",
+ }
+ .run();
}
// timestamp with different row group sizes
#[tokio::test]
async fn test_timestamp_diff_rg_sizes() {
- // This creates a parquet files of 5 columns named "nanos", "micros",
"millis", "seconds", "names"
+ // This creates a parquet files of 9 columns named "nanos",
"nanos_timezoned", "micros", "micros_timezoned", "millis", "millis_timezoned",
"seconds", "seconds_timezoned", "names"
// "nanos" --> TimestampNanosecondArray
+ // "nanos_timezoned" --> TimestampNanosecondArray
// "micros" --> TimestampMicrosecondArray
+ // "micros_timezoned" --> TimestampMicrosecondArray
// "millis" --> TimestampMillisecondArray
+ // "millis_timezoned" --> TimestampMillisecondArray
// "seconds" --> TimestampSecondArray
+ // "seconds_timezoned" --> TimestampSecondArray
// "names" --> StringArray
//
// The file is created by 4 record batches (each has a null row), each has
5 rows but then will be split into 3 row groups with size 8, 8, 4
@@ -570,19 +699,19 @@ async fn test_timestamp_diff_rg_sizes() {
row_per_group: 8, // note that the row group size is 8
};
+ let tz = "Pacific/Efate";
+
Test {
reader: reader.build().await,
- // mins are [1577840461000000000, 1577841061000000000,
1578704521000000000]
- expected_min: Arc::new(Int64Array::from(vec![
- 1577840461000000000,
- 1577841061000000000,
- 1578704521000000000,
+ expected_min: Arc::new(TimestampNanosecondArray::from(vec![
+ TimestampNanosecondType::parse("2020-01-01T01:01:01"),
+ TimestampNanosecondType::parse("2020-01-01T01:11:01"),
+ TimestampNanosecondType::parse("2020-01-11T01:02:01"),
])),
- // maxes are [1577926861000000000, 1578704461000000000,
157879086100000000]
- expected_max: Arc::new(Int64Array::from(vec![
- 1577926861000000000,
- 1578704461000000000,
- 1578790861000000000,
+ expected_max: Arc::new(TimestampNanosecondArray::from(vec![
+ TimestampNanosecondType::parse("2020-01-02T01:01:01"),
+ TimestampNanosecondType::parse("2020-01-11T01:01:01"),
+ TimestampNanosecondType::parse("2020-01-12T01:01:01"),
])),
// nulls are [1, 2, 1]
expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
@@ -592,18 +721,44 @@ async fn test_timestamp_diff_rg_sizes() {
}
.run();
+ Test {
+ reader: reader.build().await,
+ expected_min: Arc::new(
+ TimestampNanosecondArray::from(vec![
+ TimestampNanosecondType::parse("2020-01-01T01:01:01"),
+ TimestampNanosecondType::parse("2020-01-01T01:11:01"),
+ TimestampNanosecondType::parse("2020-01-11T01:02:01"),
+ ])
+ .with_timezone(tz),
+ ),
+ expected_max: Arc::new(
+ TimestampNanosecondArray::from(vec![
+ TimestampNanosecondType::parse("2020-01-02T01:01:01"),
+ TimestampNanosecondType::parse("2020-01-11T01:01:01"),
+ TimestampNanosecondType::parse("2020-01-12T01:01:01"),
+ ])
+ .with_timezone(tz),
+ ),
+ // nulls are [1, 2, 1]
+ expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
+ // row counts are [8, 8, 4]
+ expected_row_counts: UInt64Array::from(vec![8, 8, 4]),
+ column_name: "nanos_timezoned",
+ }
+ .run();
+
// micros
Test {
reader: reader.build().await,
- expected_min: Arc::new(Int64Array::from(vec![
- 1577840461000000,
- 1577841061000000,
- 1578704521000000,
+ expected_min: Arc::new(TimestampMicrosecondArray::from(vec![
+ TimestampMicrosecondType::parse("2020-01-01T01:01:01"),
+ TimestampMicrosecondType::parse("2020-01-01T01:11:01"),
+ TimestampMicrosecondType::parse("2020-01-11T01:02:01"),
])),
- expected_max: Arc::new(Int64Array::from(vec![
- 1577926861000000,
- 1578704461000000,
- 1578790861000000,
+ expected_max: Arc::new(TimestampMicrosecondArray::from(vec![
+ TimestampMicrosecondType::parse("2020-01-02T01:01:01"),
+ TimestampMicrosecondType::parse("2020-01-11T01:01:01"),
+ TimestampMicrosecondType::parse("2020-01-12T01:01:01"),
])),
expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
expected_row_counts: UInt64Array::from(vec![8, 8, 4]),
@@ -611,18 +766,44 @@ async fn test_timestamp_diff_rg_sizes() {
}
.run();
+ Test {
+ reader: reader.build().await,
+ expected_min: Arc::new(
+ TimestampMicrosecondArray::from(vec![
+ TimestampMicrosecondType::parse("2020-01-01T01:01:01"),
+ TimestampMicrosecondType::parse("2020-01-01T01:11:01"),
+ TimestampMicrosecondType::parse("2020-01-11T01:02:01"),
+ ])
+ .with_timezone(tz),
+ ),
+ expected_max: Arc::new(
+ TimestampMicrosecondArray::from(vec![
+ TimestampMicrosecondType::parse("2020-01-02T01:01:01"),
+ TimestampMicrosecondType::parse("2020-01-11T01:01:01"),
+ TimestampMicrosecondType::parse("2020-01-12T01:01:01"),
+ ])
+ .with_timezone(tz),
+ ),
+ // nulls are [1, 2, 1]
+ expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
+ // row counts are [8, 8, 4]
+ expected_row_counts: UInt64Array::from(vec![8, 8, 4]),
+ column_name: "micros_timezoned",
+ }
+ .run();
+
// millis
Test {
reader: reader.build().await,
- expected_min: Arc::new(Int64Array::from(vec![
- 1577840461000,
- 1577841061000,
- 1578704521000,
+ expected_min: Arc::new(TimestampMillisecondArray::from(vec![
+ TimestampMillisecondType::parse("2020-01-01T01:01:01"),
+ TimestampMillisecondType::parse("2020-01-01T01:11:01"),
+ TimestampMillisecondType::parse("2020-01-11T01:02:01"),
])),
- expected_max: Arc::new(Int64Array::from(vec![
- 1577926861000,
- 1578704461000,
- 1578790861000,
+ expected_max: Arc::new(TimestampMillisecondArray::from(vec![
+ TimestampMillisecondType::parse("2020-01-02T01:01:01"),
+ TimestampMillisecondType::parse("2020-01-11T01:01:01"),
+ TimestampMillisecondType::parse("2020-01-12T01:01:01"),
])),
expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
expected_row_counts: UInt64Array::from(vec![8, 8, 4]),
@@ -630,20 +811,76 @@ async fn test_timestamp_diff_rg_sizes() {
}
.run();
+ Test {
+ reader: reader.build().await,
+ expected_min: Arc::new(
+ TimestampMillisecondArray::from(vec![
+ TimestampMillisecondType::parse("2020-01-01T01:01:01"),
+ TimestampMillisecondType::parse("2020-01-01T01:11:01"),
+ TimestampMillisecondType::parse("2020-01-11T01:02:01"),
+ ])
+ .with_timezone(tz),
+ ),
+ expected_max: Arc::new(
+ TimestampMillisecondArray::from(vec![
+ TimestampMillisecondType::parse("2020-01-02T01:01:01"),
+ TimestampMillisecondType::parse("2020-01-11T01:01:01"),
+ TimestampMillisecondType::parse("2020-01-12T01:01:01"),
+ ])
+ .with_timezone(tz),
+ ),
+ // nulls are [1, 2, 1]
+ expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
+ // row counts are [8, 8, 4]
+ expected_row_counts: UInt64Array::from(vec![8, 8, 4]),
+ column_name: "millis_timezoned",
+ }
+ .run();
+
// seconds
Test {
reader: reader.build().await,
- expected_min: Arc::new(Int64Array::from(vec![
- 1577840461, 1577841061, 1578704521,
+ expected_min: Arc::new(TimestampSecondArray::from(vec![
+ TimestampSecondType::parse("2020-01-01T01:01:01"),
+ TimestampSecondType::parse("2020-01-01T01:11:01"),
+ TimestampSecondType::parse("2020-01-11T01:02:01"),
])),
- expected_max: Arc::new(Int64Array::from(vec![
- 1577926861, 1578704461, 1578790861,
+ expected_max: Arc::new(TimestampSecondArray::from(vec![
+ TimestampSecondType::parse("2020-01-02T01:01:01"),
+ TimestampSecondType::parse("2020-01-11T01:01:01"),
+ TimestampSecondType::parse("2020-01-12T01:01:01"),
])),
expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
expected_row_counts: UInt64Array::from(vec![8, 8, 4]),
column_name: "seconds",
}
.run();
+
+ Test {
+ reader: reader.build().await,
+ expected_min: Arc::new(
+ TimestampSecondArray::from(vec![
+ TimestampSecondType::parse("2020-01-01T01:01:01"),
+ TimestampSecondType::parse("2020-01-01T01:11:01"),
+ TimestampSecondType::parse("2020-01-11T01:02:01"),
+ ])
+ .with_timezone(tz),
+ ),
+ expected_max: Arc::new(
+ TimestampSecondArray::from(vec![
+ TimestampSecondType::parse("2020-01-02T01:01:01"),
+ TimestampSecondType::parse("2020-01-11T01:01:01"),
+ TimestampSecondType::parse("2020-01-12T01:01:01"),
+ ])
+ .with_timezone(tz),
+ ),
+ // nulls are [1, 2, 1]
+ expected_null_counts: UInt64Array::from(vec![1, 2, 1]),
+ // row counts are [8, 8, 4]
+ expected_row_counts: UInt64Array::from(vec![8, 8, 4]),
+ column_name: "seconds_timezoned",
+ }
+ .run();
}
// date with different row group sizes
diff --git a/datafusion/core/tests/parquet/mod.rs
b/datafusion/core/tests/parquet/mod.rs
index 94ae9ff601..41a0a86aa8 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -332,9 +332,13 @@ fn make_boolean_batch(v: Vec<Option<bool>>) -> RecordBatch
{
///
/// Columns are named:
/// "nanos" --> TimestampNanosecondArray
+/// "nanos_timezoned" --> TimestampNanosecondArray with timezone
/// "micros" --> TimestampMicrosecondArray
+/// "micros_timezoned" --> TimestampMicrosecondArray with timezone
/// "millis" --> TimestampMillisecondArray
+/// "millis_timezoned" --> TimestampMillisecondArray with timezone
/// "seconds" --> TimestampSecondArray
+/// "seconds_timezoned" --> TimestampSecondArray with timezone
/// "names" --> StringArray
fn make_timestamp_batch(offset: Duration) -> RecordBatch {
let ts_strings = vec![
@@ -345,6 +349,8 @@ fn make_timestamp_batch(offset: Duration) -> RecordBatch {
Some("2020-01-02T01:01:01.0000000000001"),
];
+ let tz_string = "Pacific/Efate";
+
let offset_nanos = offset.num_nanoseconds().expect("non overflow nanos");
let ts_nanos = ts_strings
@@ -382,19 +388,47 @@ fn make_timestamp_batch(offset: Duration) -> RecordBatch {
.map(|(i, _)| format!("Row {i} + {offset}"))
.collect::<Vec<_>>();
- let arr_nanos = TimestampNanosecondArray::from(ts_nanos);
- let arr_micros = TimestampMicrosecondArray::from(ts_micros);
- let arr_millis = TimestampMillisecondArray::from(ts_millis);
- let arr_seconds = TimestampSecondArray::from(ts_seconds);
+ let arr_nanos = TimestampNanosecondArray::from(ts_nanos.clone());
+ let arr_nanos_timezoned =
+ TimestampNanosecondArray::from(ts_nanos).with_timezone(tz_string);
+ let arr_micros = TimestampMicrosecondArray::from(ts_micros.clone());
+ let arr_micros_timezoned =
+ TimestampMicrosecondArray::from(ts_micros).with_timezone(tz_string);
+ let arr_millis = TimestampMillisecondArray::from(ts_millis.clone());
+ let arr_millis_timezoned =
+ TimestampMillisecondArray::from(ts_millis).with_timezone(tz_string);
+ let arr_seconds = TimestampSecondArray::from(ts_seconds.clone());
+ let arr_seconds_timezoned =
+ TimestampSecondArray::from(ts_seconds).with_timezone(tz_string);
let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
let arr_names = StringArray::from(names);
let schema = Schema::new(vec![
Field::new("nanos", arr_nanos.data_type().clone(), true),
+ Field::new(
+ "nanos_timezoned",
+ arr_nanos_timezoned.data_type().clone(),
+ true,
+ ),
Field::new("micros", arr_micros.data_type().clone(), true),
+ Field::new(
+ "micros_timezoned",
+ arr_micros_timezoned.data_type().clone(),
+ true,
+ ),
Field::new("millis", arr_millis.data_type().clone(), true),
+ Field::new(
+ "millis_timezoned",
+ arr_millis_timezoned.data_type().clone(),
+ true,
+ ),
Field::new("seconds", arr_seconds.data_type().clone(), true),
+ Field::new(
+ "seconds_timezoned",
+ arr_seconds_timezoned.data_type().clone(),
+ true,
+ ),
Field::new("name", arr_names.data_type().clone(), true),
]);
let schema = Arc::new(schema);
@@ -403,9 +437,13 @@ fn make_timestamp_batch(offset: Duration) -> RecordBatch {
schema,
vec![
Arc::new(arr_nanos),
+ Arc::new(arr_nanos_timezoned),
Arc::new(arr_micros),
+ Arc::new(arr_micros_timezoned),
Arc::new(arr_millis),
+ Arc::new(arr_millis_timezoned),
Arc::new(arr_seconds),
+ Arc::new(arr_seconds_timezoned),
Arc::new(arr_names),
],
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]