This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new e3df5b7 ARROW-4466: [Rust] [DataFusion] Add support for Parquet data
source
e3df5b7 is described below
commit e3df5b7321db942526e8feb726c9bd9f5bd315b4
Author: Andy Grove <[email protected]>
AuthorDate: Fri Mar 15 15:44:56 2019 -0600
ARROW-4466: [Rust] [DataFusion] Add support for Parquet data source
I'm sure I'll need some guidance on this one from @sunchao or
@liurenjie1024 but I am keen to get parquet support added for primitive types
so that I can actually use DataFusion and Arrow in production at some point.
Author: Andy Grove <[email protected]>
Author: Neville Dipale <[email protected]>
Author: Andy Grove <[email protected]>
Closes #3851 from andygrove/ARROW-4466 and squashes the following commits:
3158529 <Andy Grove> add test for reading small batches
549c829 <Andy Grove> Remove hard-coded batch size, fix nits
8d2df06 <Andy Grove> move schema projection function from arrow into
datafusion
204db83 <Andy Grove> fix timestamp nano issue
73aa934 <Andy Grove> Remove println from test
25d34ac <Andy Grove> Make INT32/64/96 handling consistent with C++
implementation
9b1308f <Andy Grove> clean up handling of INT96 and DATE/TIME/TIMESTAMP
types in schema converter
1ec815b <Andy Grove> Clean up imports
023dc25 <Andy Grove> Merge pull request #2 from nevi-me/ARROW-4466
02b2ed3 <Neville Dipale> fix int96 conversion to read timestamps correctly
2aeea24 <Andy Grove> remove println from tests
9d3047a <Andy Grove> code cleanup
639e13e <Andy Grove> null handling for int96
1503855 <Andy Grove> handle nulls for binary data
80cf303 <Andy Grove> add date support
5a3368c <Andy Grove> Remove unnecessary slice, fix null handling
306d07a <Neville Dipale> fmt
3c711a5 <Neville Dipale> immediately allocate vec
e6cbbaa <Neville Dipale> replace read_column! macro with generic
607a29f <Andy Grove> return result if there are null values
e8aa784 <Andy Grove> revert temp debug change to error messages
6457c36 <Andy Grove> use parquet::reader::schema::parquet_to_arrow_schema
c56510e <Andy Grove> projection takes slice instead of vec
7e1a98f <Andy Grove> remove println and unwrap
dddb7d7 <Andy Grove> update to use partition-aware changes from master
157512e <Andy Grove> Remove invalid TODO comment
debb2fb <Andy Grove> code cleanup
6c3b7e2 <Andy Grove> add support for all primitive parquet types
b4981ed <Andy Grove> implement more parquet column types and tests
5ce3086 <Andy Grove> revert to columnar reads
c3f71d7 <Andy Grove> add integration test
aea9f8a <Andy Grove> convert to use row iter
f46e6f7 <Andy Grove> save
eaddafb <Andy Grove> save
322fc87 <Andy Grove> add test for reading strings from parquet
3a412b1 <Andy Grove> first parquet test passes
ff3e5b7 <Andy Grove> test
10710a2 <Andy Grove> Parquet datasource
---
rust/datafusion/src/datasource/mod.rs | 1 +
rust/datafusion/src/datasource/parquet.rs | 610 ++++++++++++++++++++++++++++++
rust/datafusion/src/execution/error.rs | 8 +
rust/datafusion/tests/sql.rs | 26 +-
rust/parquet/src/reader/schema.rs | 34 +-
5 files changed, 667 insertions(+), 12 deletions(-)
diff --git a/rust/datafusion/src/datasource/mod.rs
b/rust/datafusion/src/datasource/mod.rs
index 1a22a23..5688fb5 100644
--- a/rust/datafusion/src/datasource/mod.rs
+++ b/rust/datafusion/src/datasource/mod.rs
@@ -18,6 +18,7 @@
pub mod csv;
pub mod datasource;
pub mod memory;
+pub mod parquet;
pub use self::csv::{CsvBatchIterator, CsvFile};
pub use self::datasource::{RecordBatchIterator, ScanResult, Table};
diff --git a/rust/datafusion/src/datasource/parquet.rs
b/rust/datafusion/src/datasource/parquet.rs
new file mode 100644
index 0000000..3fb4a3c
--- /dev/null
+++ b/rust/datafusion/src/datasource/parquet.rs
@@ -0,0 +1,610 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Parquet Data source
+
+use std::fs::File;
+use std::string::String;
+use std::sync::{Arc, Mutex};
+
+use arrow::array::{Array, PrimitiveArray};
+use arrow::builder::{BinaryBuilder, PrimitiveBuilder,
TimestampNanosecondBuilder};
+use arrow::datatypes::*;
+use arrow::record_batch::RecordBatch;
+
+use parquet::column::reader::*;
+use parquet::data_type::{ByteArray, Int96};
+use parquet::file::reader::*;
+use parquet::reader::schema::parquet_to_arrow_schema;
+
+use crate::datasource::{RecordBatchIterator, ScanResult, Table};
+use crate::execution::error::{ExecutionError, Result};
+
+pub struct ParquetTable {
+ filename: String,
+ schema: Arc<Schema>,
+}
+
+impl ParquetTable {
+ pub fn try_new(filename: &str) -> Result<Self> {
+ let file = File::open(filename)?;
+ let parquet_file = ParquetFile::open(file, None, 0)?;
+ let schema = parquet_file.projection_schema.clone();
+ Ok(Self {
+ filename: filename.to_string(),
+ schema,
+ })
+ }
+}
+
+impl Table for ParquetTable {
+ fn schema(&self) -> &Arc<Schema> {
+ &self.schema
+ }
+
+ fn scan(
+ &self,
+ projection: &Option<Vec<usize>>,
+ batch_size: usize,
+ ) -> Result<Vec<ScanResult>> {
+ let file = File::open(self.filename.clone())?;
+ let parquet_file = ParquetFile::open(file, projection.clone(),
batch_size)?;
+ Ok(vec![Arc::new(Mutex::new(parquet_file))])
+ }
+}
+
+pub struct ParquetFile {
+ reader: SerializedFileReader<File>,
+ /// Projection expressed as column indices into underlying parquet reader
+ projection: Vec<usize>,
+ /// The schema of the projection
+ projection_schema: Arc<Schema>,
+ batch_size: usize,
+ row_group_index: usize,
+ current_row_group: Option<Box<RowGroupReader>>,
+ column_readers: Vec<ColumnReader>,
+}
+
+macro_rules! read_binary_column {
+ ($SELF:ident, $R:ident, $INDEX:expr) => {{
+ let mut read_buffer: Vec<ByteArray> =
+ vec![ByteArray::default(); $SELF.batch_size];
+ let mut def_levels: Vec<i16> = vec![0; $SELF.batch_size];
+ let (_, levels_read) = $R.read_batch(
+ $SELF.batch_size,
+ Some(&mut def_levels),
+ None,
+ &mut read_buffer,
+ )?;
+ let mut builder = BinaryBuilder::new(levels_read);
+ let mut value_index = 0;
+ for i in 0..levels_read {
+ if def_levels[i] > 0 {
+ builder.append_string(
+
&String::from_utf8(read_buffer[value_index].data().to_vec()).unwrap(),
+ )?;
+ value_index += 1;
+ } else {
+ builder.append_null()?;
+ }
+ }
+ Arc::new(builder.finish())
+ }};
+}
+
+trait ArrowReader<T>
+where
+ T: ArrowPrimitiveType,
+{
+ fn read(
+ &mut self,
+ batch_size: usize,
+ is_nullable: bool,
+ ) -> Result<Arc<PrimitiveArray<T>>>;
+}
+
+impl<A, P> ArrowReader<A> for ColumnReaderImpl<P>
+where
+ A: ArrowPrimitiveType,
+ P: parquet::data_type::DataType,
+ P::T: std::convert::From<A::Native>,
+ A::Native: std::convert::From<P::T>,
+{
+ fn read(
+ &mut self,
+ batch_size: usize,
+ is_nullable: bool,
+ ) -> Result<Arc<PrimitiveArray<A>>> {
+ // create read buffer
+ let mut read_buffer: Vec<P::T> = vec![A::default_value().into();
batch_size];
+
+ if is_nullable {
+ let mut def_levels: Vec<i16> = vec![0; batch_size];
+
+ let (values_read, levels_read) = self.read_batch(
+ batch_size,
+ Some(&mut def_levels),
+ None,
+ &mut read_buffer,
+ )?;
+ let mut builder = PrimitiveBuilder::<A>::new(levels_read);
+ let converted_buffer: Vec<A::Native> =
+ read_buffer.into_iter().map(|v| v.into()).collect();
+ if values_read == levels_read {
+ builder.append_slice(&converted_buffer[0..values_read])?;
+ } else {
+ let mut value_index = 0;
+ for i in 0..def_levels.len() {
+ if def_levels[i] != 0 {
+
builder.append_value(converted_buffer[value_index].into())?;
+ value_index += 1;
+ } else {
+ builder.append_null()?;
+ }
+ }
+ }
+ Ok(Arc::new(builder.finish()))
+ } else {
+ let (values_read, _) =
+ self.read_batch(batch_size, None, None, &mut read_buffer)?;
+
+ let mut builder = PrimitiveBuilder::<A>::new(values_read);
+ let converted_buffer: Vec<A::Native> =
+ read_buffer.into_iter().map(|v| v.into()).collect();
+ builder.append_slice(&converted_buffer[0..values_read])?;
+ Ok(Arc::new(builder.finish()))
+ }
+ }
+}
+
+impl ParquetFile {
+ pub fn open(
+ file: File,
+ projection: Option<Vec<usize>>,
+ batch_size: usize,
+ ) -> Result<Self> {
+ let reader = SerializedFileReader::new(file)?;
+
+ let metadata = reader.metadata();
+ let schema =
+
parquet_to_arrow_schema(metadata.file_metadata().schema_descr_ptr())?;
+
+ // even if we aren't referencing structs or lists in our projection,
column reader
+ // indexes will be off until we have support for nested schemas
+ for i in 0..schema.fields().len() {
+ match schema.field(i).data_type() {
+ DataType::List(_) => {
+ return Err(ExecutionError::NotImplemented(
+ "Parquet datasource does not support LIST".to_string(),
+ ));
+ }
+ DataType::Struct(_) => {
+ return Err(ExecutionError::NotImplemented(
+ "Parquet datasource does not support
STRUCT".to_string(),
+ ));
+ }
+ _ => {}
+ }
+ }
+
+ let projection = match projection {
+ Some(p) => p,
+ None => {
+ let mut p = Vec::with_capacity(schema.fields().len());
+ for i in 0..schema.fields().len() {
+ p.push(i);
+ }
+ p
+ }
+ };
+
+ let projected_schema = schema_projection(&schema, &projection)?;
+
+ Ok(ParquetFile {
+ reader: reader,
+ row_group_index: 0,
+ projection_schema: projected_schema,
+ projection,
+ batch_size,
+ current_row_group: None,
+ column_readers: vec![],
+ })
+ }
+
+ fn load_next_row_group(&mut self) -> Result<()> {
+ if self.row_group_index < self.reader.num_row_groups() {
+ let reader = self.reader.get_row_group(self.row_group_index)?;
+
+ self.column_readers.clear();
+ self.column_readers = Vec::with_capacity(self.projection.len());
+
+ for i in 0..self.projection.len() {
+ self.column_readers
+ .push(reader.get_column_reader(self.projection[i])?);
+ }
+
+ self.current_row_group = Some(reader);
+ self.row_group_index += 1;
+
+ Ok(())
+ } else {
+ Err(ExecutionError::General(
+ "Attempt to read past final row group".to_string(),
+ ))
+ }
+ }
+
+ fn load_batch(&mut self) -> Result<Option<RecordBatch>> {
+ match &self.current_row_group {
+ Some(reader) => {
+ let mut batch: Vec<Arc<Array>> =
Vec::with_capacity(reader.num_columns());
+ for i in 0..self.column_readers.len() {
+ let is_nullable = self.schema().field(i).is_nullable();
+ let array: Arc<Array> = match self.column_readers[i] {
+ ColumnReader::BoolColumnReader(ref mut r) => {
+ ArrowReader::<BooleanType>::read(
+ r,
+ self.batch_size,
+ is_nullable,
+ )?
+ }
+ ColumnReader::Int32ColumnReader(ref mut r) => {
+ ArrowReader::<Int32Type>::read(
+ r,
+ self.batch_size,
+ is_nullable,
+ )?
+ }
+ ColumnReader::Int64ColumnReader(ref mut r) => {
+ ArrowReader::<Int64Type>::read(
+ r,
+ self.batch_size,
+ is_nullable,
+ )?
+ }
+ ColumnReader::Int96ColumnReader(ref mut r) => {
+ let mut read_buffer: Vec<Int96> =
+ vec![Int96::new(); self.batch_size];
+
+ let mut def_levels: Vec<i16> = vec![0;
self.batch_size];
+ let (_, levels_read) = r.read_batch(
+ self.batch_size,
+ Some(&mut def_levels),
+ None,
+ &mut read_buffer,
+ )?;
+
+ let mut builder =
+ TimestampNanosecondBuilder::new(levels_read);
+ let mut value_index = 0;
+ for i in 0..levels_read {
+ if def_levels[i] > 0 {
+
builder.append_value(convert_int96_timestamp(
+ read_buffer[value_index].data(),
+ ))?;
+ value_index += 1;
+ } else {
+ builder.append_null()?;
+ }
+ }
+ Arc::new(builder.finish())
+ }
+ ColumnReader::FloatColumnReader(ref mut r) => {
+ ArrowReader::<Float32Type>::read(
+ r,
+ self.batch_size,
+ is_nullable,
+ )?
+ }
+ ColumnReader::DoubleColumnReader(ref mut r) => {
+ ArrowReader::<Float64Type>::read(
+ r,
+ self.batch_size,
+ is_nullable,
+ )?
+ }
+ ColumnReader::FixedLenByteArrayColumnReader(ref mut r)
=> {
+ read_binary_column!(self, r, i)
+ }
+ ColumnReader::ByteArrayColumnReader(ref mut r) => {
+ read_binary_column!(self, r, i)
+ }
+ };
+
+ batch.push(array);
+ }
+
+ if batch.len() == 0 || batch[0].data().len() == 0 {
+ Ok(None)
+ } else {
+ Ok(Some(RecordBatch::try_new(
+ self.projection_schema.clone(),
+ batch,
+ )?))
+ }
+ }
+ _ => Ok(None),
+ }
+ }
+}
+
+/// Create a new schema by applying a projection to this schema's fields
+fn schema_projection(schema: &Schema, projection: &[usize]) ->
Result<Arc<Schema>> {
+ let mut fields: Vec<Field> = Vec::with_capacity(projection.len());
+ for i in projection {
+ if *i < schema.fields().len() {
+ fields.push(schema.field(*i).clone());
+ } else {
+ return Err(ExecutionError::InvalidColumn(format!(
+ "Invalid column index {} in projection",
+ i
+ )));
+ }
+ }
+ Ok(Arc::new(Schema::new(fields)))
+}
+
+/// convert a Parquet INT96 to an Arrow timestamp in nanoseconds
+fn convert_int96_timestamp(v: &[u32]) -> i64 {
+ const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
+ const SECONDS_PER_DAY: i64 = 86_400;
+ const MILLIS_PER_SECOND: i64 = 1_000;
+
+ let day = v[2] as i64;
+ let nanoseconds = ((v[1] as i64) << 32) + v[0] as i64;
+ let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;
+ seconds * MILLIS_PER_SECOND * 1_000_000 + nanoseconds
+}
+
+impl RecordBatchIterator for ParquetFile {
+ fn schema(&self) -> &Arc<Schema> {
+ &self.projection_schema
+ }
+
+ fn next(&mut self) -> Result<Option<RecordBatch>> {
+ // advance the row group reader if necessary
+ if self.current_row_group.is_none() {
+ self.load_next_row_group()?;
+ self.load_batch()
+ } else {
+ match self.load_batch() {
+ Ok(Some(b)) => Ok(Some(b)),
+ Ok(None) => {
+ if self.row_group_index < self.reader.num_row_groups() {
+ self.load_next_row_group()?;
+ self.load_batch()
+ } else {
+ Ok(None)
+ }
+ }
+ Err(e) => Err(e),
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow::array::{
+ BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
+ TimestampNanosecondArray,
+ };
+ use std::env;
+
+ #[test]
+ fn read_small_batches() {
+ let table = load_table("alltypes_plain.parquet");
+
+ let projection = None;
+ let scan = table.scan(&projection, 2).unwrap();
+ let mut it = scan[0].lock().unwrap();
+
+ let mut count = 0;
+ while let Some(batch) = it.next().unwrap() {
+ assert_eq!(11, batch.num_columns());
+ assert_eq!(2, batch.num_rows());
+ count += 1;
+ }
+
+ // we should have seen 4 batches of 2 rows
+ assert_eq!(4, count);
+ }
+
+ #[test]
+ fn read_alltypes_plain_parquet() {
+ let table = load_table("alltypes_plain.parquet");
+
+ let projection = None;
+ let scan = table.scan(&projection, 1024).unwrap();
+ let mut it = scan[0].lock().unwrap();
+ let batch = it.next().unwrap().unwrap();
+
+ assert_eq!(11, batch.num_columns());
+ assert_eq!(8, batch.num_rows());
+ }
+
+ #[test]
+ fn read_bool_alltypes_plain_parquet() {
+ let table = load_table("alltypes_plain.parquet");
+
+ let projection = Some(vec![1]);
+ let scan = table.scan(&projection, 1024).unwrap();
+ let mut it = scan[0].lock().unwrap();
+ let batch = it.next().unwrap().unwrap();
+
+ assert_eq!(1, batch.num_columns());
+ assert_eq!(8, batch.num_rows());
+
+ let array = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<BooleanArray>()
+ .unwrap();
+ let mut values: Vec<bool> = vec![];
+ for i in 0..batch.num_rows() {
+ values.push(array.value(i));
+ }
+
+ assert_eq!(
+ "[true, false, true, false, true, false, true, false]",
+ format!("{:?}", values)
+ );
+ }
+
+ #[test]
+ fn read_i32_alltypes_plain_parquet() {
+ let table = load_table("alltypes_plain.parquet");
+
+ let projection = Some(vec![0]);
+ let scan = table.scan(&projection, 1024).unwrap();
+ let mut it = scan[0].lock().unwrap();
+ let batch = it.next().unwrap().unwrap();
+
+ assert_eq!(1, batch.num_columns());
+ assert_eq!(8, batch.num_rows());
+
+ let array = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ let mut values: Vec<i32> = vec![];
+ for i in 0..batch.num_rows() {
+ values.push(array.value(i));
+ }
+
+ assert_eq!("[4, 5, 6, 7, 2, 3, 0, 1]", format!("{:?}", values));
+ }
+
+ #[test]
+ fn read_i96_alltypes_plain_parquet() {
+ let table = load_table("alltypes_plain.parquet");
+
+ let projection = Some(vec![10]);
+ let scan = table.scan(&projection, 1024).unwrap();
+ let mut it = scan[0].lock().unwrap();
+ let batch = it.next().unwrap().unwrap();
+
+ assert_eq!(1, batch.num_columns());
+ assert_eq!(8, batch.num_rows());
+
+ let array = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .unwrap();
+ let mut values: Vec<i64> = vec![];
+ for i in 0..batch.num_rows() {
+ values.push(array.value(i));
+ }
+
+ assert_eq!("[1235865600000000000, 1235865660000000000,
1238544000000000000, 1238544060000000000, 1233446400000000000,
1233446460000000000, 1230768000000000000, 1230768060000000000]",
format!("{:?}", values));
+ }
+
+ #[test]
+ fn read_f32_alltypes_plain_parquet() {
+ let table = load_table("alltypes_plain.parquet");
+
+ let projection = Some(vec![6]);
+ let scan = table.scan(&projection, 1024).unwrap();
+ let mut it = scan[0].lock().unwrap();
+ let batch = it.next().unwrap().unwrap();
+
+ assert_eq!(1, batch.num_columns());
+ assert_eq!(8, batch.num_rows());
+
+ let array = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Float32Array>()
+ .unwrap();
+ let mut values: Vec<f32> = vec![];
+ for i in 0..batch.num_rows() {
+ values.push(array.value(i));
+ }
+
+ assert_eq!(
+ "[0.0, 1.1, 0.0, 1.1, 0.0, 1.1, 0.0, 1.1]",
+ format!("{:?}", values)
+ );
+ }
+
+ #[test]
+ fn read_f64_alltypes_plain_parquet() {
+ let table = load_table("alltypes_plain.parquet");
+
+ let projection = Some(vec![7]);
+ let scan = table.scan(&projection, 1024).unwrap();
+ let mut it = scan[0].lock().unwrap();
+ let batch = it.next().unwrap().unwrap();
+
+ assert_eq!(1, batch.num_columns());
+ assert_eq!(8, batch.num_rows());
+
+ let array = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .unwrap();
+ let mut values: Vec<f64> = vec![];
+ for i in 0..batch.num_rows() {
+ values.push(array.value(i));
+ }
+
+ assert_eq!(
+ "[0.0, 10.1, 0.0, 10.1, 0.0, 10.1, 0.0, 10.1]",
+ format!("{:?}", values)
+ );
+ }
+
+ #[test]
+ fn read_utf8_alltypes_plain_parquet() {
+ let table = load_table("alltypes_plain.parquet");
+
+ let projection = Some(vec![9]);
+ let scan = table.scan(&projection, 1024).unwrap();
+ let mut it = scan[0].lock().unwrap();
+ let batch = it.next().unwrap().unwrap();
+
+ assert_eq!(1, batch.num_columns());
+ assert_eq!(8, batch.num_rows());
+
+ let array = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<BinaryArray>()
+ .unwrap();
+ let mut values: Vec<String> = vec![];
+ for i in 0..batch.num_rows() {
+ let str: String =
String::from_utf8(array.value(i).to_vec()).unwrap();
+ values.push(str);
+ }
+
+ assert_eq!(
+ "[\"0\", \"1\", \"0\", \"1\", \"0\", \"1\", \"0\", \"1\"]",
+ format!("{:?}", values)
+ );
+ }
+
+ fn load_table(name: &str) -> Box<Table> {
+ let testdata = env::var("PARQUET_TEST_DATA").unwrap();
+ let filename = format!("{}/{}", testdata, name);
+ let table = ParquetTable::try_new(&filename).unwrap();
+ Box::new(table)
+ }
+}
diff --git a/rust/datafusion/src/execution/error.rs
b/rust/datafusion/src/execution/error.rs
index 5b8d04d..92ce6d9 100644
--- a/rust/datafusion/src/execution/error.rs
+++ b/rust/datafusion/src/execution/error.rs
@@ -21,6 +21,7 @@ use std::io::Error;
use std::result;
use arrow::error::ArrowError;
+use parquet::errors::ParquetError;
use sqlparser::sqlparser::ParserError;
@@ -35,6 +36,7 @@ pub enum ExecutionError {
NotImplemented(String),
InternalError(String),
ArrowError(ArrowError),
+ ParquetError(ParquetError),
ExecutionError(String),
}
@@ -62,6 +64,12 @@ impl From<ArrowError> for ExecutionError {
}
}
+impl From<ParquetError> for ExecutionError {
+ fn from(e: ParquetError) -> Self {
+ ExecutionError::ParquetError(e)
+ }
+}
+
impl From<ParserError> for ExecutionError {
fn from(e: ParserError) -> Self {
ExecutionError::ParserError(e)
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index 954b2ee..9c24a50 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -16,6 +16,7 @@
// under the License.
use std::cell::RefCell;
+use std::env;
use std::rc::Rc;
use std::sync::Arc;
@@ -25,12 +26,27 @@ extern crate datafusion;
use arrow::array::*;
use arrow::datatypes::{DataType, Field, Schema};
+use datafusion::datasource::parquet::ParquetTable;
+use datafusion::datasource::Table;
use datafusion::execution::context::ExecutionContext;
use datafusion::execution::relation::Relation;
const DEFAULT_BATCH_SIZE: usize = 1024 * 1024;
#[test]
+fn parquet_query() {
+ let mut ctx = ExecutionContext::new();
+ ctx.register_table(
+ "alltypes_plain",
+ load_parquet_table("alltypes_plain.parquet"),
+ );
+ let sql = "SELECT id, string_col FROM alltypes_plain";
+ let actual = execute(&mut ctx, sql);
+ let expected =
"4\t\"0\"\n5\t\"1\"\n6\t\"0\"\n7\t\"1\"\n2\t\"0\"\n3\t\"1\"\n0\t\"0\"\n1\t\"1\"\n".to_string();
+ assert_eq!(expected, actual);
+}
+
+#[test]
fn csv_query_with_predicate() {
let mut ctx = ExecutionContext::new();
register_aggregate_csv(&mut ctx);
@@ -163,9 +179,17 @@ fn register_csv(
ctx.register_csv(name, filename, &schema, true);
}
+fn load_parquet_table(name: &str) -> Rc<Table> {
+ let testdata = env::var("PARQUET_TEST_DATA").unwrap();
+ let filename = format!("{}/{}", testdata, name);
+ let table = ParquetTable::try_new(&filename).unwrap();
+ Rc::new(table)
+}
+
/// Execute query and return result set as tab delimited string
fn execute(ctx: &mut ExecutionContext, sql: &str) -> String {
- let results = ctx.sql(&sql, DEFAULT_BATCH_SIZE).unwrap();
+ let plan = ctx.create_logical_plan(&sql).unwrap();
+ let results = ctx.execute(&plan, DEFAULT_BATCH_SIZE).unwrap();
result_str(&results)
}
diff --git a/rust/parquet/src/reader/schema.rs
b/rust/parquet/src/reader/schema.rs
index 34276a2..5af07be 100644
--- a/rust/parquet/src/reader/schema.rs
+++ b/rust/parquet/src/reader/schema.rs
@@ -28,7 +28,8 @@ use crate::basic::{LogicalType, Repetition, Type as
PhysicalType};
use crate::errors::{ParquetError::ArrowError, Result};
use crate::schema::types::{SchemaDescPtr, Type, TypePtr};
-use arrow::datatypes::{DataType, Field, Schema};
+use arrow::datatypes::TimeUnit;
+use arrow::datatypes::{DataType, DateUnit, Field, Schema};
/// Convert parquet schema to arrow schema.
pub fn parquet_to_arrow_schema(parquet_schema: SchemaDescPtr) ->
Result<Schema> {
@@ -175,19 +176,20 @@ impl ParquetTypeConverter {
fn to_primitive_type_inner(&self) -> Result<DataType> {
match self.schema.get_physical_type() {
PhysicalType::BOOLEAN => Ok(DataType::Boolean),
- PhysicalType::INT32 => self.to_int32(),
- PhysicalType::INT64 => self.to_int64(),
+ PhysicalType::INT32 => self.from_int32(),
+ PhysicalType::INT64 => self.from_int64(),
+ PhysicalType::INT96 =>
Ok(DataType::Timestamp(TimeUnit::Nanosecond)),
PhysicalType::FLOAT => Ok(DataType::Float32),
PhysicalType::DOUBLE => Ok(DataType::Float64),
- PhysicalType::BYTE_ARRAY => self.to_byte_array(),
+ PhysicalType::BYTE_ARRAY => self.from_byte_array(),
other => Err(ArrowError(format!(
- "Unable to convert parquet type {}",
+ "Unable to convert parquet physical type {}",
other
))),
}
}
- fn to_int32(&self) -> Result<DataType> {
+ fn from_int32(&self) -> Result<DataType> {
match self.schema.get_basic_info().logical_type() {
LogicalType::NONE => Ok(DataType::Int32),
LogicalType::UINT_8 => Ok(DataType::UInt8),
@@ -196,30 +198,40 @@ impl ParquetTypeConverter {
LogicalType::INT_8 => Ok(DataType::Int8),
LogicalType::INT_16 => Ok(DataType::Int16),
LogicalType::INT_32 => Ok(DataType::Int32),
+ LogicalType::DATE => Ok(DataType::Date32(DateUnit::Millisecond)),
+ LogicalType::TIME_MILLIS =>
Ok(DataType::Time32(TimeUnit::Millisecond)),
other => Err(ArrowError(format!(
- "Unable to convert parquet logical type {}",
+ "Unable to convert parquet INT32 logical type {}",
other
))),
}
}
- fn to_int64(&self) -> Result<DataType> {
+ fn from_int64(&self) -> Result<DataType> {
match self.schema.get_basic_info().logical_type() {
LogicalType::NONE => Ok(DataType::Int64),
LogicalType::INT_64 => Ok(DataType::Int64),
LogicalType::UINT_64 => Ok(DataType::UInt64),
+ LogicalType::TIME_MICROS =>
Ok(DataType::Time64(TimeUnit::Microsecond)),
+ LogicalType::TIMESTAMP_MICROS => {
+ Ok(DataType::Timestamp(TimeUnit::Microsecond))
+ }
+ LogicalType::TIMESTAMP_MILLIS => {
+ Ok(DataType::Timestamp(TimeUnit::Millisecond))
+ }
other => Err(ArrowError(format!(
- "Unable to convert parquet logical type {}",
+ "Unable to convert parquet INT64 logical type {}",
other
))),
}
}
- fn to_byte_array(&self) -> Result<DataType> {
+ fn from_byte_array(&self) -> Result<DataType> {
match self.schema.get_basic_info().logical_type() {
+ LogicalType::NONE => Ok(DataType::Utf8),
LogicalType::UTF8 => Ok(DataType::Utf8),
other => Err(ArrowError(format!(
- "Unable to convert parquet logical type {}",
+ "Unable to convert parquet BYTE_ARRAY logical type {}",
other
))),
}