This is an automated email from the ASF dual-hosted git repository.
kszucs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 129b77c ARROW-4908: [Rust] [DataFusion] Add support for date/time
parquet types encoded as INT32/INT64
129b77c is described below
commit 129b77c26737caf1231efb622efeb1653f75d4c7
Author: Andy Grove <[email protected]>
AuthorDate: Mon Mar 18 12:32:02 2019 +0100
ARROW-4908: [Rust] [DataFusion] Add support for date/time parquet types
encoded as INT32/INT64
This PR adds support for the date/time parquet types that can be encoded in
INT32/INT64 physical types.
Author: Andy Grove <[email protected]>
Closes #3940 from andygrove/ARROW-4908 and squashes the following commits:
4e99e429 <Andy Grove> formatting
503151a6 <Andy Grove> Update parquet data source for all date/time types
54cce4bd <Andy Grove> Add support for date/time parquet types encoded as
INT32/INT64
---
rust/datafusion/src/datasource/parquet.rs | 88 ++++++++++++++++++++++++++++---
rust/parquet/src/reader/schema.rs | 8 +--
2 files changed, 84 insertions(+), 12 deletions(-)
diff --git a/rust/datafusion/src/datasource/parquet.rs
b/rust/datafusion/src/datasource/parquet.rs
index d017810..c30c38d 100644
--- a/rust/datafusion/src/datasource/parquet.rs
+++ b/rust/datafusion/src/datasource/parquet.rs
@@ -253,6 +253,7 @@ impl ParquetFile {
Some(reader) => {
let mut batch: Vec<Arc<Array>> =
Vec::with_capacity(reader.num_columns());
for i in 0..self.column_readers.len() {
+ let dt = self.schema().field(i).data_type().clone();
let is_nullable = self.schema().field(i).is_nullable();
let array: Arc<Array> = match self.column_readers[i] {
ColumnReader::BoolColumnReader(ref mut r) => {
@@ -262,20 +263,69 @@ impl ParquetFile {
is_nullable,
)?
}
- ColumnReader::Int32ColumnReader(ref mut r) => {
- ArrowReader::<Int32Type>::read(
+ ColumnReader::Int32ColumnReader(ref mut r) => match dt
{
+ DataType::Date32(DateUnit::Day) => {
+ ArrowReader::<Date32Type>::read(
+ r,
+ self.batch_size,
+ is_nullable,
+ )?
+ }
+ DataType::Time32(TimeUnit::Millisecond) => {
+ ArrowReader::<Time32MillisecondType>::read(
+ r,
+ self.batch_size,
+ is_nullable,
+ )?
+ }
+ _ => ArrowReader::<Int32Type>::read(
r,
self.batch_size,
is_nullable,
- )?
- }
- ColumnReader::Int64ColumnReader(ref mut r) => {
- ArrowReader::<Int64Type>::read(
+ )?,
+ },
+ ColumnReader::Int64ColumnReader(ref mut r) => match dt
{
+ DataType::Time64(TimeUnit::Microsecond) => {
+ ArrowReader::<Time64MicrosecondType>::read(
+ r,
+ self.batch_size,
+ is_nullable,
+ )?
+ }
+ DataType::Time64(TimeUnit::Nanosecond) => {
+ ArrowReader::<Time64NanosecondType>::read(
+ r,
+ self.batch_size,
+ is_nullable,
+ )?
+ }
+ DataType::Timestamp(TimeUnit::Millisecond) => {
+ ArrowReader::<TimestampMillisecondType>::read(
+ r,
+ self.batch_size,
+ is_nullable,
+ )?
+ }
+ DataType::Timestamp(TimeUnit::Microsecond) => {
+ ArrowReader::<TimestampMicrosecondType>::read(
+ r,
+ self.batch_size,
+ is_nullable,
+ )?
+ }
+ DataType::Timestamp(TimeUnit::Nanosecond) => {
+ ArrowReader::<TimestampMicrosecondType>::read(
+ r,
+ self.batch_size,
+ is_nullable,
+ )?
+ }
+ _ => ArrowReader::<Int64Type>::read(
r,
self.batch_size,
is_nullable,
- )?
- }
+ )?,
+ },
ColumnReader::Int96ColumnReader(ref mut r) => {
let mut read_buffer: Vec<Int96> =
vec![Int96::new(); self.batch_size];
@@ -429,6 +479,28 @@ mod tests {
fn read_alltypes_plain_parquet() {
let table = load_table("alltypes_plain.parquet");
+ let x: Vec<String> = table
+ .schema()
+ .fields()
+ .iter()
+ .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
+ .collect();
+ let y = x.join("\n");
+ assert_eq!(
+ "id: Int32\n\
+ bool_col: Boolean\n\
+ tinyint_col: Int32\n\
+ smallint_col: Int32\n\
+ int_col: Int32\n\
+ bigint_col: Int64\n\
+ float_col: Float32\n\
+ double_col: Float64\n\
+ date_string_col: Utf8\n\
+ string_col: Utf8\n\
+ timestamp_col: Timestamp(Nanosecond)",
+ y
+ );
+
let projection = None;
let scan = table.scan(&projection, 1024).unwrap();
let mut it = scan[0].lock().unwrap();
diff --git a/rust/parquet/src/reader/schema.rs
b/rust/parquet/src/reader/schema.rs
index 5af07be..3e66c03 100644
--- a/rust/parquet/src/reader/schema.rs
+++ b/rust/parquet/src/reader/schema.rs
@@ -198,7 +198,7 @@ impl ParquetTypeConverter {
LogicalType::INT_8 => Ok(DataType::Int8),
LogicalType::INT_16 => Ok(DataType::Int16),
LogicalType::INT_32 => Ok(DataType::Int32),
- LogicalType::DATE => Ok(DataType::Date32(DateUnit::Millisecond)),
+ LogicalType::DATE => Ok(DataType::Date32(DateUnit::Day)),
LogicalType::TIME_MILLIS =>
Ok(DataType::Time32(TimeUnit::Millisecond)),
other => Err(ArrowError(format!(
"Unable to convert parquet INT32 logical type {}",
@@ -213,12 +213,12 @@ impl ParquetTypeConverter {
LogicalType::INT_64 => Ok(DataType::Int64),
LogicalType::UINT_64 => Ok(DataType::UInt64),
LogicalType::TIME_MICROS =>
Ok(DataType::Time64(TimeUnit::Microsecond)),
- LogicalType::TIMESTAMP_MICROS => {
- Ok(DataType::Timestamp(TimeUnit::Microsecond))
- }
LogicalType::TIMESTAMP_MILLIS => {
Ok(DataType::Timestamp(TimeUnit::Millisecond))
}
+ LogicalType::TIMESTAMP_MICROS => {
+ Ok(DataType::Timestamp(TimeUnit::Microsecond))
+ }
other => Err(ArrowError(format!(
"Unable to convert parquet INT64 logical type {}",
other