This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 911331aafa [Parquet] Adaptive Parquet Predicate Pushdown (#8733)
911331aafa is described below
commit 911331aafa13f5e230440cf5d02feb245985c64e
Author: Huang Qiwei <[email protected]>
AuthorDate: Sat Nov 15 00:05:22 2025 +0800
[Parquet] Adaptive Parquet Predicate Pushdown (#8733)
# Which issue does this PR close?
- Closes #8565
- Closes #5523
# Rationale for this change
Predicate pushdown and row filters often yield very short alternating
select/skip runs; materialising those runs through a
`VecDeque<RowSelector>` is both allocation-heavy and prone to panics
when entire pages are skipped and never decoded. This PR introduces a
mask-backed execution path for short selectors, adds the heuristics and
guards needed to decide between masks and selectors,
and provides tooling to measure the trade‑offs so we can tune the
threshold confidently.
# What changes are included in this PR?
- Introduced `RowSelectionStrategy` plus a `RowSelectionCursor` that can
iterate either a boolean mask or the legacy selector queue, along with a
public guard/override so tests and benchmarks can
tweak the average-selector-length heuristic.
- `ReadPlanBuilder` now trims selections, estimates their density, and
chooses the faster mask strategy when safe; `ParquetRecordBatchReader`
streams mask chunks, skips contiguous gaps, filters
the projected batch with Arrow’s boolean kernel, and still falls back to
selectors when needed.
- `RowSelection` can now inspect offset indexes to detect when a
selection would skip entire pages; both the synchronous and asynchronous
readers consult that signal so row filters no longer
panic when predicate pruning drops whole pages.
- Added the `row_selection_state` Criterion benchmark plus
`dev/row_selection_analysis.py` to run the bench, export CSV summaries,
and render comparative plots across selector lengths, column
widths, and Utf8View payload sizes; wired the bench into
`parquet/Cargo.toml`.
- Expanded unit/async regression coverage around mask iteration
(`test_row_selection_interleaved_skip`,
`test_row_selection_mask_sparse_rows`,
`test_row_filter_full_page_skip_is_handled` and its
async twin) and updated the push-decoder size assertion to reflect the
new state.
# Are these changes tested?
- Added the synchronous and async row-filter regression tests above plus
new `ReadPlanBuilder` threshold tests; the Criterion bench + Python
tooling provide manual validation for performance
tuning. Full parquet/arrow test suites will still run in CI.
# Are there any user-facing changes?
- Row-selection heavy scans should see lower CPU usage because dense
masks are streamed directly, while the new guards prevent
predicate-pruned queries from panicking.
- Advanced users now have access to `RowSelectionStrategy`,
`RowSelectionCursor`, and `set_avg_selector_len_mask_threshold` for
experimentation, and developers gain the new benchmarking/plotting
workflow. No breaking API changes were introduced.
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
parquet/Cargo.toml | 5 +
parquet/benches/row_selection_cursor.rs | 501 +++++++++++++++++++++
parquet/src/arrow/arrow_reader/mod.rs | 319 +++++++++++--
parquet/src/arrow/arrow_reader/read_plan.rs | 124 ++++-
parquet/src/arrow/arrow_reader/selection.rs | 243 +++++++++-
parquet/src/arrow/async_reader/mod.rs | 83 +++-
parquet/src/arrow/in_memory_row_group.rs | 4 +-
parquet/src/arrow/push_decoder/mod.rs | 2 +
.../src/arrow/push_decoder/reader_builder/mod.rs | 86 +++-
9 files changed, 1318 insertions(+), 49 deletions(-)
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index ba9892d329..504ba312c1 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -260,5 +260,10 @@ name = "row_selector"
harness = false
required-features = ["arrow"]
+[[bench]]
+name = "row_selection_cursor"
+harness = false
+required-features = ["arrow"]
+
[lib]
bench = false
diff --git a/parquet/benches/row_selection_cursor.rs
b/parquet/benches/row_selection_cursor.rs
new file mode 100644
index 0000000000..49c9e6d68a
--- /dev/null
+++ b/parquet/benches/row_selection_cursor.rs
@@ -0,0 +1,501 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::hint;
+use std::sync::Arc;
+
+use arrow_array::builder::StringViewBuilder;
+use arrow_array::{ArrayRef, Float64Array, Int32Array, RecordBatch,
StringViewArray};
+use arrow_schema::{DataType, Field, Schema};
+use bytes::Bytes;
+use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
+use parquet::arrow::ArrowWriter;
+use parquet::arrow::arrow_reader::{
+ ParquetRecordBatchReaderBuilder, RowSelection, RowSelectionPolicy,
RowSelector,
+};
+use rand::rngs::StdRng;
+use rand::{Rng, SeedableRng};
+
+const TOTAL_ROWS: usize = 1 << 20;
+const BATCH_SIZE: usize = 1 << 10;
+const BASE_SEED: u64 = 0xA55AA55A;
+const AVG_SELECTOR_LENGTHS: &[usize] = &[4, 8, 12, 16, 20, 24, 28, 32, 36, 40];
+const COLUMN_WIDTHS: &[usize] = &[2, 4, 8, 16, 32];
+const UTF8VIEW_LENS: &[usize] = &[4, 8, 16, 32, 64, 128, 256];
+const BENCH_MODES: &[BenchMode] = &[BenchMode::ReadSelector,
BenchMode::ReadMask];
+
+struct DataProfile {
+ name: &'static str,
+ build_batch: fn(usize) -> RecordBatch,
+}
+
+const DATA_PROFILES: &[DataProfile] = &[
+ DataProfile {
+ name: "int32",
+ build_batch: build_int32_batch,
+ },
+ DataProfile {
+ name: "float64",
+ build_batch: build_float64_batch,
+ },
+ DataProfile {
+ name: "utf8view",
+ build_batch: build_utf8view_batch,
+ },
+];
+
+fn criterion_benchmark(c: &mut Criterion) {
+ let scenarios = [
+ /* uniform50 (50% selected, constant run lengths, starts with skip)
+ ```text
+ ┌───────────────┐
+ │ │ skip
+ │ │
+ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select
+ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│
+ │ │ skip
+ │ │
+ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select
+ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│
+ │ ... │
+ └───────────────┘
+ ``` */
+ Scenario {
+ name: "uniform50",
+ select_ratio: 0.5,
+ start_with_select: false,
+ distribution: RunDistribution::Constant,
+ },
+ /* spread50 (50% selected, large jitter in run lengths, starts with
skip)
+ ```text
+ ┌───────────────┐
+ │ │ skip (long)
+ │ │
+ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (short)
+ │ │ skip (short)
+ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (long)
+ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│
+ │ │ skip (medium)
+ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (medium)
+ │ ... │
+ └───────────────┘
+ ``` */
+ Scenario {
+ name: "spread50",
+ select_ratio: 0.5,
+ start_with_select: false,
+ distribution: RunDistribution::Uniform { spread: 0.9 },
+ },
+ /* sparse20 (20% selected, bimodal: occasional long runs, starts with
skip)
+ ```text
+ ┌───────────────┐
+ │ │ skip (long)
+ │ │
+ │ │
+ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (short)
+ │ │ skip (long)
+ │ │
+ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (occasional long)
+ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│
+ │ ... │
+ └───────────────┘
+ ``` */
+ Scenario {
+ name: "sparse20",
+ select_ratio: 0.2,
+ start_with_select: false,
+ distribution: RunDistribution::Bimodal {
+ long_factor: 6.0,
+ long_prob: 0.1,
+ },
+ },
+ /* dense80 (80% selected, bimodal: occasional long runs, starts with
select)
+ ```text
+ ┌───────────────┐
+ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (long)
+ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│
+ │ │ skip (short)
+ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (long)
+ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│
+ │ │ skip (very short)
+ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ select (long)
+ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│
+ │ ... │
+ └───────────────┘
+ ``` */
+ Scenario {
+ name: "dense80",
+ select_ratio: 0.8,
+ start_with_select: true,
+ distribution: RunDistribution::Bimodal {
+ long_factor: 4.0,
+ long_prob: 0.05,
+ },
+ },
+ ];
+
+ let base_parquet = build_parquet_data(TOTAL_ROWS, build_int32_batch);
+ let base_scenario = &scenarios[0];
+
+ for (idx, scenario) in scenarios.iter().enumerate() {
+ // The first scenario is a special case for backwards compatibility
with
+ // existing benchmark result formats.
+ let suite = if idx == 0 { "len" } else { "scenario" };
+ bench_over_lengths(
+ c,
+ suite,
+ scenario.name,
+ &base_parquet,
+ scenario,
+ BASE_SEED ^ ((idx as u64) << 16),
+ );
+ }
+
+ for (profile_idx, profile) in DATA_PROFILES.iter().enumerate() {
+ let parquet_data = build_parquet_data(TOTAL_ROWS, profile.build_batch);
+ bench_over_lengths(
+ c,
+ "dtype",
+ profile.name,
+ &parquet_data,
+ base_scenario,
+ BASE_SEED ^ ((profile_idx as u64) << 24),
+ );
+ }
+
+ for (offset, &column_count) in COLUMN_WIDTHS.iter().enumerate() {
+ let parquet_data =
write_parquet_batch(build_int32_columns_batch(TOTAL_ROWS, column_count));
+ let variant_label = format!("C{:02}", column_count);
+ bench_over_lengths(
+ c,
+ "columns",
+ &variant_label,
+ &parquet_data,
+ base_scenario,
+ BASE_SEED ^ ((offset as u64) << 32),
+ );
+ }
+
+ for (offset, &len) in UTF8VIEW_LENS.iter().enumerate() {
+ let batch = build_utf8view_batch_with_len(TOTAL_ROWS, len);
+ let parquet_data = write_parquet_batch(batch);
+ let variant_label = format!("utf8view-L{:03}", len);
+ bench_over_lengths(
+ c,
+ "utf8view-len",
+ &variant_label,
+ &parquet_data,
+ base_scenario,
+ BASE_SEED ^ ((offset as u64) << 40),
+ );
+ }
+}
+
+fn bench_over_lengths(
+ c: &mut Criterion,
+ suite: &str,
+ variant: &str,
+ parquet_data: &Bytes,
+ scenario: &Scenario,
+ seed_base: u64,
+) {
+ for (offset, &avg_len) in AVG_SELECTOR_LENGTHS.iter().enumerate() {
+ let selectors =
+ generate_selectors(avg_len, TOTAL_ROWS, scenario, seed_base +
offset as u64);
+ let stats = SelectorStats::new(&selectors);
+ let selection = RowSelection::from(selectors);
+ let suffix = format!(
+ "{}-{}-{}-L{:02}-avg{:.1}-sel{:02}",
+ suite,
+ scenario.name,
+ variant,
+ avg_len,
+ stats.average_selector_len,
+ (stats.select_ratio * 100.0).round() as u32
+ );
+
+ let bench_input = BenchInput {
+ parquet_data: parquet_data.clone(),
+ selection,
+ };
+
+ for &mode in BENCH_MODES {
+ c.bench_with_input(
+ BenchmarkId::new(mode.label(), &suffix),
+ &bench_input,
+ |b, input| {
+ b.iter(|| {
+ let total = run_read(&input.parquet_data,
&input.selection, mode.policy());
+ hint::black_box(total);
+ });
+ },
+ );
+ }
+ }
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
+
+struct BenchInput {
+ parquet_data: Bytes,
+ selection: RowSelection,
+}
+
+fn run_read(parquet_data: &Bytes, selection: &RowSelection, policy:
RowSelectionPolicy) -> usize {
+ let reader = ParquetRecordBatchReaderBuilder::try_new(parquet_data.clone())
+ .unwrap()
+ .with_batch_size(BATCH_SIZE)
+ .with_row_selection(selection.clone())
+ .with_row_selection_policy(policy)
+ .build()
+ .unwrap();
+
+ let mut total_rows = 0usize;
+ for batch in reader {
+ let batch = batch.unwrap();
+ total_rows += batch.num_rows();
+ }
+ total_rows
+}
+
+fn build_parquet_data(total_rows: usize, build_batch: fn(usize) ->
RecordBatch) -> Bytes {
+ let batch = build_batch(total_rows);
+ write_parquet_batch(batch)
+}
+
+fn build_single_column_batch(data_type: DataType, array: ArrayRef) ->
RecordBatch {
+ let schema = Arc::new(Schema::new(vec![Field::new("value", data_type,
false)]));
+ RecordBatch::try_new(schema, vec![array]).unwrap()
+}
+
+fn build_int32_batch(total_rows: usize) -> RecordBatch {
+ let values = Int32Array::from_iter_values((0..total_rows).map(|v| v as
i32));
+ build_single_column_batch(DataType::Int32, Arc::new(values) as ArrayRef)
+}
+
+fn build_float64_batch(total_rows: usize) -> RecordBatch {
+ let values = Float64Array::from_iter_values((0..total_rows).map(|v| v as
f64));
+ build_single_column_batch(DataType::Float64, Arc::new(values) as ArrayRef)
+}
+
+fn build_utf8view_batch(total_rows: usize) -> RecordBatch {
+ let mut builder = StringViewBuilder::new();
+ // Mix short and long values.
+ for i in 0..total_rows {
+ match i % 5 {
+ 0 => builder.append_value("alpha"),
+ 1 => builder.append_value("beta"),
+ 2 => builder.append_value("gamma"),
+ 3 => builder.append_value("delta"),
+ _ => builder.append_value("a longer utf8 string payload to test
view storage"),
+ }
+ }
+ let values: StringViewArray = builder.finish();
+ build_single_column_batch(DataType::Utf8View, Arc::new(values) as ArrayRef)
+}
+
+fn build_utf8view_batch_with_len(total_rows: usize, len: usize) -> RecordBatch
{
+ let mut builder = StringViewBuilder::new();
+ let value: String = "a".repeat(len);
+ for _ in 0..total_rows {
+ builder.append_value(&value);
+ }
+ let values: StringViewArray = builder.finish();
+ build_single_column_batch(DataType::Utf8View, Arc::new(values) as ArrayRef)
+}
+
+fn build_int32_columns_batch(total_rows: usize, num_columns: usize) ->
RecordBatch {
+ let base_values: ArrayRef = Arc::new(Int32Array::from_iter_values(
+ (0..total_rows).map(|v| v as i32),
+ ));
+ let mut fields = Vec::with_capacity(num_columns);
+ let mut columns = Vec::with_capacity(num_columns);
+ for idx in 0..num_columns {
+ fields.push(Field::new(format!("value{}", idx), DataType::Int32,
false));
+ columns.push(base_values.clone());
+ }
+ let schema = Arc::new(Schema::new(fields));
+ RecordBatch::try_new(schema, columns).unwrap()
+}
+
+fn write_parquet_batch(batch: RecordBatch) -> Bytes {
+ let schema = batch.schema();
+ let mut writer = ArrowWriter::try_new(Vec::new(), schema.clone(),
None).unwrap();
+ writer.write(&batch).unwrap();
+ let buffer = writer.into_inner().unwrap();
+ Bytes::from(buffer)
+}
+
+#[derive(Clone)]
+struct Scenario {
+ name: &'static str,
+ select_ratio: f64,
+ start_with_select: bool,
+ distribution: RunDistribution,
+}
+
+#[derive(Clone)]
+enum RunDistribution {
+ Constant,
+ Uniform { spread: f64 },
+ Bimodal { long_factor: f64, long_prob: f64 },
+}
+
+fn generate_selectors(
+ avg_selector_len: usize,
+ total_rows: usize,
+ scenario: &Scenario,
+ seed: u64,
+) -> Vec<RowSelector> {
+ assert!(
+ (0.0..=1.0).contains(&scenario.select_ratio),
+ "select_ratio must be in [0, 1]"
+ );
+
+ let mut select_mean = scenario.select_ratio * 2.0 * avg_selector_len as
f64;
+ let mut skip_mean = (1.0 - scenario.select_ratio) * 2.0 * avg_selector_len
as f64;
+
+ select_mean = select_mean.max(1.0);
+ skip_mean = skip_mean.max(1.0);
+
+ let sum = select_mean + skip_mean;
+ // Rebalance the sampled select/skip run lengths so their sum matches the
requested
+ // average selector length while respecting the configured selectivity
ratio.
+ let scale = if sum == 0.0 {
+ 1.0
+ } else {
+ (2.0 * avg_selector_len as f64) / sum
+ };
+ select_mean *= scale;
+ skip_mean *= scale;
+
+ let mut rng = StdRng::seed_from_u64(seed ^ (avg_selector_len as
u64).wrapping_mul(0x9E3779B1));
+ let mut selectors = Vec::with_capacity(total_rows /
avg_selector_len.max(1));
+ let mut remaining = total_rows;
+ let mut is_select = scenario.start_with_select;
+
+ while remaining > 0 {
+ let mean = if is_select { select_mean } else { skip_mean };
+ let len = sample_length(mean, &scenario.distribution, &mut rng).max(1);
+ let len = len.min(remaining);
+ selectors.push(if is_select {
+ RowSelector::select(len)
+ } else {
+ RowSelector::skip(len)
+ });
+ remaining -= len;
+ if remaining == 0 {
+ break;
+ }
+ is_select = !is_select;
+ }
+
+ let selection: RowSelection = selectors.into();
+ selection.into()
+}
+
+fn sample_length(mean: f64, distribution: &RunDistribution, rng: &mut StdRng)
-> usize {
+ match distribution {
+ RunDistribution::Constant => mean.round().max(1.0) as usize,
+ RunDistribution::Uniform { spread } => {
+ let spread = spread.clamp(0.0, 0.99);
+ let lower = (mean * (1.0 - spread)).max(1.0);
+ let upper = (mean * (1.0 + spread)).max(lower + f64::EPSILON);
+ if (upper - lower) < 1.0 {
+ lower.round().max(1.0) as usize
+ } else {
+ let low = lower.floor() as usize;
+ let high = upper.ceil() as usize;
+ rng.random_range(low..=high).max(1)
+ }
+ }
+ RunDistribution::Bimodal {
+ long_factor,
+ long_prob,
+ } => {
+ let long_prob = long_prob.clamp(0.0, 0.5);
+ let short_prob = 1.0 - long_prob;
+ let short_factor = if short_prob == 0.0 {
+ 1.0 / long_factor.max(f64::EPSILON)
+ } else {
+ (1.0 - long_prob * long_factor).max(0.0) / short_prob
+ };
+ let use_long = rng.random_bool(long_prob);
+ let factor = if use_long {
+ *long_factor
+ } else {
+ short_factor.max(0.1)
+ };
+ (mean * factor).round().max(1.0) as usize
+ }
+ }
+}
+
+#[derive(Clone, Copy)]
+enum BenchMode {
+ ReadSelector,
+ ReadMask,
+}
+
+impl BenchMode {
+ fn label(self) -> &'static str {
+ match self {
+ BenchMode::ReadSelector => "read_selector",
+ BenchMode::ReadMask => "read_mask",
+ }
+ }
+
+ fn policy(self) -> RowSelectionPolicy {
+ match self {
+ BenchMode::ReadSelector => RowSelectionPolicy::Selectors,
+ BenchMode::ReadMask => RowSelectionPolicy::Mask,
+ }
+ }
+}
+
+struct SelectorStats {
+ average_selector_len: f64,
+ select_ratio: f64,
+}
+
+impl SelectorStats {
+ fn new(selectors: &[RowSelector]) -> Self {
+ if selectors.is_empty() {
+ return Self {
+ average_selector_len: 0.0,
+ select_ratio: 0.0,
+ };
+ }
+
+ let total_rows: usize = selectors.iter().map(|s| s.row_count).sum();
+ let selected_rows: usize = selectors
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+
+ Self {
+ average_selector_len: total_rows as f64 / selectors.len() as f64,
+ select_ratio: if total_rows == 0 {
+ 0.0
+ } else {
+ selected_rows as f64 / total_rows as f64
+ },
+ }
+ }
+}
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index 6122b861f4..94a3cf9e34 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -17,12 +17,12 @@
//! Contains reader which reads parquet data into arrow [`RecordBatch`]
-use arrow_array::Array;
use arrow_array::cast::AsArray;
-use arrow_array::{RecordBatch, RecordBatchReader};
+use arrow_array::{Array, RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
+use arrow_select::filter::filter_record_batch;
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
-pub use selection::{RowSelection, RowSelector};
+pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy,
RowSelector};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
@@ -45,12 +45,13 @@ use crate::file::reader::{ChunkReader,
SerializedPageReader};
use crate::schema::types::SchemaDescriptor;
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
+// Exposed so integration tests and benchmarks can temporarily override the
threshold.
pub use read_plan::{ReadPlan, ReadPlanBuilder};
mod filter;
pub mod metrics;
mod read_plan;
-mod selection;
+pub(crate) mod selection;
pub mod statistics;
/// Builder for constructing Parquet readers that decode into [Apache Arrow]
@@ -126,6 +127,8 @@ pub struct ArrowReaderBuilder<T> {
pub(crate) selection: Option<RowSelection>,
+ pub(crate) row_selection_policy: RowSelectionPolicy,
+
pub(crate) limit: Option<usize>,
pub(crate) offset: Option<usize>,
@@ -147,6 +150,7 @@ impl<T: Debug> Debug for ArrowReaderBuilder<T> {
.field("projection", &self.projection)
.field("filter", &self.filter)
.field("selection", &self.selection)
+ .field("row_selection_policy", &self.row_selection_policy)
.field("limit", &self.limit)
.field("offset", &self.offset)
.field("metrics", &self.metrics)
@@ -166,6 +170,7 @@ impl<T> ArrowReaderBuilder<T> {
projection: ProjectionMask::all(),
filter: None,
selection: None,
+ row_selection_policy: RowSelectionPolicy::default(),
limit: None,
offset: None,
metrics: ArrowReaderMetrics::Disabled,
@@ -214,6 +219,16 @@ impl<T> ArrowReaderBuilder<T> {
}
}
+ /// Configure how row selections should be materialised during execution
+ ///
+ /// See [`RowSelectionPolicy`] for more details
+ pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self
{
+ Self {
+ row_selection_policy: policy,
+ ..self
+ }
+ }
+
/// Provide a [`RowSelection`] to filter out rows, and avoid fetching their
/// data into memory.
///
@@ -907,11 +922,12 @@ impl<T: ChunkReader + 'static>
ParquetRecordBatchReaderBuilder<T> {
metadata,
schema: _,
fields,
- batch_size: _,
+ batch_size,
row_groups,
projection,
mut filter,
selection,
+ row_selection_policy,
limit,
offset,
metrics,
@@ -920,9 +936,7 @@ impl<T: ChunkReader + 'static>
ParquetRecordBatchReaderBuilder<T> {
} = self;
// Try to avoid allocate large buffer
- let batch_size = self
- .batch_size
- .min(metadata.file_metadata().num_rows() as usize);
+ let batch_size = batch_size.min(metadata.file_metadata().num_rows() as
usize);
let row_groups = row_groups.unwrap_or_else(||
(0..metadata.num_row_groups()).collect());
@@ -932,7 +946,9 @@ impl<T: ChunkReader + 'static>
ParquetRecordBatchReaderBuilder<T> {
row_groups,
};
- let mut plan_builder =
ReadPlanBuilder::new(batch_size).with_selection(selection);
+ let mut plan_builder = ReadPlanBuilder::new(batch_size)
+ .with_selection(selection)
+ .with_row_selection_policy(row_selection_policy);
// Update selection based on any filters
if let Some(filter) = filter.as_mut() {
@@ -1083,10 +1099,83 @@ impl ParquetRecordBatchReader {
fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
let mut read_records = 0;
let batch_size = self.batch_size();
- match self.read_plan.selection_mut() {
- Some(selection) => {
- while read_records < batch_size && !selection.is_empty() {
- let front = selection.pop_front().unwrap();
+ if batch_size == 0 {
+ return Ok(None);
+ }
+ match self.read_plan.row_selection_cursor_mut() {
+ RowSelectionCursor::Mask(mask_cursor) => {
+ // Stream the record batch reader using contiguous segments of
the selection
+ // mask, avoiding the need to materialize intermediate
`RowSelector` ranges.
+ while !mask_cursor.is_empty() {
+ let Some(mask_chunk) =
mask_cursor.next_mask_chunk(batch_size) else {
+ return Ok(None);
+ };
+
+ if mask_chunk.initial_skip > 0 {
+ let skipped =
self.array_reader.skip_records(mask_chunk.initial_skip)?;
+ if skipped != mask_chunk.initial_skip {
+ return Err(general_err!(
+ "failed to skip rows, expected {}, got {}",
+ mask_chunk.initial_skip,
+ skipped
+ ));
+ }
+ }
+
+ if mask_chunk.chunk_rows == 0 {
+ if mask_cursor.is_empty() && mask_chunk.selected_rows
== 0 {
+ return Ok(None);
+ }
+ continue;
+ }
+
+ let mask = mask_cursor.mask_values_for(&mask_chunk)?;
+
+ let read =
self.array_reader.read_records(mask_chunk.chunk_rows)?;
+ if read == 0 {
+ return Err(general_err!(
+ "reached end of column while expecting {} rows",
+ mask_chunk.chunk_rows
+ ));
+ }
+ if read != mask_chunk.chunk_rows {
+ return Err(general_err!(
+ "insufficient rows read from array reader -
expected {}, got {}",
+ mask_chunk.chunk_rows,
+ read
+ ));
+ }
+
+ let array = self.array_reader.consume_batch()?;
+ // The column reader exposes the projection as a struct
array; convert this
+ // into a record batch before applying the boolean filter
mask.
+ let struct_array = array.as_struct_opt().ok_or_else(|| {
+ ArrowError::ParquetError(
+ "Struct array reader should return struct
array".to_string(),
+ )
+ })?;
+
+ let filtered_batch =
+ filter_record_batch(&RecordBatch::from(struct_array),
&mask)?;
+
+ if filtered_batch.num_rows() != mask_chunk.selected_rows {
+ return Err(general_err!(
+ "filtered rows mismatch selection - expected {},
got {}",
+ mask_chunk.selected_rows,
+ filtered_batch.num_rows()
+ ));
+ }
+
+ if filtered_batch.num_rows() == 0 {
+ continue;
+ }
+
+ return Ok(Some(filtered_batch));
+ }
+ }
+ RowSelectionCursor::Selectors(selectors_cursor) => {
+ while read_records < batch_size &&
!selectors_cursor.is_empty() {
+ let front = selectors_cursor.next_selector();
if front.skip {
let skipped =
self.array_reader.skip_records(front.row_count)?;
@@ -1112,7 +1201,7 @@ impl ParquetRecordBatchReader {
Some(remaining) if remaining != 0 => {
// if page row count less than batch_size we must
set batch size to page row count.
// add check avoid dead loop
-
selection.push_front(RowSelector::select(remaining));
+
selectors_cursor.return_selector(RowSelector::select(remaining));
need_read
}
_ => front.row_count,
@@ -1123,7 +1212,7 @@ impl ParquetRecordBatchReader {
};
}
}
- None => {
+ RowSelectionCursor::All => {
self.array_reader.read_records(batch_size)?;
}
};
@@ -1219,6 +1308,27 @@ mod tests {
use std::path::PathBuf;
use std::sync::Arc;
+ use crate::arrow::arrow_reader::{
+ ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions,
ParquetRecordBatchReader,
+ ParquetRecordBatchReaderBuilder, RowFilter, RowSelection,
RowSelectionPolicy, RowSelector,
+ };
+ use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
+ use crate::arrow::{ArrowWriter, ProjectionMask};
+ use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type
as PhysicalType};
+ use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
+ use crate::data_type::{
+ BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray,
FixedLenByteArrayType,
+ FloatType, Int32Type, Int64Type, Int96, Int96Type,
+ };
+ use crate::errors::Result;
+ use crate::file::metadata::ParquetMetaData;
+ use crate::file::properties::{EnabledStatistics, WriterProperties,
WriterVersion};
+ use crate::file::writer::SerializedFileWriter;
+ use crate::schema::parser::parse_message_type;
+ use crate::schema::types::{Type, TypePtr};
+ use crate::util::test_common::rand_gen::RandGen;
+ use arrow::compute::kernels::cmp::eq;
+ use arrow::compute::or;
use arrow_array::builder::*;
use arrow_array::cast::AsArray;
use arrow_array::types::{
@@ -1239,26 +1349,6 @@ mod tests {
use rand::{Rng, RngCore, rng};
use tempfile::tempfile;
- use crate::arrow::arrow_reader::{
- ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions,
ParquetRecordBatchReader,
- ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
- };
- use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
- use crate::arrow::{ArrowWriter, ProjectionMask};
- use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type
as PhysicalType};
- use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
- use crate::data_type::{
- BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray,
FixedLenByteArrayType,
- FloatType, Int32Type, Int64Type, Int96, Int96Type,
- };
- use crate::errors::Result;
- use crate::file::metadata::ParquetMetaData;
- use crate::file::properties::{EnabledStatistics, WriterProperties,
WriterVersion};
- use crate::file::writer::SerializedFileWriter;
- use crate::schema::parser::parse_message_type;
- use crate::schema::types::{Type, TypePtr};
- use crate::util::test_common::rand_gen::RandGen;
-
#[test]
fn test_arrow_reader_all_columns() {
let file =
get_test_file("parquet/generated_simple_numerics/blogs.parquet");
@@ -4654,6 +4744,93 @@ mod tests {
assert_eq!(out, batch.slice(2, 1));
}
+ #[test]
+ fn test_row_selection_interleaved_skip() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "v",
+ ArrowDataType::Int32,
+ false,
+ )]));
+
+ let values = Int32Array::from(vec![0, 1, 2, 3, 4]);
+ let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as
ArrayRef)]).unwrap();
+
+ let mut buffer = Vec::with_capacity(1024);
+ let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(),
None).unwrap();
+ writer.write(&batch)?;
+ writer.close()?;
+
+ let selection = RowSelection::from(vec![
+ RowSelector::select(1),
+ RowSelector::skip(2),
+ RowSelector::select(2),
+ ]);
+
+ let mut reader =
ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))?
+ .with_batch_size(4)
+ .with_row_selection(selection)
+ .build()?;
+
+ let out = reader.next().unwrap()?;
+ assert_eq!(out.num_rows(), 3);
+ let values = out
+ .column(0)
+ .as_primitive::<arrow_array::types::Int32Type>()
+ .values();
+ assert_eq!(values, &[0, 3, 4]);
+ assert!(reader.next().is_none());
+ Ok(())
+ }
+
+ #[test]
+ fn test_row_selection_mask_sparse_rows() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "v",
+ ArrowDataType::Int32,
+ false,
+ )]));
+
+ let values = Int32Array::from((0..30).collect::<Vec<i32>>());
+ let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as
ArrayRef)])?;
+
+ let mut buffer = Vec::with_capacity(1024);
+ let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(),
None)?;
+ writer.write(&batch)?;
+ writer.close()?;
+
+ let total_rows = batch.num_rows();
+ let ranges = (1..total_rows)
+ .step_by(2)
+ .map(|i| i..i + 1)
+ .collect::<Vec<_>>();
+ let selection =
RowSelection::from_consecutive_ranges(ranges.into_iter(), total_rows);
+
+ let selectors: Vec<RowSelector> = selection.clone().into();
+ assert!(total_rows < selectors.len() * 8);
+
+ let bytes = Bytes::from(buffer);
+
+ let reader = ParquetRecordBatchReaderBuilder::try_new(bytes.clone())?
+ .with_batch_size(7)
+ .with_row_selection(selection)
+ .build()?;
+
+ let mut collected = Vec::new();
+ for batch in reader {
+ let batch = batch?;
+ collected.extend_from_slice(
+ batch
+ .column(0)
+ .as_primitive::<arrow_array::types::Int32Type>()
+ .values(),
+ );
+ }
+
+ let expected: Vec<i32> = (1..total_rows).step_by(2).map(|i| i as
i32).collect();
+ assert_eq!(collected, expected);
+ Ok(())
+ }
+
fn test_decimal32_roundtrip() {
let d = |values: Vec<i32>, p: u8| {
let iter = values.into_iter();
@@ -5036,6 +5213,78 @@ mod tests {
c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
}
+ #[test]
+ fn test_row_filter_full_page_skip_is_handled() {
+ let first_value: i64 = 1111;
+ let last_value: i64 = 9999;
+ let num_rows: usize = 12;
+
+ // build data with row selection average length 4
+ // The result would be (1111 XXXX) ... (4 page in the middle)... (XXXX
9999)
+ // The Row Selection would be [1111, (skip 10), 9999]
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("key", arrow_schema::DataType::Int64, false),
+ Field::new("value", arrow_schema::DataType::Int64, false),
+ ]));
+
+ let mut int_values: Vec<i64> = (0..num_rows as i64).collect();
+ int_values[0] = first_value;
+ int_values[num_rows - 1] = last_value;
+ let keys = Int64Array::from(int_values.clone());
+ let values = Int64Array::from(int_values.clone());
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
+ )
+ .unwrap();
+
+ let props = WriterProperties::builder()
+ .set_write_batch_size(2)
+ .set_data_page_row_count_limit(2)
+ .build();
+
+ let mut buffer = Vec::new();
+ let mut writer = ArrowWriter::try_new(&mut buffer, schema,
Some(props)).unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+ let data = Bytes::from(buffer);
+
+ let options = ArrowReaderOptions::new().with_page_index(true);
+ let builder =
+
ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(),
options).unwrap();
+ let schema = builder.parquet_schema().clone();
+ let filter_mask = ProjectionMask::leaves(&schema, [0]);
+
+ let make_predicate = |mask: ProjectionMask| {
+ ArrowPredicateFn::new(mask, move |batch: RecordBatch| {
+ let column = batch.column(0);
+ let match_first = eq(column,
&Int64Array::new_scalar(first_value))?;
+ let match_second = eq(column,
&Int64Array::new_scalar(last_value))?;
+ or(&match_first, &match_second)
+ })
+ };
+
+ let options = ArrowReaderOptions::new().with_page_index(true);
+ let predicate = make_predicate(filter_mask.clone());
+
+ // The batch size is set to 12 to read all rows in one go after
filtering
+ // If the Reader chooses mask to handle filter, it might cause panic
because the mid 4 pages may not be decoded.
+ let reader =
ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options)
+ .unwrap()
+ .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
+ .with_row_selection_policy(RowSelectionPolicy::Auto { threshold:
32 })
+ .with_batch_size(12)
+ .build()
+ .unwrap();
+
+ // Predicate pruning used to panic once mask-backed plans removed
whole pages.
+ // Collecting into batches validates the plan now downgrades to
selectors instead.
+ let schema = reader.schema().clone();
+ let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
+ let result = concat_batches(&schema, &batches).unwrap();
+ assert_eq!(result.num_rows(), 2);
+ }
+
#[test]
fn test_get_row_group_column_bloom_filter_with_length() {
// convert to new parquet file with bloom_filter_length
diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs
b/parquet/src/arrow/arrow_reader/read_plan.rs
index 2210f47df2..3c17a358f0 100644
--- a/parquet/src/arrow/arrow_reader/read_plan.rs
+++ b/parquet/src/arrow/arrow_reader/read_plan.rs
@@ -19,8 +19,10 @@
//! from a Parquet file
use crate::arrow::array_reader::ArrayReader;
+use crate::arrow::arrow_reader::selection::RowSelectionPolicy;
+use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
use crate::arrow::arrow_reader::{
- ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector,
+ ArrowPredicate, ParquetRecordBatchReader, RowSelection,
RowSelectionCursor, RowSelector,
};
use crate::errors::{ParquetError, Result};
use arrow_array::Array;
@@ -31,8 +33,10 @@ use std::collections::VecDeque;
#[derive(Clone, Debug)]
pub struct ReadPlanBuilder {
batch_size: usize,
- /// Current to apply, includes all filters
+ /// Which rows to select. Includes the result of all filters applied so far
selection: Option<RowSelection>,
+ /// Policy to use when materializing the row selection
+ row_selection_policy: RowSelectionPolicy,
}
impl ReadPlanBuilder {
@@ -41,6 +45,7 @@ impl ReadPlanBuilder {
Self {
batch_size,
selection: None,
+ row_selection_policy: RowSelectionPolicy::default(),
}
}
@@ -50,6 +55,19 @@ impl ReadPlanBuilder {
self
}
+ /// Configure the policy to use when materialising the [`RowSelection`]
+ ///
+ /// Defaults to [`RowSelectionPolicy::Auto`]
+ pub fn with_row_selection_policy(mut self, policy: RowSelectionPolicy) ->
Self {
+ self.row_selection_policy = policy;
+ self
+ }
+
+ /// Returns the current row selection policy
+ pub fn row_selection_policy(&self) -> &RowSelectionPolicy {
+ &self.row_selection_policy
+ }
+
/// Returns the current selection, if any
pub fn selection(&self) -> Option<&RowSelection> {
self.selection.as_ref()
@@ -79,6 +97,40 @@ impl ReadPlanBuilder {
self.selection.as_ref().map(|s| s.row_count())
}
+ /// Returns the [`RowSelectionStrategy`] for this plan.
+ ///
+ /// Guarantees to return either `Selectors` or `Mask`, never `Auto`.
+ pub(crate) fn resolve_selection_strategy(&self) -> RowSelectionStrategy {
+ match self.row_selection_policy {
+ RowSelectionPolicy::Selectors => RowSelectionStrategy::Selectors,
+ RowSelectionPolicy::Mask => RowSelectionStrategy::Mask,
+ RowSelectionPolicy::Auto { threshold, .. } => {
+ let selection = match self.selection.as_ref() {
+ Some(selection) => selection,
+ None => return RowSelectionStrategy::Selectors,
+ };
+
+ let trimmed = selection.clone().trim();
+ let selectors: Vec<RowSelector> = trimmed.into();
+ if selectors.is_empty() {
+ return RowSelectionStrategy::Mask;
+ }
+
+ let total_rows: usize = selectors.iter().map(|s|
s.row_count).sum();
+ let selector_count = selectors.len();
+ if selector_count == 0 {
+ return RowSelectionStrategy::Mask;
+ }
+
+ if total_rows < selector_count.saturating_mul(threshold) {
+ RowSelectionStrategy::Mask
+ } else {
+ RowSelectionStrategy::Selectors
+ }
+ }
+ }
+ }
+
/// Evaluates an [`ArrowPredicate`], updating this plan's `selection`
///
/// If the current `selection` is `Some`, the resulting [`RowSelection`]
@@ -126,16 +178,34 @@ impl ReadPlanBuilder {
if !self.selects_any() {
self.selection = Some(RowSelection::from(vec![]));
}
+
+ // Preferred strategy must not be Auto
+ let selection_strategy = self.resolve_selection_strategy();
+
let Self {
batch_size,
selection,
+ row_selection_policy: _,
} = self;
- let selection = selection.map(|s| s.trim().into());
+ let selection = selection.map(|s| s.trim());
+
+ let row_selection_cursor = selection
+ .map(|s| {
+ let trimmed = s.trim();
+ let selectors: Vec<RowSelector> = trimmed.into();
+ match selection_strategy {
+ RowSelectionStrategy::Mask => {
+ RowSelectionCursor::new_mask_from_selectors(selectors)
+ }
+ RowSelectionStrategy::Selectors =>
RowSelectionCursor::new_selectors(selectors),
+ }
+ })
+ .unwrap_or(RowSelectionCursor::new_all());
ReadPlan {
batch_size,
- selection,
+ row_selection_cursor,
}
}
}
@@ -233,13 +303,23 @@ pub struct ReadPlan {
/// The number of rows to read in each batch
batch_size: usize,
/// Row ranges to be selected from the data source
- selection: Option<VecDeque<RowSelector>>,
+ row_selection_cursor: RowSelectionCursor,
}
impl ReadPlan {
- /// Returns a mutable reference to the selection, if any
+ /// Returns a mutable reference to the selection selectors, if any
+ #[deprecated(since = "57.1.0", note = "Use `row_selection_cursor_mut`
instead")]
pub fn selection_mut(&mut self) -> Option<&mut VecDeque<RowSelector>> {
- self.selection.as_mut()
+ if let RowSelectionCursor::Selectors(selectors_cursor) = &mut
self.row_selection_cursor {
+ Some(selectors_cursor.selectors_mut())
+ } else {
+ None
+ }
+ }
+
+ /// Returns a mutable reference to the row selection cursor
+ pub fn row_selection_cursor_mut(&mut self) -> &mut RowSelectionCursor {
+ &mut self.row_selection_cursor
}
/// Return the number of rows to read in each output batch
@@ -248,3 +328,33 @@ impl ReadPlan {
self.batch_size
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn builder_with_selection(selection: RowSelection) -> ReadPlanBuilder {
+ ReadPlanBuilder::new(1024).with_selection(Some(selection))
+ }
+
+ #[test]
+ fn preferred_selection_strategy_prefers_mask_by_default() {
+ let selection = RowSelection::from(vec![RowSelector::select(8)]);
+ let builder = builder_with_selection(selection);
+ assert_eq!(
+ builder.resolve_selection_strategy(),
+ RowSelectionStrategy::Mask
+ );
+ }
+
+ #[test]
+ fn preferred_selection_strategy_prefers_selectors_when_threshold_small() {
+ let selection = RowSelection::from(vec![RowSelector::select(8)]);
+ let builder = builder_with_selection(selection)
+ .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 1
});
+ assert_eq!(
+ builder.resolve_selection_strategy(),
+ RowSelectionStrategy::Selectors
+ );
+ }
+}
diff --git a/parquet/src/arrow/arrow_reader/selection.rs
b/parquet/src/arrow/arrow_reader/selection.rs
index 1eb7c85d1d..2ddf812f9c 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -15,13 +15,50 @@
// specific language governing permissions and limitations
// under the License.
+use crate::arrow::ProjectionMask;
+use crate::errors::ParquetError;
+use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
use arrow_array::{Array, BooleanArray};
+use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder};
use arrow_select::filter::SlicesIterator;
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::ops::Range;
-use crate::file::page_index::offset_index::PageLocation;
+/// Policy for picking a strategy to materialise [`RowSelection`] during
execution.
+///
+/// Note that this is a user-provided preference, and the actual strategy used
+/// may differ based on safety considerations (e.g. page skipping).
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+pub enum RowSelectionPolicy {
+ /// Use a queue of [`RowSelector`] values
+ Selectors,
+ /// Use a boolean mask to materialise the selection
+ Mask,
+ /// Choose between [`Self::Mask`] and [`Self::Selectors`] based on
selector density
+ Auto {
+ /// Average selector length below which masks are preferred
+ threshold: usize,
+ },
+}
+
+impl Default for RowSelectionPolicy {
+ fn default() -> Self {
+ Self::Auto { threshold: 32 }
+ }
+}
+
+/// Fully resolved strategy for materializing [`RowSelection`] during
execution.
+///
+/// This is determined from a combination of user preference (via
[`RowSelectionPolicy`])
+/// and safety considerations (e.g. page skipping).
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+pub(crate) enum RowSelectionStrategy {
+ /// Use a queue of [`RowSelector`] values
+ Selectors,
+ /// Use a boolean mask to materialise the selection
+ Mask,
+}
/// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when
/// scanning a parquet file
@@ -213,6 +250,39 @@ impl RowSelection {
ranges
}
+ /// Returns true if this selection would skip any data pages within the
provided columns
+ fn selection_skips_any_page(
+ &self,
+ projection: &ProjectionMask,
+ columns: &[OffsetIndexMetaData],
+ ) -> bool {
+ columns.iter().enumerate().any(|(leaf_idx, column)| {
+ if !projection.leaf_included(leaf_idx) {
+ return false;
+ }
+
+ let locations = column.page_locations();
+ if locations.is_empty() {
+ return false;
+ }
+
+ let ranges = self.scan_ranges(locations);
+ !ranges.is_empty() && ranges.len() < locations.len()
+ })
+ }
+
+ /// Returns true if selectors should be forced, preventing mask
materialisation
+ pub(crate) fn should_force_selectors(
+ &self,
+ projection: &ProjectionMask,
+ offset_index: Option<&[OffsetIndexMetaData]>,
+ ) -> bool {
+ match offset_index {
+ Some(columns) => self.selection_skips_any_page(projection,
columns),
+ None => false,
+ }
+ }
+
/// Splits off the first `row_count` from this [`RowSelection`]
pub fn split_off(&mut self, row_count: usize) -> Self {
let mut total_count = 0;
@@ -691,6 +761,177 @@ fn union_row_selections(left: &[RowSelector], right:
&[RowSelector]) -> RowSelec
iter.collect()
}
+/// Cursor for iterating a mask-backed [`RowSelection`]
+///
+/// This is best for dense selections where there are many small skips
+/// or selections. For example, selecting every other row.
+#[derive(Debug)]
+pub struct MaskCursor {
+ mask: BooleanBuffer,
+ /// Current absolute offset into the selection
+ position: usize,
+}
+
+impl MaskCursor {
+ /// Returns `true` when no further rows remain
+ pub fn is_empty(&self) -> bool {
+ self.position >= self.mask.len()
+ }
+
+ /// Advance through the mask representation, producing the next chunk
summary
+ pub fn next_mask_chunk(&mut self, batch_size: usize) -> Option<MaskChunk> {
+ let (initial_skip, chunk_rows, selected_rows, mask_start,
end_position) = {
+ let mask = &self.mask;
+
+ if self.position >= mask.len() {
+ return None;
+ }
+
+ let start_position = self.position;
+ let mut cursor = start_position;
+ let mut initial_skip = 0;
+
+ while cursor < mask.len() && !mask.value(cursor) {
+ initial_skip += 1;
+ cursor += 1;
+ }
+
+ let mask_start = cursor;
+ let mut chunk_rows = 0;
+ let mut selected_rows = 0;
+
+ // Advance until enough rows have been selected to satisfy the
batch size,
+ // or until the mask is exhausted. This mirrors the behaviour of
the legacy
+ // `RowSelector` queue-based iteration.
+ while cursor < mask.len() && selected_rows < batch_size {
+ chunk_rows += 1;
+ if mask.value(cursor) {
+ selected_rows += 1;
+ }
+ cursor += 1;
+ }
+
+ (initial_skip, chunk_rows, selected_rows, mask_start, cursor)
+ };
+
+ self.position = end_position;
+
+ Some(MaskChunk {
+ initial_skip,
+ chunk_rows,
+ selected_rows,
+ mask_start,
+ })
+ }
+
+ /// Materialise the boolean values for a mask-backed chunk
+ pub fn mask_values_for(&self, chunk: &MaskChunk) -> Result<BooleanArray,
ParquetError> {
+ if chunk.mask_start.saturating_add(chunk.chunk_rows) > self.mask.len()
{
+ return Err(ParquetError::General(
+ "Internal Error: MaskChunk exceeds mask length".to_string(),
+ ));
+ }
+ Ok(BooleanArray::from(
+ self.mask.slice(chunk.mask_start, chunk.chunk_rows),
+ ))
+ }
+}
+
+/// Cursor for iterating a selector-backed [`RowSelection`]
+///
+/// This is best for sparse selections where large contiguous
+/// blocks of rows are selected or skipped.
+#[derive(Debug)]
+pub struct SelectorsCursor {
+ selectors: VecDeque<RowSelector>,
+ /// Current absolute offset into the selection
+ position: usize,
+}
+
+impl SelectorsCursor {
+ /// Returns `true` when no further rows remain
+ pub fn is_empty(&self) -> bool {
+ self.selectors.is_empty()
+ }
+
+ pub(crate) fn selectors_mut(&mut self) -> &mut VecDeque<RowSelector> {
+ &mut self.selectors
+ }
+
+ /// Return the next [`RowSelector`]
+ pub(crate) fn next_selector(&mut self) -> RowSelector {
+ let selector = self.selectors.pop_front().unwrap();
+ self.position += selector.row_count;
+ selector
+ }
+
+ /// Return a selector to the front, rewinding the position
+ pub(crate) fn return_selector(&mut self, selector: RowSelector) {
+ self.position = self.position.saturating_sub(selector.row_count);
+ self.selectors.push_front(selector);
+ }
+}
+
+/// Result of computing the next chunk to read when using a [`MaskCursor`]
+#[derive(Debug)]
+pub struct MaskChunk {
+ /// Number of leading rows to skip before reaching selected rows
+ pub initial_skip: usize,
+ /// Total rows covered by this chunk (selected + skipped)
+ pub chunk_rows: usize,
+ /// Rows actually selected within the chunk
+ pub selected_rows: usize,
+ /// Starting offset within the mask where the chunk begins
+ pub mask_start: usize,
+}
+
+/// Cursor for iterating a [`RowSelection`] during execution within a
+/// [`ReadPlan`](crate::arrow::arrow_reader::ReadPlan).
+///
+/// This keeps per-reader state such as the current position and delegates the
+/// actual storage strategy to the internal `RowSelectionBacking`.
+#[derive(Debug)]
+pub enum RowSelectionCursor {
+ /// Reading all rows
+ All,
+ /// Use a bitmask to back the selection (dense selections)
+ Mask(MaskCursor),
+ /// Use a queue of selectors to back the selection (sparse selections)
+ Selectors(SelectorsCursor),
+}
+
+impl RowSelectionCursor {
+ /// Create a [`MaskCursor`] cursor backed by a bitmask, from an existing
set of selectors
+ pub(crate) fn new_mask_from_selectors(selectors: Vec<RowSelector>) -> Self
{
+ Self::Mask(MaskCursor {
+ mask: boolean_mask_from_selectors(&selectors),
+ position: 0,
+ })
+ }
+
+ /// Create a [`RowSelectionCursor::Selectors`] from the provided selectors
+ pub(crate) fn new_selectors(selectors: Vec<RowSelector>) -> Self {
+ Self::Selectors(SelectorsCursor {
+ selectors: selectors.into(),
+ position: 0,
+ })
+ }
+
+ /// Create a cursor that selects all rows
+ pub(crate) fn new_all() -> Self {
+ Self::All
+ }
+}
+
+fn boolean_mask_from_selectors(selectors: &[RowSelector]) -> BooleanBuffer {
+ let total_rows: usize = selectors.iter().map(|s| s.row_count).sum();
+ let mut builder = BooleanBufferBuilder::new(total_rows);
+ for selector in selectors {
+ builder.append_n(selector.row_count, !selector.skip);
+ }
+ builder.finish()
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index becc698abe..07edf4be2a 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -489,6 +489,7 @@ impl<T: AsyncFileReader + Send + 'static>
ParquetRecordBatchStreamBuilder<T> {
projection,
filter,
selection,
+ row_selection_policy: selection_strategy,
limit,
offset,
metrics,
@@ -510,6 +511,7 @@ impl<T: AsyncFileReader + Send + 'static>
ParquetRecordBatchStreamBuilder<T> {
projection,
filter,
selection,
+ row_selection_policy: selection_strategy,
batch_size,
row_groups,
limit,
@@ -764,6 +766,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
+ use crate::arrow::arrow_reader::RowSelectionPolicy;
use crate::arrow::arrow_reader::{
ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter,
RowSelection, RowSelector,
};
@@ -772,15 +775,17 @@ mod tests {
use crate::file::metadata::ParquetMetaDataReader;
use crate::file::properties::WriterProperties;
use arrow::compute::kernels::cmp::eq;
+ use arrow::compute::or;
use arrow::error::Result as ArrowResult;
use arrow_array::builder::{ListBuilder, StringBuilder};
use arrow_array::cast::AsArray;
use arrow_array::types::Int32Type;
use arrow_array::{
- Array, ArrayRef, Int8Array, Int32Array, RecordBatchReader, Scalar,
StringArray,
+ Array, ArrayRef, Int8Array, Int32Array, Int64Array, RecordBatchReader,
Scalar, StringArray,
StructArray, UInt64Array,
};
use arrow_schema::{DataType, Field, Schema};
+ use arrow_select::concat::concat_batches;
use futures::{StreamExt, TryStreamExt};
use rand::{Rng, rng};
use std::collections::HashMap;
@@ -1206,6 +1211,82 @@ mod tests {
assert_eq!(actual_rows, expected_rows);
}
+ #[tokio::test]
+ async fn test_row_filter_full_page_skip_is_handled_async() {
+ let first_value: i64 = 1111;
+ let last_value: i64 = 9999;
+ let num_rows: usize = 12;
+
+ // build data with row selection average length 4
+ // The result would be (1111 XXXX) ... (4 page in the middle)... (XXXX
9999)
+ // The Row Selection would be [1111, (skip 10), 9999]
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("key", DataType::Int64, false),
+ Field::new("value", DataType::Int64, false),
+ ]));
+
+ let mut int_values: Vec<i64> = (0..num_rows as i64).collect();
+ int_values[0] = first_value;
+ int_values[num_rows - 1] = last_value;
+ let keys = Int64Array::from(int_values.clone());
+ let values = Int64Array::from(int_values.clone());
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
+ )
+ .unwrap();
+
+ let props = WriterProperties::builder()
+ .set_write_batch_size(2)
+ .set_data_page_row_count_limit(2)
+ .build();
+
+ let mut buffer = Vec::new();
+ let mut writer = ArrowWriter::try_new(&mut buffer, schema,
Some(props)).unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+ let data = Bytes::from(buffer);
+
+ let builder = ParquetRecordBatchStreamBuilder::new_with_options(
+ TestReader::new(data.clone()),
+ ArrowReaderOptions::new().with_page_index(true),
+ )
+ .await
+ .unwrap();
+ let schema = builder.parquet_schema().clone();
+ let filter_mask = ProjectionMask::leaves(&schema, [0]);
+
+ let make_predicate = |mask: ProjectionMask| {
+ ArrowPredicateFn::new(mask, move |batch: RecordBatch| {
+ let column = batch.column(0);
+ let match_first = eq(column,
&Int64Array::new_scalar(first_value))?;
+ let match_second = eq(column,
&Int64Array::new_scalar(last_value))?;
+ or(&match_first, &match_second)
+ })
+ };
+
+ let predicate = make_predicate(filter_mask.clone());
+
+ // The batch size is set to 12 to read all rows in one go after
filtering
+ // If the Reader chooses mask to handle filter, it might cause panic
because the mid 4 pages may not be decoded.
+ let stream = ParquetRecordBatchStreamBuilder::new_with_options(
+ TestReader::new(data.clone()),
+ ArrowReaderOptions::new().with_page_index(true),
+ )
+ .await
+ .unwrap()
+ .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
+ .with_batch_size(12)
+ .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 32 })
+ .build()
+ .unwrap();
+
+ let schema = stream.schema().clone();
+ let batches: Vec<_> = stream.try_collect().await.unwrap();
+ let result = concat_batches(&schema, &batches).unwrap();
+ assert_eq!(result.num_rows(), 2);
+ }
+
#[tokio::test]
async fn test_row_filter() {
let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
diff --git a/parquet/src/arrow/in_memory_row_group.rs
b/parquet/src/arrow/in_memory_row_group.rs
index 34e46cd34e..7969f6f323 100644
--- a/parquet/src/arrow/in_memory_row_group.rs
+++ b/parquet/src/arrow/in_memory_row_group.rs
@@ -258,7 +258,9 @@ impl ColumnChunkData {
.map(|idx| data[idx].1.clone())
.map_err(|_| {
ParquetError::General(format!(
- "Invalid offset in sparse column chunk data: {start}"
+ "Invalid offset in sparse column chunk data: {start},
no matching page found.\
+ If you are using a `SelectionStrategyPolicy::Mask`,
ensure that the OffsetIndex is provided when \
+ creating the InMemoryRowGroup."
))
}),
ColumnChunkData::Dense { offset, data } => {
diff --git a/parquet/src/arrow/push_decoder/mod.rs
b/parquet/src/arrow/push_decoder/mod.rs
index 3e0b051c55..50451aee12 100644
--- a/parquet/src/arrow/push_decoder/mod.rs
+++ b/parquet/src/arrow/push_decoder/mod.rs
@@ -174,6 +174,7 @@ impl ParquetPushDecoderBuilder {
limit,
offset,
metrics,
+ row_selection_policy,
max_predicate_cache_size,
} = self;
@@ -195,6 +196,7 @@ impl ParquetPushDecoderBuilder {
metrics,
max_predicate_cache_size,
buffers,
+ row_selection_policy,
);
// Initialize the decoder with the configured options
diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
index a0ced8aa85..312b8af845 100644
--- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
+++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
@@ -22,8 +22,9 @@ use crate::DecodeResult;
use crate::arrow::ProjectionMask;
use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache};
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
+use crate::arrow::arrow_reader::selection::RowSelectionStrategy;
use crate::arrow::arrow_reader::{
- ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection,
+ ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection,
RowSelectionPolicy,
};
use crate::arrow::in_memory_row_group::ColumnChunkData;
use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
@@ -31,6 +32,7 @@ use
crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
use crate::arrow::schema::ParquetField;
use crate::errors::ParquetError;
use crate::file::metadata::ParquetMetaData;
+use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::util::push_buffers::PushBuffers;
use bytes::Bytes;
use data::DataRequest;
@@ -155,6 +157,9 @@ pub(crate) struct RowGroupReaderBuilder {
/// The metrics collector
metrics: ArrowReaderMetrics,
+ /// Strategy for materialising row selections
+ row_selection_policy: RowSelectionPolicy,
+
/// Current state of the decoder.
///
/// It is taken when processing, and must be put back before returning
@@ -179,6 +184,7 @@ impl RowGroupReaderBuilder {
metrics: ArrowReaderMetrics,
max_predicate_cache_size: usize,
buffers: PushBuffers,
+ row_selection_policy: RowSelectionPolicy,
) -> Self {
Self {
batch_size,
@@ -190,6 +196,7 @@ impl RowGroupReaderBuilder {
offset,
metrics,
max_predicate_cache_size,
+ row_selection_policy,
state: Some(RowGroupDecoderState::Finished),
buffers,
}
@@ -233,7 +240,9 @@ impl RowGroupReaderBuilder {
"Internal Error: next_row_group called while still reading a
row group. Expected Finished state, got {state:?}"
)));
}
- let plan_builder =
ReadPlanBuilder::new(self.batch_size).with_selection(selection);
+ let plan_builder = ReadPlanBuilder::new(self.batch_size)
+ .with_selection(selection)
+ .with_row_selection_policy(self.row_selection_policy);
let row_group_info = RowGroupInfo {
row_group_idx,
@@ -484,7 +493,7 @@ impl RowGroupReaderBuilder {
}
// Apply any limit and offset
- let plan_builder = plan_builder
+ let mut plan_builder = plan_builder
.limited(row_count)
.with_offset(self.offset)
.with_limit(self.limit)
@@ -524,6 +533,14 @@ impl RowGroupReaderBuilder {
// so don't call with_cache_projection here
.build();
+ plan_builder =
plan_builder.with_row_selection_policy(self.row_selection_policy);
+
+ plan_builder = override_selector_strategy_if_needed(
+ plan_builder,
+ &self.projection,
+ self.row_group_offset_index(row_group_idx),
+ );
+
let row_group_info = RowGroupInfo {
row_group_idx,
row_count,
@@ -650,14 +667,75 @@ impl RowGroupReaderBuilder {
Some(ProjectionMask::leaves(schema, included_leaves))
}
}
+
+ /// Get the offset index for the specified row group, if any
+ fn row_group_offset_index(&self, row_group_idx: usize) ->
Option<&[OffsetIndexMetaData]> {
+ self.metadata
+ .offset_index()
+ .filter(|index| !index.is_empty())
+ .and_then(|index| index.get(row_group_idx))
+ .map(|columns| columns.as_slice())
+ }
+}
+
+/// Override the selection strategy if needed.
+///
+/// Some pages can be skipped during row-group construction if they are not
read
+/// by the selections. This means that the data pages for those rows are never
+/// loaded and definition/repetition levels are never read. When using
+/// `RowSelections` selection works because `skip_records()` handles this
+/// case and skips the page accordingly.
+///
+/// However, with the current mask design, all values must be read and decoded
+/// and then a mask filter is applied. Thus if any pages are skipped during
+/// row-group construction, the data pages are missing and cannot be decoded.
+///
+/// A simple example:
+/// * the page size is 2, the mask is 100001, row selection should be read(1)
skip(4) read(1)
+/// * the `ColumnChunkData` would be page1(10), page2(skipped), page3(01)
+///
+/// Using the row selection to skip(4), page2 won't be read at all, so in this
+/// case we can't decode all the rows and apply a mask. To correctly apply the
+/// bit mask, we need all 6 values be read, but page2 is not in memory.
+fn override_selector_strategy_if_needed(
+ plan_builder: ReadPlanBuilder,
+ projection_mask: &ProjectionMask,
+ offset_index: Option<&[OffsetIndexMetaData]>,
+) -> ReadPlanBuilder {
+ // override only applies to Auto policy, If the policy is already Mask or
Selectors, respect that
+ let RowSelectionPolicy::Auto { .. } = plan_builder.row_selection_policy()
else {
+ return plan_builder;
+ };
+
+ let preferred_strategy = plan_builder.resolve_selection_strategy();
+
+ let force_selectors = matches!(preferred_strategy,
RowSelectionStrategy::Mask)
+ && plan_builder.selection().is_some_and(|selection| {
+ selection.should_force_selectors(projection_mask, offset_index)
+ });
+
+ let resolved_strategy = if force_selectors {
+ RowSelectionStrategy::Selectors
+ } else {
+ preferred_strategy
+ };
+
+ // override the plan builder strategy with the resolved one
+ let new_policy = match resolved_strategy {
+ RowSelectionStrategy::Mask => RowSelectionPolicy::Mask,
+ RowSelectionStrategy::Selectors => RowSelectionPolicy::Selectors,
+ };
+
+ plan_builder.with_row_selection_policy(new_policy)
}
#[cfg(test)]
mod tests {
use super::*;
+
#[test]
// Verify that the size of RowGroupDecoderState does not grow too large
fn test_structure_size() {
- assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 184);
+ assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 200);
}
}