alamb commented on code in PR #9804:
URL: https://github.com/apache/arrow-rs/pull/9804#discussion_r3203888725
##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -531,47 +609,32 @@ impl RowGroupReaderBuilder {
row_group_idx,
row_count,
plan_builder,
+ budget,
} = row_group_info;
- // Compute the number of rows in the selection before applying
limit and offset
- let rows_before =
plan_builder.num_rows_selected().unwrap_or(row_count);
+ let BudgetedReadPlan {
+ mut plan_builder,
+ rows_before_budget,
+ rows_after_budget,
+ remaining_budget,
+ } = budget.apply_to_plan(plan_builder, row_count);
- if rows_before == 0 {
+ if rows_before_budget == 0 {
// ruled out entire row group
return Ok(NextState::result(
RowGroupDecoderState::Finished,
- DecodeResult::Finished,
+ RowGroupBuildResult::Finished { remaining_budget },
));
}
- // Apply any limit and offset
- let mut plan_builder = plan_builder
- .limited(row_count)
- .with_offset(self.offset)
- .with_limit(self.limit)
- .build_limited();
-
- let rows_after =
plan_builder.num_rows_selected().unwrap_or(row_count);
-
- // Update running offset and limit for after the current row
group is read
- if let Some(offset) = &mut self.offset {
- // Reduction is either because of offset or limit, as
limit is applied
- // after offset has been "exhausted" can just use
saturating sub here
- *offset = offset.saturating_sub(rows_before - rows_after)
- }
-
- if rows_after == 0 {
+ if rows_after_budget == 0 {
Review Comment:
I remember when I originally refactored this logic into the PushDecoder and
found all the limiting code quite complicated. I really like how this PR has
encaspulated the limit/offset tracking better
##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -88,6 +88,95 @@ enum RowGroupDecoderState {
Finished,
}
+/// Running offset/limit budget shared across row groups.
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+pub(crate) struct RowBudget {
+ offset: Option<usize>,
+ limit: Option<usize>,
+}
+
+impl RowBudget {
+ pub(crate) fn new(offset: Option<usize>, limit: Option<usize>) -> Self {
+ Self { offset, limit }
+ }
+
+ pub(crate) fn is_exhausted(self) -> bool {
+ matches!(self.limit, Some(0))
+ }
+
+ pub(crate) fn rows_after(self, rows_before_budget: usize) -> usize {
+ let rows_after_offset =
rows_before_budget.saturating_sub(self.offset.unwrap_or(0));
+ match self.limit {
+ Some(limit) => rows_after_offset.min(limit),
+ None => rows_after_offset,
+ }
+ }
+
+ /// Returns the number of selected rows needed before applying the offset.
+ fn selected_row_limit(self) -> Option<usize> {
+ self.limit
+ .map(|limit| limit.saturating_add(self.offset.unwrap_or(0)))
+ }
+
+ fn apply_to_plan(self, plan_builder: ReadPlanBuilder, row_count: usize) ->
BudgetedReadPlan {
+ let rows_before_budget =
plan_builder.num_rows_selected().unwrap_or(row_count);
+ let plan_builder = plan_builder
+ .limited(row_count)
+ .with_offset(self.offset)
+ .with_limit(self.limit)
+ .build_limited();
+ let rows_after_budget = self.rows_after(rows_before_budget);
+
+ BudgetedReadPlan {
+ plan_builder,
+ rows_before_budget,
+ rows_after_budget,
+ remaining_budget: self.advance(rows_before_budget,
rows_after_budget),
+ }
+ }
+
+ pub(crate) fn advance(mut self, rows_before_budget: usize,
rows_after_budget: usize) -> Self {
+ if let Some(offset) = &mut self.offset {
+ // Reduction is either because of offset or limit, as limit is
applied
+ // after offset has been "exhausted" can just use saturating sub
here.
+ *offset = offset.saturating_sub(rows_before_budget -
rows_after_budget);
+ }
+
+ if rows_after_budget != 0 {
+ if let Some(limit) = &mut self.limit {
+ *limit -= rows_after_budget;
+ }
+ }
+
+ self
+ }
+}
+
+#[derive(Debug)]
+struct BudgetedReadPlan {
+ plan_builder: ReadPlanBuilder,
+ rows_before_budget: usize,
Review Comment:
Maybe in a follow on PR we can add some comments to these fields -- like
that `rows_before_budget` corresponds to how many rows are actually selected,
and likewise `rows_after_udget` means how many rows are yet to read
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]