This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 8c956a9f9 Support writing UTC adjusted time arrays to parquet (#6278)
8c956a9f9 is described below
commit 8c956a9f9ab26c14072740cce64c2b99cb039b13
Author: aykut-bozkurt <[email protected]>
AuthorDate: Fri Aug 23 17:29:18 2024 +0300
Support writing UTC adjusted time arrays to parquet (#6278)
* check if time is adjusted to utc from metadata
* add test
* add roundtrip test
* cargo fmt
* Fix regression
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
parquet/src/arrow/arrow_reader/mod.rs | 69 ++++++++++++++++++++++++++++++++++-
parquet/src/arrow/schema/mod.rs | 10 ++++-
2 files changed, 75 insertions(+), 4 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index a0302fa86..253625117 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -932,7 +932,7 @@ pub(crate) fn evaluate_predicate(
#[cfg(test)]
mod tests {
use std::cmp::min;
- use std::collections::VecDeque;
+ use std::collections::{HashMap, VecDeque};
use std::fmt::Formatter;
use std::fs::File;
use std::io::Seek;
@@ -949,11 +949,14 @@ mod tests {
use arrow_array::cast::AsArray;
use arrow_array::types::{
Decimal128Type, Decimal256Type, DecimalType, Float16Type, Float32Type,
Float64Type,
+ Time32MillisecondType, Time64MicrosecondType,
};
use arrow_array::*;
use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
use arrow_data::ArrayDataBuilder;
- use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, Fields,
Schema, SchemaRef};
+ use arrow_schema::{
+ ArrowError, DataType as ArrowDataType, Field, Fields, Schema,
SchemaRef, TimeUnit,
+ };
use arrow_select::concat::concat_batches;
use crate::arrow::arrow_reader::{
@@ -1223,6 +1226,68 @@ mod tests {
Ok(())
}
+ #[test]
+ fn test_time_utc_roundtrip() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new(
+ "time_millis",
+ ArrowDataType::Time32(TimeUnit::Millisecond),
+ true,
+ )
+ .with_metadata(HashMap::from_iter(vec![(
+ "adjusted_to_utc".to_string(),
+ "".to_string(),
+ )])),
+ Field::new(
+ "time_micros",
+ ArrowDataType::Time64(TimeUnit::Microsecond),
+ true,
+ )
+ .with_metadata(HashMap::from_iter(vec![(
+ "adjusted_to_utc".to_string(),
+ "".to_string(),
+ )])),
+ ]));
+
+ let mut buf = Vec::with_capacity(1024);
+ let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
+
+ let original = RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(Time32MillisecondArray::from(vec![
+ Some(-1),
+ Some(0),
+ Some(86_399_000),
+ Some(86_400_000),
+ Some(86_401_000),
+ None,
+ ])),
+ Arc::new(Time64MicrosecondArray::from(vec![
+ Some(-1),
+ Some(0),
+ Some(86_399 * 1_000_000),
+ Some(86_400 * 1_000_000),
+ Some(86_401 * 1_000_000),
+ None,
+ ])),
+ ],
+ )?;
+
+ writer.write(&original)?;
+ writer.close()?;
+
+ let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf),
1024)?;
+ let ret = reader.next().unwrap()?;
+ assert_eq!(ret, original);
+
+ // Ensure can be downcast to the correct type
+ ret.column(0).as_primitive::<Time32MillisecondType>();
+ ret.column(1).as_primitive::<Time64MicrosecondType>();
+
+ Ok(())
+ }
+
struct RandFixedLenGen {}
impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs
index 8c583eeba..a3528b6c8 100644
--- a/parquet/src/arrow/schema/mod.rs
+++ b/parquet/src/arrow/schema/mod.rs
@@ -427,7 +427,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
}
DataType::Time32(unit) => Type::primitive_type_builder(name,
PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Time {
- is_adjusted_to_u_t_c: false,
+ is_adjusted_to_u_t_c:
field.metadata().contains_key("adjusted_to_utc"),
unit: match unit {
TimeUnit::Millisecond =>
ParquetTimeUnit::MILLIS(Default::default()),
u => unreachable!("Invalid unit for Time32: {:?}", u),
@@ -438,7 +438,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
.build(),
DataType::Time64(unit) => Type::primitive_type_builder(name,
PhysicalType::INT64)
.with_logical_type(Some(LogicalType::Time {
- is_adjusted_to_u_t_c: false,
+ is_adjusted_to_u_t_c:
field.metadata().contains_key("adjusted_to_utc"),
unit: match unit {
TimeUnit::Microsecond =>
ParquetTimeUnit::MICROS(Default::default()),
TimeUnit::Nanosecond =>
ParquetTimeUnit::NANOS(Default::default()),
@@ -1430,7 +1430,9 @@ mod tests {
}
OPTIONAL INT32 date (DATE);
OPTIONAL INT32 time_milli (TIME(MILLIS,false));
+ OPTIONAL INT32 time_milli_utc (TIME(MILLIS,true));
OPTIONAL INT64 time_micro (TIME_MICROS);
+ OPTIONAL INT64 time_micro_utc (TIME(MICROS, true));
OPTIONAL INT64 ts_milli (TIMESTAMP_MILLIS);
REQUIRED INT64 ts_micro (TIMESTAMP(MICROS,false));
REQUIRED INT64 ts_seconds;
@@ -1481,7 +1483,11 @@ mod tests {
),
Field::new("date", DataType::Date32, true),
Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond),
true),
+ Field::new("time_milli_utc",
DataType::Time32(TimeUnit::Millisecond), true)
+
.with_metadata(HashMap::from_iter(vec![("adjusted_to_utc".to_string(),
"".to_string())])),
Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond),
true),
+ Field::new("time_micro_utc",
DataType::Time64(TimeUnit::Microsecond), true)
+
.with_metadata(HashMap::from_iter(vec![("adjusted_to_utc".to_string(),
"".to_string())])),
Field::new(
"ts_milli",
DataType::Timestamp(TimeUnit::Millisecond, None),