This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new bf85046 Split ArrayReaderBuilder into its own module (#1483) (#1485)
bf85046 is described below
commit bf8504637e48617edc082edd553c3d9450853f6d
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Mar 25 12:55:36 2022 +0000
Split ArrayReaderBuilder into its own module (#1483) (#1485)
* Split ArrayReaderBuilder into its own module (#1483)
* Add license header
---
parquet/src/arrow/array_reader.rs | 736 +----------------------------
parquet/src/arrow/array_reader/builder.rs | 752 ++++++++++++++++++++++++++++++
2 files changed, 764 insertions(+), 724 deletions(-)
diff --git a/parquet/src/arrow/array_reader.rs
b/parquet/src/arrow/array_reader.rs
index a26e60c..9c016e7 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -17,7 +17,6 @@
use std::any::Any;
use std::cmp::{max, min};
-use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use std::mem::size_of;
use std::result::Result::Ok;
@@ -38,11 +37,10 @@ use arrow::datatypes::{
DurationMicrosecondType as ArrowDurationMicrosecondType,
DurationMillisecondType as ArrowDurationMillisecondType,
DurationNanosecondType as ArrowDurationNanosecondType,
- DurationSecondType as ArrowDurationSecondType, Field,
- Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type,
- Int16Type as ArrowInt16Type, Int32Type as ArrowInt32Type,
- Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, IntervalUnit,
Schema,
- SchemaRef, Time32MillisecondType as ArrowTime32MillisecondType,
+ DurationSecondType as ArrowDurationSecondType, Float32Type as
ArrowFloat32Type,
+ Float64Type as ArrowFloat64Type, Int16Type as ArrowInt16Type,
+ Int32Type as ArrowInt32Type, Int64Type as ArrowInt64Type, Int8Type as
ArrowInt8Type,
+ Time32MillisecondType as ArrowTime32MillisecondType,
Time32SecondType as ArrowTime32SecondType,
Time64MicrosecondType as ArrowTime64MicrosecondType,
Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit as
ArrowTimeUnit,
@@ -55,30 +53,20 @@ use arrow::datatypes::{
};
use arrow::util::bit_util;
-use crate::arrow::converter::{
- Converter, DecimalArrayConverter, DecimalConverter,
FixedLenBinaryConverter,
- FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter,
- IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
- IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
-};
+use crate::arrow::converter::Converter;
use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
use crate::arrow::schema::parquet_to_arrow_field;
-use crate::basic::{ConvertedType, Repetition, Type as PhysicalType};
+use crate::basic::Type as PhysicalType;
use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::column::reader::ColumnReaderImpl;
-use crate::data_type::{
- BoolType, DataType, DoubleType, FixedLenByteArrayType, FloatType,
Int32Type,
- Int64Type, Int96Type,
-};
+use crate::data_type::DataType;
use crate::errors::{ParquetError, ParquetError::ArrowError, Result};
use crate::file::reader::{FilePageIterator, FileReader};
-use crate::schema::types::{
- ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr,
-};
-use crate::schema::visitor::TypeVisitor;
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
+mod builder;
mod byte_array;
mod byte_array_dictionary;
mod dictionary_buffer;
@@ -87,6 +75,8 @@ mod offset_buffer;
#[cfg(test)]
mod test_util;
+pub use builder::build_array_reader;
+
pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;
@@ -1332,675 +1322,6 @@ impl ArrayReader for StructArrayReader {
}
}
-/// Create array reader from parquet schema, column indices, and parquet file
reader.
-pub fn build_array_reader<T>(
- parquet_schema: SchemaDescPtr,
- arrow_schema: SchemaRef,
- column_indices: T,
- row_groups: Box<dyn RowGroupCollection>,
-) -> Result<Box<dyn ArrayReader>>
-where
- T: IntoIterator<Item = usize>,
-{
- let mut leaves = HashMap::<*const Type, usize>::new();
-
- let mut filtered_root_names = HashSet::<String>::new();
-
- for c in column_indices {
- let column = parquet_schema.column(c).self_type() as *const Type;
-
- leaves.insert(column, c);
-
- let root = parquet_schema.get_column_root_ptr(c);
- filtered_root_names.insert(root.name().to_string());
- }
-
- if leaves.is_empty() {
- return Err(general_err!("Can't build array reader without columns!"));
- }
-
- // Only pass root fields that take part in the projection
- // to avoid traversal of columns that are not read.
- // TODO: also prune unread parts of the tree in child structures
- let filtered_root_fields = parquet_schema
- .root_schema()
- .get_fields()
- .iter()
- .filter(|field| filtered_root_names.contains(field.name()))
- .cloned()
- .collect::<Vec<_>>();
-
- let proj = Type::GroupType {
- basic_info: parquet_schema.root_schema().get_basic_info().clone(),
- fields: filtered_root_fields,
- };
-
- ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves),
row_groups)
- .build_array_reader()
-}
-
-/// Used to build array reader.
-struct ArrayReaderBuilder {
- root_schema: TypePtr,
- arrow_schema: Arc<Schema>,
- // Key: columns that need to be included in final array builder
- // Value: column index in schema
- columns_included: Arc<HashMap<*const Type, usize>>,
- row_groups: Box<dyn RowGroupCollection>,
-}
-
-/// Used in type visitor.
-#[derive(Clone)]
-struct ArrayReaderBuilderContext {
- def_level: i16,
- rep_level: i16,
- path: ColumnPath,
-}
-
-impl Default for ArrayReaderBuilderContext {
- fn default() -> Self {
- Self {
- def_level: 0i16,
- rep_level: 0i16,
- path: ColumnPath::new(Vec::new()),
- }
- }
-}
-
-/// Create array reader by visiting schema.
-impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a
ArrayReaderBuilderContext>
- for ArrayReaderBuilder
-{
- /// Build array reader for primitive type.
- fn visit_primitive(
- &mut self,
- cur_type: TypePtr,
- context: &'a ArrayReaderBuilderContext,
- ) -> Result<Option<Box<dyn ArrayReader>>> {
- if self.is_included(cur_type.as_ref()) {
- let mut new_context = context.clone();
- new_context.path.append(vec![cur_type.name().to_string()]);
-
- let null_mask_only = match cur_type.get_basic_info().repetition() {
- Repetition::REPEATED => {
- new_context.def_level += 1;
- new_context.rep_level += 1;
- false
- }
- Repetition::OPTIONAL => {
- new_context.def_level += 1;
-
- // Can just compute null mask if no parent
- context.def_level == 0 && context.rep_level == 0
- }
- _ => false,
- };
-
- let reader = self.build_for_primitive_type_inner(
- cur_type.clone(),
- &new_context,
- null_mask_only,
- )?;
-
- if cur_type.get_basic_info().repetition() == Repetition::REPEATED {
- Err(ArrowError(
- "Reading repeated field is not supported yet!".to_string(),
- ))
- } else {
- Ok(Some(reader))
- }
- } else {
- Ok(None)
- }
- }
-
- /// Build array reader for struct type.
- fn visit_struct(
- &mut self,
- cur_type: Arc<Type>,
- context: &'a ArrayReaderBuilderContext,
- ) -> Result<Option<Box<dyn ArrayReader>>> {
- let mut new_context = context.clone();
- new_context.path.append(vec![cur_type.name().to_string()]);
-
- if cur_type.get_basic_info().has_repetition() {
- match cur_type.get_basic_info().repetition() {
- Repetition::REPEATED => {
- new_context.def_level += 1;
- new_context.rep_level += 1;
- }
- Repetition::OPTIONAL => {
- new_context.def_level += 1;
- }
- _ => (),
- }
- }
-
- if let Some(reader) = self.build_for_struct_type_inner(&cur_type,
&new_context)? {
- if cur_type.get_basic_info().has_repetition()
- && cur_type.get_basic_info().repetition() ==
Repetition::REPEATED
- {
- Err(ArrowError(
- "Reading repeated field is not supported yet!".to_string(),
- ))
- } else {
- Ok(Some(reader))
- }
- } else {
- Ok(None)
- }
- }
-
- /// Build array reader for map type.
- fn visit_map(
- &mut self,
- map_type: Arc<Type>,
- context: &'a ArrayReaderBuilderContext,
- ) -> Result<Option<Box<dyn ArrayReader>>> {
- // Add map type to context
- let mut new_context = context.clone();
- new_context.path.append(vec![map_type.name().to_string()]);
- if let Repetition::OPTIONAL = map_type.get_basic_info().repetition() {
- new_context.def_level += 1;
- }
-
- // Add map entry (key_value) to context
- let map_key_value = map_type.get_fields().first().ok_or_else(|| {
- ArrowError("Map field must have a key_value entry".to_string())
- })?;
- new_context
- .path
- .append(vec![map_key_value.name().to_string()]);
- new_context.rep_level += 1;
-
- // Get key and value, and create context for each
- let map_key = map_key_value
- .get_fields()
- .first()
- .ok_or_else(|| ArrowError("Map entry must have a
key".to_string()))?;
- let map_value = map_key_value
- .get_fields()
- .get(1)
- .ok_or_else(|| ArrowError("Map entry must have a
value".to_string()))?;
-
- let key_reader = {
- let mut key_context = new_context.clone();
- key_context.def_level += 1;
- key_context.path.append(vec![map_key.name().to_string()]);
- self.dispatch(map_key.clone(), &key_context)?.unwrap()
- };
- let value_reader = {
- let mut value_context = new_context.clone();
- if let Repetition::OPTIONAL =
map_value.get_basic_info().repetition() {
- value_context.def_level += 1;
- }
- self.dispatch(map_value.clone(), &value_context)?.unwrap()
- };
-
- let arrow_type = self
- .arrow_schema
- .field_with_name(map_type.name())
- .ok()
- .map(|f| f.data_type().to_owned())
- .unwrap_or_else(|| {
- ArrowType::Map(
- Box::new(Field::new(
- map_key_value.name(),
- ArrowType::Struct(vec![
- Field::new(
- map_key.name(),
- key_reader.get_data_type().clone(),
- false,
- ),
- Field::new(
- map_value.name(),
- value_reader.get_data_type().clone(),
- map_value.is_optional(),
- ),
- ]),
- map_type.is_optional(),
- )),
- false,
- )
- });
-
- let key_array_reader: Box<dyn ArrayReader> =
Box::new(MapArrayReader::new(
- key_reader,
- value_reader,
- arrow_type,
- new_context.def_level,
- new_context.rep_level,
- ));
-
- Ok(Some(key_array_reader))
- }
-
- /// Build array reader for list type.
- fn visit_list_with_item(
- &mut self,
- list_type: Arc<Type>,
- item_type: Arc<Type>,
- context: &'a ArrayReaderBuilderContext,
- ) -> Result<Option<Box<dyn ArrayReader>>> {
- let mut list_child = &list_type
- .get_fields()
- .first()
- .ok_or_else(|| ArrowError("List field must have a
child.".to_string()))?
- .clone();
- let mut new_context = context.clone();
-
- new_context.path.append(vec![list_type.name().to_string()]);
- // We need to know at what definition a list or its child is null
- let list_null_def = new_context.def_level;
- let mut list_empty_def = new_context.def_level;
-
- // If the list's root is nullable
- if let Repetition::OPTIONAL = list_type.get_basic_info().repetition() {
- new_context.def_level += 1;
- // current level is nullable, increment to get level for empty
list slot
- list_empty_def += 1;
- }
-
- match list_child.get_basic_info().repetition() {
- Repetition::REPEATED => {
- new_context.def_level += 1;
- new_context.rep_level += 1;
- }
- Repetition::OPTIONAL => {
- new_context.def_level += 1;
- }
- _ => (),
- }
-
- let item_reader = self
- .dispatch(item_type.clone(), &new_context)
- .unwrap()
- .unwrap();
-
- let item_reader_type = item_reader.get_data_type().clone();
-
- match item_reader_type {
- ArrowType::List(_)
- | ArrowType::FixedSizeList(_, _)
- | ArrowType::Dictionary(_, _) => Err(ArrowError(format!(
- "reading List({:?}) into arrow not supported yet",
- item_type
- ))),
- _ => {
- // a list is a group type with a single child. The list child's
- // name comes from the child's field name.
- // if the child's name is "list" and it has a child, then use
this child
- if list_child.name() == "list" &&
!list_child.get_fields().is_empty() {
- list_child = list_child.get_fields().first().unwrap();
- }
- let arrow_type = self
- .arrow_schema
- .field_with_name(list_type.name())
- .ok()
- .map(|f| f.data_type().to_owned())
- .unwrap_or_else(|| {
- ArrowType::List(Box::new(Field::new(
- list_child.name(),
- item_reader_type.clone(),
- list_child.is_optional(),
- )))
- });
-
- let list_array_reader: Box<dyn ArrayReader> = match arrow_type
{
- ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
- item_reader,
- arrow_type,
- item_reader_type,
- new_context.def_level,
- new_context.rep_level,
- list_null_def,
- list_empty_def,
- )),
- ArrowType::LargeList(_) =>
Box::new(ListArrayReader::<i64>::new(
- item_reader,
- arrow_type,
- item_reader_type,
- new_context.def_level,
- new_context.rep_level,
- list_null_def,
- list_empty_def,
- )),
-
- _ => {
- return Err(ArrowError(format!(
- "creating ListArrayReader with type {:?} should be
unreachable",
- arrow_type
- )))
- }
- };
-
- Ok(Some(list_array_reader))
- }
- }
- }
-}
-
-impl<'a> ArrayReaderBuilder {
- /// Construct array reader builder.
- fn new(
- root_schema: TypePtr,
- arrow_schema: Arc<Schema>,
- columns_included: Arc<HashMap<*const Type, usize>>,
- file_reader: Box<dyn RowGroupCollection>,
- ) -> Self {
- Self {
- root_schema,
- arrow_schema,
- columns_included,
- row_groups: file_reader,
- }
- }
-
- /// Main entry point.
- fn build_array_reader(&mut self) -> Result<Box<dyn ArrayReader>> {
- let context = ArrayReaderBuilderContext::default();
-
- self.visit_struct(self.root_schema.clone(), &context)
- .and_then(|reader_opt| {
- reader_opt.ok_or_else(|| general_err!("Failed to build array
reader!"))
- })
- }
-
- // Utility functions
-
- /// Check whether one column in included in this array reader builder.
- fn is_included(&self, t: &Type) -> bool {
- self.columns_included.contains_key(&(t as *const Type))
- }
-
- /// Creates primitive array reader for each primitive type.
- fn build_for_primitive_type_inner(
- &self,
- cur_type: TypePtr,
- context: &'a ArrayReaderBuilderContext,
- null_mask_only: bool,
- ) -> Result<Box<dyn ArrayReader>> {
- let column_desc = Arc::new(ColumnDescriptor::new(
- cur_type.clone(),
- context.def_level,
- context.rep_level,
- context.path.clone(),
- ));
-
- let page_iterator = self
- .row_groups
- .column_chunks(self.columns_included[&(cur_type.as_ref() as *const
Type)])?;
-
- let arrow_type: Option<ArrowType> = self
- .get_arrow_field(&cur_type, context)
- .map(|f| f.data_type().clone());
-
- match cur_type.get_physical_type() {
- PhysicalType::BOOLEAN => Ok(Box::new(
- PrimitiveArrayReader::<BoolType>::new_with_options(
- page_iterator,
- column_desc,
- arrow_type,
- null_mask_only,
- )?,
- )),
- PhysicalType::INT32 => {
- if let Some(ArrowType::Null) = arrow_type {
- Ok(Box::new(NullArrayReader::<Int32Type>::new(
- page_iterator,
- column_desc,
- )?))
- } else {
- Ok(Box::new(
- PrimitiveArrayReader::<Int32Type>::new_with_options(
- page_iterator,
- column_desc,
- arrow_type,
- null_mask_only,
- )?,
- ))
- }
- }
- PhysicalType::INT64 => Ok(Box::new(
- PrimitiveArrayReader::<Int64Type>::new_with_options(
- page_iterator,
- column_desc,
- arrow_type,
- null_mask_only,
- )?,
- )),
- PhysicalType::INT96 => {
- // get the optional timezone information from arrow type
- let timezone = arrow_type.as_ref().and_then(|data_type| {
- if let ArrowType::Timestamp(_, tz) = data_type {
- tz.clone()
- } else {
- None
- }
- });
- let converter = Int96Converter::new(Int96ArrayConverter {
timezone });
- Ok(Box::new(ComplexObjectArrayReader::<
- Int96Type,
- Int96Converter,
- >::new(
- page_iterator,
- column_desc,
- converter,
- arrow_type,
- )?))
- }
- PhysicalType::FLOAT => Ok(Box::new(
- PrimitiveArrayReader::<FloatType>::new_with_options(
- page_iterator,
- column_desc,
- arrow_type,
- null_mask_only,
- )?,
- )),
- PhysicalType::DOUBLE => Ok(Box::new(
- PrimitiveArrayReader::<DoubleType>::new_with_options(
- page_iterator,
- column_desc,
- arrow_type,
- null_mask_only,
- )?,
- )),
- PhysicalType::BYTE_ARRAY => match arrow_type {
- Some(ArrowType::Dictionary(_, _)) =>
make_byte_array_dictionary_reader(
- page_iterator,
- column_desc,
- arrow_type,
- null_mask_only,
- ),
- _ => make_byte_array_reader(
- page_iterator,
- column_desc,
- arrow_type,
- null_mask_only,
- ),
- },
- PhysicalType::FIXED_LEN_BYTE_ARRAY
- if cur_type.get_basic_info().converted_type()
- == ConvertedType::DECIMAL =>
- {
- let converter =
DecimalConverter::new(DecimalArrayConverter::new(
- cur_type.get_precision(),
- cur_type.get_scale(),
- ));
- Ok(Box::new(ComplexObjectArrayReader::<
- FixedLenByteArrayType,
- DecimalConverter,
- >::new(
- page_iterator,
- column_desc,
- converter,
- arrow_type,
- )?))
- }
- PhysicalType::FIXED_LEN_BYTE_ARRAY => {
- let byte_width = match *cur_type {
- Type::PrimitiveType {
- ref type_length, ..
- } => *type_length,
- _ => {
- return Err(ArrowError(
- "Expected a physical type, not a group
type".to_string(),
- ))
- }
- };
- if cur_type.get_basic_info().converted_type() ==
ConvertedType::INTERVAL {
- if byte_width != 12 {
- return Err(ArrowError(format!(
- "Parquet interval type should have length of 12,
found {}",
- byte_width
- )));
- }
- match arrow_type {
- Some(ArrowType::Interval(IntervalUnit::DayTime)) => {
- let converter = IntervalDayTimeConverter::new(
- IntervalDayTimeArrayConverter {},
- );
- Ok(Box::new(ComplexObjectArrayReader::<
- FixedLenByteArrayType,
- IntervalDayTimeConverter,
- >::new(
- page_iterator,
- column_desc,
- converter,
- arrow_type,
- )?))
- }
- Some(ArrowType::Interval(IntervalUnit::YearMonth)) => {
- let converter = IntervalYearMonthConverter::new(
- IntervalYearMonthArrayConverter {},
- );
- Ok(Box::new(ComplexObjectArrayReader::<
- FixedLenByteArrayType,
- IntervalYearMonthConverter,
- >::new(
- page_iterator,
- column_desc,
- converter,
- arrow_type,
- )?))
- }
- Some(t) => Err(ArrowError(format!(
- "Cannot write a Parquet interval to {:?}",
- t
- ))),
- None => {
- // we do not support an interval not matched to an
Arrow type,
- // because we risk data loss as we won't know
which of the 12 bytes
- // are or should be populated
- Err(ArrowError(
- "Cannot write a Parquet interval with no Arrow
type specified.
- There is a risk of data loss as Arrow either
supports YearMonth or
- DayTime precision. Without the Arrow type, we
cannot infer the type.
- ".to_string()
- ))
- }
- }
- } else {
- let converter = FixedLenBinaryConverter::new(
- FixedSizeArrayConverter::new(byte_width),
- );
- Ok(Box::new(ComplexObjectArrayReader::<
- FixedLenByteArrayType,
- FixedLenBinaryConverter,
- >::new(
- page_iterator,
- column_desc,
- converter,
- arrow_type,
- )?))
- }
- }
- }
- }
-
- /// Constructs struct array reader without considering repetition.
- fn build_for_struct_type_inner(
- &mut self,
- cur_type: &Type,
- context: &'a ArrayReaderBuilderContext,
- ) -> Result<Option<Box<dyn ArrayReader>>> {
- let mut fields = Vec::with_capacity(cur_type.get_fields().len());
- let mut children_reader =
Vec::with_capacity(cur_type.get_fields().len());
-
- for child in cur_type.get_fields() {
- let mut struct_context = context.clone();
- if let Some(child_reader) = self.dispatch(child.clone(), context)?
{
- // TODO: this results in calling get_arrow_field twice, it
could be reused
- // from child_reader above, by making child_reader carry its
`Field`
- struct_context.path.append(vec![child.name().to_string()]);
- let field = match self.get_arrow_field(child, &struct_context)
{
- Some(f) => f.clone(),
- _ => Field::new(
- child.name(),
- child_reader.get_data_type().clone(),
- child.is_optional(),
- ),
- };
- fields.push(field);
- children_reader.push(child_reader);
- }
- }
-
- if !fields.is_empty() {
- let arrow_type = ArrowType::Struct(fields);
- Ok(Some(Box::new(StructArrayReader::new(
- arrow_type,
- children_reader,
- context.def_level,
- context.rep_level,
- ))))
- } else {
- Ok(None)
- }
- }
-
- fn get_arrow_field(
- &self,
- cur_type: &Type,
- context: &'a ArrayReaderBuilderContext,
- ) -> Option<&Field> {
- let parts: Vec<&str> = context
- .path
- .parts()
- .iter()
- .map(|x| -> &str { x })
- .collect::<Vec<&str>>();
-
- // If the parts length is one it'll have the top level "schema" type.
If
- // it's two then it'll be a top-level type that we can get from the
arrow
- // schema directly.
- if parts.len() <= 2 {
- self.arrow_schema.field_with_name(cur_type.name()).ok()
- } else {
- // If it's greater than two then we need to traverse the type path
- // until we find the actual field we're looking for.
- let mut field: Option<&Field> = None;
-
- for (i, part) in parts.iter().enumerate().skip(1) {
- if i == 1 {
- field = self.arrow_schema.field_with_name(part).ok();
- } else if let Some(f) = field {
- match f.data_type() {
- ArrowType::Struct(fields) => {
- field = fields.iter().find(|f| f.name() == part)
- }
- ArrowType::List(list_field) => field =
Some(list_field.as_ref()),
- _ => field = None,
- }
- } else {
- field = None;
- }
- }
- field
- }
- }
-}
-
#[cfg(test)]
mod tests {
use std::any::Any;
@@ -2024,18 +1345,16 @@ mod tests {
};
use crate::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
- use crate::arrow::schema::parquet_to_arrow_schema;
use crate::basic::{Encoding, Type as PhysicalType};
use crate::column::page::{Page, PageReader};
use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type,
Int64Type};
use crate::errors::Result;
- use crate::file::reader::{FileReader, SerializedFileReader};
use crate::schema::parser::parse_message_type;
use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
+ use crate::util::test_common::make_pages;
use crate::util::test_common::page_util::{
DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator,
};
- use crate::util::test_common::{get_test_file, make_pages};
use super::*;
@@ -2839,37 +2158,6 @@ mod tests {
}
#[test]
- fn test_create_array_reader() {
- let file = get_test_file("nulls.snappy.parquet");
- let file_reader: Arc<dyn FileReader> =
- Arc::new(SerializedFileReader::new(file).unwrap());
-
- let file_metadata = file_reader.metadata().file_metadata();
- let arrow_schema = parquet_to_arrow_schema(
- file_metadata.schema_descr(),
- file_metadata.key_value_metadata(),
- )
- .unwrap();
-
- let array_reader = build_array_reader(
- file_reader.metadata().file_metadata().schema_descr_ptr(),
- Arc::new(arrow_schema),
- vec![0usize].into_iter(),
- Box::new(file_reader),
- )
- .unwrap();
-
- // Create arrow types
- let arrow_type = ArrowType::Struct(vec![Field::new(
- "b_struct",
- ArrowType::Struct(vec![Field::new("b_c_int", ArrowType::Int32,
true)]),
- true,
- )]);
-
- assert_eq!(array_reader.get_data_type(), &arrow_type);
- }
-
- #[test]
fn test_list_array_reader() {
// [[1, null, 2], null, [3, 4]]
let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
diff --git a/parquet/src/arrow/array_reader/builder.rs
b/parquet/src/arrow/array_reader/builder.rs
new file mode 100644
index 0000000..309919f
--- /dev/null
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -0,0 +1,752 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+use arrow::datatypes::{DataType as ArrowType, Field, IntervalUnit, Schema,
SchemaRef};
+
+use crate::arrow::array_reader::{
+ make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
+ ComplexObjectArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
+ PrimitiveArrayReader, RowGroupCollection, StructArrayReader,
+};
+use crate::arrow::converter::{
+ DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter,
+ FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter,
+ IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
+ IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
+};
+use crate::basic::{ConvertedType, Repetition, Type as PhysicalType};
+use crate::data_type::{
+ BoolType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type,
Int64Type,
+ Int96Type,
+};
+use crate::errors::ParquetError::ArrowError;
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type,
TypePtr};
+use crate::schema::visitor::TypeVisitor;
+
+/// Create array reader from parquet schema, column indices, and parquet file
reader.
+pub fn build_array_reader<T>(
+ parquet_schema: SchemaDescPtr,
+ arrow_schema: SchemaRef,
+ column_indices: T,
+ row_groups: Box<dyn RowGroupCollection>,
+) -> Result<Box<dyn ArrayReader>>
+where
+ T: IntoIterator<Item = usize>,
+{
+ let mut leaves = HashMap::<*const Type, usize>::new();
+
+ let mut filtered_root_names = HashSet::<String>::new();
+
+ for c in column_indices {
+ let column = parquet_schema.column(c).self_type() as *const Type;
+
+ leaves.insert(column, c);
+
+ let root = parquet_schema.get_column_root_ptr(c);
+ filtered_root_names.insert(root.name().to_string());
+ }
+
+ if leaves.is_empty() {
+ return Err(general_err!("Can't build array reader without columns!"));
+ }
+
+ // Only pass root fields that take part in the projection
+ // to avoid traversal of columns that are not read.
+ // TODO: also prune unread parts of the tree in child structures
+ let filtered_root_fields = parquet_schema
+ .root_schema()
+ .get_fields()
+ .iter()
+ .filter(|field| filtered_root_names.contains(field.name()))
+ .cloned()
+ .collect::<Vec<_>>();
+
+ let proj = Type::GroupType {
+ basic_info: parquet_schema.root_schema().get_basic_info().clone(),
+ fields: filtered_root_fields,
+ };
+
+ ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves),
row_groups)
+ .build_array_reader()
+}
+
+/// Used to build array reader.
+struct ArrayReaderBuilder {
+ root_schema: TypePtr,
+ arrow_schema: Arc<Schema>,
+ // Key: columns that need to be included in final array builder
+ // Value: column index in schema
+ columns_included: Arc<HashMap<*const Type, usize>>,
+ row_groups: Box<dyn RowGroupCollection>,
+}
+
+/// Used in type visitor.
+#[derive(Clone)]
+struct ArrayReaderBuilderContext {
+ def_level: i16,
+ rep_level: i16,
+ path: ColumnPath,
+}
+
+impl Default for ArrayReaderBuilderContext {
+ fn default() -> Self {
+ Self {
+ def_level: 0i16,
+ rep_level: 0i16,
+ path: ColumnPath::new(Vec::new()),
+ }
+ }
+}
+
+/// Create array reader by visiting schema.
+impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a
ArrayReaderBuilderContext>
+ for ArrayReaderBuilder
+{
+ /// Build array reader for primitive type.
+ fn visit_primitive(
+ &mut self,
+ cur_type: TypePtr,
+ context: &'a ArrayReaderBuilderContext,
+ ) -> Result<Option<Box<dyn ArrayReader>>> {
+ if self.is_included(cur_type.as_ref()) {
+ let mut new_context = context.clone();
+ new_context.path.append(vec![cur_type.name().to_string()]);
+
+ let null_mask_only = match cur_type.get_basic_info().repetition() {
+ Repetition::REPEATED => {
+ new_context.def_level += 1;
+ new_context.rep_level += 1;
+ false
+ }
+ Repetition::OPTIONAL => {
+ new_context.def_level += 1;
+
+ // Can just compute null mask if no parent
+ context.def_level == 0 && context.rep_level == 0
+ }
+ _ => false,
+ };
+
+ let reader = self.build_for_primitive_type_inner(
+ cur_type.clone(),
+ &new_context,
+ null_mask_only,
+ )?;
+
+ if cur_type.get_basic_info().repetition() == Repetition::REPEATED {
+ Err(ArrowError(
+ "Reading repeated field is not supported yet!".to_string(),
+ ))
+ } else {
+ Ok(Some(reader))
+ }
+ } else {
+ Ok(None)
+ }
+ }
+
+ /// Build array reader for struct type.
+ fn visit_struct(
+ &mut self,
+ cur_type: Arc<Type>,
+ context: &'a ArrayReaderBuilderContext,
+ ) -> Result<Option<Box<dyn ArrayReader>>> {
+ let mut new_context = context.clone();
+ new_context.path.append(vec![cur_type.name().to_string()]);
+
+ if cur_type.get_basic_info().has_repetition() {
+ match cur_type.get_basic_info().repetition() {
+ Repetition::REPEATED => {
+ new_context.def_level += 1;
+ new_context.rep_level += 1;
+ }
+ Repetition::OPTIONAL => {
+ new_context.def_level += 1;
+ }
+ _ => (),
+ }
+ }
+
+ if let Some(reader) = self.build_for_struct_type_inner(&cur_type,
&new_context)? {
+ if cur_type.get_basic_info().has_repetition()
+ && cur_type.get_basic_info().repetition() ==
Repetition::REPEATED
+ {
+ Err(ArrowError(
+ "Reading repeated field is not supported yet!".to_string(),
+ ))
+ } else {
+ Ok(Some(reader))
+ }
+ } else {
+ Ok(None)
+ }
+ }
+
+ /// Build array reader for map type.
+ fn visit_map(
+ &mut self,
+ map_type: Arc<Type>,
+ context: &'a ArrayReaderBuilderContext,
+ ) -> Result<Option<Box<dyn ArrayReader>>> {
+ // Add map type to context
+ let mut new_context = context.clone();
+ new_context.path.append(vec![map_type.name().to_string()]);
+ if let Repetition::OPTIONAL = map_type.get_basic_info().repetition() {
+ new_context.def_level += 1;
+ }
+
+ // Add map entry (key_value) to context
+ let map_key_value = map_type.get_fields().first().ok_or_else(|| {
+ ArrowError("Map field must have a key_value entry".to_string())
+ })?;
+ new_context
+ .path
+ .append(vec![map_key_value.name().to_string()]);
+ new_context.rep_level += 1;
+
+ // Get key and value, and create context for each
+ let map_key = map_key_value
+ .get_fields()
+ .first()
+ .ok_or_else(|| ArrowError("Map entry must have a
key".to_string()))?;
+ let map_value = map_key_value
+ .get_fields()
+ .get(1)
+ .ok_or_else(|| ArrowError("Map entry must have a
value".to_string()))?;
+
+ let key_reader = {
+ let mut key_context = new_context.clone();
+ key_context.def_level += 1;
+ key_context.path.append(vec![map_key.name().to_string()]);
+ self.dispatch(map_key.clone(), &key_context)?.unwrap()
+ };
+ let value_reader = {
+ let mut value_context = new_context.clone();
+ if let Repetition::OPTIONAL =
map_value.get_basic_info().repetition() {
+ value_context.def_level += 1;
+ }
+ self.dispatch(map_value.clone(), &value_context)?.unwrap()
+ };
+
+ let arrow_type = self
+ .arrow_schema
+ .field_with_name(map_type.name())
+ .ok()
+ .map(|f| f.data_type().to_owned())
+ .unwrap_or_else(|| {
+ ArrowType::Map(
+ Box::new(Field::new(
+ map_key_value.name(),
+ ArrowType::Struct(vec![
+ Field::new(
+ map_key.name(),
+ key_reader.get_data_type().clone(),
+ false,
+ ),
+ Field::new(
+ map_value.name(),
+ value_reader.get_data_type().clone(),
+ map_value.is_optional(),
+ ),
+ ]),
+ map_type.is_optional(),
+ )),
+ false,
+ )
+ });
+
+ let key_array_reader: Box<dyn ArrayReader> =
Box::new(MapArrayReader::new(
+ key_reader,
+ value_reader,
+ arrow_type,
+ new_context.def_level,
+ new_context.rep_level,
+ ));
+
+ Ok(Some(key_array_reader))
+ }
+
+ /// Build array reader for list type.
+ fn visit_list_with_item(
+ &mut self,
+ list_type: Arc<Type>,
+ item_type: Arc<Type>,
+ context: &'a ArrayReaderBuilderContext,
+ ) -> Result<Option<Box<dyn ArrayReader>>> {
+ let mut list_child = &list_type
+ .get_fields()
+ .first()
+ .ok_or_else(|| ArrowError("List field must have a
child.".to_string()))?
+ .clone();
+ let mut new_context = context.clone();
+
+ new_context.path.append(vec![list_type.name().to_string()]);
+ // We need to know at what definition a list or its child is null
+ let list_null_def = new_context.def_level;
+ let mut list_empty_def = new_context.def_level;
+
+ // If the list's root is nullable
+ if let Repetition::OPTIONAL = list_type.get_basic_info().repetition() {
+ new_context.def_level += 1;
+ // current level is nullable, increment to get level for empty
list slot
+ list_empty_def += 1;
+ }
+
+ match list_child.get_basic_info().repetition() {
+ Repetition::REPEATED => {
+ new_context.def_level += 1;
+ new_context.rep_level += 1;
+ }
+ Repetition::OPTIONAL => {
+ new_context.def_level += 1;
+ }
+ _ => (),
+ }
+
+ let item_reader = self
+ .dispatch(item_type.clone(), &new_context)
+ .unwrap()
+ .unwrap();
+
+ let item_reader_type = item_reader.get_data_type().clone();
+
+ match item_reader_type {
+ ArrowType::List(_)
+ | ArrowType::FixedSizeList(_, _)
+ | ArrowType::Dictionary(_, _) => Err(ArrowError(format!(
+ "reading List({:?}) into arrow not supported yet",
+ item_type
+ ))),
+ _ => {
+ // a list is a group type with a single child. The list child's
+ // name comes from the child's field name.
+ // if the child's name is "list" and it has a child, then use
this child
+ if list_child.name() == "list" &&
!list_child.get_fields().is_empty() {
+ list_child = list_child.get_fields().first().unwrap();
+ }
+ let arrow_type = self
+ .arrow_schema
+ .field_with_name(list_type.name())
+ .ok()
+ .map(|f| f.data_type().to_owned())
+ .unwrap_or_else(|| {
+ ArrowType::List(Box::new(Field::new(
+ list_child.name(),
+ item_reader_type.clone(),
+ list_child.is_optional(),
+ )))
+ });
+
+ let list_array_reader: Box<dyn ArrayReader> = match arrow_type
{
+ ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
+ item_reader,
+ arrow_type,
+ item_reader_type,
+ new_context.def_level,
+ new_context.rep_level,
+ list_null_def,
+ list_empty_def,
+ )),
+ ArrowType::LargeList(_) =>
Box::new(ListArrayReader::<i64>::new(
+ item_reader,
+ arrow_type,
+ item_reader_type,
+ new_context.def_level,
+ new_context.rep_level,
+ list_null_def,
+ list_empty_def,
+ )),
+
+ _ => {
+ return Err(ArrowError(format!(
+ "creating ListArrayReader with type {:?} should be
unreachable",
+ arrow_type
+ )))
+ }
+ };
+
+ Ok(Some(list_array_reader))
+ }
+ }
+ }
+}
+
+impl<'a> ArrayReaderBuilder {
+ /// Construct array reader builder.
+ fn new(
+ root_schema: TypePtr,
+ arrow_schema: Arc<Schema>,
+ columns_included: Arc<HashMap<*const Type, usize>>,
+ file_reader: Box<dyn RowGroupCollection>,
+ ) -> Self {
+ Self {
+ root_schema,
+ arrow_schema,
+ columns_included,
+ row_groups: file_reader,
+ }
+ }
+
+ /// Main entry point.
+ fn build_array_reader(&mut self) -> Result<Box<dyn ArrayReader>> {
+ let context = ArrayReaderBuilderContext::default();
+
+ self.visit_struct(self.root_schema.clone(), &context)
+ .and_then(|reader_opt| {
+ reader_opt.ok_or_else(|| general_err!("Failed to build array
reader!"))
+ })
+ }
+
+ // Utility functions
+
+ /// Check whether one column in included in this array reader builder.
+ fn is_included(&self, t: &Type) -> bool {
+ self.columns_included.contains_key(&(t as *const Type))
+ }
+
+ /// Creates primitive array reader for each primitive type.
+ fn build_for_primitive_type_inner(
+ &self,
+ cur_type: TypePtr,
+ context: &'a ArrayReaderBuilderContext,
+ null_mask_only: bool,
+ ) -> Result<Box<dyn ArrayReader>> {
+ let column_desc = Arc::new(ColumnDescriptor::new(
+ cur_type.clone(),
+ context.def_level,
+ context.rep_level,
+ context.path.clone(),
+ ));
+
+ let page_iterator = self
+ .row_groups
+ .column_chunks(self.columns_included[&(cur_type.as_ref() as *const
Type)])?;
+
+ let arrow_type: Option<ArrowType> = self
+ .get_arrow_field(&cur_type, context)
+ .map(|f| f.data_type().clone());
+
+ match cur_type.get_physical_type() {
+ PhysicalType::BOOLEAN => Ok(Box::new(
+ PrimitiveArrayReader::<BoolType>::new_with_options(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ null_mask_only,
+ )?,
+ )),
+ PhysicalType::INT32 => {
+ if let Some(ArrowType::Null) = arrow_type {
+ Ok(Box::new(NullArrayReader::<Int32Type>::new(
+ page_iterator,
+ column_desc,
+ )?))
+ } else {
+ Ok(Box::new(
+ PrimitiveArrayReader::<Int32Type>::new_with_options(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ null_mask_only,
+ )?,
+ ))
+ }
+ }
+ PhysicalType::INT64 => Ok(Box::new(
+ PrimitiveArrayReader::<Int64Type>::new_with_options(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ null_mask_only,
+ )?,
+ )),
+ PhysicalType::INT96 => {
+ // get the optional timezone information from arrow type
+ let timezone = arrow_type.as_ref().and_then(|data_type| {
+ if let ArrowType::Timestamp(_, tz) = data_type {
+ tz.clone()
+ } else {
+ None
+ }
+ });
+ let converter = Int96Converter::new(Int96ArrayConverter {
timezone });
+ Ok(Box::new(ComplexObjectArrayReader::<
+ Int96Type,
+ Int96Converter,
+ >::new(
+ page_iterator,
+ column_desc,
+ converter,
+ arrow_type,
+ )?))
+ }
+ PhysicalType::FLOAT => Ok(Box::new(
+ PrimitiveArrayReader::<FloatType>::new_with_options(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ null_mask_only,
+ )?,
+ )),
+ PhysicalType::DOUBLE => Ok(Box::new(
+ PrimitiveArrayReader::<DoubleType>::new_with_options(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ null_mask_only,
+ )?,
+ )),
+ PhysicalType::BYTE_ARRAY => match arrow_type {
+ Some(ArrowType::Dictionary(_, _)) =>
make_byte_array_dictionary_reader(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ null_mask_only,
+ ),
+ _ => make_byte_array_reader(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ null_mask_only,
+ ),
+ },
+ PhysicalType::FIXED_LEN_BYTE_ARRAY
+ if cur_type.get_basic_info().converted_type()
+ == ConvertedType::DECIMAL =>
+ {
+ let converter =
DecimalConverter::new(DecimalArrayConverter::new(
+ cur_type.get_precision(),
+ cur_type.get_scale(),
+ ));
+ Ok(Box::new(ComplexObjectArrayReader::<
+ FixedLenByteArrayType,
+ DecimalConverter,
+ >::new(
+ page_iterator,
+ column_desc,
+ converter,
+ arrow_type,
+ )?))
+ }
+ PhysicalType::FIXED_LEN_BYTE_ARRAY => {
+ let byte_width = match *cur_type {
+ Type::PrimitiveType {
+ ref type_length, ..
+ } => *type_length,
+ _ => {
+ return Err(ArrowError(
+ "Expected a physical type, not a group
type".to_string(),
+ ))
+ }
+ };
+ if cur_type.get_basic_info().converted_type() ==
ConvertedType::INTERVAL {
+ if byte_width != 12 {
+ return Err(ArrowError(format!(
+ "Parquet interval type should have length of 12,
found {}",
+ byte_width
+ )));
+ }
+ match arrow_type {
+ Some(ArrowType::Interval(IntervalUnit::DayTime)) => {
+ let converter = IntervalDayTimeConverter::new(
+ IntervalDayTimeArrayConverter {},
+ );
+ Ok(Box::new(ComplexObjectArrayReader::<
+ FixedLenByteArrayType,
+ _,
+ >::new(
+ page_iterator,
+ column_desc,
+ converter,
+ arrow_type,
+ )?))
+ }
+ Some(ArrowType::Interval(IntervalUnit::YearMonth)) => {
+ let converter = IntervalYearMonthConverter::new(
+ IntervalYearMonthArrayConverter {},
+ );
+ Ok(Box::new(ComplexObjectArrayReader::<
+ FixedLenByteArrayType,
+ _,
+ >::new(
+ page_iterator,
+ column_desc,
+ converter,
+ arrow_type,
+ )?))
+ }
+ Some(t) => Err(ArrowError(format!(
+ "Cannot write a Parquet interval to {:?}",
+ t
+ ))),
+ None => {
+ // we do not support an interval not matched to an
Arrow type,
+ // because we risk data loss as we won't know
which of the 12 bytes
+ // are or should be populated
+ Err(ArrowError(
+ "Cannot write a Parquet interval with no Arrow
type specified.
+ There is a risk of data loss as Arrow either
supports YearMonth or
+ DayTime precision. Without the Arrow type, we
cannot infer the type.
+ ".to_string()
+ ))
+ }
+ }
+ } else {
+ let converter = FixedLenBinaryConverter::new(
+ FixedSizeArrayConverter::new(byte_width),
+ );
+ Ok(Box::new(ComplexObjectArrayReader::<
+ FixedLenByteArrayType,
+ FixedLenBinaryConverter,
+ >::new(
+ page_iterator,
+ column_desc,
+ converter,
+ arrow_type,
+ )?))
+ }
+ }
+ }
+ }
+
+ /// Constructs struct array reader without considering repetition.
+ fn build_for_struct_type_inner(
+ &mut self,
+ cur_type: &Type,
+ context: &'a ArrayReaderBuilderContext,
+ ) -> Result<Option<Box<dyn ArrayReader>>> {
+ let mut fields = Vec::with_capacity(cur_type.get_fields().len());
+ let mut children_reader =
Vec::with_capacity(cur_type.get_fields().len());
+
+ for child in cur_type.get_fields() {
+ let mut struct_context = context.clone();
+ if let Some(child_reader) = self.dispatch(child.clone(), context)?
{
+ // TODO: this results in calling get_arrow_field twice, it
could be reused
+ // from child_reader above, by making child_reader carry its
`Field`
+ struct_context.path.append(vec![child.name().to_string()]);
+ let field = match self.get_arrow_field(child, &struct_context)
{
+ Some(f) => f.clone(),
+ _ => Field::new(
+ child.name(),
+ child_reader.get_data_type().clone(),
+ child.is_optional(),
+ ),
+ };
+ fields.push(field);
+ children_reader.push(child_reader);
+ }
+ }
+
+ if !fields.is_empty() {
+ let arrow_type = ArrowType::Struct(fields);
+ Ok(Some(Box::new(StructArrayReader::new(
+ arrow_type,
+ children_reader,
+ context.def_level,
+ context.rep_level,
+ ))))
+ } else {
+ Ok(None)
+ }
+ }
+
+ fn get_arrow_field(
+ &self,
+ cur_type: &Type,
+ context: &'a ArrayReaderBuilderContext,
+ ) -> Option<&Field> {
+ let parts: Vec<&str> = context
+ .path
+ .parts()
+ .iter()
+ .map(|x| -> &str { x })
+ .collect::<Vec<&str>>();
+
+ // If the parts length is one it'll have the top level "schema" type.
If
+ // it's two then it'll be a top-level type that we can get from the
arrow
+ // schema directly.
+ if parts.len() <= 2 {
+ self.arrow_schema.field_with_name(cur_type.name()).ok()
+ } else {
+ // If it's greater than two then we need to traverse the type path
+ // until we find the actual field we're looking for.
+ let mut field: Option<&Field> = None;
+
+ for (i, part) in parts.iter().enumerate().skip(1) {
+ if i == 1 {
+ field = self.arrow_schema.field_with_name(part).ok();
+ } else if let Some(f) = field {
+ match f.data_type() {
+ ArrowType::Struct(fields) => {
+ field = fields.iter().find(|f| f.name() == part)
+ }
+ ArrowType::List(list_field) => field =
Some(list_field.as_ref()),
+ _ => field = None,
+ }
+ } else {
+ field = None;
+ }
+ }
+ field
+ }
+ }
+}
+
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+ use crate::arrow::parquet_to_arrow_schema;
+ use crate::file::reader::{FileReader, SerializedFileReader};
+ use crate::util::test_common::get_test_file;
+ use super::*;
+
+ #[test]
+ fn test_create_array_reader() {
+ let file = get_test_file("nulls.snappy.parquet");
+ let file_reader: Arc<dyn FileReader> =
+ Arc::new(SerializedFileReader::new(file).unwrap());
+
+ let file_metadata = file_reader.metadata().file_metadata();
+ let arrow_schema = parquet_to_arrow_schema(
+ file_metadata.schema_descr(),
+ file_metadata.key_value_metadata(),
+ )
+ .unwrap();
+
+ let array_reader = build_array_reader(
+ file_reader.metadata().file_metadata().schema_descr_ptr(),
+ Arc::new(arrow_schema),
+ vec![0usize].into_iter(),
+ Box::new(file_reader),
+ )
+ .unwrap();
+
+ // Create arrow types
+ let arrow_type = ArrowType::Struct(vec![Field::new(
+ "b_struct",
+ ArrowType::Struct(vec![Field::new("b_c_int", ArrowType::Int32,
true)]),
+ true,
+ )]);
+
+ assert_eq!(array_reader.get_data_type(), &arrow_type);
+ }
+}
\ No newline at end of file