This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 21e33a3d1 Propagate timezone to created arrays (#5481)
21e33a3d1 is described below
commit 21e33a3d1047aa97be60b2efb4516efdd1d2b6bb
Author: Max Burke <[email protected]>
AuthorDate: Mon Mar 6 05:58:13 2023 -0800
Propagate timezone to created arrays (#5481)
* Propagate timezone to created arrays
* Add test for timezone propogation
This was manifesting as an error in the use of window functions on
Timestamp types with Arrow failing to concatenate arrays of different
data types (one array with a timezone, one without).
---
datafusion/common/src/scalar.rs | 32 +++++++++++++-------
.../tests/parquet/data/timestamp_with_tz.parquet | Bin 0 -> 87689 bytes
datafusion/core/tests/sql/parquet.rs | 33 +++++++++++++++++++++
3 files changed, 55 insertions(+), 10 deletions(-)
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index c31b37b63..212328121 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -1296,7 +1296,7 @@ impl ScalarValue {
}
macro_rules! build_array_primitive_tz {
- ($ARRAY_TY:ident, $SCALAR_TY:ident) => {{
+ ($ARRAY_TY:ident, $SCALAR_TY:ident, $TZ:expr) => {{
{
let array = scalars.map(|sv| {
if let ScalarValue::$SCALAR_TY(v, _) = sv {
@@ -1310,7 +1310,7 @@ impl ScalarValue {
}
})
.collect::<Result<$ARRAY_TY>>()?;
- Arc::new(array)
+ Arc::new(array.with_timezone_opt($TZ.clone()))
}
}};
}
@@ -1444,17 +1444,29 @@ impl ScalarValue {
DataType::Time64(TimeUnit::Nanosecond) => {
build_array_primitive!(Time64NanosecondArray, Time64Nanosecond)
}
- DataType::Timestamp(TimeUnit::Second, _) => {
- build_array_primitive_tz!(TimestampSecondArray,
TimestampSecond)
+ DataType::Timestamp(TimeUnit::Second, tz) => {
+ build_array_primitive_tz!(TimestampSecondArray,
TimestampSecond, tz)
}
- DataType::Timestamp(TimeUnit::Millisecond, _) => {
- build_array_primitive_tz!(TimestampMillisecondArray,
TimestampMillisecond)
+ DataType::Timestamp(TimeUnit::Millisecond, tz) => {
+ build_array_primitive_tz!(
+ TimestampMillisecondArray,
+ TimestampMillisecond,
+ tz
+ )
}
- DataType::Timestamp(TimeUnit::Microsecond, _) => {
- build_array_primitive_tz!(TimestampMicrosecondArray,
TimestampMicrosecond)
+ DataType::Timestamp(TimeUnit::Microsecond, tz) => {
+ build_array_primitive_tz!(
+ TimestampMicrosecondArray,
+ TimestampMicrosecond,
+ tz
+ )
}
- DataType::Timestamp(TimeUnit::Nanosecond, _) => {
- build_array_primitive_tz!(TimestampNanosecondArray,
TimestampNanosecond)
+ DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
+ build_array_primitive_tz!(
+ TimestampNanosecondArray,
+ TimestampNanosecond,
+ tz
+ )
}
DataType::Interval(IntervalUnit::DayTime) => {
build_array_primitive!(IntervalDayTimeArray, IntervalDayTime)
diff --git a/datafusion/core/tests/parquet/data/timestamp_with_tz.parquet
b/datafusion/core/tests/parquet/data/timestamp_with_tz.parquet
new file mode 100644
index 000000000..075f846a6
Binary files /dev/null and
b/datafusion/core/tests/parquet/data/timestamp_with_tz.parquet differ
diff --git a/datafusion/core/tests/sql/parquet.rs
b/datafusion/core/tests/sql/parquet.rs
index 31cd0da21..b18528992 100644
--- a/datafusion/core/tests/sql/parquet.rs
+++ b/datafusion/core/tests/sql/parquet.rs
@@ -165,6 +165,39 @@ async fn fixed_size_binary_columns() {
}
}
+#[tokio::test]
+async fn window_fn_timestamp_tz() {
+ let ctx = SessionContext::new();
+ ctx.register_parquet(
+ "t0",
+ "tests/parquet/data/timestamp_with_tz.parquet",
+ ParquetReadOptions::default(),
+ )
+ .await
+ .unwrap();
+
+ let sql = "SELECT count, LAG(timestamp, 1) OVER (ORDER BY timestamp) FROM
t0";
+ let dataframe = ctx.sql(sql).await.unwrap();
+ let results = dataframe.collect().await.unwrap();
+
+ let mut num_rows = 0;
+ for batch in results {
+ num_rows += batch.num_rows();
+ assert_eq!(2, batch.num_columns());
+
+ let ty = batch.column(0).data_type().clone();
+ assert_eq!(DataType::Int64, ty);
+
+ let ty = batch.column(1).data_type().clone();
+ assert_eq!(
+ DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".to_owned())),
+ ty
+ );
+ }
+
+ assert_eq!(131072, num_rows);
+}
+
#[tokio::test]
async fn parquet_single_nan_schema() {
let ctx = SessionContext::new();