This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 129b77c  ARROW-4908: [Rust] [DataFusion] Add support for date/time 
parquet types encoded as INT32/INT64
129b77c is described below

commit 129b77c26737caf1231efb622efeb1653f75d4c7
Author: Andy Grove <[email protected]>
AuthorDate: Mon Mar 18 12:32:02 2019 +0100

    ARROW-4908: [Rust] [DataFusion] Add support for date/time parquet types 
encoded as INT32/INT64
    
    This PR adds support for the date/time parquet types that can be encoded in 
INT32/INT64 physical types.
    
    Author: Andy Grove <[email protected]>
    
    Closes #3940 from andygrove/ARROW-4908 and squashes the following commits:
    
    4e99e429 <Andy Grove> formatting
    503151a6 <Andy Grove> Update parquet data source for all date/time types
    54cce4bd <Andy Grove> Add support for date/time parquet types encoded as 
INT32/INT64
---
 rust/datafusion/src/datasource/parquet.rs | 88 ++++++++++++++++++++++++++++---
 rust/parquet/src/reader/schema.rs         |  8 +--
 2 files changed, 84 insertions(+), 12 deletions(-)

diff --git a/rust/datafusion/src/datasource/parquet.rs 
b/rust/datafusion/src/datasource/parquet.rs
index d017810..c30c38d 100644
--- a/rust/datafusion/src/datasource/parquet.rs
+++ b/rust/datafusion/src/datasource/parquet.rs
@@ -253,6 +253,7 @@ impl ParquetFile {
             Some(reader) => {
                 let mut batch: Vec<Arc<Array>> = 
Vec::with_capacity(reader.num_columns());
                 for i in 0..self.column_readers.len() {
+                    let dt = self.schema().field(i).data_type().clone();
                     let is_nullable = self.schema().field(i).is_nullable();
                     let array: Arc<Array> = match self.column_readers[i] {
                         ColumnReader::BoolColumnReader(ref mut r) => {
@@ -262,20 +263,69 @@ impl ParquetFile {
                                 is_nullable,
                             )?
                         }
-                        ColumnReader::Int32ColumnReader(ref mut r) => {
-                            ArrowReader::<Int32Type>::read(
+                        ColumnReader::Int32ColumnReader(ref mut r) => match dt 
{
+                            DataType::Date32(DateUnit::Day) => {
+                                ArrowReader::<Date32Type>::read(
+                                    r,
+                                    self.batch_size,
+                                    is_nullable,
+                                )?
+                            }
+                            DataType::Time32(TimeUnit::Millisecond) => {
+                                ArrowReader::<Time32MillisecondType>::read(
+                                    r,
+                                    self.batch_size,
+                                    is_nullable,
+                                )?
+                            }
+                            _ => ArrowReader::<Int32Type>::read(
                                 r,
                                 self.batch_size,
                                 is_nullable,
-                            )?
-                        }
-                        ColumnReader::Int64ColumnReader(ref mut r) => {
-                            ArrowReader::<Int64Type>::read(
+                            )?,
+                        },
+                        ColumnReader::Int64ColumnReader(ref mut r) => match dt 
{
+                            DataType::Time64(TimeUnit::Microsecond) => {
+                                ArrowReader::<Time64MicrosecondType>::read(
+                                    r,
+                                    self.batch_size,
+                                    is_nullable,
+                                )?
+                            }
+                            DataType::Time64(TimeUnit::Nanosecond) => {
+                                ArrowReader::<Time64NanosecondType>::read(
+                                    r,
+                                    self.batch_size,
+                                    is_nullable,
+                                )?
+                            }
+                            DataType::Timestamp(TimeUnit::Millisecond) => {
+                                ArrowReader::<TimestampMillisecondType>::read(
+                                    r,
+                                    self.batch_size,
+                                    is_nullable,
+                                )?
+                            }
+                            DataType::Timestamp(TimeUnit::Microsecond) => {
+                                ArrowReader::<TimestampMicrosecondType>::read(
+                                    r,
+                                    self.batch_size,
+                                    is_nullable,
+                                )?
+                            }
+                            DataType::Timestamp(TimeUnit::Nanosecond) => {
+                                ArrowReader::<TimestampMicrosecondType>::read(
+                                    r,
+                                    self.batch_size,
+                                    is_nullable,
+                                )?
+                            }
+                            _ => ArrowReader::<Int64Type>::read(
                                 r,
                                 self.batch_size,
                                 is_nullable,
-                            )?
-                        }
+                            )?,
+                        },
                         ColumnReader::Int96ColumnReader(ref mut r) => {
                             let mut read_buffer: Vec<Int96> =
                                 vec![Int96::new(); self.batch_size];
@@ -429,6 +479,28 @@ mod tests {
     fn read_alltypes_plain_parquet() {
         let table = load_table("alltypes_plain.parquet");
 
+        let x: Vec<String> = table
+            .schema()
+            .fields()
+            .iter()
+            .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
+            .collect();
+        let y = x.join("\n");
+        assert_eq!(
+            "id: Int32\n\
+             bool_col: Boolean\n\
+             tinyint_col: Int32\n\
+             smallint_col: Int32\n\
+             int_col: Int32\n\
+             bigint_col: Int64\n\
+             float_col: Float32\n\
+             double_col: Float64\n\
+             date_string_col: Utf8\n\
+             string_col: Utf8\n\
+             timestamp_col: Timestamp(Nanosecond)",
+            y
+        );
+
         let projection = None;
         let scan = table.scan(&projection, 1024).unwrap();
         let mut it = scan[0].lock().unwrap();
diff --git a/rust/parquet/src/reader/schema.rs 
b/rust/parquet/src/reader/schema.rs
index 5af07be..3e66c03 100644
--- a/rust/parquet/src/reader/schema.rs
+++ b/rust/parquet/src/reader/schema.rs
@@ -198,7 +198,7 @@ impl ParquetTypeConverter {
             LogicalType::INT_8 => Ok(DataType::Int8),
             LogicalType::INT_16 => Ok(DataType::Int16),
             LogicalType::INT_32 => Ok(DataType::Int32),
-            LogicalType::DATE => Ok(DataType::Date32(DateUnit::Millisecond)),
+            LogicalType::DATE => Ok(DataType::Date32(DateUnit::Day)),
             LogicalType::TIME_MILLIS => 
Ok(DataType::Time32(TimeUnit::Millisecond)),
             other => Err(ArrowError(format!(
                 "Unable to convert parquet INT32 logical type {}",
@@ -213,12 +213,12 @@ impl ParquetTypeConverter {
             LogicalType::INT_64 => Ok(DataType::Int64),
             LogicalType::UINT_64 => Ok(DataType::UInt64),
             LogicalType::TIME_MICROS => 
Ok(DataType::Time64(TimeUnit::Microsecond)),
-            LogicalType::TIMESTAMP_MICROS => {
-                Ok(DataType::Timestamp(TimeUnit::Microsecond))
-            }
             LogicalType::TIMESTAMP_MILLIS => {
                 Ok(DataType::Timestamp(TimeUnit::Millisecond))
             }
+            LogicalType::TIMESTAMP_MICROS => {
+                Ok(DataType::Timestamp(TimeUnit::Microsecond))
+            }
             other => Err(ArrowError(format!(
                 "Unable to convert parquet INT64 logical type {}",
                 other

Reply via email to