HippoBaro commented on code in PR #9804:
URL: https://github.com/apache/arrow-rs/pull/9804#discussion_r3205314571


##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -88,6 +88,95 @@ enum RowGroupDecoderState {
     Finished,
 }
 
+/// Running offset/limit budget shared across row groups.
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+pub(crate) struct RowBudget {
+    offset: Option<usize>,
+    limit: Option<usize>,
+}
+
+impl RowBudget {
+    pub(crate) fn new(offset: Option<usize>, limit: Option<usize>) -> Self {
+        Self { offset, limit }
+    }
+
+    pub(crate) fn is_exhausted(self) -> bool {
+        matches!(self.limit, Some(0))
+    }
+
+    pub(crate) fn rows_after(self, rows_before_budget: usize) -> usize {
+        let rows_after_offset = 
rows_before_budget.saturating_sub(self.offset.unwrap_or(0));
+        match self.limit {
+            Some(limit) => rows_after_offset.min(limit),
+            None => rows_after_offset,
+        }
+    }
+
+    /// Returns the number of selected rows needed before applying the offset.
+    fn selected_row_limit(self) -> Option<usize> {
+        self.limit
+            .map(|limit| limit.saturating_add(self.offset.unwrap_or(0)))
+    }
+
+    fn apply_to_plan(self, plan_builder: ReadPlanBuilder, row_count: usize) -> 
BudgetedReadPlan {
+        let rows_before_budget = 
plan_builder.num_rows_selected().unwrap_or(row_count);
+        let plan_builder = plan_builder
+            .limited(row_count)
+            .with_offset(self.offset)
+            .with_limit(self.limit)
+            .build_limited();
+        let rows_after_budget = self.rows_after(rows_before_budget);
+
+        BudgetedReadPlan {
+            plan_builder,
+            rows_before_budget,
+            rows_after_budget,
+            remaining_budget: self.advance(rows_before_budget, 
rows_after_budget),
+        }
+    }
+
+    pub(crate) fn advance(mut self, rows_before_budget: usize, 
rows_after_budget: usize) -> Self {
+        if let Some(offset) = &mut self.offset {
+            // Reduction is either because of offset or limit, as limit is 
applied
+            // after offset has been "exhausted" can just use saturating sub 
here.
+            *offset = offset.saturating_sub(rows_before_budget - 
rows_after_budget);
+        }
+
+        if rows_after_budget != 0 {
+            if let Some(limit) = &mut self.limit {
+                *limit -= rows_after_budget;
+            }
+        }
+
+        self
+    }
+}
+
+#[derive(Debug)]
+struct BudgetedReadPlan {
+    plan_builder: ReadPlanBuilder,
+    rows_before_budget: usize,

Review Comment:
   I'd be happy to include the feedback in this series. I'll ping you when it's 
pushed. Thank you! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to