This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new ce18e5b1e4 Introduce `ReadPlan` to encapsulate the calculation of what
parquet rows to decode (#7502)
ce18e5b1e4 is described below
commit ce18e5b1e4ff22695a3a97bf519336dd400176e9
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon May 19 15:42:48 2025 -0400
Introduce `ReadPlan` to encapsulate the calculation of what parquet rows to
decode (#7502)
* Introduce `ReadPlan` to encapsulate the calculation of what rows to decode
* Update parquet/src/arrow/arrow_reader/read_plan.rs
Co-authored-by: Ed Seidl <[email protected]>
---------
Co-authored-by: Ed Seidl <[email protected]>
---
parquet/src/arrow/arrow_reader/mod.rs | 149 ++++-------------
parquet/src/arrow/arrow_reader/read_plan.rs | 249 ++++++++++++++++++++++++++++
parquet/src/arrow/async_reader/mod.rs | 79 +++++----
3 files changed, 326 insertions(+), 151 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index 90e2592183..ea068acb29 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -21,10 +21,8 @@ use arrow_array::cast::AsArray;
use arrow_array::Array;
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
-use arrow_select::filter::prep_null_mask_filter;
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};
-use std::collections::VecDeque;
use std::sync::Arc;
pub use crate::arrow::array_reader::RowGroups;
@@ -39,7 +37,10 @@ use crate::file::metadata::{ParquetMetaData,
ParquetMetaDataReader};
use crate::file::reader::{ChunkReader, SerializedPageReader};
use crate::schema::types::SchemaDescriptor;
+pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder};
+
mod filter;
+mod read_plan;
mod selection;
pub mod statistics;
@@ -679,38 +680,32 @@ impl<T: ChunkReader + 'static>
ParquetRecordBatchReaderBuilder<T> {
};
let mut filter = self.filter;
- let mut selection = self.selection;
+ let mut plan_builder =
ReadPlanBuilder::new(batch_size).with_selection(self.selection);
+ // Update selection based on any filters
if let Some(filter) = filter.as_mut() {
for predicate in filter.predicates.iter_mut() {
- if !selects_any(selection.as_ref()) {
+ // break early if we have ruled out all rows
+ if !plan_builder.selects_any() {
break;
}
let array_reader =
build_array_reader(self.fields.as_deref(),
predicate.projection(), &reader)?;
- selection = Some(evaluate_predicate(
- batch_size,
- array_reader,
- selection,
- predicate.as_mut(),
- )?);
+ plan_builder = plan_builder.with_predicate(array_reader,
predicate.as_mut())?;
}
}
let array_reader = build_array_reader(self.fields.as_deref(),
&self.projection, &reader)?;
+ let read_plan = plan_builder
+ .limited(reader.num_rows())
+ .with_offset(self.offset)
+ .with_limit(self.limit)
+ .build_limited()
+ .build();
- // If selection is empty, truncate
- if !selects_any(selection.as_ref()) {
- selection = Some(RowSelection::from(vec![]));
- }
-
- Ok(ParquetRecordBatchReader::new(
- batch_size,
- array_reader,
- apply_range(selection, reader.num_rows(), self.offset, self.limit),
- ))
+ Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
}
}
@@ -789,11 +784,9 @@ impl<T: ChunkReader + 'static> PageIterator for
ReaderPageIterator<T> {}
/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
/// read from a parquet data source
pub struct ParquetRecordBatchReader {
- batch_size: usize,
array_reader: Box<dyn ArrayReader>,
schema: SchemaRef,
- /// Row ranges to be selected from the data source
- selection: Option<VecDeque<RowSelector>>,
+ read_plan: ReadPlan,
}
impl Iterator for ParquetRecordBatchReader {
@@ -814,9 +807,10 @@ impl ParquetRecordBatchReader {
/// simplify error handling with `?`
fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
let mut read_records = 0;
- match self.selection.as_mut() {
+ let batch_size = self.batch_size();
+ match self.read_plan.selection_mut() {
Some(selection) => {
- while read_records < self.batch_size && !selection.is_empty() {
+ while read_records < batch_size && !selection.is_empty() {
let front = selection.pop_front().unwrap();
if front.skip {
let skipped =
self.array_reader.skip_records(front.row_count)?;
@@ -838,7 +832,7 @@ impl ParquetRecordBatchReader {
}
// try to read record
- let need_read = self.batch_size - read_records;
+ let need_read = batch_size - read_records;
let to_read = match front.row_count.checked_sub(need_read)
{
Some(remaining) if remaining != 0 => {
// if page row count less than batch_size we must
set batch size to page row count.
@@ -855,7 +849,7 @@ impl ParquetRecordBatchReader {
}
}
None => {
- self.array_reader.read_records(self.batch_size)?;
+ self.array_reader.read_records(batch_size)?;
}
};
@@ -905,116 +899,37 @@ impl ParquetRecordBatchReader {
let array_reader =
build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(),
row_groups)?;
+ let read_plan = ReadPlanBuilder::new(batch_size)
+ .with_selection(selection)
+ .build();
+
Ok(Self {
- batch_size,
array_reader,
schema: Arc::new(Schema::new(levels.fields.clone())),
- selection: selection.map(|s| s.trim().into()),
+ read_plan,
})
}
/// Create a new [`ParquetRecordBatchReader`] that will read at most
`batch_size` rows at
/// a time from [`ArrayReader`] based on the configured `selection`. If
`selection` is `None`
/// all rows will be returned
- pub(crate) fn new(
- batch_size: usize,
- array_reader: Box<dyn ArrayReader>,
- selection: Option<RowSelection>,
- ) -> Self {
+ pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan)
-> Self {
let schema = match array_reader.get_data_type() {
ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
_ => unreachable!("Struct array reader's data type is not
struct!"),
};
Self {
- batch_size,
array_reader,
schema: Arc::new(schema),
- selection: selection.map(|s| s.trim().into()),
+ read_plan,
}
}
-}
-/// Returns `true` if `selection` is `None` or selects some rows
-pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
- selection.map(|x| x.selects_any()).unwrap_or(true)
-}
-
-/// Applies an optional offset and limit to an optional [`RowSelection`]
-pub(crate) fn apply_range(
- mut selection: Option<RowSelection>,
- row_count: usize,
- offset: Option<usize>,
- limit: Option<usize>,
-) -> Option<RowSelection> {
- // If an offset is defined, apply it to the `selection`
- if let Some(offset) = offset {
- selection = Some(match row_count.checked_sub(offset) {
- None => RowSelection::from(vec![]),
- Some(remaining) => selection
- .map(|selection| selection.offset(offset))
- .unwrap_or_else(|| {
- RowSelection::from(vec![
- RowSelector::skip(offset),
- RowSelector::select(remaining),
- ])
- }),
- });
- }
-
- // If a limit is defined, apply it to the final `selection`
- if let Some(limit) = limit {
- selection = Some(
- selection
- .map(|selection| selection.limit(limit))
- .unwrap_or_else(|| {
-
RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
- }),
- );
- }
- selection
-}
-
-/// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating
-/// which rows to return.
-///
-/// `input_selection`: Optional pre-existing selection. If `Some`, then the
-/// final [`RowSelection`] will be the conjunction of it and the rows selected
-/// by `predicate`.
-///
-/// Note: A pre-existing selection may come from evaluating a previous
predicate
-/// or if the [`ParquetRecordBatchReader`] specified an explicit
-/// [`RowSelection`] in addition to one or more predicates.
-pub(crate) fn evaluate_predicate(
- batch_size: usize,
- array_reader: Box<dyn ArrayReader>,
- input_selection: Option<RowSelection>,
- predicate: &mut dyn ArrowPredicate,
-) -> Result<RowSelection> {
- let reader = ParquetRecordBatchReader::new(batch_size, array_reader,
input_selection.clone());
- let mut filters = vec![];
- for maybe_batch in reader {
- let maybe_batch = maybe_batch?;
- let input_rows = maybe_batch.num_rows();
- let filter = predicate.evaluate(maybe_batch)?;
- // Since user supplied predicate, check error here to catch bugs
quickly
- if filter.len() != input_rows {
- return Err(arrow_err!(
- "ArrowPredicate predicate returned {} rows, expected
{input_rows}",
- filter.len()
- ));
- }
- match filter.null_count() {
- 0 => filters.push(filter),
- _ => filters.push(prep_null_mask_filter(&filter)),
- };
+ #[inline(always)]
+ pub(crate) fn batch_size(&self) -> usize {
+ self.read_plan.batch_size()
}
-
- let raw = RowSelection::from_filters(&filters);
- Ok(match input_selection {
- Some(selection) => selection.and_then(&raw),
- None => raw,
- })
}
#[cfg(test)]
@@ -3993,7 +3908,7 @@ mod tests {
.build()
.unwrap();
assert_ne!(1024, num_rows);
- assert_eq!(reader.batch_size, num_rows as usize);
+ assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
}
#[test]
diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs
b/parquet/src/arrow/arrow_reader/read_plan.rs
new file mode 100644
index 0000000000..cf5d833850
--- /dev/null
+++ b/parquet/src/arrow/arrow_reader/read_plan.rs
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ReadPlan`] and [`ReadPlanBuilder`] for determining which rows to read
+//! from a Parquet file
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::arrow::arrow_reader::{
+ ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector,
+};
+use crate::errors::{ParquetError, Result};
+use arrow_array::Array;
+use arrow_select::filter::prep_null_mask_filter;
+use std::collections::VecDeque;
+
+/// A builder for [`ReadPlan`]
+#[derive(Clone)]
+pub(crate) struct ReadPlanBuilder {
+ batch_size: usize,
+ /// Current to apply, includes all filters
+ selection: Option<RowSelection>,
+}
+
+impl ReadPlanBuilder {
+ /// Create a `ReadPlanBuilder` with the given batch size
+ pub(crate) fn new(batch_size: usize) -> Self {
+ Self {
+ batch_size,
+ selection: None,
+ }
+ }
+
+ /// Set the current selection to the given value
+ pub(crate) fn with_selection(mut self, selection: Option<RowSelection>) ->
Self {
+ self.selection = selection;
+ self
+ }
+
+ /// Returns the current selection, if any
+ pub(crate) fn selection(&self) -> Option<&RowSelection> {
+ self.selection.as_ref()
+ }
+
+ /// Specifies the number of rows in the row group, before filtering is
applied.
+ ///
+ /// Returns a [`LimitedReadPlanBuilder`] that can apply
+ /// offset and limit.
+ ///
+ /// Call [`LimitedReadPlanBuilder::build_limited`] to apply the limits to
this
+ /// selection.
+ pub(crate) fn limited(self, row_count: usize) -> LimitedReadPlanBuilder {
+ LimitedReadPlanBuilder::new(self, row_count)
+ }
+
+ /// Returns true if the current plan selects any rows
+ pub(crate) fn selects_any(&self) -> bool {
+ self.selection
+ .as_ref()
+ .map(|s| s.selects_any())
+ .unwrap_or(true)
+ }
+
+ /// Returns the number of rows selected, or `None` if all rows are
selected.
+ pub(crate) fn num_rows_selected(&self) -> Option<usize> {
+ self.selection.as_ref().map(|s| s.row_count())
+ }
+
+ /// Evaluates an [`ArrowPredicate`], updating this plan's `selection`
+ ///
+ /// If the current `selection` is `Some`, the resulting [`RowSelection`]
+ /// will be the conjunction of the existing selection and the rows selected
+ /// by `predicate`.
+ ///
+ /// Note: pre-existing selections may come from evaluating a previous
predicate
+ /// or if the [`ParquetRecordBatchReader`] specified an explicit
+ /// [`RowSelection`] in addition to one or more predicates.
+ pub(crate) fn with_predicate(
+ mut self,
+ array_reader: Box<dyn ArrayReader>,
+ predicate: &mut dyn ArrowPredicate,
+ ) -> Result<Self> {
+ let reader = ParquetRecordBatchReader::new(array_reader,
self.clone().build());
+ let mut filters = vec![];
+ for maybe_batch in reader {
+ let maybe_batch = maybe_batch?;
+ let input_rows = maybe_batch.num_rows();
+ let filter = predicate.evaluate(maybe_batch)?;
+ // Since user supplied predicate, check error here to catch bugs
quickly
+ if filter.len() != input_rows {
+ return Err(arrow_err!(
+ "ArrowPredicate predicate returned {} rows, expected
{input_rows}",
+ filter.len()
+ ));
+ }
+ match filter.null_count() {
+ 0 => filters.push(filter),
+ _ => filters.push(prep_null_mask_filter(&filter)),
+ };
+ }
+
+ let raw = RowSelection::from_filters(&filters);
+ self.selection = match self.selection.take() {
+ Some(selection) => Some(selection.and_then(&raw)),
+ None => Some(raw),
+ };
+ Ok(self)
+ }
+
+ /// Create a final `ReadPlan` the read plan for the scan
+ pub(crate) fn build(mut self) -> ReadPlan {
+ // If selection is empty, truncate
+ if !self.selects_any() {
+ self.selection = Some(RowSelection::from(vec![]));
+ }
+ let Self {
+ batch_size,
+ selection,
+ } = self;
+
+ let selection = selection.map(|s| s.trim().into());
+
+ ReadPlan {
+ batch_size,
+ selection,
+ }
+ }
+}
+
+/// Builder for [`ReadPlan`] that applies a limit and offset to the read plan
+///
+/// See [`ReadPlanBuilder::limited`] to create this builder.
+pub(crate) struct LimitedReadPlanBuilder {
+ /// The underlying builder
+ inner: ReadPlanBuilder,
+ /// Total number of rows in the row group before the selection, limit or
+ /// offset are applied
+ row_count: usize,
+ /// The offset to apply, if any
+ offset: Option<usize>,
+ /// The limit to apply, if any
+ limit: Option<usize>,
+}
+
+impl LimitedReadPlanBuilder {
+ /// Create a new `LimitedReadPlanBuilder` from the existing builder and
number of rows
+ fn new(inner: ReadPlanBuilder, row_count: usize) -> Self {
+ Self {
+ inner,
+ row_count,
+ offset: None,
+ limit: None,
+ }
+ }
+
+ /// Set the offset to apply to the read plan
+ pub(crate) fn with_offset(mut self, offset: Option<usize>) -> Self {
+ self.offset = offset;
+ self
+ }
+
+ /// Set the limit to apply to the read plan
+ pub(crate) fn with_limit(mut self, limit: Option<usize>) -> Self {
+ self.limit = limit;
+ self
+ }
+
+ /// Apply offset and limit, updating the selection on the underlying
builder
+ /// and returning it.
+ pub(crate) fn build_limited(self) -> ReadPlanBuilder {
+ let Self {
+ mut inner,
+ row_count,
+ offset,
+ limit,
+ } = self;
+
+ // If the selection is empty, truncate
+ if !inner.selects_any() {
+ inner.selection = Some(RowSelection::from(vec![]));
+ }
+
+ // If an offset is defined, apply it to the `selection`
+ if let Some(offset) = offset {
+ inner.selection = Some(match row_count.checked_sub(offset) {
+ None => RowSelection::from(vec![]),
+ Some(remaining) => inner
+ .selection
+ .map(|selection| selection.offset(offset))
+ .unwrap_or_else(|| {
+ RowSelection::from(vec![
+ RowSelector::skip(offset),
+ RowSelector::select(remaining),
+ ])
+ }),
+ });
+ }
+
+ // If a limit is defined, apply it to the final `selection`
+ if let Some(limit) = limit {
+ inner.selection = Some(
+ inner
+ .selection
+ .map(|selection| selection.limit(limit))
+ .unwrap_or_else(|| {
+
RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
+ }),
+ );
+ }
+
+ inner
+ }
+}
+
+/// A plan reading specific rows from a Parquet Row Group.
+///
+/// See [`ReadPlanBuilder`] to create `ReadPlan`s
+pub(crate) struct ReadPlan {
+ /// The number of rows to read in each batch
+ batch_size: usize,
+ /// Row ranges to be selected from the data source
+ selection: Option<VecDeque<RowSelector>>,
+}
+
+impl ReadPlan {
+ /// Returns a mutable reference to the selection, if any
+ pub(crate) fn selection_mut(&mut self) -> Option<&mut
VecDeque<RowSelector>> {
+ self.selection.as_mut()
+ }
+
+ /// Return the number of rows to read in each output batch
+ #[inline(always)]
+ pub fn batch_size(&self) -> usize {
+ self.batch_size
+ }
+}
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index 9466fb9a35..0c38d36a5b 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -40,8 +40,8 @@ use arrow_schema::{DataType, Fields, Schema, SchemaRef};
use crate::arrow::array_reader::{build_array_reader, RowGroups};
use crate::arrow::arrow_reader::{
- apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder,
ArrowReaderMetadata,
- ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection,
+ ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions,
ParquetRecordBatchReader,
+ RowFilter, RowSelection,
};
use crate::arrow::ProjectionMask;
@@ -61,6 +61,7 @@ pub use metadata::*;
#[cfg(feature = "object_store")]
mod store;
+use crate::arrow::arrow_reader::ReadPlanBuilder;
use crate::arrow::schema::ParquetField;
#[cfg(feature = "object_store")]
pub use store::*;
@@ -535,6 +536,10 @@ impl<T: AsyncFileReader + Send + 'static>
ParquetRecordBatchStreamBuilder<T> {
}
}
+/// Returns a [`ReaderFactory`] and an optional [`ParquetRecordBatchReader`]
for the next row group
+///
+/// Note: If all rows are filtered out in the row group (e.g by filters, limit
or
+/// offset), returns `None` for the reader.
type ReadResult<T> = Result<(ReaderFactory<T>,
Option<ParquetRecordBatchReader>)>;
/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
@@ -542,14 +547,18 @@ type ReadResult<T> = Result<(ReaderFactory<T>,
Option<ParquetRecordBatchReader>)
struct ReaderFactory<T> {
metadata: Arc<ParquetMetaData>,
+ /// Top level parquet schema
fields: Option<Arc<ParquetField>>,
input: T,
+ /// Optional filter
filter: Option<RowFilter>,
+ /// Limit to apply to remaining row groups.
limit: Option<usize>,
+ /// Offset to apply to the next
offset: Option<usize>,
}
@@ -559,11 +568,13 @@ where
{
/// Reads the next row group with the provided `selection`, `projection`
and `batch_size`
///
+ /// Updates the `limit` and `offset` of the reader factory
+ ///
/// Note: this captures self so that the resulting future has a static
lifetime
async fn read_row_group(
mut self,
row_group_idx: usize,
- mut selection: Option<RowSelection>,
+ selection: Option<RowSelection>,
projection: ProjectionMask,
batch_size: usize,
) -> ReadResult<T> {
@@ -586,49 +597,50 @@ where
metadata: self.metadata.as_ref(),
};
+ let filter = self.filter.as_mut();
+ let mut plan_builder =
ReadPlanBuilder::new(batch_size).with_selection(selection);
+
// Update selection based on any filters
- if let Some(filter) = self.filter.as_mut() {
+ if let Some(filter) = filter {
for predicate in filter.predicates.iter_mut() {
- if !selects_any(selection.as_ref()) {
- return Ok((self, None));
+ if !plan_builder.selects_any() {
+ return Ok((self, None)); // ruled out entire row group
}
- let predicate_projection = predicate.projection();
+ // (pre) Fetch only the columns that are selected by the
predicate
+ let selection = plan_builder.selection();
row_group
- .fetch(&mut self.input, predicate_projection,
selection.as_ref())
+ .fetch(&mut self.input, predicate.projection(), selection)
.await?;
let array_reader =
- build_array_reader(self.fields.as_deref(),
predicate_projection, &row_group)?;
-
- selection = Some(evaluate_predicate(
- batch_size,
- array_reader,
- selection,
- predicate.as_mut(),
- )?);
+ build_array_reader(self.fields.as_deref(),
predicate.projection(), &row_group)?;
+
+ plan_builder = plan_builder.with_predicate(array_reader,
predicate.as_mut())?;
}
}
// Compute the number of rows in the selection before applying limit
and offset
- let rows_before = selection
- .as_ref()
- .map(|s| s.row_count())
+ let rows_before = plan_builder
+ .num_rows_selected()
.unwrap_or(row_group.row_count);
if rows_before == 0 {
- return Ok((self, None));
+ return Ok((self, None)); // ruled out entire row group
}
- selection = apply_range(selection, row_group.row_count, self.offset,
self.limit);
+ // Apply any limit and offset
+ let plan_builder = plan_builder
+ .limited(row_group.row_count)
+ .with_offset(self.offset)
+ .with_limit(self.limit)
+ .build_limited();
- // Compute the number of rows in the selection after applying limit
and offset
- let rows_after = selection
- .as_ref()
- .map(|s| s.row_count())
+ let rows_after = plan_builder
+ .num_rows_selected()
.unwrap_or(row_group.row_count);
- // Update offset if necessary
+ // Update running offset and limit for after the current row group is
read
if let Some(offset) = &mut self.offset {
// Reduction is either because of offset or limit, as limit is
applied
// after offset has been "exhausted" can just use saturating sub
here
@@ -636,22 +648,21 @@ where
}
if rows_after == 0 {
- return Ok((self, None));
+ return Ok((self, None)); // ruled out entire row group
}
if let Some(limit) = &mut self.limit {
*limit -= rows_after;
}
-
+ // fetch the pages needed for decoding
row_group
- .fetch(&mut self.input, &projection, selection.as_ref())
+ .fetch(&mut self.input, &projection, plan_builder.selection())
.await?;
- let reader = ParquetRecordBatchReader::new(
- batch_size,
- build_array_reader(self.fields.as_deref(), &projection,
&row_group)?,
- selection,
- );
+ let plan = plan_builder.build();
+
+ let array_reader = build_array_reader(self.fields.as_deref(),
&projection, &row_group)?;
+ let reader = ParquetRecordBatchReader::new(array_reader, plan);
Ok((self, Some(reader)))
}