alamb commented on code in PR #8733: URL: https://github.com/apache/arrow-rs/pull/8733#discussion_r2478972703
########## parquet/benches/row_selection_state.rs: ########## @@ -0,0 +1,290 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::hint; +use std::sync::Arc; + +use arrow_array::{ArrayRef, Int32Array, RecordBatch}; +use arrow_schema::{DataType, Field, Schema}; +use bytes::Bytes; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use parquet::arrow::ArrowWriter; +use parquet::arrow::arrow_reader::{ + ParquetRecordBatchReaderBuilder, RowSelection, RowSelectionStrategy, RowSelector, +}; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; + +const TOTAL_ROWS: usize = 1 << 20; +const BATCH_SIZE: usize = 1 << 10; +const BASE_SEED: u64 = 0xA55AA55A; +const READ_STRATEGIES: &[(&str, RowSelectionStrategy)] = &[ + ("read_mask", RowSelectionStrategy::Mask), + ("read_selectors", RowSelectionStrategy::Selectors), +]; + +fn criterion_benchmark(c: &mut Criterion) { Review Comment: I found this code quite clear and easy to read -- thank you 🙏 I do think it would be good if we could add some of the background context here as a commenet Specifically it is not obvious from just the code that this benchmark can be used to determine the value of `AVG_SELECTOR_LEN_MASK_THRESHOLD`) -- perhaps you can reuse some of the description from this PR? Also, how did you generate these charts? If it is straightforward perhaps you can also describe that in the comments https://github.com/apache/arrow-rs/pull/8733#issuecomment-3468441165 ########## parquet/src/arrow/arrow_reader/mod.rs: ########## @@ -271,6 +275,14 @@ impl<T> ArrowReaderBuilder<T> { } } + /// Override the strategy used to execute the configured [`RowSelection`] Review Comment: I think this is a very nice API and allows users to control the behavior ########## parquet/src/arrow/arrow_reader/mod.rs: ########## @@ -1035,8 +1050,85 @@ impl ParquetRecordBatchReader { let batch_size = self.batch_size(); match self.read_plan.selection_mut() { Some(selection) => { + if selection.is_mask_backed() { + // Stream the record batch reader using contiguous segments of the selection + // mask, avoiding the need to materialize intermediate `RowSelector` ranges. + loop { + let mask_chunk = match selection.next_mask_chunk(batch_size) { + Some(batch) => batch, + None => return Ok(None), + }; Review Comment: A small nit is you could make this somewhat more concise via ```suggestion let Some(mask_chunk) = selection.next_mask_chunk(batch_size) else { return Ok(None); }; ``` ########## parquet/src/arrow/arrow_reader/read_plan.rs: ########## @@ -248,3 +275,190 @@ impl ReadPlan { self.batch_size } } + +/// Cursor for iterating a [`RowSelection`] during execution within a [`ReadPlan`]. +/// +/// This keeps per-reader state such as the current position and delegates the +/// actual storage strategy to [`RowSelectionBacking`]. +#[derive(Debug)] +pub struct RowSelectionCursor { + /// Backing storage describing how the selection is materialised + storage: RowSelectionBacking, + /// Current absolute offset into the selection + position: usize, +} + +/// Backing storage that powers [`RowSelectionCursor`]. +/// +/// The cursor either walks a boolean mask (dense representation) or a queue +/// of [`RowSelector`] ranges (sparse representation). +#[derive(Debug)] +enum RowSelectionBacking { + Mask(BooleanBuffer), + Selectors(VecDeque<RowSelector>), +} + +/// Result of computing the next chunk to read when using a bitmap mask +pub struct MaskChunk { + /// Number of leading rows to skip before reaching selected rows + pub initial_skip: usize, + /// Total rows covered by this chunk (selected + skipped) + pub chunk_rows: usize, + /// Rows actually selected within the chunk + pub selected_rows: usize, + /// Starting offset within the mask where the chunk begins + pub mask_start: usize, +} + +impl RowSelectionCursor { + /// Create a cursor, choosing an efficient backing representation + fn new(selectors: Vec<RowSelector>, strategy: RowSelectionStrategy) -> Self { + if matches!(strategy, RowSelectionStrategy::Selectors) { + return Self { + storage: RowSelectionBacking::Selectors(selectors.into()), + position: 0, + }; + } + + let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); + let selector_count = selectors.len(); + const AVG_SELECTOR_LEN_MASK_THRESHOLD: usize = 16; + // Prefer a bitmap mask when the selectors are short on average, as the mask + // (re)construction cost is amortized by a simpler execution path during reads. + let use_mask = match strategy { + RowSelectionStrategy::Mask => true, + RowSelectionStrategy::Auto => { + selector_count == 0 + || total_rows < selector_count.saturating_mul(AVG_SELECTOR_LEN_MASK_THRESHOLD) + } + RowSelectionStrategy::Selectors => unreachable!(), + }; + + let storage = if use_mask { + RowSelectionBacking::Mask(boolean_mask_from_selectors(&selectors)) + } else { + RowSelectionBacking::Selectors(selectors.into()) + }; + + Self { + storage, + position: 0, + } + } + + /// Returns `true` when no further rows remain + pub fn is_empty(&self) -> bool { + match &self.storage { + RowSelectionBacking::Mask(mask) => self.position >= mask.len(), + RowSelectionBacking::Selectors(selectors) => selectors.is_empty(), + } + } + + /// Current position within the overall selection + pub fn position(&self) -> usize { + self.position + } + + /// Return the next [`RowSelector`] when using the sparse representation + pub fn next_selector(&mut self) -> Option<RowSelector> { + match &mut self.storage { + RowSelectionBacking::Selectors(selectors) => { + let selector = selectors.pop_front()?; + self.position += selector.row_count; + Some(selector) + } + RowSelectionBacking::Mask(_) => None, Review Comment: perhaps this should also be `unreachable!` to signal it is not expected as well as mirror the code in `return_selector` ########## parquet/src/arrow/arrow_reader/read_plan.rs: ########## @@ -248,3 +275,190 @@ impl ReadPlan { self.batch_size } } + +/// Cursor for iterating a [`RowSelection`] during execution within a [`ReadPlan`]. +/// +/// This keeps per-reader state such as the current position and delegates the +/// actual storage strategy to [`RowSelectionBacking`]. +#[derive(Debug)] +pub struct RowSelectionCursor { + /// Backing storage describing how the selection is materialised + storage: RowSelectionBacking, + /// Current absolute offset into the selection + position: usize, +} + +/// Backing storage that powers [`RowSelectionCursor`]. +/// +/// The cursor either walks a boolean mask (dense representation) or a queue +/// of [`RowSelector`] ranges (sparse representation). +#[derive(Debug)] +enum RowSelectionBacking { + Mask(BooleanBuffer), + Selectors(VecDeque<RowSelector>), +} + +/// Result of computing the next chunk to read when using a bitmap mask +pub struct MaskChunk { + /// Number of leading rows to skip before reaching selected rows + pub initial_skip: usize, + /// Total rows covered by this chunk (selected + skipped) + pub chunk_rows: usize, + /// Rows actually selected within the chunk + pub selected_rows: usize, + /// Starting offset within the mask where the chunk begins + pub mask_start: usize, +} + +impl RowSelectionCursor { + /// Create a cursor, choosing an efficient backing representation + fn new(selectors: Vec<RowSelector>, strategy: RowSelectionStrategy) -> Self { + if matches!(strategy, RowSelectionStrategy::Selectors) { + return Self { + storage: RowSelectionBacking::Selectors(selectors.into()), + position: 0, + }; + } + + let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); + let selector_count = selectors.len(); + const AVG_SELECTOR_LEN_MASK_THRESHOLD: usize = 16; + // Prefer a bitmap mask when the selectors are short on average, as the mask + // (re)construction cost is amortized by a simpler execution path during reads. + let use_mask = match strategy { + RowSelectionStrategy::Mask => true, + RowSelectionStrategy::Auto => { + selector_count == 0 + || total_rows < selector_count.saturating_mul(AVG_SELECTOR_LEN_MASK_THRESHOLD) + } + RowSelectionStrategy::Selectors => unreachable!(), + }; + + let storage = if use_mask { + RowSelectionBacking::Mask(boolean_mask_from_selectors(&selectors)) + } else { + RowSelectionBacking::Selectors(selectors.into()) + }; + + Self { + storage, + position: 0, + } + } + + /// Returns `true` when no further rows remain + pub fn is_empty(&self) -> bool { + match &self.storage { + RowSelectionBacking::Mask(mask) => self.position >= mask.len(), + RowSelectionBacking::Selectors(selectors) => selectors.is_empty(), + } + } + + /// Current position within the overall selection + pub fn position(&self) -> usize { + self.position + } + + /// Return the next [`RowSelector`] when using the sparse representation + pub fn next_selector(&mut self) -> Option<RowSelector> { + match &mut self.storage { + RowSelectionBacking::Selectors(selectors) => { + let selector = selectors.pop_front()?; + self.position += selector.row_count; + Some(selector) + } + RowSelectionBacking::Mask(_) => None, + } + } + + /// Return a selector to the front, rewinding the position (sparse-only) + pub fn return_selector(&mut self, selector: RowSelector) { + match &mut self.storage { + RowSelectionBacking::Selectors(selectors) => { + self.position = self.position.saturating_sub(selector.row_count); + selectors.push_front(selector); + } + RowSelectionBacking::Mask(_) => { + unreachable!("return_selector called for mask-based RowSelectionCursor") + } + } + } + + /// Returns `true` if the cursor is backed by a boolean mask + pub fn is_mask_backed(&self) -> bool { + matches!(self.storage, RowSelectionBacking::Mask(_)) + } + + /// Advance through the mask representation, producing the next chunk summary + pub fn next_mask_chunk(&mut self, batch_size: usize) -> Option<MaskChunk> { + let (initial_skip, chunk_rows, selected_rows, mask_start, end_position) = { + let mask = match &self.storage { + RowSelectionBacking::Mask(mask) => mask, + RowSelectionBacking::Selectors(_) => return None, + }; + + if self.position >= mask.len() { + return None; + } + + let start_position = self.position; + let mut cursor = start_position; + let mut initial_skip = 0; + + while cursor < mask.len() && !mask.value(cursor) { + initial_skip += 1; + cursor += 1; + } + + let mask_start = cursor; + let mut chunk_rows = 0; + let mut selected_rows = 0; + + // Advance until enough rows have been selected to satisfy the batch size, + // or until the mask is exhausted. This mirrors the behaviour of the legacy + // `RowSelector` queue-based iteration. + while cursor < mask.len() && selected_rows < batch_size { + chunk_rows += 1; + if mask.value(cursor) { + selected_rows += 1; + } + cursor += 1; + } + + (initial_skip, chunk_rows, selected_rows, mask_start, cursor) + }; + + self.position = end_position; + + Some(MaskChunk { + initial_skip, + chunk_rows, + selected_rows, + mask_start, + }) + } + + /// Materialise the boolean values for a mask-backed chunk + pub fn mask_values_for(&self, chunk: &MaskChunk) -> Option<BooleanArray> { + match &self.storage { + RowSelectionBacking::Mask(mask) => { + if chunk.mask_start.saturating_add(chunk.chunk_rows) > mask.len() { + return None; + } + Some(BooleanArray::from( + mask.slice(chunk.mask_start, chunk.chunk_rows), + )) + } + RowSelectionBacking::Selectors(_) => None, + } + } +} + +fn boolean_mask_from_selectors(selectors: &[RowSelector]) -> BooleanBuffer { Review Comment: I think we can do even better than this (as a follow on PR) The current code still converts the result of a filter (BooleanArray) to a RowSelection, https://github.com/apache/arrow-rs/blob/cc1444a3232fa11b8485e2794a88f342bd7f97e2/parquet/src/arrow/arrow_reader/read_plan.rs#L113-L112 and then `boolean_mask_from_selectors` converts it back to a BooleanArray However I think we could apply the result of evaluating the filter directly to a `RowSelectionBacking::Mask` In fact, @XiangpengHao even has some (relatively crazy) techniques to combine masks quickly in https://github.com/apache/arrow-rs/pull/6624#discussion_r2127391600 ########## parquet/src/arrow/arrow_reader/read_plan.rs: ########## @@ -23,16 +23,31 @@ use crate::arrow::arrow_reader::{ ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector, }; use crate::errors::{ParquetError, Result}; -use arrow_array::Array; +use arrow_array::{Array, BooleanArray}; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; use arrow_select::filter::prep_null_mask_filter; use std::collections::VecDeque; +/// Strategy for materialising [`RowSelection`] during execution. +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub enum RowSelectionStrategy { + /// Automatically choose between mask- and selector-backed execution Review Comment: ```suggestion /// Automatically choose between mask- and selector-backed execution /// based on heuristics ``` ########## parquet/benches/row_selection_state.rs: ########## @@ -0,0 +1,290 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::hint; +use std::sync::Arc; + +use arrow_array::{ArrayRef, Int32Array, RecordBatch}; +use arrow_schema::{DataType, Field, Schema}; +use bytes::Bytes; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use parquet::arrow::ArrowWriter; +use parquet::arrow::arrow_reader::{ + ParquetRecordBatchReaderBuilder, RowSelection, RowSelectionStrategy, RowSelector, +}; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; + +const TOTAL_ROWS: usize = 1 << 20; +const BATCH_SIZE: usize = 1 << 10; +const BASE_SEED: u64 = 0xA55AA55A; +const READ_STRATEGIES: &[(&str, RowSelectionStrategy)] = &[ + ("read_mask", RowSelectionStrategy::Mask), + ("read_selectors", RowSelectionStrategy::Selectors), +]; + +fn criterion_benchmark(c: &mut Criterion) { + let avg_selector_lengths: &[usize] = &[16, 20, 24, 28, 32, 36, 40]; + let parquet_data = build_parquet_data(TOTAL_ROWS); + + let scenarios = [ + Scenario { + name: "uniform50", + select_ratio: 0.5, + start_with_select: false, + distribution: RunDistribution::Constant, + }, + Scenario { + name: "spread50", + select_ratio: 0.5, + start_with_select: false, + distribution: RunDistribution::Uniform { spread: 0.9 }, + }, + Scenario { + name: "sparse20", + select_ratio: 0.2, + start_with_select: false, + distribution: RunDistribution::Bimodal { + long_factor: 6.0, + long_prob: 0.1, + }, + }, + Scenario { + name: "dense80", + select_ratio: 0.8, + start_with_select: true, + distribution: RunDistribution::Bimodal { + long_factor: 4.0, + long_prob: 0.05, + }, + }, + ]; + + for scenario in scenarios.iter() { + for (offset, &avg_len) in avg_selector_lengths.iter().enumerate() { + let selectors = + generate_selectors(avg_len, TOTAL_ROWS, scenario, BASE_SEED + offset as u64); + let stats = SelectorStats::new(&selectors); + let suffix = format!( + "{}-avg{:.1}-sel{:02}", + scenario.name, + stats.average_selector_len, + (stats.select_ratio * 100.0).round() as u32 + ); + + let bench_input = BenchInput { + parquet_data: parquet_data.clone(), + selection: RowSelection::from(selectors.clone()), + }; + + for (label, strategy) in READ_STRATEGIES.iter().copied() { + c.bench_with_input( + BenchmarkId::new(label, &suffix), + &bench_input, + |b, input| { + b.iter(|| { + let total = run_read(&input.parquet_data, &input.selection, strategy); + hint::black_box(total); + }); + }, + ); + } + } + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); + +struct BenchInput { + parquet_data: Bytes, + selection: RowSelection, +} + +fn run_read( + parquet_data: &Bytes, + selection: &RowSelection, + strategy: RowSelectionStrategy, +) -> usize { + let reader = ParquetRecordBatchReaderBuilder::try_new(parquet_data.clone()) + .unwrap() + .with_batch_size(BATCH_SIZE) + .with_row_selection(selection.clone()) + .with_row_selection_strategy(strategy) + .build() + .unwrap(); + + let mut total_rows = 0usize; + for batch in reader { + let batch = batch.unwrap(); + total_rows += batch.num_rows(); + } + total_rows +} + +fn build_parquet_data(total_rows: usize) -> Bytes { + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int32, + false, + )])); + let values = Int32Array::from_iter_values((0..total_rows).map(|v| v as i32)); + let columns: Vec<ArrayRef> = vec![Arc::new(values) as ArrayRef]; Review Comment: I recommend we also test with some variable length rows too -- as the selection overhead may be different for StringArray/StringViewArray than a i32 ########## parquet/benches/row_selection_state.rs: ########## @@ -0,0 +1,290 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::hint; +use std::sync::Arc; + +use arrow_array::{ArrayRef, Int32Array, RecordBatch}; +use arrow_schema::{DataType, Field, Schema}; +use bytes::Bytes; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use parquet::arrow::ArrowWriter; +use parquet::arrow::arrow_reader::{ + ParquetRecordBatchReaderBuilder, RowSelection, RowSelectionStrategy, RowSelector, +}; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; + +const TOTAL_ROWS: usize = 1 << 20; +const BATCH_SIZE: usize = 1 << 10; +const BASE_SEED: u64 = 0xA55AA55A; +const READ_STRATEGIES: &[(&str, RowSelectionStrategy)] = &[ + ("read_mask", RowSelectionStrategy::Mask), + ("read_selectors", RowSelectionStrategy::Selectors), +]; + +fn criterion_benchmark(c: &mut Criterion) { + let avg_selector_lengths: &[usize] = &[16, 20, 24, 28, 32, 36, 40]; + let parquet_data = build_parquet_data(TOTAL_ROWS); + + let scenarios = [ + Scenario { + name: "uniform50", + select_ratio: 0.5, + start_with_select: false, + distribution: RunDistribution::Constant, + }, + Scenario { + name: "spread50", + select_ratio: 0.5, + start_with_select: false, + distribution: RunDistribution::Uniform { spread: 0.9 }, + }, + Scenario { + name: "sparse20", + select_ratio: 0.2, + start_with_select: false, + distribution: RunDistribution::Bimodal { + long_factor: 6.0, + long_prob: 0.1, + }, + }, + Scenario { + name: "dense80", + select_ratio: 0.8, + start_with_select: true, + distribution: RunDistribution::Bimodal { + long_factor: 4.0, + long_prob: 0.05, + }, + }, + ]; + + for scenario in scenarios.iter() { + for (offset, &avg_len) in avg_selector_lengths.iter().enumerate() { + let selectors = + generate_selectors(avg_len, TOTAL_ROWS, scenario, BASE_SEED + offset as u64); + let stats = SelectorStats::new(&selectors); + let suffix = format!( + "{}-avg{:.1}-sel{:02}", + scenario.name, + stats.average_selector_len, + (stats.select_ratio * 100.0).round() as u32 + ); + + let bench_input = BenchInput { + parquet_data: parquet_data.clone(), + selection: RowSelection::from(selectors.clone()), + }; + + for (label, strategy) in READ_STRATEGIES.iter().copied() { + c.bench_with_input( + BenchmarkId::new(label, &suffix), + &bench_input, + |b, input| { + b.iter(|| { + let total = run_read(&input.parquet_data, &input.selection, strategy); + hint::black_box(total); + }); + }, + ); + } + } + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); + +struct BenchInput { + parquet_data: Bytes, + selection: RowSelection, +} + +fn run_read( + parquet_data: &Bytes, + selection: &RowSelection, + strategy: RowSelectionStrategy, +) -> usize { + let reader = ParquetRecordBatchReaderBuilder::try_new(parquet_data.clone()) + .unwrap() + .with_batch_size(BATCH_SIZE) + .with_row_selection(selection.clone()) + .with_row_selection_strategy(strategy) + .build() + .unwrap(); + + let mut total_rows = 0usize; + for batch in reader { + let batch = batch.unwrap(); + total_rows += batch.num_rows(); + } + total_rows +} + +fn build_parquet_data(total_rows: usize) -> Bytes { + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int32, + false, + )])); + let values = Int32Array::from_iter_values((0..total_rows).map(|v| v as i32)); + let columns: Vec<ArrayRef> = vec![Arc::new(values) as ArrayRef]; + let batch = RecordBatch::try_new(schema.clone(), columns).unwrap(); + + let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap(); + writer.write(&batch).unwrap(); + let buffer = writer.into_inner().unwrap(); + Bytes::from(buffer) +} + +#[derive(Clone)] +struct Scenario { + name: &'static str, + select_ratio: f64, + start_with_select: bool, + distribution: RunDistribution, +} + +#[derive(Clone)] +enum RunDistribution { + Constant, + Uniform { spread: f64 }, + Bimodal { long_factor: f64, long_prob: f64 }, +} + +fn generate_selectors( + avg_selector_len: usize, + total_rows: usize, + scenario: &Scenario, + seed: u64, +) -> Vec<RowSelector> { + assert!( + (0.0..=1.0).contains(&scenario.select_ratio), + "select_ratio must be in [0, 1]" + ); + + let mut select_mean = scenario.select_ratio * 2.0 * avg_selector_len as f64; + let mut skip_mean = (1.0 - scenario.select_ratio) * 2.0 * avg_selector_len as f64; + + select_mean = select_mean.max(1.0); + skip_mean = skip_mean.max(1.0); + + let sum = select_mean + skip_mean; + // Rebalance the sampled select/skip run lengths so their sum matches the requested + // average selector length while respecting the configured selectivity ratio. + let scale = if sum == 0.0 { + 1.0 + } else { + (2.0 * avg_selector_len as f64) / sum + }; + select_mean *= scale; + skip_mean *= scale; + + let mut rng = StdRng::seed_from_u64(seed ^ (avg_selector_len as u64).wrapping_mul(0x9E3779B1)); + let mut selectors = Vec::with_capacity(total_rows / avg_selector_len.max(1)); + let mut remaining = total_rows; + let mut is_select = scenario.start_with_select; + + while remaining > 0 { + let mean = if is_select { select_mean } else { skip_mean }; + let len = sample_length(mean, &scenario.distribution, &mut rng).max(1); + let len = len.min(remaining); + selectors.push(if is_select { + RowSelector::select(len) + } else { + RowSelector::skip(len) + }); + remaining -= len; + if remaining == 0 { + break; + } + is_select = !is_select; + } + + let selection: RowSelection = selectors.into(); + selection.into() +} + +fn sample_length(mean: f64, distribution: &RunDistribution, rng: &mut StdRng) -> usize { + match distribution { + RunDistribution::Constant => mean.round().max(1.0) as usize, + RunDistribution::Uniform { spread } => { + let spread = spread.clamp(0.0, 0.99); + let lower = (mean * (1.0 - spread)).max(1.0); + let upper = (mean * (1.0 + spread)).max(lower + f64::EPSILON); + if (upper - lower) < 1.0 { + lower.round().max(1.0) as usize + } else { + let low = lower.floor() as usize; + let high = upper.ceil() as usize; + rng.random_range(low..=high).max(1) + } + } + RunDistribution::Bimodal { + long_factor, + long_prob, + } => { + let long_prob = long_prob.clamp(0.0, 0.5); + let short_prob = 1.0 - long_prob; + let short_factor = if short_prob == 0.0 { + 1.0 / long_factor.max(f64::EPSILON) + } else { + (1.0 - long_prob * long_factor).max(0.0) / short_prob + }; + let use_long = rng.random_bool(long_prob); + let factor = if use_long { + *long_factor + } else { + short_factor.max(0.1) + }; + (mean * factor).round().max(1.0) as usize + } + } +} + +struct SelectorStats { Review Comment: this is a neat structure ########## parquet/src/arrow/arrow_reader/read_plan.rs: ########## @@ -248,3 +275,190 @@ impl ReadPlan { self.batch_size } } + +/// Cursor for iterating a [`RowSelection`] during execution within a [`ReadPlan`]. +/// +/// This keeps per-reader state such as the current position and delegates the +/// actual storage strategy to [`RowSelectionBacking`]. +#[derive(Debug)] +pub struct RowSelectionCursor { + /// Backing storage describing how the selection is materialised + storage: RowSelectionBacking, + /// Current absolute offset into the selection + position: usize, +} + +/// Backing storage that powers [`RowSelectionCursor`]. +/// +/// The cursor either walks a boolean mask (dense representation) or a queue +/// of [`RowSelector`] ranges (sparse representation). +#[derive(Debug)] +enum RowSelectionBacking { + Mask(BooleanBuffer), + Selectors(VecDeque<RowSelector>), +} + +/// Result of computing the next chunk to read when using a bitmap mask +pub struct MaskChunk { + /// Number of leading rows to skip before reaching selected rows + pub initial_skip: usize, + /// Total rows covered by this chunk (selected + skipped) + pub chunk_rows: usize, + /// Rows actually selected within the chunk + pub selected_rows: usize, + /// Starting offset within the mask where the chunk begins + pub mask_start: usize, +} + +impl RowSelectionCursor { + /// Create a cursor, choosing an efficient backing representation + fn new(selectors: Vec<RowSelector>, strategy: RowSelectionStrategy) -> Self { + if matches!(strategy, RowSelectionStrategy::Selectors) { + return Self { + storage: RowSelectionBacking::Selectors(selectors.into()), + position: 0, + }; + } + + let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); + let selector_count = selectors.len(); + const AVG_SELECTOR_LEN_MASK_THRESHOLD: usize = 16; Review Comment: I recommend we pull this constant somewhere that is easier to find along with a comment about what it is and how it was chosen. I suggest simply making it a constant in this module ########## parquet/src/arrow/arrow_reader/read_plan.rs: ########## @@ -248,3 +275,190 @@ impl ReadPlan { self.batch_size } } + +/// Cursor for iterating a [`RowSelection`] during execution within a [`ReadPlan`]. +/// +/// This keeps per-reader state such as the current position and delegates the +/// actual storage strategy to [`RowSelectionBacking`]. +#[derive(Debug)] +pub struct RowSelectionCursor { + /// Backing storage describing how the selection is materialised + storage: RowSelectionBacking, + /// Current absolute offset into the selection + position: usize, +} + +/// Backing storage that powers [`RowSelectionCursor`]. +/// +/// The cursor either walks a boolean mask (dense representation) or a queue +/// of [`RowSelector`] ranges (sparse representation). +#[derive(Debug)] +enum RowSelectionBacking { + Mask(BooleanBuffer), + Selectors(VecDeque<RowSelector>), +} + +/// Result of computing the next chunk to read when using a bitmap mask +pub struct MaskChunk { + /// Number of leading rows to skip before reaching selected rows + pub initial_skip: usize, + /// Total rows covered by this chunk (selected + skipped) + pub chunk_rows: usize, + /// Rows actually selected within the chunk + pub selected_rows: usize, + /// Starting offset within the mask where the chunk begins + pub mask_start: usize, +} + +impl RowSelectionCursor { + /// Create a cursor, choosing an efficient backing representation + fn new(selectors: Vec<RowSelector>, strategy: RowSelectionStrategy) -> Self { + if matches!(strategy, RowSelectionStrategy::Selectors) { + return Self { + storage: RowSelectionBacking::Selectors(selectors.into()), + position: 0, + }; + } + + let total_rows: usize = selectors.iter().map(|s| s.row_count).sum(); + let selector_count = selectors.len(); + const AVG_SELECTOR_LEN_MASK_THRESHOLD: usize = 16; + // Prefer a bitmap mask when the selectors are short on average, as the mask + // (re)construction cost is amortized by a simpler execution path during reads. + let use_mask = match strategy { + RowSelectionStrategy::Mask => true, + RowSelectionStrategy::Auto => { + selector_count == 0 + || total_rows < selector_count.saturating_mul(AVG_SELECTOR_LEN_MASK_THRESHOLD) + } + RowSelectionStrategy::Selectors => unreachable!(), + }; + + let storage = if use_mask { + RowSelectionBacking::Mask(boolean_mask_from_selectors(&selectors)) + } else { + RowSelectionBacking::Selectors(selectors.into()) + }; + + Self { + storage, + position: 0, + } + } + + /// Returns `true` when no further rows remain + pub fn is_empty(&self) -> bool { + match &self.storage { + RowSelectionBacking::Mask(mask) => self.position >= mask.len(), + RowSelectionBacking::Selectors(selectors) => selectors.is_empty(), + } + } + + /// Current position within the overall selection + pub fn position(&self) -> usize { + self.position + } + + /// Return the next [`RowSelector`] when using the sparse representation + pub fn next_selector(&mut self) -> Option<RowSelector> { + match &mut self.storage { + RowSelectionBacking::Selectors(selectors) => { + let selector = selectors.pop_front()?; + self.position += selector.row_count; + Some(selector) + } + RowSelectionBacking::Mask(_) => None, + } + } + + /// Return a selector to the front, rewinding the position (sparse-only) + pub fn return_selector(&mut self, selector: RowSelector) { + match &mut self.storage { + RowSelectionBacking::Selectors(selectors) => { + self.position = self.position.saturating_sub(selector.row_count); + selectors.push_front(selector); + } + RowSelectionBacking::Mask(_) => { + unreachable!("return_selector called for mask-based RowSelectionCursor") + } + } + } + + /// Returns `true` if the cursor is backed by a boolean mask + pub fn is_mask_backed(&self) -> bool { + matches!(self.storage, RowSelectionBacking::Mask(_)) + } + + /// Advance through the mask representation, producing the next chunk summary + pub fn next_mask_chunk(&mut self, batch_size: usize) -> Option<MaskChunk> { + let (initial_skip, chunk_rows, selected_rows, mask_start, end_position) = { + let mask = match &self.storage { + RowSelectionBacking::Mask(mask) => mask, + RowSelectionBacking::Selectors(_) => return None, + }; + + if self.position >= mask.len() { + return None; + } + + let start_position = self.position; + let mut cursor = start_position; + let mut initial_skip = 0; + + while cursor < mask.len() && !mask.value(cursor) { Review Comment: I suspect there are all sorts of bit level hacks we can do to make this faster (as a follow on PR) - for example leveraging the code to count the number of 1s a u64 at a time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
