HippoBaro commented on code in PR #9804:
URL: https://github.com/apache/arrow-rs/pull/9804#discussion_r3142791086


##########
parquet/src/arrow/push_decoder/remaining.rs:
##########
@@ -17,29 +17,179 @@
 
 use crate::DecodeResult;
 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
-use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder;
+use crate::arrow::push_decoder::reader_builder::{
+    RowBudget, RowGroupBuildResult, RowGroupReaderBuilder,
+};
 use crate::errors::ParquetError;
 use crate::file::metadata::ParquetMetaData;
 use bytes::Bytes;
 use std::collections::VecDeque;
 use std::ops::Range;
 use std::sync::Arc;
 
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+enum QueuedRowGroupDecision {
+    NeedThis,
+    SkipThis { remaining_budget: RowBudget },
+    SkipAllRemaining,
+}
+#[derive(Debug)]
+struct NextRowGroup {
+    row_group_idx: usize,
+    row_count: usize,
+    selection: Option<RowSelection>,
+    budget: RowBudget,
+}
+
+#[derive(Debug)]
+struct SelectionFrontier {
+    selection: Option<RowSelection>,
+}
+
+impl SelectionFrontier {
+    fn new(selection: Option<RowSelection>) -> Self {
+        Self { selection }
+    }
+
+    fn is_exhausted(&self) -> bool {
+        self.selection
+            .as_ref()
+            .is_some_and(|selection| !selection.selects_any())
+    }
+
+    fn clear(&mut self) {
+        self.selection = None;
+    }
+
+    fn take_for_row_group(&mut self, row_count: usize) -> Option<RowSelection> 
{
+        self.selection.as_mut().map(|s| s.split_off(row_count))
+    }
+}
+
+#[derive(Debug)]
+struct RowGroupFrontier {
+    parquet_metadata: Arc<ParquetMetaData>,
+    row_groups: VecDeque<usize>,
+    selection: SelectionFrontier,
+    budget: RowBudget,
+    has_predicates: bool,
+}
+
+impl RowGroupFrontier {
+    fn new(
+        parquet_metadata: Arc<ParquetMetaData>,
+        row_groups: Vec<usize>,
+        selection: Option<RowSelection>,
+        budget: RowBudget,
+        has_predicates: bool,
+    ) -> Self {
+        Self {
+            parquet_metadata,
+            row_groups: VecDeque::from(row_groups),
+            selection: SelectionFrontier::new(selection),
+            budget,
+            has_predicates,
+        }
+    }
+
+    fn row_group_num_rows(&self, row_group_idx: usize) -> Result<usize, 
ParquetError> {
+        self.parquet_metadata
+            .row_group(row_group_idx)
+            .num_rows()
+            .try_into()
+            .map_err(|e| ParquetError::General(format!("Row count overflow: 
{e}")))
+    }
+
+    fn advance_budget(&mut self, budget: RowBudget) {
+        self.budget = budget;
+    }
+
+    fn selection_exhausted(&self) -> bool {
+        self.selection.is_exhausted()
+    }
+
+    fn clear_remaining(&mut self) {
+        self.selection.clear();
+        self.row_groups.clear();
+    }
+
+    fn classify_queued_row_group(
+        &self,
+        row_count: usize,
+        selection: Option<&RowSelection>,
+    ) -> QueuedRowGroupDecision {
+        if self.budget.is_exhausted() {
+            return QueuedRowGroupDecision::SkipAllRemaining;
+        }
+
+        if selection.is_some_and(|selection| !selection.selects_any()) {
+            return QueuedRowGroupDecision::SkipThis {
+                remaining_budget: self.budget,
+            };
+        }
+
+        if self.has_predicates {
+            return QueuedRowGroupDecision::NeedThis;
+        }
+
+        let rows_before_budget = selection
+            .map(|selection| selection.row_count())
+            .unwrap_or(row_count);
+        let rows_after_budget = self.budget.rows_after(rows_before_budget);
+        if rows_after_budget != 0 {
+            return QueuedRowGroupDecision::NeedThis;
+        }
+
+        QueuedRowGroupDecision::SkipThis {
+            remaining_budget: self.budget.advance(rows_before_budget, 
rows_after_budget),
+        }
+    }
+
+    fn next_readable_row_group(&mut self) -> Result<Option<NextRowGroup>, 
ParquetError> {
+        loop {
+            if self.selection_exhausted() {
+                self.clear_remaining();
+                return Ok(None);
+            }
+
+            let Some(&row_group_idx) = self.row_groups.front() else {
+                return Ok(None);
+            };
+            let row_count = self.row_group_num_rows(row_group_idx)?;
+            let selection = self.selection.take_for_row_group(row_count);
+
+            match self.classify_queued_row_group(row_count, 
selection.as_ref()) {
+                QueuedRowGroupDecision::NeedThis => {
+                    let row_group_idx = 
self.row_groups.pop_front().expect("front row group");
+                    return Ok(Some(NextRowGroup {
+                        row_group_idx,
+                        row_count,
+                        selection,
+                        budget: self.budget,
+                    }));
+                }
+                QueuedRowGroupDecision::SkipThis { remaining_budget } => {
+                    self.row_groups.pop_front();
+                    self.budget = remaining_budget;
+                }
+                QueuedRowGroupDecision::SkipAllRemaining => {
+                    self.clear_remaining();
+                    return Ok(None);
+                }
+            }
+        }
+    }
+}
+
 /// State machine that tracks the remaining high level chunks (row groups) of
 /// Parquet data are left to read.
 ///
 /// This is currently a row group, but the author aspires to extend the pattern
 /// to data boundaries other than RowGroups in the future.
 #[derive(Debug)]
 pub(crate) struct RemainingRowGroups {
-    /// The underlying Parquet metadata
-    parquet_metadata: Arc<ParquetMetaData>,
-
-    /// The row groups that have not yet been read
-    row_groups: VecDeque<usize>,
-
-    /// Remaining selection to apply to the next row groups
-    selection: Option<RowSelection>,
+    /// Cross-row-group scan state for queued work.
+    frontier: RowGroupFrontier,

Review Comment:
   For clearer separation of concerns. The builder now only owns the currently 
active row group and its decode state, while `RowGroupFrontier` owns the 
look-ahead state needed to decide which row group can be handed to the builder 
next. 
   
   This is also the piece that I care about in the context of the `PushBuffers` 
work, because it centralizes all the row-group pruning logic, which makes 
releasing prefetched buffer easy. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to