alamb commented on code in PR #8733:
URL: https://github.com/apache/arrow-rs/pull/8733#discussion_r2505421194


##########
parquet/src/arrow/arrow_reader/read_plan.rs:
##########
@@ -20,19 +20,70 @@
 
 use crate::arrow::array_reader::ArrayReader;
 use crate::arrow::arrow_reader::{
-    ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector,
+    ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor,
+    RowSelectionStrategy, RowSelector,
 };
 use crate::errors::{ParquetError, Result};
 use arrow_array::Array;
 use arrow_select::filter::prep_null_mask_filter;
-use std::collections::VecDeque;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+// The average selector length threshold for choosing between
+// `RowSelectionStrategy::Mask` and `RowSelectionStrategy::Selectors`.
+// If the average selector length is less than this value,
+// `RowSelectionStrategy::Mask` is preferred.
+const AVG_SELECTOR_LEN_MASK_THRESHOLD: usize = 32;
+
+// The logic in `preferred_selection_strategy` depends on the constant
+// `AVG_SELECTOR_LEN_MASK_THRESHOLD`. To allow unit testing of this logic,
+// we use a mutable global variable that can be temporarily changed during 
tests.
+//
+// An `AtomicUsize` is used because the Rust test runner (`cargo test`) runs 
tests
+// in parallel by default. The atomic operations prevent data races between
+// different test threads that might try to modify this value simultaneously.
+//
+// For the production code path, `load(Ordering::Relaxed)` is used. This is the
+// weakest memory ordering and for a simple load on most modern architectures,
+// it compiles down to a regular memory read with negligible performance 
overhead.
+// The more expensive atomic operations with stronger ordering are only used 
in the
+// test-only functions below.
+static AVG_SELECTOR_LEN_MASK_THRESHOLD_OVERRIDE: AtomicUsize =

Review Comment:
   I am a little worried about this mechanism for testing -- I understand its 
appeal, but it might be better to avoid statics entirely and thread through the 
threshold value as an option on the selection strategy directly
   
   For example
   ```rust
   enum RowSelectionStrategy {
     /// automatically pick the filter representation
     /// based on heuristics
     Auto,
     /// If the average number of rows is selected is more than
     /// the threshold, uses the Mask policy, otherwise uses Selectors
     Threshold {
       threshold: usize
     },
     Selection,
     Mask
   }
   ```



##########
parquet/src/arrow/arrow_reader/selection.rs:
##########
@@ -691,6 +736,180 @@ fn union_row_selections(left: &[RowSelector], right: 
&[RowSelector]) -> RowSelec
     iter.collect()
 }
 
+/// Cursor for iterating a [`RowSelection`] during execution within a
+/// [`ReadPlan`](crate::arrow::arrow_reader::ReadPlan).
+///
+/// This keeps per-reader state such as the current position and delegates the
+/// actual storage strategy to the internal `RowSelectionBacking`.
+#[derive(Debug)]
+pub struct RowSelectionCursor {
+    /// Backing storage describing how the selection is materialised
+    storage: RowSelectionBacking,
+    /// Current absolute offset into the selection
+    position: usize,
+}
+
+/// Backing storage that powers [`RowSelectionCursor`].
+///
+/// The cursor either walks a boolean mask (dense representation) or a queue
+/// of [`RowSelector`] ranges (sparse representation).
+#[derive(Debug)]
+pub enum RowSelectionBacking {
+    Mask(BooleanBuffer),
+    Selectors(VecDeque<RowSelector>),
+}
+
+/// Result of computing the next chunk to read when using a bitmap mask
+#[derive(Debug)]
+pub struct MaskChunk {
+    /// Number of leading rows to skip before reaching selected rows
+    pub initial_skip: usize,
+    /// Total rows covered by this chunk (selected + skipped)
+    pub chunk_rows: usize,
+    /// Rows actually selected within the chunk
+    pub selected_rows: usize,
+    /// Starting offset within the mask where the chunk begins
+    pub mask_start: usize,
+}
+
+impl RowSelectionCursor {
+    /// Create a cursor, choosing an efficient backing representation
+    pub(crate) fn new(selectors: Vec<RowSelector>, strategy: 
RowSelectionStrategy) -> Self {
+        let storage = match strategy {
+            RowSelectionStrategy::Mask => {
+                
RowSelectionBacking::Mask(boolean_mask_from_selectors(&selectors))
+            }
+            RowSelectionStrategy::Selectors => 
RowSelectionBacking::Selectors(selectors.into()),
+        };
+
+        Self {
+            storage,
+            position: 0,
+        }
+    }
+
+    /// Returns `true` when no further rows remain
+    pub fn is_empty(&self) -> bool {
+        match &self.storage {
+            RowSelectionBacking::Mask(mask) => self.position >= mask.len(),
+            RowSelectionBacking::Selectors(selectors) => selectors.is_empty(),
+        }
+    }
+
+    /// Current position within the overall selection
+    pub fn position(&self) -> usize {
+        self.position
+    }
+
+    /// Return the next [`RowSelector`] when using the sparse representation
+    pub fn next_selector(&mut self) -> Option<RowSelector> {
+        match &mut self.storage {
+            RowSelectionBacking::Selectors(selectors) => {
+                let selector = selectors.pop_front()?;
+                self.position += selector.row_count;
+                Some(selector)

Review Comment:
   this always returns `Some` so  we could change the signature to always 
return `RowSelector` rather than `Option<RowSelector>`



##########
parquet/src/arrow/arrow_reader/read_plan.rs:
##########
@@ -20,19 +20,70 @@
 
 use crate::arrow::array_reader::ArrayReader;
 use crate::arrow::arrow_reader::{
-    ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector,
+    ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor,
+    RowSelectionStrategy, RowSelector,
 };
 use crate::errors::{ParquetError, Result};
 use arrow_array::Array;
 use arrow_select::filter::prep_null_mask_filter;
-use std::collections::VecDeque;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+// The average selector length threshold for choosing between
+// `RowSelectionStrategy::Mask` and `RowSelectionStrategy::Selectors`.
+// If the average selector length is less than this value,
+// `RowSelectionStrategy::Mask` is preferred.
+const AVG_SELECTOR_LEN_MASK_THRESHOLD: usize = 32;
+
+// The logic in `preferred_selection_strategy` depends on the constant
+// `AVG_SELECTOR_LEN_MASK_THRESHOLD`. To allow unit testing of this logic,
+// we use a mutable global variable that can be temporarily changed during 
tests.
+//
+// An `AtomicUsize` is used because the Rust test runner (`cargo test`) runs 
tests
+// in parallel by default. The atomic operations prevent data races between
+// different test threads that might try to modify this value simultaneously.
+//
+// For the production code path, `load(Ordering::Relaxed)` is used. This is the
+// weakest memory ordering and for a simple load on most modern architectures,
+// it compiles down to a regular memory read with negligible performance 
overhead.
+// The more expensive atomic operations with stronger ordering are only used 
in the
+// test-only functions below.
+static AVG_SELECTOR_LEN_MASK_THRESHOLD_OVERRIDE: AtomicUsize =
+    AtomicUsize::new(AVG_SELECTOR_LEN_MASK_THRESHOLD);
+
+#[inline(always)]
+fn avg_selector_len_mask_threshold() -> usize {
+    AVG_SELECTOR_LEN_MASK_THRESHOLD_OVERRIDE.load(Ordering::Relaxed)
+}
+
+/// An RAII guard that restores the previous value of the override when it is 
dropped.
+/// This ensures that any change to the global threshold is temporary and 
scoped to
+/// the test or benchmark where it's used, even in the case of a panic.
+pub struct AvgSelectorLenMaskThresholdGuard {
+    previous: usize,
+}
+
+impl Drop for AvgSelectorLenMaskThresholdGuard {
+    fn drop(&mut self) {
+        AVG_SELECTOR_LEN_MASK_THRESHOLD_OVERRIDE.store(self.previous, 
Ordering::SeqCst);
+    }
+}
+
+/// Override AVG_SELECTOR_LEN_MASK_THRESHOLD (primarily for tests / 
benchmarks).
+///
+/// Returns an [`AvgSelectorLenMaskThresholdGuard`] that restores the previous 
value on drop.
+pub fn set_avg_selector_len_mask_threshold(value: usize) -> 
AvgSelectorLenMaskThresholdGuard {
+    let previous = AVG_SELECTOR_LEN_MASK_THRESHOLD_OVERRIDE.swap(value, 
Ordering::SeqCst);
+    AvgSelectorLenMaskThresholdGuard { previous }
+}
 
 /// A builder for [`ReadPlan`]
 #[derive(Clone, Debug)]
 pub struct ReadPlanBuilder {
     batch_size: usize,
     /// Current to apply, includes all filters
     selection: Option<RowSelection>,
+    /// Strategy to use when materialising the row selection
+    selection_strategy: RowSelectionStrategy,

Review Comment:
   I like this formulation as it makes this PR logic quite clear. 
   
   Also, As a follow on PR we could potentially combine this logic so the 
ReadPlanBuilder holds a `Option<RowSelectionCursor>` rather than a `selection` 
and selection_strategy`
   
   This might allow us to both
   1. avoid converting BooleanArray --> RowSelection and then back again
   2.  implement the page filtering in 
[`InMemoryRowGroup::fetch_ranges`](https://github.com/apache/arrow-rs/blob/e9ea12b138f8fa11c6046e94cb2b55b2830a0d7c/parquet/src/arrow/in_memory_row_group.rs#L58-L57)
 in terms of masked selections as well 
   
   However, we can totally do this as a follow on PR -- I will file a ticket 
when we merge this PR



##########
parquet/src/arrow/arrow_reader/selection.rs:
##########
@@ -16,12 +16,24 @@
 // under the License.
 
 use arrow_array::{Array, BooleanArray};
+use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder};
 use arrow_select::filter::SlicesIterator;
 use std::cmp::Ordering;
 use std::collections::VecDeque;
 use std::ops::Range;
 
-use crate::file::page_index::offset_index::PageLocation;
+use crate::arrow::ProjectionMask;
+use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
+
+/// Strategy for materialising [`RowSelection`] during execution.
+#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
+pub enum RowSelectionStrategy {

Review Comment:
   I liked the idea previously of having an Auto mode here based on heuristics 
that we could change over time. 
   
   I can help propose an upate to this API if you are amenable to this idea



##########
parquet/src/arrow/arrow_reader/selection.rs:
##########
@@ -213,6 +225,39 @@ impl RowSelection {
         ranges
     }
 
+    /// Returns true if this selection would skip any data pages within the 
provided columns
+    fn selection_skips_any_page(
+        &self,
+        projection: &ProjectionMask,
+        columns: &[OffsetIndexMetaData],
+    ) -> bool {
+        columns.iter().enumerate().any(|(leaf_idx, column)| {
+            if !projection.leaf_included(leaf_idx) {
+                return false;
+            }
+
+            let locations = column.page_locations();
+            if locations.is_empty() {
+                return false;
+            }
+
+            let ranges = self.scan_ranges(locations);
+            !ranges.is_empty() && ranges.len() < locations.len()
+        })
+    }
+
+    /// Returns true if selectors should be forced, preventing mask 
materialisation
+    pub(crate) fn should_force_selectors(
+        &self,
+        projection: &ProjectionMask,
+        offset_index: Option<&[OffsetIndexMetaData]>,
+    ) -> bool {
+        match offset_index {
+            Some(columns) => self.selection_skips_any_page(projection, 
columns),

Review Comment:
   This is a nice solution for the initial PR -- I think we can file a follow 
on ticket to figure out how to optimize this further in follow on PRs. 
   
   I think given my reading above, we can probably move move this logic into 
InMemoryRowGroup::fetch and then actually decide what to do with pages as 
necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to