This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c956a9f9 Support writing UTC adjusted time arrays to parquet (#6278)
8c956a9f9 is described below

commit 8c956a9f9ab26c14072740cce64c2b99cb039b13
Author: aykut-bozkurt <[email protected]>
AuthorDate: Fri Aug 23 17:29:18 2024 +0300

    Support writing UTC adjusted time arrays to parquet (#6278)
    
    * check if time is adjusted to utc from metadata
    
    * add test
    
    * add roundtrip test
    
    * cargo fmt
    
    * Fix regression
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 parquet/src/arrow/arrow_reader/mod.rs | 69 ++++++++++++++++++++++++++++++++++-
 parquet/src/arrow/schema/mod.rs       | 10 ++++-
 2 files changed, 75 insertions(+), 4 deletions(-)

diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index a0302fa86..253625117 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -932,7 +932,7 @@ pub(crate) fn evaluate_predicate(
 #[cfg(test)]
 mod tests {
     use std::cmp::min;
-    use std::collections::VecDeque;
+    use std::collections::{HashMap, VecDeque};
     use std::fmt::Formatter;
     use std::fs::File;
     use std::io::Seek;
@@ -949,11 +949,14 @@ mod tests {
     use arrow_array::cast::AsArray;
     use arrow_array::types::{
         Decimal128Type, Decimal256Type, DecimalType, Float16Type, Float32Type, 
Float64Type,
+        Time32MillisecondType, Time64MicrosecondType,
     };
     use arrow_array::*;
     use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
     use arrow_data::ArrayDataBuilder;
-    use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, Fields, 
Schema, SchemaRef};
+    use arrow_schema::{
+        ArrowError, DataType as ArrowDataType, Field, Fields, Schema, 
SchemaRef, TimeUnit,
+    };
     use arrow_select::concat::concat_batches;
 
     use crate::arrow::arrow_reader::{
@@ -1223,6 +1226,68 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn test_time_utc_roundtrip() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new(
+                "time_millis",
+                ArrowDataType::Time32(TimeUnit::Millisecond),
+                true,
+            )
+            .with_metadata(HashMap::from_iter(vec![(
+                "adjusted_to_utc".to_string(),
+                "".to_string(),
+            )])),
+            Field::new(
+                "time_micros",
+                ArrowDataType::Time64(TimeUnit::Microsecond),
+                true,
+            )
+            .with_metadata(HashMap::from_iter(vec![(
+                "adjusted_to_utc".to_string(),
+                "".to_string(),
+            )])),
+        ]));
+
+        let mut buf = Vec::with_capacity(1024);
+        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
+
+        let original = RecordBatch::try_new(
+            schema,
+            vec![
+                Arc::new(Time32MillisecondArray::from(vec![
+                    Some(-1),
+                    Some(0),
+                    Some(86_399_000),
+                    Some(86_400_000),
+                    Some(86_401_000),
+                    None,
+                ])),
+                Arc::new(Time64MicrosecondArray::from(vec![
+                    Some(-1),
+                    Some(0),
+                    Some(86_399 * 1_000_000),
+                    Some(86_400 * 1_000_000),
+                    Some(86_401 * 1_000_000),
+                    None,
+                ])),
+            ],
+        )?;
+
+        writer.write(&original)?;
+        writer.close()?;
+
+        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 
1024)?;
+        let ret = reader.next().unwrap()?;
+        assert_eq!(ret, original);
+
+        // Ensure can be downcast to the correct type
+        ret.column(0).as_primitive::<Time32MillisecondType>();
+        ret.column(1).as_primitive::<Time64MicrosecondType>();
+
+        Ok(())
+    }
+
     struct RandFixedLenGen {}
 
     impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs
index 8c583eeba..a3528b6c8 100644
--- a/parquet/src/arrow/schema/mod.rs
+++ b/parquet/src/arrow/schema/mod.rs
@@ -427,7 +427,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
         }
         DataType::Time32(unit) => Type::primitive_type_builder(name, 
PhysicalType::INT32)
             .with_logical_type(Some(LogicalType::Time {
-                is_adjusted_to_u_t_c: false,
+                is_adjusted_to_u_t_c: 
field.metadata().contains_key("adjusted_to_utc"),
                 unit: match unit {
                     TimeUnit::Millisecond => 
ParquetTimeUnit::MILLIS(Default::default()),
                     u => unreachable!("Invalid unit for Time32: {:?}", u),
@@ -438,7 +438,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
             .build(),
         DataType::Time64(unit) => Type::primitive_type_builder(name, 
PhysicalType::INT64)
             .with_logical_type(Some(LogicalType::Time {
-                is_adjusted_to_u_t_c: false,
+                is_adjusted_to_u_t_c: 
field.metadata().contains_key("adjusted_to_utc"),
                 unit: match unit {
                     TimeUnit::Microsecond => 
ParquetTimeUnit::MICROS(Default::default()),
                     TimeUnit::Nanosecond => 
ParquetTimeUnit::NANOS(Default::default()),
@@ -1430,7 +1430,9 @@ mod tests {
             }
             OPTIONAL INT32   date       (DATE);
             OPTIONAL INT32   time_milli (TIME(MILLIS,false));
+            OPTIONAL INT32   time_milli_utc (TIME(MILLIS,true));
             OPTIONAL INT64   time_micro (TIME_MICROS);
+            OPTIONAL INT64   time_micro_utc (TIME(MICROS, true));
             OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
             REQUIRED INT64   ts_micro (TIMESTAMP(MICROS,false));
             REQUIRED INT64   ts_seconds;
@@ -1481,7 +1483,11 @@ mod tests {
             ),
             Field::new("date", DataType::Date32, true),
             Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), 
true),
+            Field::new("time_milli_utc", 
DataType::Time32(TimeUnit::Millisecond), true)
+                
.with_metadata(HashMap::from_iter(vec![("adjusted_to_utc".to_string(), 
"".to_string())])),
             Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), 
true),
+            Field::new("time_micro_utc", 
DataType::Time64(TimeUnit::Microsecond), true)
+                
.with_metadata(HashMap::from_iter(vec![("adjusted_to_utc".to_string(), 
"".to_string())])),
             Field::new(
                 "ts_milli",
                 DataType::Timestamp(TimeUnit::Millisecond, None),

Reply via email to