alamb commented on code in PR #8733:
URL: https://github.com/apache/arrow-rs/pull/8733#discussion_r2528093842
##########
parquet/src/arrow/arrow_reader/read_plan.rs:
##########
@@ -248,3 +275,190 @@ impl ReadPlan {
self.batch_size
}
}
+
+/// Cursor for iterating a [`RowSelection`] during execution within a
[`ReadPlan`].
+///
+/// This keeps per-reader state such as the current position and delegates the
+/// actual storage strategy to [`RowSelectionBacking`].
+#[derive(Debug)]
+pub struct RowSelectionCursor {
+ /// Backing storage describing how the selection is materialised
+ storage: RowSelectionBacking,
+ /// Current absolute offset into the selection
+ position: usize,
+}
+
+/// Backing storage that powers [`RowSelectionCursor`].
+///
+/// The cursor either walks a boolean mask (dense representation) or a queue
+/// of [`RowSelector`] ranges (sparse representation).
+#[derive(Debug)]
+enum RowSelectionBacking {
+ Mask(BooleanBuffer),
+ Selectors(VecDeque<RowSelector>),
+}
+
+/// Result of computing the next chunk to read when using a bitmap mask
+pub struct MaskChunk {
+ /// Number of leading rows to skip before reaching selected rows
+ pub initial_skip: usize,
+ /// Total rows covered by this chunk (selected + skipped)
+ pub chunk_rows: usize,
+ /// Rows actually selected within the chunk
+ pub selected_rows: usize,
+ /// Starting offset within the mask where the chunk begins
+ pub mask_start: usize,
+}
+
+impl RowSelectionCursor {
+ /// Create a cursor, choosing an efficient backing representation
+ fn new(selectors: Vec<RowSelector>, strategy: RowSelectionStrategy) ->
Self {
+ if matches!(strategy, RowSelectionStrategy::Selectors) {
+ return Self {
+ storage: RowSelectionBacking::Selectors(selectors.into()),
+ position: 0,
+ };
+ }
+
+ let total_rows: usize = selectors.iter().map(|s| s.row_count).sum();
+ let selector_count = selectors.len();
+ const AVG_SELECTOR_LEN_MASK_THRESHOLD: usize = 16;
+ // Prefer a bitmap mask when the selectors are short on average, as
the mask
+ // (re)construction cost is amortized by a simpler execution path
during reads.
+ let use_mask = match strategy {
+ RowSelectionStrategy::Mask => true,
+ RowSelectionStrategy::Auto => {
+ selector_count == 0
+ || total_rows <
selector_count.saturating_mul(AVG_SELECTOR_LEN_MASK_THRESHOLD)
+ }
+ RowSelectionStrategy::Selectors => unreachable!(),
+ };
+
+ let storage = if use_mask {
+ RowSelectionBacking::Mask(boolean_mask_from_selectors(&selectors))
+ } else {
+ RowSelectionBacking::Selectors(selectors.into())
+ };
+
+ Self {
+ storage,
+ position: 0,
+ }
+ }
+
+ /// Returns `true` when no further rows remain
+ pub fn is_empty(&self) -> bool {
+ match &self.storage {
+ RowSelectionBacking::Mask(mask) => self.position >= mask.len(),
+ RowSelectionBacking::Selectors(selectors) => selectors.is_empty(),
+ }
+ }
+
+ /// Current position within the overall selection
+ pub fn position(&self) -> usize {
+ self.position
+ }
+
+ /// Return the next [`RowSelector`] when using the sparse representation
+ pub fn next_selector(&mut self) -> Option<RowSelector> {
+ match &mut self.storage {
+ RowSelectionBacking::Selectors(selectors) => {
+ let selector = selectors.pop_front()?;
+ self.position += selector.row_count;
+ Some(selector)
+ }
+ RowSelectionBacking::Mask(_) => None,
+ }
+ }
+
+ /// Return a selector to the front, rewinding the position (sparse-only)
+ pub fn return_selector(&mut self, selector: RowSelector) {
+ match &mut self.storage {
+ RowSelectionBacking::Selectors(selectors) => {
+ self.position =
self.position.saturating_sub(selector.row_count);
+ selectors.push_front(selector);
+ }
+ RowSelectionBacking::Mask(_) => {
+ unreachable!("return_selector called for mask-based
RowSelectionCursor")
+ }
+ }
+ }
+
+ /// Returns `true` if the cursor is backed by a boolean mask
+ pub fn is_mask_backed(&self) -> bool {
+ matches!(self.storage, RowSelectionBacking::Mask(_))
+ }
+
+ /// Advance through the mask representation, producing the next chunk
summary
+ pub fn next_mask_chunk(&mut self, batch_size: usize) -> Option<MaskChunk> {
+ let (initial_skip, chunk_rows, selected_rows, mask_start,
end_position) = {
+ let mask = match &self.storage {
+ RowSelectionBacking::Mask(mask) => mask,
+ RowSelectionBacking::Selectors(_) => return None,
+ };
+
+ if self.position >= mask.len() {
+ return None;
+ }
+
+ let start_position = self.position;
+ let mut cursor = start_position;
+ let mut initial_skip = 0;
+
+ while cursor < mask.len() && !mask.value(cursor) {
+ initial_skip += 1;
+ cursor += 1;
+ }
+
+ let mask_start = cursor;
+ let mut chunk_rows = 0;
+ let mut selected_rows = 0;
+
+ // Advance until enough rows have been selected to satisfy the
batch size,
+ // or until the mask is exhausted. This mirrors the behaviour of
the legacy
+ // `RowSelector` queue-based iteration.
+ while cursor < mask.len() && selected_rows < batch_size {
+ chunk_rows += 1;
+ if mask.value(cursor) {
+ selected_rows += 1;
+ }
+ cursor += 1;
+ }
+
+ (initial_skip, chunk_rows, selected_rows, mask_start, cursor)
+ };
+
+ self.position = end_position;
+
+ Some(MaskChunk {
+ initial_skip,
+ chunk_rows,
+ selected_rows,
+ mask_start,
+ })
+ }
+
+ /// Materialise the boolean values for a mask-backed chunk
+ pub fn mask_values_for(&self, chunk: &MaskChunk) -> Option<BooleanArray> {
+ match &self.storage {
+ RowSelectionBacking::Mask(mask) => {
+ if chunk.mask_start.saturating_add(chunk.chunk_rows) >
mask.len() {
+ return None;
+ }
+ Some(BooleanArray::from(
+ mask.slice(chunk.mask_start, chunk.chunk_rows),
+ ))
+ }
+ RowSelectionBacking::Selectors(_) => None,
+ }
+ }
+}
+
+fn boolean_mask_from_selectors(selectors: &[RowSelector]) -> BooleanBuffer {
Review Comment:
- Tracking ticket: https://github.com/apache/arrow-rs/issues/8844
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]