This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new ce18e5b1e4 Introduce `ReadPlan` to encapsulate the calculation of what 
parquet rows to decode (#7502)
ce18e5b1e4 is described below

commit ce18e5b1e4ff22695a3a97bf519336dd400176e9
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon May 19 15:42:48 2025 -0400

    Introduce `ReadPlan` to encapsulate the calculation of what parquet rows to 
decode (#7502)
    
    * Introduce `ReadPlan` to encapsulate the calculation of what rows to decode
    
    * Update parquet/src/arrow/arrow_reader/read_plan.rs
    
    Co-authored-by: Ed Seidl <[email protected]>
    
    ---------
    
    Co-authored-by: Ed Seidl <[email protected]>
---
 parquet/src/arrow/arrow_reader/mod.rs       | 149 ++++-------------
 parquet/src/arrow/arrow_reader/read_plan.rs | 249 ++++++++++++++++++++++++++++
 parquet/src/arrow/async_reader/mod.rs       |  79 +++++----
 3 files changed, 326 insertions(+), 151 deletions(-)

diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index 90e2592183..ea068acb29 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -21,10 +21,8 @@ use arrow_array::cast::AsArray;
 use arrow_array::Array;
 use arrow_array::{RecordBatch, RecordBatchReader};
 use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
-use arrow_select::filter::prep_null_mask_filter;
 pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
 pub use selection::{RowSelection, RowSelector};
-use std::collections::VecDeque;
 use std::sync::Arc;
 
 pub use crate::arrow::array_reader::RowGroups;
@@ -39,7 +37,10 @@ use crate::file::metadata::{ParquetMetaData, 
ParquetMetaDataReader};
 use crate::file::reader::{ChunkReader, SerializedPageReader};
 use crate::schema::types::SchemaDescriptor;
 
+pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder};
+
 mod filter;
+mod read_plan;
 mod selection;
 pub mod statistics;
 
@@ -679,38 +680,32 @@ impl<T: ChunkReader + 'static> 
ParquetRecordBatchReaderBuilder<T> {
         };
 
         let mut filter = self.filter;
-        let mut selection = self.selection;
+        let mut plan_builder = 
ReadPlanBuilder::new(batch_size).with_selection(self.selection);
 
+        // Update selection based on any filters
         if let Some(filter) = filter.as_mut() {
             for predicate in filter.predicates.iter_mut() {
-                if !selects_any(selection.as_ref()) {
+                // break early if we have ruled out all rows
+                if !plan_builder.selects_any() {
                     break;
                 }
 
                 let array_reader =
                     build_array_reader(self.fields.as_deref(), 
predicate.projection(), &reader)?;
 
-                selection = Some(evaluate_predicate(
-                    batch_size,
-                    array_reader,
-                    selection,
-                    predicate.as_mut(),
-                )?);
+                plan_builder = plan_builder.with_predicate(array_reader, 
predicate.as_mut())?;
             }
         }
 
         let array_reader = build_array_reader(self.fields.as_deref(), 
&self.projection, &reader)?;
+        let read_plan = plan_builder
+            .limited(reader.num_rows())
+            .with_offset(self.offset)
+            .with_limit(self.limit)
+            .build_limited()
+            .build();
 
-        // If selection is empty, truncate
-        if !selects_any(selection.as_ref()) {
-            selection = Some(RowSelection::from(vec![]));
-        }
-
-        Ok(ParquetRecordBatchReader::new(
-            batch_size,
-            array_reader,
-            apply_range(selection, reader.num_rows(), self.offset, self.limit),
-        ))
+        Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
     }
 }
 
@@ -789,11 +784,9 @@ impl<T: ChunkReader + 'static> PageIterator for 
ReaderPageIterator<T> {}
 /// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
 /// read from a parquet data source
 pub struct ParquetRecordBatchReader {
-    batch_size: usize,
     array_reader: Box<dyn ArrayReader>,
     schema: SchemaRef,
-    /// Row ranges to be selected from the data source
-    selection: Option<VecDeque<RowSelector>>,
+    read_plan: ReadPlan,
 }
 
 impl Iterator for ParquetRecordBatchReader {
@@ -814,9 +807,10 @@ impl ParquetRecordBatchReader {
     /// simplify error handling with `?`
     fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
         let mut read_records = 0;
-        match self.selection.as_mut() {
+        let batch_size = self.batch_size();
+        match self.read_plan.selection_mut() {
             Some(selection) => {
-                while read_records < self.batch_size && !selection.is_empty() {
+                while read_records < batch_size && !selection.is_empty() {
                     let front = selection.pop_front().unwrap();
                     if front.skip {
                         let skipped = 
self.array_reader.skip_records(front.row_count)?;
@@ -838,7 +832,7 @@ impl ParquetRecordBatchReader {
                     }
 
                     // try to read record
-                    let need_read = self.batch_size - read_records;
+                    let need_read = batch_size - read_records;
                     let to_read = match front.row_count.checked_sub(need_read) 
{
                         Some(remaining) if remaining != 0 => {
                             // if page row count less than batch_size we must 
set batch size to page row count.
@@ -855,7 +849,7 @@ impl ParquetRecordBatchReader {
                 }
             }
             None => {
-                self.array_reader.read_records(self.batch_size)?;
+                self.array_reader.read_records(batch_size)?;
             }
         };
 
@@ -905,116 +899,37 @@ impl ParquetRecordBatchReader {
         let array_reader =
             build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(), 
row_groups)?;
 
+        let read_plan = ReadPlanBuilder::new(batch_size)
+            .with_selection(selection)
+            .build();
+
         Ok(Self {
-            batch_size,
             array_reader,
             schema: Arc::new(Schema::new(levels.fields.clone())),
-            selection: selection.map(|s| s.trim().into()),
+            read_plan,
         })
     }
 
     /// Create a new [`ParquetRecordBatchReader`] that will read at most 
`batch_size` rows at
     /// a time from [`ArrayReader`] based on the configured `selection`. If 
`selection` is `None`
     /// all rows will be returned
-    pub(crate) fn new(
-        batch_size: usize,
-        array_reader: Box<dyn ArrayReader>,
-        selection: Option<RowSelection>,
-    ) -> Self {
+    pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) 
-> Self {
         let schema = match array_reader.get_data_type() {
             ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
             _ => unreachable!("Struct array reader's data type is not 
struct!"),
         };
 
         Self {
-            batch_size,
             array_reader,
             schema: Arc::new(schema),
-            selection: selection.map(|s| s.trim().into()),
+            read_plan,
         }
     }
-}
 
-/// Returns `true` if `selection` is `None` or selects some rows
-pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
-    selection.map(|x| x.selects_any()).unwrap_or(true)
-}
-
-/// Applies an optional offset and limit to an optional [`RowSelection`]
-pub(crate) fn apply_range(
-    mut selection: Option<RowSelection>,
-    row_count: usize,
-    offset: Option<usize>,
-    limit: Option<usize>,
-) -> Option<RowSelection> {
-    // If an offset is defined, apply it to the `selection`
-    if let Some(offset) = offset {
-        selection = Some(match row_count.checked_sub(offset) {
-            None => RowSelection::from(vec![]),
-            Some(remaining) => selection
-                .map(|selection| selection.offset(offset))
-                .unwrap_or_else(|| {
-                    RowSelection::from(vec![
-                        RowSelector::skip(offset),
-                        RowSelector::select(remaining),
-                    ])
-                }),
-        });
-    }
-
-    // If a limit is defined, apply it to the final `selection`
-    if let Some(limit) = limit {
-        selection = Some(
-            selection
-                .map(|selection| selection.limit(limit))
-                .unwrap_or_else(|| {
-                    
RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
-                }),
-        );
-    }
-    selection
-}
-
-/// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating
-/// which rows to return.
-///
-/// `input_selection`: Optional pre-existing selection. If `Some`, then the
-/// final [`RowSelection`] will be the conjunction of it and the rows selected
-/// by `predicate`.
-///
-/// Note: A pre-existing selection may come from evaluating a previous 
predicate
-/// or if the [`ParquetRecordBatchReader`] specified an explicit
-/// [`RowSelection`] in addition to one or more predicates.
-pub(crate) fn evaluate_predicate(
-    batch_size: usize,
-    array_reader: Box<dyn ArrayReader>,
-    input_selection: Option<RowSelection>,
-    predicate: &mut dyn ArrowPredicate,
-) -> Result<RowSelection> {
-    let reader = ParquetRecordBatchReader::new(batch_size, array_reader, 
input_selection.clone());
-    let mut filters = vec![];
-    for maybe_batch in reader {
-        let maybe_batch = maybe_batch?;
-        let input_rows = maybe_batch.num_rows();
-        let filter = predicate.evaluate(maybe_batch)?;
-        // Since user supplied predicate, check error here to catch bugs 
quickly
-        if filter.len() != input_rows {
-            return Err(arrow_err!(
-                "ArrowPredicate predicate returned {} rows, expected 
{input_rows}",
-                filter.len()
-            ));
-        }
-        match filter.null_count() {
-            0 => filters.push(filter),
-            _ => filters.push(prep_null_mask_filter(&filter)),
-        };
+    #[inline(always)]
+    pub(crate) fn batch_size(&self) -> usize {
+        self.read_plan.batch_size()
     }
-
-    let raw = RowSelection::from_filters(&filters);
-    Ok(match input_selection {
-        Some(selection) => selection.and_then(&raw),
-        None => raw,
-    })
 }
 
 #[cfg(test)]
@@ -3993,7 +3908,7 @@ mod tests {
             .build()
             .unwrap();
         assert_ne!(1024, num_rows);
-        assert_eq!(reader.batch_size, num_rows as usize);
+        assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
     }
 
     #[test]
diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs 
b/parquet/src/arrow/arrow_reader/read_plan.rs
new file mode 100644
index 0000000000..cf5d833850
--- /dev/null
+++ b/parquet/src/arrow/arrow_reader/read_plan.rs
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ReadPlan`] and [`ReadPlanBuilder`] for determining which rows to read
+//! from a Parquet file
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::arrow::arrow_reader::{
+    ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector,
+};
+use crate::errors::{ParquetError, Result};
+use arrow_array::Array;
+use arrow_select::filter::prep_null_mask_filter;
+use std::collections::VecDeque;
+
+/// A builder for [`ReadPlan`]
+#[derive(Clone)]
+pub(crate) struct ReadPlanBuilder {
+    batch_size: usize,
+    /// Current to apply, includes all filters
+    selection: Option<RowSelection>,
+}
+
+impl ReadPlanBuilder {
+    /// Create a `ReadPlanBuilder` with the given batch size
+    pub(crate) fn new(batch_size: usize) -> Self {
+        Self {
+            batch_size,
+            selection: None,
+        }
+    }
+
+    /// Set the current selection to the given value
+    pub(crate) fn with_selection(mut self, selection: Option<RowSelection>) -> 
Self {
+        self.selection = selection;
+        self
+    }
+
+    /// Returns the current selection, if any
+    pub(crate) fn selection(&self) -> Option<&RowSelection> {
+        self.selection.as_ref()
+    }
+
+    /// Specifies the number of rows in the row group, before filtering is 
applied.
+    ///
+    /// Returns a [`LimitedReadPlanBuilder`] that can apply
+    /// offset and limit.
+    ///
+    /// Call [`LimitedReadPlanBuilder::build_limited`] to apply the limits to 
this
+    /// selection.
+    pub(crate) fn limited(self, row_count: usize) -> LimitedReadPlanBuilder {
+        LimitedReadPlanBuilder::new(self, row_count)
+    }
+
+    /// Returns true if the current plan selects any rows
+    pub(crate) fn selects_any(&self) -> bool {
+        self.selection
+            .as_ref()
+            .map(|s| s.selects_any())
+            .unwrap_or(true)
+    }
+
+    /// Returns the number of rows selected, or `None` if all rows are 
selected.
+    pub(crate) fn num_rows_selected(&self) -> Option<usize> {
+        self.selection.as_ref().map(|s| s.row_count())
+    }
+
+    /// Evaluates an [`ArrowPredicate`], updating this plan's `selection`
+    ///
+    /// If the current `selection` is `Some`, the resulting [`RowSelection`]
+    /// will be the conjunction of the existing selection and the rows selected
+    /// by `predicate`.
+    ///
+    /// Note: pre-existing selections may come from evaluating a previous 
predicate
+    /// or if the [`ParquetRecordBatchReader`] specified an explicit
+    /// [`RowSelection`] in addition to one or more predicates.
+    pub(crate) fn with_predicate(
+        mut self,
+        array_reader: Box<dyn ArrayReader>,
+        predicate: &mut dyn ArrowPredicate,
+    ) -> Result<Self> {
+        let reader = ParquetRecordBatchReader::new(array_reader, 
self.clone().build());
+        let mut filters = vec![];
+        for maybe_batch in reader {
+            let maybe_batch = maybe_batch?;
+            let input_rows = maybe_batch.num_rows();
+            let filter = predicate.evaluate(maybe_batch)?;
+            // Since user supplied predicate, check error here to catch bugs 
quickly
+            if filter.len() != input_rows {
+                return Err(arrow_err!(
+                    "ArrowPredicate predicate returned {} rows, expected 
{input_rows}",
+                    filter.len()
+                ));
+            }
+            match filter.null_count() {
+                0 => filters.push(filter),
+                _ => filters.push(prep_null_mask_filter(&filter)),
+            };
+        }
+
+        let raw = RowSelection::from_filters(&filters);
+        self.selection = match self.selection.take() {
+            Some(selection) => Some(selection.and_then(&raw)),
+            None => Some(raw),
+        };
+        Ok(self)
+    }
+
+    /// Create a final `ReadPlan` the read plan for the scan
+    pub(crate) fn build(mut self) -> ReadPlan {
+        // If selection is empty, truncate
+        if !self.selects_any() {
+            self.selection = Some(RowSelection::from(vec![]));
+        }
+        let Self {
+            batch_size,
+            selection,
+        } = self;
+
+        let selection = selection.map(|s| s.trim().into());
+
+        ReadPlan {
+            batch_size,
+            selection,
+        }
+    }
+}
+
+/// Builder for [`ReadPlan`] that applies a limit and offset to the read plan
+///
+/// See [`ReadPlanBuilder::limited`] to create this builder.
+pub(crate) struct LimitedReadPlanBuilder {
+    /// The underlying builder
+    inner: ReadPlanBuilder,
+    /// Total number of rows in the row group before the selection, limit or
+    /// offset are applied
+    row_count: usize,
+    /// The offset to apply, if any
+    offset: Option<usize>,
+    /// The limit to apply, if any
+    limit: Option<usize>,
+}
+
+impl LimitedReadPlanBuilder {
+    /// Create a new `LimitedReadPlanBuilder` from the existing builder and 
number of rows
+    fn new(inner: ReadPlanBuilder, row_count: usize) -> Self {
+        Self {
+            inner,
+            row_count,
+            offset: None,
+            limit: None,
+        }
+    }
+
+    /// Set the offset to apply to the read plan
+    pub(crate) fn with_offset(mut self, offset: Option<usize>) -> Self {
+        self.offset = offset;
+        self
+    }
+
+    /// Set the limit to apply to the read plan
+    pub(crate) fn with_limit(mut self, limit: Option<usize>) -> Self {
+        self.limit = limit;
+        self
+    }
+
+    /// Apply offset and limit, updating the selection on the underlying 
builder
+    /// and returning it.
+    pub(crate) fn build_limited(self) -> ReadPlanBuilder {
+        let Self {
+            mut inner,
+            row_count,
+            offset,
+            limit,
+        } = self;
+
+        // If the selection is empty, truncate
+        if !inner.selects_any() {
+            inner.selection = Some(RowSelection::from(vec![]));
+        }
+
+        // If an offset is defined, apply it to the `selection`
+        if let Some(offset) = offset {
+            inner.selection = Some(match row_count.checked_sub(offset) {
+                None => RowSelection::from(vec![]),
+                Some(remaining) => inner
+                    .selection
+                    .map(|selection| selection.offset(offset))
+                    .unwrap_or_else(|| {
+                        RowSelection::from(vec![
+                            RowSelector::skip(offset),
+                            RowSelector::select(remaining),
+                        ])
+                    }),
+            });
+        }
+
+        // If a limit is defined, apply it to the final `selection`
+        if let Some(limit) = limit {
+            inner.selection = Some(
+                inner
+                    .selection
+                    .map(|selection| selection.limit(limit))
+                    .unwrap_or_else(|| {
+                        
RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
+                    }),
+            );
+        }
+
+        inner
+    }
+}
+
+/// A plan reading specific rows from a Parquet Row Group.
+///
+/// See [`ReadPlanBuilder`] to create `ReadPlan`s
+pub(crate) struct ReadPlan {
+    /// The number of rows to read in each batch
+    batch_size: usize,
+    /// Row ranges to be selected from the data source
+    selection: Option<VecDeque<RowSelector>>,
+}
+
+impl ReadPlan {
+    /// Returns a mutable reference to the selection, if any
+    pub(crate) fn selection_mut(&mut self) -> Option<&mut 
VecDeque<RowSelector>> {
+        self.selection.as_mut()
+    }
+
+    /// Return the number of rows to read in each output batch
+    #[inline(always)]
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+}
diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index 9466fb9a35..0c38d36a5b 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -40,8 +40,8 @@ use arrow_schema::{DataType, Fields, Schema, SchemaRef};
 
 use crate::arrow::array_reader::{build_array_reader, RowGroups};
 use crate::arrow::arrow_reader::{
-    apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, 
ArrowReaderMetadata,
-    ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection,
+    ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, 
ParquetRecordBatchReader,
+    RowFilter, RowSelection,
 };
 use crate::arrow::ProjectionMask;
 
@@ -61,6 +61,7 @@ pub use metadata::*;
 #[cfg(feature = "object_store")]
 mod store;
 
+use crate::arrow::arrow_reader::ReadPlanBuilder;
 use crate::arrow::schema::ParquetField;
 #[cfg(feature = "object_store")]
 pub use store::*;
@@ -535,6 +536,10 @@ impl<T: AsyncFileReader + Send + 'static> 
ParquetRecordBatchStreamBuilder<T> {
     }
 }
 
+/// Returns a [`ReaderFactory`] and an optional [`ParquetRecordBatchReader`] 
for the next row group
+///
+/// Note: If all rows are filtered out in the row group (e.g by filters, limit 
or
+/// offset), returns `None` for the reader.
 type ReadResult<T> = Result<(ReaderFactory<T>, 
Option<ParquetRecordBatchReader>)>;
 
 /// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
@@ -542,14 +547,18 @@ type ReadResult<T> = Result<(ReaderFactory<T>, 
Option<ParquetRecordBatchReader>)
 struct ReaderFactory<T> {
     metadata: Arc<ParquetMetaData>,
 
+    /// Top level parquet schema
     fields: Option<Arc<ParquetField>>,
 
     input: T,
 
+    /// Optional filter
     filter: Option<RowFilter>,
 
+    /// Limit to apply to remaining row groups.  
     limit: Option<usize>,
 
+    /// Offset to apply to the next
     offset: Option<usize>,
 }
 
@@ -559,11 +568,13 @@ where
 {
     /// Reads the next row group with the provided `selection`, `projection` 
and `batch_size`
     ///
+    /// Updates the `limit` and `offset` of the reader factory
+    ///
     /// Note: this captures self so that the resulting future has a static 
lifetime
     async fn read_row_group(
         mut self,
         row_group_idx: usize,
-        mut selection: Option<RowSelection>,
+        selection: Option<RowSelection>,
         projection: ProjectionMask,
         batch_size: usize,
     ) -> ReadResult<T> {
@@ -586,49 +597,50 @@ where
             metadata: self.metadata.as_ref(),
         };
 
+        let filter = self.filter.as_mut();
+        let mut plan_builder = 
ReadPlanBuilder::new(batch_size).with_selection(selection);
+
         // Update selection based on any filters
-        if let Some(filter) = self.filter.as_mut() {
+        if let Some(filter) = filter {
             for predicate in filter.predicates.iter_mut() {
-                if !selects_any(selection.as_ref()) {
-                    return Ok((self, None));
+                if !plan_builder.selects_any() {
+                    return Ok((self, None)); // ruled out entire row group
                 }
 
-                let predicate_projection = predicate.projection();
+                // (pre) Fetch only the columns that are selected by the 
predicate
+                let selection = plan_builder.selection();
                 row_group
-                    .fetch(&mut self.input, predicate_projection, 
selection.as_ref())
+                    .fetch(&mut self.input, predicate.projection(), selection)
                     .await?;
 
                 let array_reader =
-                    build_array_reader(self.fields.as_deref(), 
predicate_projection, &row_group)?;
-
-                selection = Some(evaluate_predicate(
-                    batch_size,
-                    array_reader,
-                    selection,
-                    predicate.as_mut(),
-                )?);
+                    build_array_reader(self.fields.as_deref(), 
predicate.projection(), &row_group)?;
+
+                plan_builder = plan_builder.with_predicate(array_reader, 
predicate.as_mut())?;
             }
         }
 
         // Compute the number of rows in the selection before applying limit 
and offset
-        let rows_before = selection
-            .as_ref()
-            .map(|s| s.row_count())
+        let rows_before = plan_builder
+            .num_rows_selected()
             .unwrap_or(row_group.row_count);
 
         if rows_before == 0 {
-            return Ok((self, None));
+            return Ok((self, None)); // ruled out entire row group
         }
 
-        selection = apply_range(selection, row_group.row_count, self.offset, 
self.limit);
+        // Apply any limit and offset
+        let plan_builder = plan_builder
+            .limited(row_group.row_count)
+            .with_offset(self.offset)
+            .with_limit(self.limit)
+            .build_limited();
 
-        // Compute the number of rows in the selection after applying limit 
and offset
-        let rows_after = selection
-            .as_ref()
-            .map(|s| s.row_count())
+        let rows_after = plan_builder
+            .num_rows_selected()
             .unwrap_or(row_group.row_count);
 
-        // Update offset if necessary
+        // Update running offset and limit for after the current row group is 
read
         if let Some(offset) = &mut self.offset {
             // Reduction is either because of offset or limit, as limit is 
applied
             // after offset has been "exhausted" can just use saturating sub 
here
@@ -636,22 +648,21 @@ where
         }
 
         if rows_after == 0 {
-            return Ok((self, None));
+            return Ok((self, None)); // ruled out entire row group
         }
 
         if let Some(limit) = &mut self.limit {
             *limit -= rows_after;
         }
-
+        // fetch the pages needed for decoding
         row_group
-            .fetch(&mut self.input, &projection, selection.as_ref())
+            .fetch(&mut self.input, &projection, plan_builder.selection())
             .await?;
 
-        let reader = ParquetRecordBatchReader::new(
-            batch_size,
-            build_array_reader(self.fields.as_deref(), &projection, 
&row_group)?,
-            selection,
-        );
+        let plan = plan_builder.build();
+
+        let array_reader = build_array_reader(self.fields.as_deref(), 
&projection, &row_group)?;
+        let reader = ParquetRecordBatchReader::new(array_reader, plan);
 
         Ok((self, Some(reader)))
     }

Reply via email to