This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 45bda043c6 Refactor `build_array_reader` into a struct (#7521)
45bda043c6 is described below
commit 45bda043c66853aadb66d61dd44f4875b9cd0d14
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue May 20 13:09:08 2025 -0400
Refactor `build_array_reader` into a struct (#7521)
* Factor ArrayDataBuilder into a new struct
* fit
---
parquet/src/arrow/array_reader/builder.rs | 560 ++++++++++++++-------------
parquet/src/arrow/array_reader/list_array.rs | 6 +-
parquet/src/arrow/array_reader/mod.rs | 5 +-
parquet/src/arrow/arrow_reader/mod.rs | 14 +-
parquet/src/arrow/async_reader/mod.rs | 10 +-
5 files changed, 310 insertions(+), 285 deletions(-)
diff --git a/parquet/src/arrow/array_reader/builder.rs
b/parquet/src/arrow/array_reader/builder.rs
index 5ada61e93d..14a4758598 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -34,306 +34,322 @@ use crate::data_type::{BoolType, DoubleType, FloatType,
Int32Type, Int64Type, In
use crate::errors::{ParquetError, Result};
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
-/// Create array reader from parquet schema, projection mask, and parquet file
reader.
-pub fn build_array_reader(
- field: Option<&ParquetField>,
- mask: &ProjectionMask,
- row_groups: &dyn RowGroups,
-) -> Result<Box<dyn ArrayReader>> {
- let reader = field
- .and_then(|field| build_reader(field, mask, row_groups).transpose())
- .transpose()?
- .unwrap_or_else(|| make_empty_array_reader(row_groups.num_rows()));
-
- Ok(reader)
+/// Builds [`ArrayReader`]s from parquet schema, projection mask, and
RowGroups reader
+pub(crate) struct ArrayReaderBuilder<'a> {
+ row_groups: &'a dyn RowGroups,
}
-fn build_reader(
- field: &ParquetField,
- mask: &ProjectionMask,
- row_groups: &dyn RowGroups,
-) -> Result<Option<Box<dyn ArrayReader>>> {
- match field.field_type {
- ParquetFieldType::Primitive { .. } => build_primitive_reader(field,
mask, row_groups),
- ParquetFieldType::Group { .. } => match &field.arrow_type {
- DataType::Map(_, _) => build_map_reader(field, mask, row_groups),
- DataType::Struct(_) => build_struct_reader(field, mask,
row_groups),
- DataType::List(_) => build_list_reader(field, mask, false,
row_groups),
- DataType::LargeList(_) => build_list_reader(field, mask, true,
row_groups),
- DataType::FixedSizeList(_, _) =>
build_fixed_size_list_reader(field, mask, row_groups),
- d => unimplemented!("reading group type {} not implemented", d),
- },
+impl<'a> ArrayReaderBuilder<'a> {
+ pub(crate) fn new(row_groups: &'a dyn RowGroups) -> Self {
+ Self { row_groups }
}
-}
-/// Build array reader for map type.
-fn build_map_reader(
- field: &ParquetField,
- mask: &ProjectionMask,
- row_groups: &dyn RowGroups,
-) -> Result<Option<Box<dyn ArrayReader>>> {
- let children = field.children().unwrap();
- assert_eq!(children.len(), 2);
-
- let key_reader = build_reader(&children[0], mask, row_groups)?;
- let value_reader = build_reader(&children[1], mask, row_groups)?;
-
- match (key_reader, value_reader) {
- (Some(key_reader), Some(value_reader)) => {
- // Need to retrieve underlying data type to handle projection
- let key_type = key_reader.get_data_type().clone();
- let value_type = value_reader.get_data_type().clone();
-
- let data_type = match &field.arrow_type {
- DataType::Map(map_field, is_sorted) => match
map_field.data_type() {
- DataType::Struct(fields) => {
- assert_eq!(fields.len(), 2);
- let struct_field =
map_field.as_ref().clone().with_data_type(
- DataType::Struct(Fields::from(vec![
-
fields[0].as_ref().clone().with_data_type(key_type),
-
fields[1].as_ref().clone().with_data_type(value_type),
- ])),
- );
- DataType::Map(Arc::new(struct_field), *is_sorted)
- }
- _ => unreachable!(),
- },
- _ => unreachable!(),
- };
-
- Ok(Some(Box::new(MapArrayReader::new(
- key_reader,
- value_reader,
- data_type,
- field.def_level,
- field.rep_level,
- field.nullable,
- ))))
+ /// Create [`ArrayReader`] from parquet schema, projection mask, and
parquet file reader.
+ pub fn build_array_reader(
+ &self,
+ field: Option<&ParquetField>,
+ mask: &ProjectionMask,
+ ) -> Result<Box<dyn ArrayReader>> {
+ let reader = field
+ .and_then(|field| self.build_reader(field, mask).transpose())
+ .transpose()?
+ .unwrap_or_else(|| make_empty_array_reader(self.num_rows()));
+
+ Ok(reader)
+ }
+
+ /// Return the total number of rows
+ fn num_rows(&self) -> usize {
+ self.row_groups.num_rows()
+ }
+
+ fn build_reader(
+ &self,
+ field: &ParquetField,
+ mask: &ProjectionMask,
+ ) -> Result<Option<Box<dyn ArrayReader>>> {
+ match field.field_type {
+ ParquetFieldType::Primitive { .. } =>
self.build_primitive_reader(field, mask),
+ ParquetFieldType::Group { .. } => match &field.arrow_type {
+ DataType::Map(_, _) => self.build_map_reader(field, mask),
+ DataType::Struct(_) => self.build_struct_reader(field, mask),
+ DataType::List(_) => self.build_list_reader(field, mask,
false),
+ DataType::LargeList(_) => self.build_list_reader(field, mask,
true),
+ DataType::FixedSizeList(_, _) =>
self.build_fixed_size_list_reader(field, mask),
+ d => unimplemented!("reading group type {} not implemented",
d),
+ },
}
- (None, None) => Ok(None),
- _ => Err(general_err!(
- "partial projection of MapArray is not supported"
- )),
}
-}
-/// Build array reader for list type.
-fn build_list_reader(
- field: &ParquetField,
- mask: &ProjectionMask,
- is_large: bool,
- row_groups: &dyn RowGroups,
-) -> Result<Option<Box<dyn ArrayReader>>> {
- let children = field.children().unwrap();
- assert_eq!(children.len(), 1);
-
- let reader = match build_reader(&children[0], mask, row_groups)? {
- Some(item_reader) => {
- // Need to retrieve underlying data type to handle projection
- let item_type = item_reader.get_data_type().clone();
- let data_type = match &field.arrow_type {
- DataType::List(f) => {
-
DataType::List(Arc::new(f.as_ref().clone().with_data_type(item_type)))
- }
- DataType::LargeList(f) => {
-
DataType::LargeList(Arc::new(f.as_ref().clone().with_data_type(item_type)))
- }
- _ => unreachable!(),
- };
+ /// Build array reader for map type.
+ fn build_map_reader(
+ &self,
+ field: &ParquetField,
+ mask: &ProjectionMask,
+ ) -> Result<Option<Box<dyn ArrayReader>>> {
+ let children = field.children().unwrap();
+ assert_eq!(children.len(), 2);
+
+ let key_reader = self.build_reader(&children[0], mask)?;
+ let value_reader = self.build_reader(&children[1], mask)?;
+
+ match (key_reader, value_reader) {
+ (Some(key_reader), Some(value_reader)) => {
+ // Need to retrieve underlying data type to handle projection
+ let key_type = key_reader.get_data_type().clone();
+ let value_type = value_reader.get_data_type().clone();
+
+ let data_type = match &field.arrow_type {
+ DataType::Map(map_field, is_sorted) => match
map_field.data_type() {
+ DataType::Struct(fields) => {
+ assert_eq!(fields.len(), 2);
+ let struct_field =
map_field.as_ref().clone().with_data_type(
+ DataType::Struct(Fields::from(vec![
+
fields[0].as_ref().clone().with_data_type(key_type),
+
fields[1].as_ref().clone().with_data_type(value_type),
+ ])),
+ );
+ DataType::Map(Arc::new(struct_field), *is_sorted)
+ }
+ _ => unreachable!(),
+ },
+ _ => unreachable!(),
+ };
- let reader = match is_large {
- false => Box::new(ListArrayReader::<i32>::new(
- item_reader,
- data_type,
- field.def_level,
- field.rep_level,
- field.nullable,
- )) as _,
- true => Box::new(ListArrayReader::<i64>::new(
- item_reader,
+ Ok(Some(Box::new(MapArrayReader::new(
+ key_reader,
+ value_reader,
data_type,
field.def_level,
field.rep_level,
field.nullable,
- )) as _,
- };
- Some(reader)
+ ))))
+ }
+ (None, None) => Ok(None),
+ _ => Err(general_err!(
+ "partial projection of MapArray is not supported"
+ )),
}
- None => None,
- };
- Ok(reader)
-}
+ }
-/// Build array reader for fixed-size list type.
-fn build_fixed_size_list_reader(
- field: &ParquetField,
- mask: &ProjectionMask,
- row_groups: &dyn RowGroups,
-) -> Result<Option<Box<dyn ArrayReader>>> {
- let children = field.children().unwrap();
- assert_eq!(children.len(), 1);
-
- let reader = match build_reader(&children[0], mask, row_groups)? {
- Some(item_reader) => {
- let item_type = item_reader.get_data_type().clone();
- let reader = match &field.arrow_type {
- &DataType::FixedSizeList(ref f, size) => {
- let data_type = DataType::FixedSizeList(
- Arc::new(f.as_ref().clone().with_data_type(item_type)),
- size,
- );
-
- Box::new(FixedSizeListArrayReader::new(
+ /// Build array reader for list type.
+ fn build_list_reader(
+ &self,
+ field: &ParquetField,
+ mask: &ProjectionMask,
+ is_large: bool,
+ ) -> Result<Option<Box<dyn ArrayReader>>> {
+ let children = field.children().unwrap();
+ assert_eq!(children.len(), 1);
+
+ let reader = match self.build_reader(&children[0], mask)? {
+ Some(item_reader) => {
+ // Need to retrieve underlying data type to handle projection
+ let item_type = item_reader.get_data_type().clone();
+ let data_type = match &field.arrow_type {
+ DataType::List(f) => {
+
DataType::List(Arc::new(f.as_ref().clone().with_data_type(item_type)))
+ }
+ DataType::LargeList(f) => {
+
DataType::LargeList(Arc::new(f.as_ref().clone().with_data_type(item_type)))
+ }
+ _ => unreachable!(),
+ };
+
+ let reader = match is_large {
+ false => Box::new(ListArrayReader::<i32>::new(
item_reader,
- size as usize,
data_type,
field.def_level,
field.rep_level,
field.nullable,
- )) as _
- }
- _ => unimplemented!(),
- };
- Some(reader)
+ )) as _,
+ true => Box::new(ListArrayReader::<i64>::new(
+ item_reader,
+ data_type,
+ field.def_level,
+ field.rep_level,
+ field.nullable,
+ )) as _,
+ };
+ Some(reader)
+ }
+ None => None,
+ };
+ Ok(reader)
+ }
+
+ /// Build array reader for fixed-size list type.
+ fn build_fixed_size_list_reader(
+ &self,
+ field: &ParquetField,
+ mask: &ProjectionMask,
+ ) -> Result<Option<Box<dyn ArrayReader>>> {
+ let children = field.children().unwrap();
+ assert_eq!(children.len(), 1);
+
+ let reader = match self.build_reader(&children[0], mask)? {
+ Some(item_reader) => {
+ let item_type = item_reader.get_data_type().clone();
+ let reader = match &field.arrow_type {
+ &DataType::FixedSizeList(ref f, size) => {
+ let data_type = DataType::FixedSizeList(
+
Arc::new(f.as_ref().clone().with_data_type(item_type)),
+ size,
+ );
+
+ Box::new(FixedSizeListArrayReader::new(
+ item_reader,
+ size as usize,
+ data_type,
+ field.def_level,
+ field.rep_level,
+ field.nullable,
+ )) as _
+ }
+ _ => unimplemented!(),
+ };
+ Some(reader)
+ }
+ None => None,
+ };
+ Ok(reader)
+ }
+
+ /// Creates primitive array reader for each primitive type.
+ fn build_primitive_reader(
+ &self,
+ field: &ParquetField,
+ mask: &ProjectionMask,
+ ) -> Result<Option<Box<dyn ArrayReader>>> {
+ let (col_idx, primitive_type) = match &field.field_type {
+ ParquetFieldType::Primitive {
+ col_idx,
+ primitive_type,
+ } => match primitive_type.as_ref() {
+ Type::PrimitiveType { .. } => (*col_idx,
primitive_type.clone()),
+ Type::GroupType { .. } => unreachable!(),
+ },
+ _ => unreachable!(),
+ };
+
+ if !mask.leaf_included(col_idx) {
+ return Ok(None);
}
- None => None,
- };
- Ok(reader)
-}
-/// Creates primitive array reader for each primitive type.
-fn build_primitive_reader(
- field: &ParquetField,
- mask: &ProjectionMask,
- row_groups: &dyn RowGroups,
-) -> Result<Option<Box<dyn ArrayReader>>> {
- let (col_idx, primitive_type) = match &field.field_type {
- ParquetFieldType::Primitive {
- col_idx,
+ let physical_type = primitive_type.get_physical_type();
+
+ // We don't track the column path in ParquetField as it adds a
potential source
+ // of bugs when the arrow mapping converts more than one level in the
parquet
+ // schema into a single arrow field.
+ //
+ // None of the readers actually use this field, but it is required for
this type,
+ // so just stick a placeholder in
+ let column_desc = Arc::new(ColumnDescriptor::new(
primitive_type,
- } => match primitive_type.as_ref() {
- Type::PrimitiveType { .. } => (*col_idx, primitive_type.clone()),
- Type::GroupType { .. } => unreachable!(),
- },
- _ => unreachable!(),
- };
-
- if !mask.leaf_included(col_idx) {
- return Ok(None);
+ field.def_level,
+ field.rep_level,
+ ColumnPath::new(vec![]),
+ ));
+
+ let page_iterator = self.row_groups.column_chunks(col_idx)?;
+ let arrow_type = Some(field.arrow_type.clone());
+
+ let reader = match physical_type {
+ PhysicalType::BOOLEAN =>
Box::new(PrimitiveArrayReader::<BoolType>::new(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ )?) as _,
+ PhysicalType::INT32 => {
+ if let Some(DataType::Null) = arrow_type {
+ Box::new(NullArrayReader::<Int32Type>::new(
+ page_iterator,
+ column_desc,
+ )?) as _
+ } else {
+ Box::new(PrimitiveArrayReader::<Int32Type>::new(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ )?) as _
+ }
+ }
+ PhysicalType::INT64 =>
Box::new(PrimitiveArrayReader::<Int64Type>::new(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ )?) as _,
+ PhysicalType::INT96 =>
Box::new(PrimitiveArrayReader::<Int96Type>::new(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ )?) as _,
+ PhysicalType::FLOAT =>
Box::new(PrimitiveArrayReader::<FloatType>::new(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ )?) as _,
+ PhysicalType::DOUBLE =>
Box::new(PrimitiveArrayReader::<DoubleType>::new(
+ page_iterator,
+ column_desc,
+ arrow_type,
+ )?) as _,
+ PhysicalType::BYTE_ARRAY => match arrow_type {
+ Some(DataType::Dictionary(_, _)) => {
+ make_byte_array_dictionary_reader(page_iterator,
column_desc, arrow_type)?
+ }
+ Some(DataType::Utf8View | DataType::BinaryView) => {
+ make_byte_view_array_reader(page_iterator, column_desc,
arrow_type)?
+ }
+ _ => make_byte_array_reader(page_iterator, column_desc,
arrow_type)?,
+ },
+ PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type {
+ Some(DataType::Dictionary(_, _)) => {
+ make_byte_array_dictionary_reader(page_iterator,
column_desc, arrow_type)?
+ }
+ _ => make_fixed_len_byte_array_reader(page_iterator,
column_desc, arrow_type)?,
+ },
+ };
+ Ok(Some(reader))
}
- let physical_type = primitive_type.get_physical_type();
-
- // We don't track the column path in ParquetField as it adds a potential
source
- // of bugs when the arrow mapping converts more than one level in the
parquet
- // schema into a single arrow field.
- //
- // None of the readers actually use this field, but it is required for
this type,
- // so just stick a placeholder in
- let column_desc = Arc::new(ColumnDescriptor::new(
- primitive_type,
- field.def_level,
- field.rep_level,
- ColumnPath::new(vec![]),
- ));
-
- let page_iterator = row_groups.column_chunks(col_idx)?;
- let arrow_type = Some(field.arrow_type.clone());
-
- let reader = match physical_type {
- PhysicalType::BOOLEAN =>
Box::new(PrimitiveArrayReader::<BoolType>::new(
- page_iterator,
- column_desc,
- arrow_type,
- )?) as _,
- PhysicalType::INT32 => {
- if let Some(DataType::Null) = arrow_type {
- Box::new(NullArrayReader::<Int32Type>::new(
- page_iterator,
- column_desc,
- )?) as _
- } else {
- Box::new(PrimitiveArrayReader::<Int32Type>::new(
- page_iterator,
- column_desc,
- arrow_type,
- )?) as _
+ fn build_struct_reader(
+ &self,
+ field: &ParquetField,
+ mask: &ProjectionMask,
+ ) -> Result<Option<Box<dyn ArrayReader>>> {
+ let arrow_fields = match &field.arrow_type {
+ DataType::Struct(children) => children,
+ _ => unreachable!(),
+ };
+ let children = field.children().unwrap();
+ assert_eq!(arrow_fields.len(), children.len());
+
+ let mut readers = Vec::with_capacity(children.len());
+ let mut builder = SchemaBuilder::with_capacity(children.len());
+
+ for (arrow, parquet) in arrow_fields.iter().zip(children) {
+ if let Some(reader) = self.build_reader(parquet, mask)? {
+ // Need to retrieve underlying data type to handle projection
+ let child_type = reader.get_data_type().clone();
+
builder.push(arrow.as_ref().clone().with_data_type(child_type));
+ readers.push(reader);
}
}
- PhysicalType::INT64 => Box::new(PrimitiveArrayReader::<Int64Type>::new(
- page_iterator,
- column_desc,
- arrow_type,
- )?) as _,
- PhysicalType::INT96 => Box::new(PrimitiveArrayReader::<Int96Type>::new(
- page_iterator,
- column_desc,
- arrow_type,
- )?) as _,
- PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::<FloatType>::new(
- page_iterator,
- column_desc,
- arrow_type,
- )?) as _,
- PhysicalType::DOUBLE =>
Box::new(PrimitiveArrayReader::<DoubleType>::new(
- page_iterator,
- column_desc,
- arrow_type,
- )?) as _,
- PhysicalType::BYTE_ARRAY => match arrow_type {
- Some(DataType::Dictionary(_, _)) => {
- make_byte_array_dictionary_reader(page_iterator, column_desc,
arrow_type)?
- }
- Some(DataType::Utf8View | DataType::BinaryView) => {
- make_byte_view_array_reader(page_iterator, column_desc,
arrow_type)?
- }
- _ => make_byte_array_reader(page_iterator, column_desc,
arrow_type)?,
- },
- PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type {
- Some(DataType::Dictionary(_, _)) => {
- make_byte_array_dictionary_reader(page_iterator, column_desc,
arrow_type)?
- }
- _ => make_fixed_len_byte_array_reader(page_iterator, column_desc,
arrow_type)?,
- },
- };
- Ok(Some(reader))
-}
-fn build_struct_reader(
- field: &ParquetField,
- mask: &ProjectionMask,
- row_groups: &dyn RowGroups,
-) -> Result<Option<Box<dyn ArrayReader>>> {
- let arrow_fields = match &field.arrow_type {
- DataType::Struct(children) => children,
- _ => unreachable!(),
- };
- let children = field.children().unwrap();
- assert_eq!(arrow_fields.len(), children.len());
-
- let mut readers = Vec::with_capacity(children.len());
- let mut builder = SchemaBuilder::with_capacity(children.len());
-
- for (arrow, parquet) in arrow_fields.iter().zip(children) {
- if let Some(reader) = build_reader(parquet, mask, row_groups)? {
- // Need to retrieve underlying data type to handle projection
- let child_type = reader.get_data_type().clone();
- builder.push(arrow.as_ref().clone().with_data_type(child_type));
- readers.push(reader);
+ if readers.is_empty() {
+ return Ok(None);
}
- }
- if readers.is_empty() {
- return Ok(None);
+ Ok(Some(Box::new(StructArrayReader::new(
+ DataType::Struct(builder.finish().fields),
+ readers,
+ field.def_level,
+ field.rep_level,
+ field.nullable,
+ ))))
}
-
- Ok(Some(Box::new(StructArrayReader::new(
- DataType::Struct(builder.finish().fields),
- readers,
- field.def_level,
- field.rep_level,
- field.nullable,
- ))))
}
#[cfg(test)]
@@ -359,7 +375,9 @@ mod tests {
)
.unwrap();
- let array_reader = build_array_reader(fields.as_ref(), &mask,
&file_reader).unwrap();
+ let array_reader = ArrayReaderBuilder::new(&file_reader)
+ .build_array_reader(fields.as_ref(), &mask)
+ .unwrap();
// Create arrow types
let arrow_type = DataType::Struct(Fields::from(vec![Field::new(
diff --git a/parquet/src/arrow/array_reader/list_array.rs
b/parquet/src/arrow/array_reader/list_array.rs
index 06448d5f6f..66c4f30b3c 100644
--- a/parquet/src/arrow/array_reader/list_array.rs
+++ b/parquet/src/arrow/array_reader/list_array.rs
@@ -246,9 +246,9 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for
ListArrayReader<OffsetSize> {
#[cfg(test)]
mod tests {
use super::*;
- use crate::arrow::array_reader::build_array_reader;
use crate::arrow::array_reader::list_array::ListArrayReader;
use crate::arrow::array_reader::test_util::InMemoryArrayReader;
+ use crate::arrow::array_reader::ArrayReaderBuilder;
use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
use crate::arrow::{parquet_to_arrow_schema, ArrowWriter, ProjectionMask};
use crate::file::properties::WriterProperties;
@@ -563,7 +563,9 @@ mod tests {
)
.unwrap();
- let mut array_reader = build_array_reader(fields.as_ref(), &mask,
&file_reader).unwrap();
+ let mut array_reader = ArrayReaderBuilder::new(&file_reader)
+ .build_array_reader(fields.as_ref(), &mask)
+ .unwrap();
let batch = array_reader.next_batch(100).unwrap();
assert_eq!(batch.data_type(), array_reader.get_data_type());
diff --git a/parquet/src/arrow/array_reader/mod.rs
b/parquet/src/arrow/array_reader/mod.rs
index a5ea426e95..94d61c9eac 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -45,7 +45,7 @@ mod struct_array;
#[cfg(test)]
mod test_util;
-pub use builder::build_array_reader;
+pub(crate) use builder::ArrayReaderBuilder;
pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;
#[allow(unused_imports)] // Only used for benchmarks
@@ -111,7 +111,8 @@ pub trait RowGroups {
/// Get the number of rows in this collection
fn num_rows(&self) -> usize;
- /// Returns a [`PageIterator`] for the column chunks with the given leaf
column index
+ /// Returns a [`PageIterator`] for all pages in the specified column chunk
+ /// across all row groups in this collection.
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
}
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index ea068acb29..a8688e8af8 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -26,7 +26,7 @@ pub use selection::{RowSelection, RowSelector};
use std::sync::Arc;
pub use crate::arrow::array_reader::RowGroups;
-use crate::arrow::array_reader::{build_array_reader, ArrayReader};
+use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
use crate::column::page::{PageIterator, PageReader};
@@ -690,14 +690,16 @@ impl<T: ChunkReader + 'static>
ParquetRecordBatchReaderBuilder<T> {
break;
}
- let array_reader =
- build_array_reader(self.fields.as_deref(),
predicate.projection(), &reader)?;
+ let array_reader = ArrayReaderBuilder::new(&reader)
+ .build_array_reader(self.fields.as_deref(),
predicate.projection())?;
plan_builder = plan_builder.with_predicate(array_reader,
predicate.as_mut())?;
}
}
- let array_reader = build_array_reader(self.fields.as_deref(),
&self.projection, &reader)?;
+ let array_reader = ArrayReaderBuilder::new(&reader)
+ .build_array_reader(self.fields.as_deref(), &self.projection)?;
+
let read_plan = plan_builder
.limited(reader.num_rows())
.with_offset(self.offset)
@@ -896,8 +898,8 @@ impl ParquetRecordBatchReader {
batch_size: usize,
selection: Option<RowSelection>,
) -> Result<Self> {
- let array_reader =
- build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(),
row_groups)?;
+ let array_reader = ArrayReaderBuilder::new(row_groups)
+ .build_array_reader(levels.levels.as_ref(),
&ProjectionMask::all())?;
let read_plan = ReadPlanBuilder::new(batch_size)
.with_selection(selection)
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index ac4d24ee27..611d6999e0 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -38,7 +38,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek,
AsyncSeekExt};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Fields, Schema, SchemaRef};
-use crate::arrow::array_reader::{build_array_reader, RowGroups};
+use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroups};
use crate::arrow::arrow_reader::{
ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions,
ParquetRecordBatchReader,
RowFilter, RowSelection,
@@ -613,8 +613,8 @@ where
.fetch(&mut self.input, predicate.projection(), selection)
.await?;
- let array_reader =
- build_array_reader(self.fields.as_deref(),
predicate.projection(), &row_group)?;
+ let array_reader = ArrayReaderBuilder::new(&row_group)
+ .build_array_reader(self.fields.as_deref(),
predicate.projection())?;
plan_builder = plan_builder.with_predicate(array_reader,
predicate.as_mut())?;
}
@@ -661,7 +661,9 @@ where
let plan = plan_builder.build();
- let array_reader = build_array_reader(self.fields.as_deref(),
&projection, &row_group)?;
+ let array_reader = ArrayReaderBuilder::new(&row_group)
+ .build_array_reader(self.fields.as_deref(), &projection)?;
+
let reader = ParquetRecordBatchReader::new(array_reader, plan);
Ok((self, Some(reader)))