This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new e3df5b7  ARROW-4466: [Rust] [DataFusion] Add support for Parquet data 
source
e3df5b7 is described below

commit e3df5b7321db942526e8feb726c9bd9f5bd315b4
Author: Andy Grove <[email protected]>
AuthorDate: Fri Mar 15 15:44:56 2019 -0600

    ARROW-4466: [Rust] [DataFusion] Add support for Parquet data source
    
    I'm sure I'll need some guidance on this one from @sunchao or 
@liurenjie1024 but I am keen to get parquet support added for primitive types 
so that I can actually use DataFusion and Arrow in production at some point.
    
    Author: Andy Grove <[email protected]>
    Author: Neville Dipale <[email protected]>
    Author: Andy Grove <[email protected]>
    
    Closes #3851 from andygrove/ARROW-4466 and squashes the following commits:
    
    3158529 <Andy Grove> add test for reading small batches
    549c829 <Andy Grove> Remove hard-coded batch size, fix nits
    8d2df06 <Andy Grove> move schema projection function from arrow into 
datafusion
    204db83 <Andy Grove> fix timestamp nano issue
    73aa934 <Andy Grove> Remove println from test
    25d34ac <Andy Grove> Make INT32/64/96 handling consistent with C++ 
implementation
    9b1308f <Andy Grove> clean up handling of INT96 and DATE/TIME/TIMESTAMP 
types in schema converter
    1ec815b <Andy Grove> Clean up imports
    023dc25 <Andy Grove> Merge pull request #2 from nevi-me/ARROW-4466
    02b2ed3 <Neville Dipale> fix int96 conversion to read timestamps correctly
    2aeea24 <Andy Grove> remove println from tests
    9d3047a <Andy Grove> code cleanup
    639e13e <Andy Grove> null handling for int96
    1503855 <Andy Grove> handle nulls for binary data
    80cf303 <Andy Grove> add date support
    5a3368c <Andy Grove> Remove unnecessary slice, fix null handling
    306d07a <Neville Dipale> fmt
    3c711a5 <Neville Dipale> immediately allocate vec
    e6cbbaa <Neville Dipale> replace read_column! macro with generic
    607a29f <Andy Grove> return result if there are null values
    e8aa784 <Andy Grove> revert temp debug change to error messages
    6457c36 <Andy Grove> use parquet::reader::schema::parquet_to_arrow_schema
    c56510e <Andy Grove> projection takes slice instead of vec
    7e1a98f <Andy Grove> remove println and unwrap
    dddb7d7 <Andy Grove> update to use partition-aware changes from master
    157512e <Andy Grove> Remove invalid TODO comment
    debb2fb <Andy Grove> code cleanup
    6c3b7e2 <Andy Grove> add support for all primitive parquet types
    b4981ed <Andy Grove> implement more parquet column types and tests
    5ce3086 <Andy Grove> revert to columnar reads
    c3f71d7 <Andy Grove> add integration test
    aea9f8a <Andy Grove> convert to use row iter
    f46e6f7 <Andy Grove> save
    eaddafb <Andy Grove> save
    322fc87 <Andy Grove> add test for reading strings from parquet
    3a412b1 <Andy Grove> first parquet test passes
    ff3e5b7 <Andy Grove> test
    10710a2 <Andy Grove> Parquet datasource
---
 rust/datafusion/src/datasource/mod.rs     |   1 +
 rust/datafusion/src/datasource/parquet.rs | 610 ++++++++++++++++++++++++++++++
 rust/datafusion/src/execution/error.rs    |   8 +
 rust/datafusion/tests/sql.rs              |  26 +-
 rust/parquet/src/reader/schema.rs         |  34 +-
 5 files changed, 667 insertions(+), 12 deletions(-)

diff --git a/rust/datafusion/src/datasource/mod.rs 
b/rust/datafusion/src/datasource/mod.rs
index 1a22a23..5688fb5 100644
--- a/rust/datafusion/src/datasource/mod.rs
+++ b/rust/datafusion/src/datasource/mod.rs
@@ -18,6 +18,7 @@
 pub mod csv;
 pub mod datasource;
 pub mod memory;
+pub mod parquet;
 
 pub use self::csv::{CsvBatchIterator, CsvFile};
 pub use self::datasource::{RecordBatchIterator, ScanResult, Table};
diff --git a/rust/datafusion/src/datasource/parquet.rs 
b/rust/datafusion/src/datasource/parquet.rs
new file mode 100644
index 0000000..3fb4a3c
--- /dev/null
+++ b/rust/datafusion/src/datasource/parquet.rs
@@ -0,0 +1,610 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Parquet Data source
+
+use std::fs::File;
+use std::string::String;
+use std::sync::{Arc, Mutex};
+
+use arrow::array::{Array, PrimitiveArray};
+use arrow::builder::{BinaryBuilder, PrimitiveBuilder, 
TimestampNanosecondBuilder};
+use arrow::datatypes::*;
+use arrow::record_batch::RecordBatch;
+
+use parquet::column::reader::*;
+use parquet::data_type::{ByteArray, Int96};
+use parquet::file::reader::*;
+use parquet::reader::schema::parquet_to_arrow_schema;
+
+use crate::datasource::{RecordBatchIterator, ScanResult, Table};
+use crate::execution::error::{ExecutionError, Result};
+
+pub struct ParquetTable {
+    filename: String,
+    schema: Arc<Schema>,
+}
+
+impl ParquetTable {
+    pub fn try_new(filename: &str) -> Result<Self> {
+        let file = File::open(filename)?;
+        let parquet_file = ParquetFile::open(file, None, 0)?;
+        let schema = parquet_file.projection_schema.clone();
+        Ok(Self {
+            filename: filename.to_string(),
+            schema,
+        })
+    }
+}
+
+impl Table for ParquetTable {
+    fn schema(&self) -> &Arc<Schema> {
+        &self.schema
+    }
+
+    fn scan(
+        &self,
+        projection: &Option<Vec<usize>>,
+        batch_size: usize,
+    ) -> Result<Vec<ScanResult>> {
+        let file = File::open(self.filename.clone())?;
+        let parquet_file = ParquetFile::open(file, projection.clone(), 
batch_size)?;
+        Ok(vec![Arc::new(Mutex::new(parquet_file))])
+    }
+}
+
+pub struct ParquetFile {
+    reader: SerializedFileReader<File>,
+    /// Projection expressed as column indices into underlying parquet reader
+    projection: Vec<usize>,
+    /// The schema of the projection
+    projection_schema: Arc<Schema>,
+    batch_size: usize,
+    row_group_index: usize,
+    current_row_group: Option<Box<RowGroupReader>>,
+    column_readers: Vec<ColumnReader>,
+}
+
+macro_rules! read_binary_column {
+    ($SELF:ident, $R:ident, $INDEX:expr) => {{
+        let mut read_buffer: Vec<ByteArray> =
+            vec![ByteArray::default(); $SELF.batch_size];
+        let mut def_levels: Vec<i16> = vec![0; $SELF.batch_size];
+        let (_, levels_read) = $R.read_batch(
+            $SELF.batch_size,
+            Some(&mut def_levels),
+            None,
+            &mut read_buffer,
+        )?;
+        let mut builder = BinaryBuilder::new(levels_read);
+        let mut value_index = 0;
+        for i in 0..levels_read {
+            if def_levels[i] > 0 {
+                builder.append_string(
+                    
&String::from_utf8(read_buffer[value_index].data().to_vec()).unwrap(),
+                )?;
+                value_index += 1;
+            } else {
+                builder.append_null()?;
+            }
+        }
+        Arc::new(builder.finish())
+    }};
+}
+
+trait ArrowReader<T>
+where
+    T: ArrowPrimitiveType,
+{
+    fn read(
+        &mut self,
+        batch_size: usize,
+        is_nullable: bool,
+    ) -> Result<Arc<PrimitiveArray<T>>>;
+}
+
+impl<A, P> ArrowReader<A> for ColumnReaderImpl<P>
+where
+    A: ArrowPrimitiveType,
+    P: parquet::data_type::DataType,
+    P::T: std::convert::From<A::Native>,
+    A::Native: std::convert::From<P::T>,
+{
+    fn read(
+        &mut self,
+        batch_size: usize,
+        is_nullable: bool,
+    ) -> Result<Arc<PrimitiveArray<A>>> {
+        // create read buffer
+        let mut read_buffer: Vec<P::T> = vec![A::default_value().into(); 
batch_size];
+
+        if is_nullable {
+            let mut def_levels: Vec<i16> = vec![0; batch_size];
+
+            let (values_read, levels_read) = self.read_batch(
+                batch_size,
+                Some(&mut def_levels),
+                None,
+                &mut read_buffer,
+            )?;
+            let mut builder = PrimitiveBuilder::<A>::new(levels_read);
+            let converted_buffer: Vec<A::Native> =
+                read_buffer.into_iter().map(|v| v.into()).collect();
+            if values_read == levels_read {
+                builder.append_slice(&converted_buffer[0..values_read])?;
+            } else {
+                let mut value_index = 0;
+                for i in 0..def_levels.len() {
+                    if def_levels[i] != 0 {
+                        
builder.append_value(converted_buffer[value_index].into())?;
+                        value_index += 1;
+                    } else {
+                        builder.append_null()?;
+                    }
+                }
+            }
+            Ok(Arc::new(builder.finish()))
+        } else {
+            let (values_read, _) =
+                self.read_batch(batch_size, None, None, &mut read_buffer)?;
+
+            let mut builder = PrimitiveBuilder::<A>::new(values_read);
+            let converted_buffer: Vec<A::Native> =
+                read_buffer.into_iter().map(|v| v.into()).collect();
+            builder.append_slice(&converted_buffer[0..values_read])?;
+            Ok(Arc::new(builder.finish()))
+        }
+    }
+}
+
+impl ParquetFile {
+    pub fn open(
+        file: File,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+    ) -> Result<Self> {
+        let reader = SerializedFileReader::new(file)?;
+
+        let metadata = reader.metadata();
+        let schema =
+            
parquet_to_arrow_schema(metadata.file_metadata().schema_descr_ptr())?;
+
+        // even if we aren't referencing structs or lists in our projection, 
column reader
+        // indexes will be off until we have support for nested schemas
+        for i in 0..schema.fields().len() {
+            match schema.field(i).data_type() {
+                DataType::List(_) => {
+                    return Err(ExecutionError::NotImplemented(
+                        "Parquet datasource does not support LIST".to_string(),
+                    ));
+                }
+                DataType::Struct(_) => {
+                    return Err(ExecutionError::NotImplemented(
+                        "Parquet datasource does not support 
STRUCT".to_string(),
+                    ));
+                }
+                _ => {}
+            }
+        }
+
+        let projection = match projection {
+            Some(p) => p,
+            None => {
+                let mut p = Vec::with_capacity(schema.fields().len());
+                for i in 0..schema.fields().len() {
+                    p.push(i);
+                }
+                p
+            }
+        };
+
+        let projected_schema = schema_projection(&schema, &projection)?;
+
+        Ok(ParquetFile {
+            reader: reader,
+            row_group_index: 0,
+            projection_schema: projected_schema,
+            projection,
+            batch_size,
+            current_row_group: None,
+            column_readers: vec![],
+        })
+    }
+
+    fn load_next_row_group(&mut self) -> Result<()> {
+        if self.row_group_index < self.reader.num_row_groups() {
+            let reader = self.reader.get_row_group(self.row_group_index)?;
+
+            self.column_readers.clear();
+            self.column_readers = Vec::with_capacity(self.projection.len());
+
+            for i in 0..self.projection.len() {
+                self.column_readers
+                    .push(reader.get_column_reader(self.projection[i])?);
+            }
+
+            self.current_row_group = Some(reader);
+            self.row_group_index += 1;
+
+            Ok(())
+        } else {
+            Err(ExecutionError::General(
+                "Attempt to read past final row group".to_string(),
+            ))
+        }
+    }
+
+    fn load_batch(&mut self) -> Result<Option<RecordBatch>> {
+        match &self.current_row_group {
+            Some(reader) => {
+                let mut batch: Vec<Arc<Array>> = 
Vec::with_capacity(reader.num_columns());
+                for i in 0..self.column_readers.len() {
+                    let is_nullable = self.schema().field(i).is_nullable();
+                    let array: Arc<Array> = match self.column_readers[i] {
+                        ColumnReader::BoolColumnReader(ref mut r) => {
+                            ArrowReader::<BooleanType>::read(
+                                r,
+                                self.batch_size,
+                                is_nullable,
+                            )?
+                        }
+                        ColumnReader::Int32ColumnReader(ref mut r) => {
+                            ArrowReader::<Int32Type>::read(
+                                r,
+                                self.batch_size,
+                                is_nullable,
+                            )?
+                        }
+                        ColumnReader::Int64ColumnReader(ref mut r) => {
+                            ArrowReader::<Int64Type>::read(
+                                r,
+                                self.batch_size,
+                                is_nullable,
+                            )?
+                        }
+                        ColumnReader::Int96ColumnReader(ref mut r) => {
+                            let mut read_buffer: Vec<Int96> =
+                                vec![Int96::new(); self.batch_size];
+
+                            let mut def_levels: Vec<i16> = vec![0; 
self.batch_size];
+                            let (_, levels_read) = r.read_batch(
+                                self.batch_size,
+                                Some(&mut def_levels),
+                                None,
+                                &mut read_buffer,
+                            )?;
+
+                            let mut builder =
+                                TimestampNanosecondBuilder::new(levels_read);
+                            let mut value_index = 0;
+                            for i in 0..levels_read {
+                                if def_levels[i] > 0 {
+                                    
builder.append_value(convert_int96_timestamp(
+                                        read_buffer[value_index].data(),
+                                    ))?;
+                                    value_index += 1;
+                                } else {
+                                    builder.append_null()?;
+                                }
+                            }
+                            Arc::new(builder.finish())
+                        }
+                        ColumnReader::FloatColumnReader(ref mut r) => {
+                            ArrowReader::<Float32Type>::read(
+                                r,
+                                self.batch_size,
+                                is_nullable,
+                            )?
+                        }
+                        ColumnReader::DoubleColumnReader(ref mut r) => {
+                            ArrowReader::<Float64Type>::read(
+                                r,
+                                self.batch_size,
+                                is_nullable,
+                            )?
+                        }
+                        ColumnReader::FixedLenByteArrayColumnReader(ref mut r) 
=> {
+                            read_binary_column!(self, r, i)
+                        }
+                        ColumnReader::ByteArrayColumnReader(ref mut r) => {
+                            read_binary_column!(self, r, i)
+                        }
+                    };
+
+                    batch.push(array);
+                }
+
+                if batch.len() == 0 || batch[0].data().len() == 0 {
+                    Ok(None)
+                } else {
+                    Ok(Some(RecordBatch::try_new(
+                        self.projection_schema.clone(),
+                        batch,
+                    )?))
+                }
+            }
+            _ => Ok(None),
+        }
+    }
+}
+
+/// Create a new schema by applying a projection to this schema's fields
+fn schema_projection(schema: &Schema, projection: &[usize]) -> 
Result<Arc<Schema>> {
+    let mut fields: Vec<Field> = Vec::with_capacity(projection.len());
+    for i in projection {
+        if *i < schema.fields().len() {
+            fields.push(schema.field(*i).clone());
+        } else {
+            return Err(ExecutionError::InvalidColumn(format!(
+                "Invalid column index {} in projection",
+                i
+            )));
+        }
+    }
+    Ok(Arc::new(Schema::new(fields)))
+}
+
+/// convert a Parquet INT96 to an Arrow timestamp in nanoseconds
+fn convert_int96_timestamp(v: &[u32]) -> i64 {
+    const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
+    const SECONDS_PER_DAY: i64 = 86_400;
+    const MILLIS_PER_SECOND: i64 = 1_000;
+
+    let day = v[2] as i64;
+    let nanoseconds = ((v[1] as i64) << 32) + v[0] as i64;
+    let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;
+    seconds * MILLIS_PER_SECOND * 1_000_000 + nanoseconds
+}
+
+impl RecordBatchIterator for ParquetFile {
+    fn schema(&self) -> &Arc<Schema> {
+        &self.projection_schema
+    }
+
+    fn next(&mut self) -> Result<Option<RecordBatch>> {
+        // advance the row group reader if necessary
+        if self.current_row_group.is_none() {
+            self.load_next_row_group()?;
+            self.load_batch()
+        } else {
+            match self.load_batch() {
+                Ok(Some(b)) => Ok(Some(b)),
+                Ok(None) => {
+                    if self.row_group_index < self.reader.num_row_groups() {
+                        self.load_next_row_group()?;
+                        self.load_batch()
+                    } else {
+                        Ok(None)
+                    }
+                }
+                Err(e) => Err(e),
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::{
+        BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
+        TimestampNanosecondArray,
+    };
+    use std::env;
+
+    #[test]
+    fn read_small_batches() {
+        let table = load_table("alltypes_plain.parquet");
+
+        let projection = None;
+        let scan = table.scan(&projection, 2).unwrap();
+        let mut it = scan[0].lock().unwrap();
+
+        let mut count = 0;
+        while let Some(batch) = it.next().unwrap() {
+            assert_eq!(11, batch.num_columns());
+            assert_eq!(2, batch.num_rows());
+            count += 1;
+        }
+
+        // we should have seen 4 batches of 2 rows
+        assert_eq!(4, count);
+    }
+
+    #[test]
+    fn read_alltypes_plain_parquet() {
+        let table = load_table("alltypes_plain.parquet");
+
+        let projection = None;
+        let scan = table.scan(&projection, 1024).unwrap();
+        let mut it = scan[0].lock().unwrap();
+        let batch = it.next().unwrap().unwrap();
+
+        assert_eq!(11, batch.num_columns());
+        assert_eq!(8, batch.num_rows());
+    }
+
+    #[test]
+    fn read_bool_alltypes_plain_parquet() {
+        let table = load_table("alltypes_plain.parquet");
+
+        let projection = Some(vec![1]);
+        let scan = table.scan(&projection, 1024).unwrap();
+        let mut it = scan[0].lock().unwrap();
+        let batch = it.next().unwrap().unwrap();
+
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(8, batch.num_rows());
+
+        let array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<BooleanArray>()
+            .unwrap();
+        let mut values: Vec<bool> = vec![];
+        for i in 0..batch.num_rows() {
+            values.push(array.value(i));
+        }
+
+        assert_eq!(
+            "[true, false, true, false, true, false, true, false]",
+            format!("{:?}", values)
+        );
+    }
+
+    #[test]
+    fn read_i32_alltypes_plain_parquet() {
+        let table = load_table("alltypes_plain.parquet");
+
+        let projection = Some(vec![0]);
+        let scan = table.scan(&projection, 1024).unwrap();
+        let mut it = scan[0].lock().unwrap();
+        let batch = it.next().unwrap().unwrap();
+
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(8, batch.num_rows());
+
+        let array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        let mut values: Vec<i32> = vec![];
+        for i in 0..batch.num_rows() {
+            values.push(array.value(i));
+        }
+
+        assert_eq!("[4, 5, 6, 7, 2, 3, 0, 1]", format!("{:?}", values));
+    }
+
+    #[test]
+    fn read_i96_alltypes_plain_parquet() {
+        let table = load_table("alltypes_plain.parquet");
+
+        let projection = Some(vec![10]);
+        let scan = table.scan(&projection, 1024).unwrap();
+        let mut it = scan[0].lock().unwrap();
+        let batch = it.next().unwrap().unwrap();
+
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(8, batch.num_rows());
+
+        let array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<TimestampNanosecondArray>()
+            .unwrap();
+        let mut values: Vec<i64> = vec![];
+        for i in 0..batch.num_rows() {
+            values.push(array.value(i));
+        }
+
+        assert_eq!("[1235865600000000000, 1235865660000000000, 
1238544000000000000, 1238544060000000000, 1233446400000000000, 
1233446460000000000, 1230768000000000000, 1230768060000000000]", 
format!("{:?}", values));
+    }
+
+    #[test]
+    fn read_f32_alltypes_plain_parquet() {
+        let table = load_table("alltypes_plain.parquet");
+
+        let projection = Some(vec![6]);
+        let scan = table.scan(&projection, 1024).unwrap();
+        let mut it = scan[0].lock().unwrap();
+        let batch = it.next().unwrap().unwrap();
+
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(8, batch.num_rows());
+
+        let array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Float32Array>()
+            .unwrap();
+        let mut values: Vec<f32> = vec![];
+        for i in 0..batch.num_rows() {
+            values.push(array.value(i));
+        }
+
+        assert_eq!(
+            "[0.0, 1.1, 0.0, 1.1, 0.0, 1.1, 0.0, 1.1]",
+            format!("{:?}", values)
+        );
+    }
+
+    #[test]
+    fn read_f64_alltypes_plain_parquet() {
+        let table = load_table("alltypes_plain.parquet");
+
+        let projection = Some(vec![7]);
+        let scan = table.scan(&projection, 1024).unwrap();
+        let mut it = scan[0].lock().unwrap();
+        let batch = it.next().unwrap().unwrap();
+
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(8, batch.num_rows());
+
+        let array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Float64Array>()
+            .unwrap();
+        let mut values: Vec<f64> = vec![];
+        for i in 0..batch.num_rows() {
+            values.push(array.value(i));
+        }
+
+        assert_eq!(
+            "[0.0, 10.1, 0.0, 10.1, 0.0, 10.1, 0.0, 10.1]",
+            format!("{:?}", values)
+        );
+    }
+
+    #[test]
+    fn read_utf8_alltypes_plain_parquet() {
+        let table = load_table("alltypes_plain.parquet");
+
+        let projection = Some(vec![9]);
+        let scan = table.scan(&projection, 1024).unwrap();
+        let mut it = scan[0].lock().unwrap();
+        let batch = it.next().unwrap().unwrap();
+
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(8, batch.num_rows());
+
+        let array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<BinaryArray>()
+            .unwrap();
+        let mut values: Vec<String> = vec![];
+        for i in 0..batch.num_rows() {
+            let str: String = 
String::from_utf8(array.value(i).to_vec()).unwrap();
+            values.push(str);
+        }
+
+        assert_eq!(
+            "[\"0\", \"1\", \"0\", \"1\", \"0\", \"1\", \"0\", \"1\"]",
+            format!("{:?}", values)
+        );
+    }
+
+    fn load_table(name: &str) -> Box<Table> {
+        let testdata = env::var("PARQUET_TEST_DATA").unwrap();
+        let filename = format!("{}/{}", testdata, name);
+        let table = ParquetTable::try_new(&filename).unwrap();
+        Box::new(table)
+    }
+}
diff --git a/rust/datafusion/src/execution/error.rs 
b/rust/datafusion/src/execution/error.rs
index 5b8d04d..92ce6d9 100644
--- a/rust/datafusion/src/execution/error.rs
+++ b/rust/datafusion/src/execution/error.rs
@@ -21,6 +21,7 @@ use std::io::Error;
 use std::result;
 
 use arrow::error::ArrowError;
+use parquet::errors::ParquetError;
 
 use sqlparser::sqlparser::ParserError;
 
@@ -35,6 +36,7 @@ pub enum ExecutionError {
     NotImplemented(String),
     InternalError(String),
     ArrowError(ArrowError),
+    ParquetError(ParquetError),
     ExecutionError(String),
 }
 
@@ -62,6 +64,12 @@ impl From<ArrowError> for ExecutionError {
     }
 }
 
+impl From<ParquetError> for ExecutionError {
+    fn from(e: ParquetError) -> Self {
+        ExecutionError::ParquetError(e)
+    }
+}
+
 impl From<ParserError> for ExecutionError {
     fn from(e: ParserError) -> Self {
         ExecutionError::ParserError(e)
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index 954b2ee..9c24a50 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use std::cell::RefCell;
+use std::env;
 use std::rc::Rc;
 use std::sync::Arc;
 
@@ -25,12 +26,27 @@ extern crate datafusion;
 use arrow::array::*;
 use arrow::datatypes::{DataType, Field, Schema};
 
+use datafusion::datasource::parquet::ParquetTable;
+use datafusion::datasource::Table;
 use datafusion::execution::context::ExecutionContext;
 use datafusion::execution::relation::Relation;
 
 const DEFAULT_BATCH_SIZE: usize = 1024 * 1024;
 
 #[test]
+fn parquet_query() {
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table(
+        "alltypes_plain",
+        load_parquet_table("alltypes_plain.parquet"),
+    );
+    let sql = "SELECT id, string_col FROM alltypes_plain";
+    let actual = execute(&mut ctx, sql);
+    let expected = 
"4\t\"0\"\n5\t\"1\"\n6\t\"0\"\n7\t\"1\"\n2\t\"0\"\n3\t\"1\"\n0\t\"0\"\n1\t\"1\"\n".to_string();
+    assert_eq!(expected, actual);
+}
+
+#[test]
 fn csv_query_with_predicate() {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx);
@@ -163,9 +179,17 @@ fn register_csv(
     ctx.register_csv(name, filename, &schema, true);
 }
 
+fn load_parquet_table(name: &str) -> Rc<Table> {
+    let testdata = env::var("PARQUET_TEST_DATA").unwrap();
+    let filename = format!("{}/{}", testdata, name);
+    let table = ParquetTable::try_new(&filename).unwrap();
+    Rc::new(table)
+}
+
 /// Execute query and return result set as tab delimited string
 fn execute(ctx: &mut ExecutionContext, sql: &str) -> String {
-    let results = ctx.sql(&sql, DEFAULT_BATCH_SIZE).unwrap();
+    let plan = ctx.create_logical_plan(&sql).unwrap();
+    let results = ctx.execute(&plan, DEFAULT_BATCH_SIZE).unwrap();
     result_str(&results)
 }
 
diff --git a/rust/parquet/src/reader/schema.rs 
b/rust/parquet/src/reader/schema.rs
index 34276a2..5af07be 100644
--- a/rust/parquet/src/reader/schema.rs
+++ b/rust/parquet/src/reader/schema.rs
@@ -28,7 +28,8 @@ use crate::basic::{LogicalType, Repetition, Type as 
PhysicalType};
 use crate::errors::{ParquetError::ArrowError, Result};
 use crate::schema::types::{SchemaDescPtr, Type, TypePtr};
 
-use arrow::datatypes::{DataType, Field, Schema};
+use arrow::datatypes::TimeUnit;
+use arrow::datatypes::{DataType, DateUnit, Field, Schema};
 
 /// Convert parquet schema to arrow schema.
 pub fn parquet_to_arrow_schema(parquet_schema: SchemaDescPtr) -> 
Result<Schema> {
@@ -175,19 +176,20 @@ impl ParquetTypeConverter {
     fn to_primitive_type_inner(&self) -> Result<DataType> {
         match self.schema.get_physical_type() {
             PhysicalType::BOOLEAN => Ok(DataType::Boolean),
-            PhysicalType::INT32 => self.to_int32(),
-            PhysicalType::INT64 => self.to_int64(),
+            PhysicalType::INT32 => self.from_int32(),
+            PhysicalType::INT64 => self.from_int64(),
+            PhysicalType::INT96 => 
Ok(DataType::Timestamp(TimeUnit::Nanosecond)),
             PhysicalType::FLOAT => Ok(DataType::Float32),
             PhysicalType::DOUBLE => Ok(DataType::Float64),
-            PhysicalType::BYTE_ARRAY => self.to_byte_array(),
+            PhysicalType::BYTE_ARRAY => self.from_byte_array(),
             other => Err(ArrowError(format!(
-                "Unable to convert parquet type {}",
+                "Unable to convert parquet physical type {}",
                 other
             ))),
         }
     }
 
-    fn to_int32(&self) -> Result<DataType> {
+    fn from_int32(&self) -> Result<DataType> {
         match self.schema.get_basic_info().logical_type() {
             LogicalType::NONE => Ok(DataType::Int32),
             LogicalType::UINT_8 => Ok(DataType::UInt8),
@@ -196,30 +198,40 @@ impl ParquetTypeConverter {
             LogicalType::INT_8 => Ok(DataType::Int8),
             LogicalType::INT_16 => Ok(DataType::Int16),
             LogicalType::INT_32 => Ok(DataType::Int32),
+            LogicalType::DATE => Ok(DataType::Date32(DateUnit::Millisecond)),
+            LogicalType::TIME_MILLIS => 
Ok(DataType::Time32(TimeUnit::Millisecond)),
             other => Err(ArrowError(format!(
-                "Unable to convert parquet logical type {}",
+                "Unable to convert parquet INT32 logical type {}",
                 other
             ))),
         }
     }
 
-    fn to_int64(&self) -> Result<DataType> {
+    fn from_int64(&self) -> Result<DataType> {
         match self.schema.get_basic_info().logical_type() {
             LogicalType::NONE => Ok(DataType::Int64),
             LogicalType::INT_64 => Ok(DataType::Int64),
             LogicalType::UINT_64 => Ok(DataType::UInt64),
+            LogicalType::TIME_MICROS => 
Ok(DataType::Time64(TimeUnit::Microsecond)),
+            LogicalType::TIMESTAMP_MICROS => {
+                Ok(DataType::Timestamp(TimeUnit::Microsecond))
+            }
+            LogicalType::TIMESTAMP_MILLIS => {
+                Ok(DataType::Timestamp(TimeUnit::Millisecond))
+            }
             other => Err(ArrowError(format!(
-                "Unable to convert parquet logical type {}",
+                "Unable to convert parquet INT64 logical type {}",
                 other
             ))),
         }
     }
 
-    fn to_byte_array(&self) -> Result<DataType> {
+    fn from_byte_array(&self) -> Result<DataType> {
         match self.schema.get_basic_info().logical_type() {
+            LogicalType::NONE => Ok(DataType::Utf8),
             LogicalType::UTF8 => Ok(DataType::Utf8),
             other => Err(ArrowError(format!(
-                "Unable to convert parquet logical type {}",
+                "Unable to convert parquet BYTE_ARRAY logical type {}",
                 other
             ))),
         }

Reply via email to