This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 48fa8a7a45 feat(parquet): separate push decoder frontier state from 
row-group decoding (#9804)
48fa8a7a45 is described below

commit 48fa8a7a45567b9ab47c461771b968ba0d37812f
Author: Hippolyte Barraud <[email protected]>
AuthorDate: Tue May 12 11:03:34 2026 -0400

    feat(parquet): separate push decoder frontier state from row-group decoding 
(#9804)
    
    # Which issue does this PR close?
    
    - Prerequisite to #9697
    
    # Rationale for this change
    
    #9697 aims to make staged buffer management in the push decoder more
    explicit. In doing so, it exposes a structural problem: the logic for
    deciding whether a row group is still live, skipped, or unreachable is
    spread across several parts of the decoder.
    
    This matters because row-group-level buffer release depends on a single
    question having a clear answer: can this row group ever need bytes
    again? That answer depends on the queued row groups, the remaining
    selection, the running offset/limit budget, and whether predicates
    require the decoder to stay conservative. Today, that state is split
    across multiple components, which makes the release policy difficult to
    centralize cleanly.
    
    # What changes are included in this PR?
    
    This PR introduces a clearer ownership boundary in the push decoder:
    
    - cross-row-group scan state is now handled by a dedicated
    frontier/look-ahead mechanism
    - the row-group builder is reduced to current-row-group decode work only
    - offset/limit accounting and row-group selection advancement are
    centralized around that frontier/builder split
    
    This does not implement row-group-level buffer release directly, but it
    establishes the structure needed for that follow-up work. It should also
    make future pruning rules easier to add and maintain.
    
    # Are these changes tested?
    
    All existing tests pass, and the refactor adds focused coverage for the
    extracted budget logic and the frontier-driven `try_next_reader` path.
    
    # Are there any user-facing changes?
    
    None.
    
    ---------
    
    Signed-off-by: Hippolyte Barraud <[email protected]>
---
 parquet/src/arrow/push_decoder/mod.rs              |  31 ++-
 .../src/arrow/push_decoder/reader_builder/mod.rs   | 279 +++++++++++++++------
 parquet/src/arrow/push_decoder/remaining.rs        | 243 ++++++++++++++----
 3 files changed, 433 insertions(+), 120 deletions(-)

diff --git a/parquet/src/arrow/push_decoder/mod.rs 
b/parquet/src/arrow/push_decoder/mod.rs
index 4c667e5343..f905d6fb2c 100644
--- a/parquet/src/arrow/push_decoder/mod.rs
+++ b/parquet/src/arrow/push_decoder/mod.rs
@@ -30,7 +30,7 @@ use crate::file::metadata::ParquetMetaData;
 use crate::util::push_buffers::PushBuffers;
 use arrow_array::RecordBatch;
 use bytes::Bytes;
-use reader_builder::RowGroupReaderBuilder;
+use reader_builder::{RowBudget, RowGroupReaderBuilder};
 use remaining::RemainingRowGroups;
 use std::ops::Range;
 use std::sync::Arc;
@@ -181,6 +181,9 @@ impl ParquetPushDecoderBuilder {
         // If no row groups were specified, read all of them
         let row_groups =
             row_groups.unwrap_or_else(|| 
(0..parquet_metadata.num_row_groups()).collect());
+        let has_predicates = filter
+            .as_ref()
+            .is_some_and(|filter| !filter.predicates.is_empty());
 
         // Prepare to build RowGroup readers
         let file_len = 0; // not used in push decoder
@@ -191,8 +194,6 @@ impl ParquetPushDecoderBuilder {
             Arc::clone(&parquet_metadata),
             fields,
             filter,
-            limit,
-            offset,
             metrics,
             max_predicate_cache_size,
             buffers,
@@ -204,6 +205,8 @@ impl ParquetPushDecoderBuilder {
             parquet_metadata,
             row_groups,
             selection,
+            RowBudget::new(offset, limit),
+            has_predicates,
             row_group_reader_builder,
         );
 
@@ -1402,6 +1405,28 @@ mod test {
         expect_finished(decoder.try_decode());
     }
 
+    #[test]
+    fn test_decoder_try_next_reader_offset_limit() {
+        let mut decoder = 
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
+            .unwrap()
+            .with_offset(225)
+            .with_limit(20)
+            .build()
+            .unwrap();
+
+        let ranges = expect_needs_data(decoder.try_next_reader());
+        push_ranges_to_decoder(&mut decoder, ranges);
+
+        let reader = expect_data(decoder.try_next_reader());
+        let batches = reader
+            .map(|batch| batch.expect("expected decoded batch"))
+            .collect::<Vec<_>>();
+        let output = concat_batches(&TEST_BATCH.schema(), &batches).unwrap();
+        assert_eq!(output, TEST_BATCH.slice(225, 20));
+
+        expect_finished(decoder.try_next_reader());
+    }
+
     #[test]
     fn test_decoder_row_group_selection() {
         // take only the second row group
diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs 
b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
index 60e50d2952..0452cea436 100644
--- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
+++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
@@ -18,7 +18,6 @@
 mod data;
 mod filter;
 
-use crate::DecodeResult;
 use crate::arrow::ProjectionMask;
 use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptions, 
RowGroupCache};
 use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
@@ -42,12 +41,13 @@ use filter::FilterInfo;
 use std::ops::Range;
 use std::sync::{Arc, RwLock};
 
-/// The current row group being read and the read plan
+/// The current row group being read, its read plan, and its offset/limit 
budget.
 #[derive(Debug)]
 struct RowGroupInfo {
     row_group_idx: usize,
     row_count: usize,
     plan_builder: ReadPlanBuilder,
+    budget: RowBudget,
 }
 
 /// This is the inner state machine for reading a single row group.
@@ -88,6 +88,107 @@ enum RowGroupDecoderState {
     Finished,
 }
 
+/// Running offset/limit budget shared across row groups.
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+pub(crate) struct RowBudget {
+    offset: Option<usize>,
+    limit: Option<usize>,
+}
+
+impl RowBudget {
+    pub(crate) fn new(offset: Option<usize>, limit: Option<usize>) -> Self {
+        Self { offset, limit }
+    }
+
+    pub(crate) fn is_exhausted(self) -> bool {
+        matches!(self.limit, Some(0))
+    }
+
+    /// Returns how many selected rows remain after applying this budget.
+    pub(crate) fn rows_after(self, rows_before_budget: usize) -> usize {
+        let rows_after_offset = 
rows_before_budget.saturating_sub(self.offset.unwrap_or(0));
+        match self.limit {
+            Some(limit) => rows_after_offset.min(limit),
+            None => rows_after_offset,
+        }
+    }
+
+    /// Returns the number of selected rows needed before applying the offset.
+    fn selected_row_limit(self) -> Option<usize> {
+        self.limit
+            .map(|limit| limit.saturating_add(self.offset.unwrap_or(0)))
+    }
+
+    fn apply_to_plan(self, plan_builder: ReadPlanBuilder, row_count: usize) -> 
BudgetedReadPlan {
+        let rows_before_budget = 
plan_builder.num_rows_selected().unwrap_or(row_count);
+        let plan_builder = plan_builder
+            .limited(row_count)
+            .with_offset(self.offset)
+            .with_limit(self.limit)
+            .build_limited();
+        let rows_after_budget = self.rows_after(rows_before_budget);
+
+        BudgetedReadPlan {
+            plan_builder,
+            rows_before_budget,
+            rows_after_budget,
+            remaining_budget: self.advance(rows_before_budget, 
rows_after_budget),
+        }
+    }
+
+    /// Advance the budget past one row group.
+    ///
+    /// `rows_before_budget` is the number of rows selected before applying the
+    /// budget, and `rows_after_budget` is the number retained for output from
+    /// this row group.
+    pub(crate) fn advance(mut self, rows_before_budget: usize, 
rows_after_budget: usize) -> Self {
+        if let Some(offset) = &mut self.offset {
+            // Reduction is either because of offset or limit, as limit is 
applied
+            // after offset has been "exhausted" can just use saturating sub 
here.
+            *offset = offset.saturating_sub(rows_before_budget - 
rows_after_budget);
+        }
+
+        if rows_after_budget != 0 {
+            if let Some(limit) = &mut self.limit {
+                *limit -= rows_after_budget;
+            }
+        }
+
+        self
+    }
+}
+
+#[derive(Debug)]
+struct BudgetedReadPlan {
+    /// Read plan after applying this row group's share of the offset/limit 
budget.
+    plan_builder: ReadPlanBuilder,
+    /// Number of rows selected by row selection and predicates before applying
+    /// this row group's offset/limit budget.
+    rows_before_budget: usize,
+    /// Number of selected rows that remain to be read after applying this row
+    /// group's offset/limit budget.
+    rows_after_budget: usize,
+    /// Budget remaining for later row groups.
+    remaining_budget: RowBudget,
+}
+
+#[derive(Debug)]
+pub(crate) enum RowGroupBuildResult {
+    /// The active row group is complete without producing a reader.
+    Finished {
+        /// Budget remaining after applying this row group's selection.
+        remaining_budget: RowBudget,
+    },
+    /// More bytes are needed before the active row group can make progress.
+    NeedsData(Vec<Range<u64>>),
+    /// The active row group produced a reader.
+    Data {
+        batch_reader: ParquetRecordBatchReader,
+        /// Budget remaining after applying this row group's selection.
+        remaining_budget: RowBudget,
+    },
+}
+
 /// Result of a state transition
 #[derive(Debug)]
 struct NextState {
@@ -96,7 +197,7 @@ struct NextState {
     ///
     /// * `Some`: the processing should stop and return the result
     /// * `None`: processing should continue
-    result: Option<DecodeResult<ParquetRecordBatchReader>>,
+    result: Option<RowGroupBuildResult>,
 }
 
 impl NextState {
@@ -111,10 +212,7 @@ impl NextState {
     }
 
     /// Create a NextState with a result that should be returned
-    fn result(
-        next_state: RowGroupDecoderState,
-        result: DecodeResult<ParquetRecordBatchReader>,
-    ) -> Self {
+    fn result(next_state: RowGroupDecoderState, result: RowGroupBuildResult) 
-> Self {
         Self {
             next_state,
             result: Some(result),
@@ -144,12 +242,6 @@ pub(crate) struct RowGroupReaderBuilder {
     /// Optional filter
     filter: Option<RowFilter>,
 
-    /// Limit to apply to remaining row groups (decremented as rows are read)
-    limit: Option<usize>,
-
-    /// Offset to apply to remaining row groups (decremented as rows are read)
-    offset: Option<usize>,
-
     /// The size in bytes of the predicate cache to use
     ///
     /// See [`RowGroupCache`] for details.
@@ -180,8 +272,6 @@ impl RowGroupReaderBuilder {
         metadata: Arc<ParquetMetaData>,
         fields: Option<Arc<ParquetField>>,
         filter: Option<RowFilter>,
-        limit: Option<usize>,
-        offset: Option<usize>,
         metrics: ArrowReaderMetrics,
         max_predicate_cache_size: usize,
         buffers: PushBuffers,
@@ -193,8 +283,6 @@ impl RowGroupReaderBuilder {
             metadata,
             fields,
             filter,
-            limit,
-            offset,
             metrics,
             max_predicate_cache_size,
             row_selection_policy,
@@ -233,12 +321,18 @@ impl RowGroupReaderBuilder {
         })
     }
 
+    /// Returns true if this builder is currently decoding a row group.
+    pub(crate) fn has_active_row_group(&self) -> bool {
+        !matches!(self.state, Some(RowGroupDecoderState::Finished))
+    }
+
     /// Setup this reader to read the next row group
     pub(crate) fn next_row_group(
         &mut self,
         row_group_idx: usize,
         row_count: usize,
         selection: Option<RowSelection>,
+        budget: RowBudget,
     ) -> Result<(), ParquetError> {
         let state = self.take_state()?;
         if !matches!(state, RowGroupDecoderState::Finished) {
@@ -254,22 +348,20 @@ impl RowGroupReaderBuilder {
             row_group_idx,
             row_count,
             plan_builder,
+            budget,
         };
 
         self.state = Some(RowGroupDecoderState::Start { row_group_info });
         Ok(())
     }
 
-    /// Try to build the next `ParquetRecordBatchReader` from this 
RowGroupReader.
+    /// Try to build the next `ParquetRecordBatchReader` for the active row 
group.
     ///
-    /// If more data is needed, returns [`DecodeResult::NeedsData`] with the
-    /// ranges of data that are needed to proceed.
-    ///
-    /// If a [`ParquetRecordBatchReader`] is ready, it is returned in
-    /// `DecodeResult::Data`.
-    pub(crate) fn try_build(
-        &mut self,
-    ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
+    /// Returns [`RowGroupBuildResult::NeedsData`] if more data is needed,
+    /// [`RowGroupBuildResult::Data`] if a reader is ready, or
+    /// [`RowGroupBuildResult::Finished`] if the row group completed without
+    /// producing a reader.
+    pub(crate) fn try_build(&mut self) -> Result<RowGroupBuildResult, 
ParquetError> {
         loop {
             let current_state = self.take_state()?;
             // Try to transition the decoder.
@@ -310,18 +402,10 @@ impl RowGroupReaderBuilder {
     ) -> Result<NextState, ParquetError> {
         let result = match current_state {
             RowGroupDecoderState::Start { row_group_info } => {
-                // Short-circuit once the overall output limit is exhausted.
-                //
-                // `self.limit` tracks how many more rows the reader is still
-                // allowed to emit and is decremented as each row group is
-                // planned in `StartData`, so `Some(0)` means earlier row
-                // groups have already produced the full requested output.
-                if matches!(self.limit, Some(0)) {
-                    return Ok(NextState::result(
-                        RowGroupDecoderState::Finished,
-                        DecodeResult::Finished,
-                    ));
-                }
+                debug_assert!(
+                    !row_group_info.budget.is_exhausted(),
+                    "RowGroupFrontier should not hand off row groups after the 
output limit is exhausted"
+                );
 
                 let column_chunks = None; // no prior column chunks
 
@@ -371,6 +455,7 @@ impl RowGroupReaderBuilder {
                     row_group_idx,
                     row_count,
                     plan_builder,
+                    budget,
                 } = row_group_info;
 
                 // If nothing is selected, we are done with this row group
@@ -379,7 +464,9 @@ impl RowGroupReaderBuilder {
                     self.filter = Some(filter_info.into_filter());
                     return Ok(NextState::result(
                         RowGroupDecoderState::Finished,
-                        DecodeResult::Finished,
+                        RowGroupBuildResult::Finished {
+                            remaining_budget: budget,
+                        },
                     ));
                 }
 
@@ -405,6 +492,7 @@ impl RowGroupReaderBuilder {
                     row_group_idx,
                     row_count,
                     plan_builder,
+                    budget,
                 };
 
                 NextState::again(RowGroupDecoderState::WaitingOnFilterData {
@@ -428,7 +516,7 @@ impl RowGroupReaderBuilder {
                             filter_info,
                             data_request,
                         },
-                        DecodeResult::NeedsData(needed_ranges),
+                        RowGroupBuildResult::NeedsData(needed_ranges),
                     ));
                 }
 
@@ -437,6 +525,7 @@ impl RowGroupReaderBuilder {
                     row_group_idx,
                     row_count,
                     mut plan_builder,
+                    budget,
                 } = row_group_info;
 
                 let predicate = filter_info.current();
@@ -476,10 +565,10 @@ impl RowGroupReaderBuilder {
                 // When this is the final predicate in the chain and an output
                 // limit is set, tell the filter evaluation to stop once enough
                 // matching rows have been accumulated.
-                let predicate_limit = self
-                    .limit
-                    .filter(|_| filter_info.is_last())
-                    .map(|l| l.saturating_add(self.offset.unwrap_or(0)));
+                let predicate_limit = filter_info
+                    .is_last()
+                    .then(|| budget.selected_row_limit())
+                    .flatten();
 
                 // Evaluate the filter via `with_predicate_options`, opting 
into
                 // early termination when this is the final predicate and an
@@ -495,6 +584,7 @@ impl RowGroupReaderBuilder {
                     row_group_idx,
                     row_count,
                     plan_builder,
+                    budget,
                 };
 
                 // Take back the column chunks that were read
@@ -531,47 +621,32 @@ impl RowGroupReaderBuilder {
                     row_group_idx,
                     row_count,
                     plan_builder,
+                    budget,
                 } = row_group_info;
 
-                // Compute the number of rows in the selection before applying 
limit and offset
-                let rows_before = 
plan_builder.num_rows_selected().unwrap_or(row_count);
+                let BudgetedReadPlan {
+                    mut plan_builder,
+                    rows_before_budget,
+                    rows_after_budget,
+                    remaining_budget,
+                } = budget.apply_to_plan(plan_builder, row_count);
 
-                if rows_before == 0 {
+                if rows_before_budget == 0 {
                     // ruled out entire row group
                     return Ok(NextState::result(
                         RowGroupDecoderState::Finished,
-                        DecodeResult::Finished,
+                        RowGroupBuildResult::Finished { remaining_budget },
                     ));
                 }
 
-                // Apply any limit and offset
-                let mut plan_builder = plan_builder
-                    .limited(row_count)
-                    .with_offset(self.offset)
-                    .with_limit(self.limit)
-                    .build_limited();
-
-                let rows_after = 
plan_builder.num_rows_selected().unwrap_or(row_count);
-
-                // Update running offset and limit for after the current row 
group is read
-                if let Some(offset) = &mut self.offset {
-                    // Reduction is either because of offset or limit, as 
limit is applied
-                    // after offset has been "exhausted" can just use 
saturating sub here
-                    *offset = offset.saturating_sub(rows_before - rows_after)
-                }
-
-                if rows_after == 0 {
+                if rows_after_budget == 0 {
                     // no rows left after applying limit/offset
                     return Ok(NextState::result(
                         RowGroupDecoderState::Finished,
-                        DecodeResult::Finished,
+                        RowGroupBuildResult::Finished { remaining_budget },
                     ));
                 }
 
-                if let Some(limit) = &mut self.limit {
-                    *limit -= rows_after;
-                }
-
                 let data_request = DataRequestBuilder::new(
                     row_group_idx,
                     row_count,
@@ -597,6 +672,7 @@ impl RowGroupReaderBuilder {
                     row_group_idx,
                     row_count,
                     plan_builder,
+                    budget: remaining_budget,
                 };
 
                 NextState::again(RowGroupDecoderState::WaitingOnData {
@@ -620,7 +696,7 @@ impl RowGroupReaderBuilder {
                             data_request,
                             cache_info,
                         },
-                        DecodeResult::NeedsData(needed_ranges),
+                        RowGroupBuildResult::NeedsData(needed_ranges),
                     ));
                 }
 
@@ -629,6 +705,7 @@ impl RowGroupReaderBuilder {
                     row_group_idx,
                     row_count,
                     plan_builder,
+                    budget,
                 } = row_group_info;
 
                 let row_group = data_request.try_into_in_memory_row_group(
@@ -656,11 +733,18 @@ impl RowGroupReaderBuilder {
                 }?;
 
                 let reader = ParquetRecordBatchReader::new(array_reader, plan);
-                NextState::result(RowGroupDecoderState::Finished, 
DecodeResult::Data(reader))
+                NextState::result(
+                    RowGroupDecoderState::Finished,
+                    RowGroupBuildResult::Data {
+                        batch_reader: reader,
+                        remaining_budget: budget,
+                    },
+                )
             }
             RowGroupDecoderState::Finished => {
-                // nothing left to read
-                NextState::result(RowGroupDecoderState::Finished, 
DecodeResult::Finished)
+                return Err(ParquetError::General(String::from(
+                    "Internal Error: try_build called without an active row 
group",
+                )));
             }
         };
         Ok(result)
@@ -760,10 +844,55 @@ fn override_selector_strategy_if_needed(
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::arrow::arrow_reader::{RowSelection, RowSelector};
 
     #[test]
     // Verify that the size of RowGroupDecoderState does not grow too large
     fn test_structure_size() {
-        assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 200);
+        assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 232);
+    }
+
+    #[test]
+    fn test_row_budget_offset_limit_across_row_groups() {
+        let first =
+            RowBudget::new(Some(225), 
Some(20)).apply_to_plan(ReadPlanBuilder::new(1024), 200);
+        assert_eq!(first.rows_before_budget, 200);
+        assert_eq!(first.rows_after_budget, 0);
+        assert_eq!(first.remaining_budget, RowBudget::new(Some(25), Some(20)));
+        assert_eq!(first.plan_builder.num_rows_selected(), Some(0));
+
+        let second = first
+            .remaining_budget
+            .apply_to_plan(ReadPlanBuilder::new(1024), 200);
+        assert_eq!(second.rows_before_budget, 200);
+        assert_eq!(second.rows_after_budget, 20);
+        assert_eq!(second.remaining_budget, RowBudget::new(Some(0), Some(0)));
+        assert_eq!(second.plan_builder.num_rows_selected(), Some(20));
+    }
+
+    #[test]
+    fn test_row_budget_limit_only() {
+        let budgeted =
+            RowBudget::new(None, 
Some(20)).apply_to_plan(ReadPlanBuilder::new(1024), 200);
+        assert_eq!(budgeted.rows_before_budget, 200);
+        assert_eq!(budgeted.rows_after_budget, 20);
+        assert_eq!(budgeted.remaining_budget, RowBudget::new(None, Some(0)));
+        assert_eq!(budgeted.plan_builder.num_rows_selected(), Some(20));
+    }
+
+    #[test]
+    fn test_row_budget_empty_selection() {
+        let empty_selection = RowSelection::from(vec![RowSelector::skip(200)]);
+        let budgeted = RowBudget::new(Some(10), Some(20)).apply_to_plan(
+            ReadPlanBuilder::new(1024).with_selection(Some(empty_selection)),
+            200,
+        );
+        assert_eq!(budgeted.rows_before_budget, 0);
+        assert_eq!(budgeted.rows_after_budget, 0);
+        assert_eq!(
+            budgeted.remaining_budget,
+            RowBudget::new(Some(10), Some(20))
+        );
+        assert_eq!(budgeted.plan_builder.num_rows_selected(), Some(0));
     }
 }
diff --git a/parquet/src/arrow/push_decoder/remaining.rs 
b/parquet/src/arrow/push_decoder/remaining.rs
index 2986ca0da8..33e13abf9c 100644
--- a/parquet/src/arrow/push_decoder/remaining.rs
+++ b/parquet/src/arrow/push_decoder/remaining.rs
@@ -17,7 +17,9 @@
 
 use crate::DecodeResult;
 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
-use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder;
+use crate::arrow::push_decoder::reader_builder::{
+    RowBudget, RowGroupBuildResult, RowGroupReaderBuilder,
+};
 use crate::errors::ParquetError;
 use crate::file::metadata::ParquetMetaData;
 use bytes::Bytes;
@@ -25,21 +27,166 @@ use std::collections::VecDeque;
 use std::ops::Range;
 use std::sync::Arc;
 
-/// State machine that tracks the remaining high level chunks (row groups) of
-/// Parquet data are left to read.
-///
-/// This is currently a row group, but the author aspires to extend the pattern
-/// to data boundaries other than RowGroups in the future.
+/// Plan for the next queued row group after row-selection slicing.
 #[derive(Debug)]
-pub(crate) struct RemainingRowGroups {
-    /// The underlying Parquet metadata
-    parquet_metadata: Arc<ParquetMetaData>,
+enum QueuedRowGroupDecision {
+    /// Hand this row group to the builder.
+    Read(NextRowGroup),
+    /// Skip this row group, and keep scanning with the updated budget.
+    Skip { remaining_budget: RowBudget },
+}
 
-    /// The row groups that have not yet been read
-    row_groups: VecDeque<usize>,
+/// Work item handed from [`RowGroupFrontier`] to [`RowGroupReaderBuilder`].
+#[derive(Debug)]
+struct NextRowGroup {
+    row_group_idx: usize,
+    row_count: usize,
+    /// This row group's slice of the global selection, or `None` when all rows
+    /// are selected.
+    selection: Option<RowSelection>,
+    /// Budget snapshot to apply while decoding this row group.
+    budget: RowBudget,
+}
 
-    /// Remaining selection to apply to the next row groups
+#[derive(Debug)]
+struct RowGroupFrontier {
+    /// Metadata used to resolve row counts for queued row groups.
+    parquet_metadata: Arc<ParquetMetaData>,
+    /// Row group indices not yet handed to the builder.
+    row_groups: VecDeque<usize>,
+    /// Cross-row-group cursor for the optional global row selection.
     selection: Option<RowSelection>,
+    /// Offset/limit budget before the next readable row group is planned.
+    budget: RowBudget,
+    /// If predicates are present, row groups with selected rows must be read 
so
+    /// the predicate can decide whether they are actually needed.
+    has_predicates: bool,
+}
+
+impl RowGroupFrontier {
+    fn new(
+        parquet_metadata: Arc<ParquetMetaData>,
+        row_groups: Vec<usize>,
+        selection: Option<RowSelection>,
+        budget: RowBudget,
+        has_predicates: bool,
+    ) -> Self {
+        Self {
+            parquet_metadata,
+            row_groups: VecDeque::from(row_groups),
+            selection,
+            budget,
+            has_predicates,
+        }
+    }
+
+    fn row_group_num_rows(&self, row_group_idx: usize) -> Result<usize, 
ParquetError> {
+        self.parquet_metadata
+            .row_group(row_group_idx)
+            .num_rows()
+            .try_into()
+            .map_err(|e| ParquetError::General(format!("Row count overflow: 
{e}")))
+    }
+
+    fn update_budget_after_row_group(&mut self, budget: RowBudget) {
+        self.budget = budget;
+    }
+
+    fn clear_remaining(&mut self) {
+        self.selection = None;
+        self.row_groups.clear();
+    }
+
+    /// Plan whether a selected row group should be read or skipped.
+    ///
+    /// Selection-only skips are handled before this method is called. This
+    /// method applies the remaining offset/limit budget and predicate
+    /// conservatism.
+    fn plan_selected_row_group(
+        &self,
+        next_row_group: NextRowGroup,
+        selected_rows: usize,
+    ) -> QueuedRowGroupDecision {
+        if self.has_predicates {
+            return QueuedRowGroupDecision::Read(next_row_group);
+        }
+
+        let rows_after_budget = self.budget.rows_after(selected_rows);
+        if rows_after_budget != 0 {
+            return QueuedRowGroupDecision::Read(next_row_group);
+        }
+
+        QueuedRowGroupDecision::Skip {
+            remaining_budget: self.budget.advance(selected_rows, 
rows_after_budget),
+        }
+    }
+
+    /// Advance queued row groups until one should be handed to the builder.
+    fn next_readable_row_group(&mut self) -> Result<Option<NextRowGroup>, 
ParquetError> {
+        loop {
+            let Some(&row_group_idx) = self.row_groups.front() else {
+                return Ok(None);
+            };
+            if self.budget.is_exhausted()
+                || self
+                    .selection
+                    .as_ref()
+                    .is_some_and(|selection| selection.row_count() == 0)
+            {
+                self.clear_remaining();
+                return Ok(None);
+            }
+
+            let row_count = self.row_group_num_rows(row_group_idx)?;
+            let (selection, selected_rows) = match self.selection.as_mut() {
+                Some(selection) => {
+                    let selection = selection.split_off(row_count);
+                    let selected_rows = selection.row_count();
+                    if selected_rows == 0 {
+                        self.row_groups.pop_front();
+                        continue;
+                    }
+
+                    let selection = if selected_rows == row_count {
+                        None
+                    } else {
+                        Some(selection)
+                    };
+                    (selection, selected_rows)
+                }
+                None => (None, row_count),
+            };
+
+            let next_row_group = NextRowGroup {
+                row_group_idx,
+                row_count,
+                selection,
+                budget: self.budget,
+            };
+
+            match self.plan_selected_row_group(next_row_group, selected_rows) {
+                QueuedRowGroupDecision::Read(next_row_group) => {
+                    self.row_groups.pop_front();
+                    return Ok(Some(next_row_group));
+                }
+                QueuedRowGroupDecision::Skip { remaining_budget } => {
+                    self.row_groups.pop_front();
+                    self.budget = remaining_budget;
+                }
+            }
+        }
+    }
+}
+
+/// State machine that tracks the remaining high level chunks (row groups) of
+/// Parquet data left to read.
+///
+/// [`RowGroupFrontier`] owns cross-row-group scan state and selects the next
+/// work item. [`RowGroupReaderBuilder`] owns decoding for the active row 
group.
+#[derive(Debug)]
+pub(crate) struct RemainingRowGroups {
+    /// Cross-row-group scan state for queued work.
+    frontier: RowGroupFrontier,
 
     /// State for building the reader for the current row group
     row_group_reader_builder: RowGroupReaderBuilder,
@@ -50,12 +197,18 @@ impl RemainingRowGroups {
         parquet_metadata: Arc<ParquetMetaData>,
         row_groups: Vec<usize>,
         selection: Option<RowSelection>,
+        budget: RowBudget,
+        has_predicates: bool,
         row_group_reader_builder: RowGroupReaderBuilder,
     ) -> Self {
         Self {
-            parquet_metadata,
-            row_groups: VecDeque::from(row_groups),
-            selection,
+            frontier: RowGroupFrontier::new(
+                parquet_metadata,
+                row_groups,
+                selection,
+                budget,
+                has_predicates,
+            ),
             row_group_reader_builder,
         }
     }
@@ -82,42 +235,48 @@ impl RemainingRowGroups {
         &mut self,
     ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
         loop {
-            // Are we ready yet to start reading?
-            let result: DecodeResult<ParquetRecordBatchReader> =
-                self.row_group_reader_builder.try_build()?;
-            match result {
-                DecodeResult::Finished => {
+            if !self.row_group_reader_builder.has_active_row_group() {
+                // We are done with the previous row group, seek to the next 
one
+                // from the frontier, if any.
+
+                match self.frontier.next_readable_row_group()? {
+                    Some(NextRowGroup {
+                        row_group_idx,
+                        row_count,
+                        selection,
+                        budget,
+                    }) => {
+                        self.row_group_reader_builder.next_row_group(
+                            row_group_idx,
+                            row_count,
+                            selection,
+                            budget,
+                        )?;
+                    }
+                    None => return Ok(DecodeResult::Finished),
+                }
+            }
+
+            match self.row_group_reader_builder.try_build()? {
+                RowGroupBuildResult::Finished { remaining_budget } => {
+                    self.frontier
+                        .update_budget_after_row_group(remaining_budget);
                     // reader is done, proceed to the next row group
-                    // fall through to the next row group
-                    // This happens if the row group was completely filtered 
out
                 }
-                DecodeResult::NeedsData(ranges) => {
+                RowGroupBuildResult::NeedsData(ranges) => {
                     // need more data to proceed
                     return Ok(DecodeResult::NeedsData(ranges));
                 }
-                DecodeResult::Data(batch_reader) => {
+                RowGroupBuildResult::Data {
+                    batch_reader,
+                    remaining_budget,
+                } => {
+                    self.frontier
+                        .update_budget_after_row_group(remaining_budget);
                     // ready to read the row group
                     return Ok(DecodeResult::Data(batch_reader));
                 }
             }
-
-            // No current reader, proceed to the next row group if any
-            let row_group_idx = match self.row_groups.pop_front() {
-                None => return Ok(DecodeResult::Finished),
-                Some(idx) => idx,
-            };
-
-            let row_count: usize = self
-                .parquet_metadata
-                .row_group(row_group_idx)
-                .num_rows()
-                .try_into()
-                .map_err(|e| ParquetError::General(format!("Row count 
overflow: {e}")))?;
-
-            let selection = self.selection.as_mut().map(|s| 
s.split_off(row_count));
-            self.row_group_reader_builder
-                .next_row_group(row_group_idx, row_count, selection)?;
-            // the next iteration will try to build the reader for the new row 
group
         }
     }
 }


Reply via email to