alamb commented on code in PR #9804:
URL: https://github.com/apache/arrow-rs/pull/9804#discussion_r3203888725


##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -531,47 +609,32 @@ impl RowGroupReaderBuilder {
                     row_group_idx,
                     row_count,
                     plan_builder,
+                    budget,
                 } = row_group_info;
 
-                // Compute the number of rows in the selection before applying 
limit and offset
-                let rows_before = 
plan_builder.num_rows_selected().unwrap_or(row_count);
+                let BudgetedReadPlan {
+                    mut plan_builder,
+                    rows_before_budget,
+                    rows_after_budget,
+                    remaining_budget,
+                } = budget.apply_to_plan(plan_builder, row_count);
 
-                if rows_before == 0 {
+                if rows_before_budget == 0 {
                     // ruled out entire row group
                     return Ok(NextState::result(
                         RowGroupDecoderState::Finished,
-                        DecodeResult::Finished,
+                        RowGroupBuildResult::Finished { remaining_budget },
                     ));
                 }
 
-                // Apply any limit and offset
-                let mut plan_builder = plan_builder
-                    .limited(row_count)
-                    .with_offset(self.offset)
-                    .with_limit(self.limit)
-                    .build_limited();
-
-                let rows_after = 
plan_builder.num_rows_selected().unwrap_or(row_count);
-
-                // Update running offset and limit for after the current row 
group is read
-                if let Some(offset) = &mut self.offset {
-                    // Reduction is either because of offset or limit, as 
limit is applied
-                    // after offset has been "exhausted" can just use 
saturating sub here
-                    *offset = offset.saturating_sub(rows_before - rows_after)
-                }
-
-                if rows_after == 0 {
+                if rows_after_budget == 0 {

Review Comment:
   I remember when I originally refactored this logic into the PushDecoder and 
found all the limiting code quite complicated. I really like how this PR has 
encaspulated the limit/offset tracking better



##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -88,6 +88,95 @@ enum RowGroupDecoderState {
     Finished,
 }
 
+/// Running offset/limit budget shared across row groups.
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+pub(crate) struct RowBudget {
+    offset: Option<usize>,
+    limit: Option<usize>,
+}
+
+impl RowBudget {
+    pub(crate) fn new(offset: Option<usize>, limit: Option<usize>) -> Self {
+        Self { offset, limit }
+    }
+
+    pub(crate) fn is_exhausted(self) -> bool {
+        matches!(self.limit, Some(0))
+    }
+
+    pub(crate) fn rows_after(self, rows_before_budget: usize) -> usize {
+        let rows_after_offset = 
rows_before_budget.saturating_sub(self.offset.unwrap_or(0));
+        match self.limit {
+            Some(limit) => rows_after_offset.min(limit),
+            None => rows_after_offset,
+        }
+    }
+
+    /// Returns the number of selected rows needed before applying the offset.
+    fn selected_row_limit(self) -> Option<usize> {
+        self.limit
+            .map(|limit| limit.saturating_add(self.offset.unwrap_or(0)))
+    }
+
+    fn apply_to_plan(self, plan_builder: ReadPlanBuilder, row_count: usize) -> 
BudgetedReadPlan {
+        let rows_before_budget = 
plan_builder.num_rows_selected().unwrap_or(row_count);
+        let plan_builder = plan_builder
+            .limited(row_count)
+            .with_offset(self.offset)
+            .with_limit(self.limit)
+            .build_limited();
+        let rows_after_budget = self.rows_after(rows_before_budget);
+
+        BudgetedReadPlan {
+            plan_builder,
+            rows_before_budget,
+            rows_after_budget,
+            remaining_budget: self.advance(rows_before_budget, 
rows_after_budget),
+        }
+    }
+
+    pub(crate) fn advance(mut self, rows_before_budget: usize, 
rows_after_budget: usize) -> Self {
+        if let Some(offset) = &mut self.offset {
+            // Reduction is either because of offset or limit, as limit is 
applied
+            // after offset has been "exhausted" can just use saturating sub 
here.
+            *offset = offset.saturating_sub(rows_before_budget - 
rows_after_budget);
+        }
+
+        if rows_after_budget != 0 {
+            if let Some(limit) = &mut self.limit {
+                *limit -= rows_after_budget;
+            }
+        }
+
+        self
+    }
+}
+
+#[derive(Debug)]
+struct BudgetedReadPlan {
+    plan_builder: ReadPlanBuilder,
+    rows_before_budget: usize,

Review Comment:
   Maybe in a follow on PR we can add some comments to these fields -- like 
that `rows_before_budget` corresponds to how many rows are actually selected, 
and likewise `rows_after_udget` means how many rows are yet to read



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to