This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 45bda043c6 Refactor `build_array_reader` into a struct (#7521)
45bda043c6 is described below

commit 45bda043c66853aadb66d61dd44f4875b9cd0d14
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue May 20 13:09:08 2025 -0400

    Refactor `build_array_reader` into a struct (#7521)
    
    * Factor ArrayDataBuilder into a new struct
    
    * fit
---
 parquet/src/arrow/array_reader/builder.rs    | 560 ++++++++++++++-------------
 parquet/src/arrow/array_reader/list_array.rs |   6 +-
 parquet/src/arrow/array_reader/mod.rs        |   5 +-
 parquet/src/arrow/arrow_reader/mod.rs        |  14 +-
 parquet/src/arrow/async_reader/mod.rs        |  10 +-
 5 files changed, 310 insertions(+), 285 deletions(-)

diff --git a/parquet/src/arrow/array_reader/builder.rs 
b/parquet/src/arrow/array_reader/builder.rs
index 5ada61e93d..14a4758598 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -34,306 +34,322 @@ use crate::data_type::{BoolType, DoubleType, FloatType, 
Int32Type, Int64Type, In
 use crate::errors::{ParquetError, Result};
 use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
 
-/// Create array reader from parquet schema, projection mask, and parquet file 
reader.
-pub fn build_array_reader(
-    field: Option<&ParquetField>,
-    mask: &ProjectionMask,
-    row_groups: &dyn RowGroups,
-) -> Result<Box<dyn ArrayReader>> {
-    let reader = field
-        .and_then(|field| build_reader(field, mask, row_groups).transpose())
-        .transpose()?
-        .unwrap_or_else(|| make_empty_array_reader(row_groups.num_rows()));
-
-    Ok(reader)
+/// Builds [`ArrayReader`]s from parquet schema, projection mask, and 
RowGroups reader
+pub(crate) struct ArrayReaderBuilder<'a> {
+    row_groups: &'a dyn RowGroups,
 }
 
-fn build_reader(
-    field: &ParquetField,
-    mask: &ProjectionMask,
-    row_groups: &dyn RowGroups,
-) -> Result<Option<Box<dyn ArrayReader>>> {
-    match field.field_type {
-        ParquetFieldType::Primitive { .. } => build_primitive_reader(field, 
mask, row_groups),
-        ParquetFieldType::Group { .. } => match &field.arrow_type {
-            DataType::Map(_, _) => build_map_reader(field, mask, row_groups),
-            DataType::Struct(_) => build_struct_reader(field, mask, 
row_groups),
-            DataType::List(_) => build_list_reader(field, mask, false, 
row_groups),
-            DataType::LargeList(_) => build_list_reader(field, mask, true, 
row_groups),
-            DataType::FixedSizeList(_, _) => 
build_fixed_size_list_reader(field, mask, row_groups),
-            d => unimplemented!("reading group type {} not implemented", d),
-        },
+impl<'a> ArrayReaderBuilder<'a> {
+    pub(crate) fn new(row_groups: &'a dyn RowGroups) -> Self {
+        Self { row_groups }
     }
-}
 
-/// Build array reader for map type.
-fn build_map_reader(
-    field: &ParquetField,
-    mask: &ProjectionMask,
-    row_groups: &dyn RowGroups,
-) -> Result<Option<Box<dyn ArrayReader>>> {
-    let children = field.children().unwrap();
-    assert_eq!(children.len(), 2);
-
-    let key_reader = build_reader(&children[0], mask, row_groups)?;
-    let value_reader = build_reader(&children[1], mask, row_groups)?;
-
-    match (key_reader, value_reader) {
-        (Some(key_reader), Some(value_reader)) => {
-            // Need to retrieve underlying data type to handle projection
-            let key_type = key_reader.get_data_type().clone();
-            let value_type = value_reader.get_data_type().clone();
-
-            let data_type = match &field.arrow_type {
-                DataType::Map(map_field, is_sorted) => match 
map_field.data_type() {
-                    DataType::Struct(fields) => {
-                        assert_eq!(fields.len(), 2);
-                        let struct_field = 
map_field.as_ref().clone().with_data_type(
-                            DataType::Struct(Fields::from(vec![
-                                
fields[0].as_ref().clone().with_data_type(key_type),
-                                
fields[1].as_ref().clone().with_data_type(value_type),
-                            ])),
-                        );
-                        DataType::Map(Arc::new(struct_field), *is_sorted)
-                    }
-                    _ => unreachable!(),
-                },
-                _ => unreachable!(),
-            };
-
-            Ok(Some(Box::new(MapArrayReader::new(
-                key_reader,
-                value_reader,
-                data_type,
-                field.def_level,
-                field.rep_level,
-                field.nullable,
-            ))))
+    /// Create [`ArrayReader`] from parquet schema, projection mask, and 
parquet file reader.
+    pub fn build_array_reader(
+        &self,
+        field: Option<&ParquetField>,
+        mask: &ProjectionMask,
+    ) -> Result<Box<dyn ArrayReader>> {
+        let reader = field
+            .and_then(|field| self.build_reader(field, mask).transpose())
+            .transpose()?
+            .unwrap_or_else(|| make_empty_array_reader(self.num_rows()));
+
+        Ok(reader)
+    }
+
+    /// Return the total number of rows
+    fn num_rows(&self) -> usize {
+        self.row_groups.num_rows()
+    }
+
+    fn build_reader(
+        &self,
+        field: &ParquetField,
+        mask: &ProjectionMask,
+    ) -> Result<Option<Box<dyn ArrayReader>>> {
+        match field.field_type {
+            ParquetFieldType::Primitive { .. } => 
self.build_primitive_reader(field, mask),
+            ParquetFieldType::Group { .. } => match &field.arrow_type {
+                DataType::Map(_, _) => self.build_map_reader(field, mask),
+                DataType::Struct(_) => self.build_struct_reader(field, mask),
+                DataType::List(_) => self.build_list_reader(field, mask, 
false),
+                DataType::LargeList(_) => self.build_list_reader(field, mask, 
true),
+                DataType::FixedSizeList(_, _) => 
self.build_fixed_size_list_reader(field, mask),
+                d => unimplemented!("reading group type {} not implemented", 
d),
+            },
         }
-        (None, None) => Ok(None),
-        _ => Err(general_err!(
-            "partial projection of MapArray is not supported"
-        )),
     }
-}
 
-/// Build array reader for list type.
-fn build_list_reader(
-    field: &ParquetField,
-    mask: &ProjectionMask,
-    is_large: bool,
-    row_groups: &dyn RowGroups,
-) -> Result<Option<Box<dyn ArrayReader>>> {
-    let children = field.children().unwrap();
-    assert_eq!(children.len(), 1);
-
-    let reader = match build_reader(&children[0], mask, row_groups)? {
-        Some(item_reader) => {
-            // Need to retrieve underlying data type to handle projection
-            let item_type = item_reader.get_data_type().clone();
-            let data_type = match &field.arrow_type {
-                DataType::List(f) => {
-                    
DataType::List(Arc::new(f.as_ref().clone().with_data_type(item_type)))
-                }
-                DataType::LargeList(f) => {
-                    
DataType::LargeList(Arc::new(f.as_ref().clone().with_data_type(item_type)))
-                }
-                _ => unreachable!(),
-            };
+    /// Build array reader for map type.
+    fn build_map_reader(
+        &self,
+        field: &ParquetField,
+        mask: &ProjectionMask,
+    ) -> Result<Option<Box<dyn ArrayReader>>> {
+        let children = field.children().unwrap();
+        assert_eq!(children.len(), 2);
+
+        let key_reader = self.build_reader(&children[0], mask)?;
+        let value_reader = self.build_reader(&children[1], mask)?;
+
+        match (key_reader, value_reader) {
+            (Some(key_reader), Some(value_reader)) => {
+                // Need to retrieve underlying data type to handle projection
+                let key_type = key_reader.get_data_type().clone();
+                let value_type = value_reader.get_data_type().clone();
+
+                let data_type = match &field.arrow_type {
+                    DataType::Map(map_field, is_sorted) => match 
map_field.data_type() {
+                        DataType::Struct(fields) => {
+                            assert_eq!(fields.len(), 2);
+                            let struct_field = 
map_field.as_ref().clone().with_data_type(
+                                DataType::Struct(Fields::from(vec![
+                                    
fields[0].as_ref().clone().with_data_type(key_type),
+                                    
fields[1].as_ref().clone().with_data_type(value_type),
+                                ])),
+                            );
+                            DataType::Map(Arc::new(struct_field), *is_sorted)
+                        }
+                        _ => unreachable!(),
+                    },
+                    _ => unreachable!(),
+                };
 
-            let reader = match is_large {
-                false => Box::new(ListArrayReader::<i32>::new(
-                    item_reader,
-                    data_type,
-                    field.def_level,
-                    field.rep_level,
-                    field.nullable,
-                )) as _,
-                true => Box::new(ListArrayReader::<i64>::new(
-                    item_reader,
+                Ok(Some(Box::new(MapArrayReader::new(
+                    key_reader,
+                    value_reader,
                     data_type,
                     field.def_level,
                     field.rep_level,
                     field.nullable,
-                )) as _,
-            };
-            Some(reader)
+                ))))
+            }
+            (None, None) => Ok(None),
+            _ => Err(general_err!(
+                "partial projection of MapArray is not supported"
+            )),
         }
-        None => None,
-    };
-    Ok(reader)
-}
+    }
 
-/// Build array reader for fixed-size list type.
-fn build_fixed_size_list_reader(
-    field: &ParquetField,
-    mask: &ProjectionMask,
-    row_groups: &dyn RowGroups,
-) -> Result<Option<Box<dyn ArrayReader>>> {
-    let children = field.children().unwrap();
-    assert_eq!(children.len(), 1);
-
-    let reader = match build_reader(&children[0], mask, row_groups)? {
-        Some(item_reader) => {
-            let item_type = item_reader.get_data_type().clone();
-            let reader = match &field.arrow_type {
-                &DataType::FixedSizeList(ref f, size) => {
-                    let data_type = DataType::FixedSizeList(
-                        Arc::new(f.as_ref().clone().with_data_type(item_type)),
-                        size,
-                    );
-
-                    Box::new(FixedSizeListArrayReader::new(
+    /// Build array reader for list type.
+    fn build_list_reader(
+        &self,
+        field: &ParquetField,
+        mask: &ProjectionMask,
+        is_large: bool,
+    ) -> Result<Option<Box<dyn ArrayReader>>> {
+        let children = field.children().unwrap();
+        assert_eq!(children.len(), 1);
+
+        let reader = match self.build_reader(&children[0], mask)? {
+            Some(item_reader) => {
+                // Need to retrieve underlying data type to handle projection
+                let item_type = item_reader.get_data_type().clone();
+                let data_type = match &field.arrow_type {
+                    DataType::List(f) => {
+                        
DataType::List(Arc::new(f.as_ref().clone().with_data_type(item_type)))
+                    }
+                    DataType::LargeList(f) => {
+                        
DataType::LargeList(Arc::new(f.as_ref().clone().with_data_type(item_type)))
+                    }
+                    _ => unreachable!(),
+                };
+
+                let reader = match is_large {
+                    false => Box::new(ListArrayReader::<i32>::new(
                         item_reader,
-                        size as usize,
                         data_type,
                         field.def_level,
                         field.rep_level,
                         field.nullable,
-                    )) as _
-                }
-                _ => unimplemented!(),
-            };
-            Some(reader)
+                    )) as _,
+                    true => Box::new(ListArrayReader::<i64>::new(
+                        item_reader,
+                        data_type,
+                        field.def_level,
+                        field.rep_level,
+                        field.nullable,
+                    )) as _,
+                };
+                Some(reader)
+            }
+            None => None,
+        };
+        Ok(reader)
+    }
+
+    /// Build array reader for fixed-size list type.
+    fn build_fixed_size_list_reader(
+        &self,
+        field: &ParquetField,
+        mask: &ProjectionMask,
+    ) -> Result<Option<Box<dyn ArrayReader>>> {
+        let children = field.children().unwrap();
+        assert_eq!(children.len(), 1);
+
+        let reader = match self.build_reader(&children[0], mask)? {
+            Some(item_reader) => {
+                let item_type = item_reader.get_data_type().clone();
+                let reader = match &field.arrow_type {
+                    &DataType::FixedSizeList(ref f, size) => {
+                        let data_type = DataType::FixedSizeList(
+                            
Arc::new(f.as_ref().clone().with_data_type(item_type)),
+                            size,
+                        );
+
+                        Box::new(FixedSizeListArrayReader::new(
+                            item_reader,
+                            size as usize,
+                            data_type,
+                            field.def_level,
+                            field.rep_level,
+                            field.nullable,
+                        )) as _
+                    }
+                    _ => unimplemented!(),
+                };
+                Some(reader)
+            }
+            None => None,
+        };
+        Ok(reader)
+    }
+
+    /// Creates primitive array reader for each primitive type.
+    fn build_primitive_reader(
+        &self,
+        field: &ParquetField,
+        mask: &ProjectionMask,
+    ) -> Result<Option<Box<dyn ArrayReader>>> {
+        let (col_idx, primitive_type) = match &field.field_type {
+            ParquetFieldType::Primitive {
+                col_idx,
+                primitive_type,
+            } => match primitive_type.as_ref() {
+                Type::PrimitiveType { .. } => (*col_idx, 
primitive_type.clone()),
+                Type::GroupType { .. } => unreachable!(),
+            },
+            _ => unreachable!(),
+        };
+
+        if !mask.leaf_included(col_idx) {
+            return Ok(None);
         }
-        None => None,
-    };
-    Ok(reader)
-}
 
-/// Creates primitive array reader for each primitive type.
-fn build_primitive_reader(
-    field: &ParquetField,
-    mask: &ProjectionMask,
-    row_groups: &dyn RowGroups,
-) -> Result<Option<Box<dyn ArrayReader>>> {
-    let (col_idx, primitive_type) = match &field.field_type {
-        ParquetFieldType::Primitive {
-            col_idx,
+        let physical_type = primitive_type.get_physical_type();
+
+        // We don't track the column path in ParquetField as it adds a 
potential source
+        // of bugs when the arrow mapping converts more than one level in the 
parquet
+        // schema into a single arrow field.
+        //
+        // None of the readers actually use this field, but it is required for 
this type,
+        // so just stick a placeholder in
+        let column_desc = Arc::new(ColumnDescriptor::new(
             primitive_type,
-        } => match primitive_type.as_ref() {
-            Type::PrimitiveType { .. } => (*col_idx, primitive_type.clone()),
-            Type::GroupType { .. } => unreachable!(),
-        },
-        _ => unreachable!(),
-    };
-
-    if !mask.leaf_included(col_idx) {
-        return Ok(None);
+            field.def_level,
+            field.rep_level,
+            ColumnPath::new(vec![]),
+        ));
+
+        let page_iterator = self.row_groups.column_chunks(col_idx)?;
+        let arrow_type = Some(field.arrow_type.clone());
+
+        let reader = match physical_type {
+            PhysicalType::BOOLEAN => 
Box::new(PrimitiveArrayReader::<BoolType>::new(
+                page_iterator,
+                column_desc,
+                arrow_type,
+            )?) as _,
+            PhysicalType::INT32 => {
+                if let Some(DataType::Null) = arrow_type {
+                    Box::new(NullArrayReader::<Int32Type>::new(
+                        page_iterator,
+                        column_desc,
+                    )?) as _
+                } else {
+                    Box::new(PrimitiveArrayReader::<Int32Type>::new(
+                        page_iterator,
+                        column_desc,
+                        arrow_type,
+                    )?) as _
+                }
+            }
+            PhysicalType::INT64 => 
Box::new(PrimitiveArrayReader::<Int64Type>::new(
+                page_iterator,
+                column_desc,
+                arrow_type,
+            )?) as _,
+            PhysicalType::INT96 => 
Box::new(PrimitiveArrayReader::<Int96Type>::new(
+                page_iterator,
+                column_desc,
+                arrow_type,
+            )?) as _,
+            PhysicalType::FLOAT => 
Box::new(PrimitiveArrayReader::<FloatType>::new(
+                page_iterator,
+                column_desc,
+                arrow_type,
+            )?) as _,
+            PhysicalType::DOUBLE => 
Box::new(PrimitiveArrayReader::<DoubleType>::new(
+                page_iterator,
+                column_desc,
+                arrow_type,
+            )?) as _,
+            PhysicalType::BYTE_ARRAY => match arrow_type {
+                Some(DataType::Dictionary(_, _)) => {
+                    make_byte_array_dictionary_reader(page_iterator, 
column_desc, arrow_type)?
+                }
+                Some(DataType::Utf8View | DataType::BinaryView) => {
+                    make_byte_view_array_reader(page_iterator, column_desc, 
arrow_type)?
+                }
+                _ => make_byte_array_reader(page_iterator, column_desc, 
arrow_type)?,
+            },
+            PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type {
+                Some(DataType::Dictionary(_, _)) => {
+                    make_byte_array_dictionary_reader(page_iterator, 
column_desc, arrow_type)?
+                }
+                _ => make_fixed_len_byte_array_reader(page_iterator, 
column_desc, arrow_type)?,
+            },
+        };
+        Ok(Some(reader))
     }
 
-    let physical_type = primitive_type.get_physical_type();
-
-    // We don't track the column path in ParquetField as it adds a potential 
source
-    // of bugs when the arrow mapping converts more than one level in the 
parquet
-    // schema into a single arrow field.
-    //
-    // None of the readers actually use this field, but it is required for 
this type,
-    // so just stick a placeholder in
-    let column_desc = Arc::new(ColumnDescriptor::new(
-        primitive_type,
-        field.def_level,
-        field.rep_level,
-        ColumnPath::new(vec![]),
-    ));
-
-    let page_iterator = row_groups.column_chunks(col_idx)?;
-    let arrow_type = Some(field.arrow_type.clone());
-
-    let reader = match physical_type {
-        PhysicalType::BOOLEAN => 
Box::new(PrimitiveArrayReader::<BoolType>::new(
-            page_iterator,
-            column_desc,
-            arrow_type,
-        )?) as _,
-        PhysicalType::INT32 => {
-            if let Some(DataType::Null) = arrow_type {
-                Box::new(NullArrayReader::<Int32Type>::new(
-                    page_iterator,
-                    column_desc,
-                )?) as _
-            } else {
-                Box::new(PrimitiveArrayReader::<Int32Type>::new(
-                    page_iterator,
-                    column_desc,
-                    arrow_type,
-                )?) as _
+    fn build_struct_reader(
+        &self,
+        field: &ParquetField,
+        mask: &ProjectionMask,
+    ) -> Result<Option<Box<dyn ArrayReader>>> {
+        let arrow_fields = match &field.arrow_type {
+            DataType::Struct(children) => children,
+            _ => unreachable!(),
+        };
+        let children = field.children().unwrap();
+        assert_eq!(arrow_fields.len(), children.len());
+
+        let mut readers = Vec::with_capacity(children.len());
+        let mut builder = SchemaBuilder::with_capacity(children.len());
+
+        for (arrow, parquet) in arrow_fields.iter().zip(children) {
+            if let Some(reader) = self.build_reader(parquet, mask)? {
+                // Need to retrieve underlying data type to handle projection
+                let child_type = reader.get_data_type().clone();
+                
builder.push(arrow.as_ref().clone().with_data_type(child_type));
+                readers.push(reader);
             }
         }
-        PhysicalType::INT64 => Box::new(PrimitiveArrayReader::<Int64Type>::new(
-            page_iterator,
-            column_desc,
-            arrow_type,
-        )?) as _,
-        PhysicalType::INT96 => Box::new(PrimitiveArrayReader::<Int96Type>::new(
-            page_iterator,
-            column_desc,
-            arrow_type,
-        )?) as _,
-        PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::<FloatType>::new(
-            page_iterator,
-            column_desc,
-            arrow_type,
-        )?) as _,
-        PhysicalType::DOUBLE => 
Box::new(PrimitiveArrayReader::<DoubleType>::new(
-            page_iterator,
-            column_desc,
-            arrow_type,
-        )?) as _,
-        PhysicalType::BYTE_ARRAY => match arrow_type {
-            Some(DataType::Dictionary(_, _)) => {
-                make_byte_array_dictionary_reader(page_iterator, column_desc, 
arrow_type)?
-            }
-            Some(DataType::Utf8View | DataType::BinaryView) => {
-                make_byte_view_array_reader(page_iterator, column_desc, 
arrow_type)?
-            }
-            _ => make_byte_array_reader(page_iterator, column_desc, 
arrow_type)?,
-        },
-        PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type {
-            Some(DataType::Dictionary(_, _)) => {
-                make_byte_array_dictionary_reader(page_iterator, column_desc, 
arrow_type)?
-            }
-            _ => make_fixed_len_byte_array_reader(page_iterator, column_desc, 
arrow_type)?,
-        },
-    };
-    Ok(Some(reader))
-}
 
-fn build_struct_reader(
-    field: &ParquetField,
-    mask: &ProjectionMask,
-    row_groups: &dyn RowGroups,
-) -> Result<Option<Box<dyn ArrayReader>>> {
-    let arrow_fields = match &field.arrow_type {
-        DataType::Struct(children) => children,
-        _ => unreachable!(),
-    };
-    let children = field.children().unwrap();
-    assert_eq!(arrow_fields.len(), children.len());
-
-    let mut readers = Vec::with_capacity(children.len());
-    let mut builder = SchemaBuilder::with_capacity(children.len());
-
-    for (arrow, parquet) in arrow_fields.iter().zip(children) {
-        if let Some(reader) = build_reader(parquet, mask, row_groups)? {
-            // Need to retrieve underlying data type to handle projection
-            let child_type = reader.get_data_type().clone();
-            builder.push(arrow.as_ref().clone().with_data_type(child_type));
-            readers.push(reader);
+        if readers.is_empty() {
+            return Ok(None);
         }
-    }
 
-    if readers.is_empty() {
-        return Ok(None);
+        Ok(Some(Box::new(StructArrayReader::new(
+            DataType::Struct(builder.finish().fields),
+            readers,
+            field.def_level,
+            field.rep_level,
+            field.nullable,
+        ))))
     }
-
-    Ok(Some(Box::new(StructArrayReader::new(
-        DataType::Struct(builder.finish().fields),
-        readers,
-        field.def_level,
-        field.rep_level,
-        field.nullable,
-    ))))
 }
 
 #[cfg(test)]
@@ -359,7 +375,9 @@ mod tests {
         )
         .unwrap();
 
-        let array_reader = build_array_reader(fields.as_ref(), &mask, 
&file_reader).unwrap();
+        let array_reader = ArrayReaderBuilder::new(&file_reader)
+            .build_array_reader(fields.as_ref(), &mask)
+            .unwrap();
 
         // Create arrow types
         let arrow_type = DataType::Struct(Fields::from(vec![Field::new(
diff --git a/parquet/src/arrow/array_reader/list_array.rs 
b/parquet/src/arrow/array_reader/list_array.rs
index 06448d5f6f..66c4f30b3c 100644
--- a/parquet/src/arrow/array_reader/list_array.rs
+++ b/parquet/src/arrow/array_reader/list_array.rs
@@ -246,9 +246,9 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for 
ListArrayReader<OffsetSize> {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::arrow::array_reader::build_array_reader;
     use crate::arrow::array_reader::list_array::ListArrayReader;
     use crate::arrow::array_reader::test_util::InMemoryArrayReader;
+    use crate::arrow::array_reader::ArrayReaderBuilder;
     use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
     use crate::arrow::{parquet_to_arrow_schema, ArrowWriter, ProjectionMask};
     use crate::file::properties::WriterProperties;
@@ -563,7 +563,9 @@ mod tests {
         )
         .unwrap();
 
-        let mut array_reader = build_array_reader(fields.as_ref(), &mask, 
&file_reader).unwrap();
+        let mut array_reader = ArrayReaderBuilder::new(&file_reader)
+            .build_array_reader(fields.as_ref(), &mask)
+            .unwrap();
 
         let batch = array_reader.next_batch(100).unwrap();
         assert_eq!(batch.data_type(), array_reader.get_data_type());
diff --git a/parquet/src/arrow/array_reader/mod.rs 
b/parquet/src/arrow/array_reader/mod.rs
index a5ea426e95..94d61c9eac 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -45,7 +45,7 @@ mod struct_array;
 #[cfg(test)]
 mod test_util;
 
-pub use builder::build_array_reader;
+pub(crate) use builder::ArrayReaderBuilder;
 pub use byte_array::make_byte_array_reader;
 pub use byte_array_dictionary::make_byte_array_dictionary_reader;
 #[allow(unused_imports)] // Only used for benchmarks
@@ -111,7 +111,8 @@ pub trait RowGroups {
     /// Get the number of rows in this collection
     fn num_rows(&self) -> usize;
 
-    /// Returns a [`PageIterator`] for the column chunks with the given leaf 
column index
+    /// Returns a [`PageIterator`] for all pages in the specified column chunk
+    /// across all row groups in this collection.
     fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
 }
 
diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index ea068acb29..a8688e8af8 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -26,7 +26,7 @@ pub use selection::{RowSelection, RowSelector};
 use std::sync::Arc;
 
 pub use crate::arrow::array_reader::RowGroups;
-use crate::arrow::array_reader::{build_array_reader, ArrayReader};
+use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
 use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
 use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
 use crate::column::page::{PageIterator, PageReader};
@@ -690,14 +690,16 @@ impl<T: ChunkReader + 'static> 
ParquetRecordBatchReaderBuilder<T> {
                     break;
                 }
 
-                let array_reader =
-                    build_array_reader(self.fields.as_deref(), 
predicate.projection(), &reader)?;
+                let array_reader = ArrayReaderBuilder::new(&reader)
+                    .build_array_reader(self.fields.as_deref(), 
predicate.projection())?;
 
                 plan_builder = plan_builder.with_predicate(array_reader, 
predicate.as_mut())?;
             }
         }
 
-        let array_reader = build_array_reader(self.fields.as_deref(), 
&self.projection, &reader)?;
+        let array_reader = ArrayReaderBuilder::new(&reader)
+            .build_array_reader(self.fields.as_deref(), &self.projection)?;
+
         let read_plan = plan_builder
             .limited(reader.num_rows())
             .with_offset(self.offset)
@@ -896,8 +898,8 @@ impl ParquetRecordBatchReader {
         batch_size: usize,
         selection: Option<RowSelection>,
     ) -> Result<Self> {
-        let array_reader =
-            build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(), 
row_groups)?;
+        let array_reader = ArrayReaderBuilder::new(row_groups)
+            .build_array_reader(levels.levels.as_ref(), 
&ProjectionMask::all())?;
 
         let read_plan = ReadPlanBuilder::new(batch_size)
             .with_selection(selection)
diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index ac4d24ee27..611d6999e0 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -38,7 +38,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, 
AsyncSeekExt};
 use arrow_array::RecordBatch;
 use arrow_schema::{DataType, Fields, Schema, SchemaRef};
 
-use crate::arrow::array_reader::{build_array_reader, RowGroups};
+use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroups};
 use crate::arrow::arrow_reader::{
     ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, 
ParquetRecordBatchReader,
     RowFilter, RowSelection,
@@ -613,8 +613,8 @@ where
                     .fetch(&mut self.input, predicate.projection(), selection)
                     .await?;
 
-                let array_reader =
-                    build_array_reader(self.fields.as_deref(), 
predicate.projection(), &row_group)?;
+                let array_reader = ArrayReaderBuilder::new(&row_group)
+                    .build_array_reader(self.fields.as_deref(), 
predicate.projection())?;
 
                 plan_builder = plan_builder.with_predicate(array_reader, 
predicate.as_mut())?;
             }
@@ -661,7 +661,9 @@ where
 
         let plan = plan_builder.build();
 
-        let array_reader = build_array_reader(self.fields.as_deref(), 
&projection, &row_group)?;
+        let array_reader = ArrayReaderBuilder::new(&row_group)
+            .build_array_reader(self.fields.as_deref(), &projection)?;
+
         let reader = ParquetRecordBatchReader::new(array_reader, plan);
 
         Ok((self, Some(reader)))

Reply via email to