hhhizzz commented on code in PR #10135: URL: https://github.com/apache/arrow-rs/pull/10135#discussion_r3465770411
########## parquet/benches/arrow_reader_materialization_policy.rs: ########## @@ -0,0 +1,1648 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Focused benchmark for Parquet reader materialization policy decisions. +//! +//! # Background: +//! +//! As described in [Efficient Filter Pushdown in Parquet], evaluating +//! pushdown filters is a two-step process: +//! +//! 1. Build a filter mask by decoding and evaluating filter functions on +//! the filter column(s). +//! +//! 2. Decode the rows that match the filter mask from the projected columns. +//! +//! The performance depends on factors such as the number of rows selected, +//! the clustering of results (which affects the efficiency of the filter mask), +//! and whether the same column is used for both filtering and projection. +//! +//! This benchmark isolates the reader policy choice between full post-filtering +//! and row-filter pushdown with `Auto`, forced `Selectors`, and forced `Mask`. +//! +//! [Efficient Filter Pushdown in Parquet]: https://datafusion.apache.org/blog/2025/03/21/parquet-pushdown/ +//! +//! The benchmark creates an in-memory Parquet file with 500K rows and four root +//! columns: +//! - `int64`: random integers with an injected point-lookup value. +//! - `float64`: random floating-point values used for sparse and dense filters. +//! - `utf8View`: ClickBench-like string values with sparse sentinel values. +//! - `ts`: sequential timestamps used for clustered filters. +//! +//! The benchmark cases are organized by reader-level axes: selection density +//! and clustering, predicate/output overlap, deferred output payload, predicate +//! cost and order, count/filter-only outputs, and small-file behavior. +//! +//! Full TPC-DS runs can show query-level movement that does not reproduce in +//! isolated reader probes. Keep these cases focused on stable reader-level +//! risks: moderate projected predicates with cheap deferred output can favor +//! post-filtering, while clustered selections, variable-width deferred output, +//! complex OR predicates, and sparse scalar prefixes should not be swept into +//! that shortcut without their own evidence. + +use arrow::array::{ArrayRef, BooleanArray, Float64Array, Int64Array, TimestampMillisecondArray}; +use arrow::compute::kernels::cmp::{eq, gt, lt, lt_eq, neq}; +use arrow::compute::{and, or}; +use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; +use arrow::record_batch::RecordBatch; +use arrow_array::StringViewArray; +use arrow_array::builder::{ArrayBuilder, StringViewBuilder}; +use bytes::Bytes; +use criterion::{ + BenchmarkGroup, BenchmarkId, Criterion, criterion_group, criterion_main, measurement::WallTime, +}; +use futures::future::BoxFuture; +use futures::{FutureExt, StreamExt}; +use parquet::arrow::arrow_reader::{ + ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelectionPolicy, +}; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::basic::Compression; +use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader}; +use parquet::file::properties::WriterProperties; +use rand::{Rng, SeedableRng, rngs::StdRng}; +use std::ops::Range; +use std::sync::Arc; + +const COLUMN_NAMES: [&str; 4] = ["int64", "float64", "utf8View", "ts"]; +const UTF8_VIEW_MISSING_VALUE: &str = "__arrow_rs_missing__"; + +/// Generates a random string. Has a 50% chance to generate a short string (3–11 characters) +/// or a long string (13–20 characters). +fn random_string(rng: &mut StdRng) -> String { + let charset = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + let is_long = rng.random_bool(0.5); + let len = if is_long { + rng.random_range(13..21) + } else { + rng.random_range(3..12) + }; + (0..len) + .map(|_| charset[rng.random_range(0..charset.len())] as char) + .collect() +} + +/// Creates an int64 array of a given size with random integers in [0, 100). +/// Then, it overwrites a single random index with 9999 to serve as the unique value for point lookup. +fn create_int64_array(size: usize) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(42); + let mut values: Vec<i64> = (0..size).map(|_| rng.random_range(0..100)).collect(); + let unique_index = rng.random_range(0..size); + values[unique_index] = 9999; // Unique value for point lookup + Arc::new(Int64Array::from(values)) as ArrayRef +} + +/// Creates a float64 array of a given size with random floats in [0.0, 100.0). +fn create_float64_array(size: usize) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(43); + let values: Vec<f64> = (0..size).map(|_| rng.random_range(0.0..100.0)).collect(); + Arc::new(Float64Array::from(values)) as ArrayRef +} + +fn append_utf8_view_value(builder: &mut StringViewBuilder, value: &str) { + if builder.len() % 1_000 == 0 { + builder.append_value(UTF8_VIEW_MISSING_VALUE); + } else { + builder.append_value(value); + } +} + +/// Creates a utf8View array of a given size with random strings. +/// +/// This is modeled after the "SearchPhrase" column in the ClickBench benchmark. +/// +/// See <https://github.com/apache/arrow-rs/issues/7460> for calculations. +/// +/// The important ClickBench data properties are: +/// * Selectivity is: 13172392 / 99997497 = 0.132 +/// * Number of RowSelections = 14054784 +/// * Average run length of each RowSelection: 99997497 / 14054784 = 7.114 +/// +/// A 100K-row reference generated by this shape has: +/// * Selectivity is: 15144 / 100000 = 0.15144 +/// * Number of RowSelections = 12904 +/// * Average run length of each RowSelection: 100000 / 12904 = 7.75 +fn create_utf8_view_array(size: usize) -> ArrayRef { + const AVG_RUN_LENGTH: usize = 4; // average number of empty/non-empty strings in a row + const EMPTY_DENSITY: u32 = 85; // percent chance that each run is an empty string + + let mut builder = StringViewBuilder::with_capacity(size); + let mut rng = StdRng::seed_from_u64(44); + while builder.len() < size { + let mut run_length = rng.random_range(1..AVG_RUN_LENGTH); + if builder.len() + run_length > size { + // cap to size rows + run_length = size - builder.len(); + } + + let choice = rng.random_range(0..100); + if choice < EMPTY_DENSITY { + for _ in 0..run_length { + append_utf8_view_value(&mut builder, ""); + } + } else { + for _ in 0..run_length { + append_utf8_view_value(&mut builder, &random_string(&mut rng)); + } + } + } + Arc::new(builder.finish()) as ArrayRef +} + +/// Creates a ts (timestamp) array of a given size. Each value is computed as i % 10_000, +/// which simulates repeating blocks (each block of 10,000) to model clustered patterns. +fn create_ts_array(size: usize) -> ArrayRef { + let values: Vec<i64> = (0..size).map(|i| (i % 10_000) as i64).collect(); + Arc::new(TimestampMillisecondArray::from(values)) as ArrayRef +} + +/// Creates a RecordBatch with `size` rows and 4 columns: int64, float64, +/// utf8View, and ts. +pub(crate) fn create_record_batch(size: usize) -> RecordBatch { + let fields = vec![ + Field::new("int64", DataType::Int64, false), + Field::new("float64", DataType::Float64, false), + Field::new("utf8View", DataType::Utf8View, true), + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + ]; + let schema = Arc::new(Schema::new(fields)); + + let int64_array = create_int64_array(size); + let float64_array = create_float64_array(size); + let utf8_array = create_utf8_view_array(size); + let ts_array = create_ts_array(size); + + let arrays: Vec<ArrayRef> = vec![int64_array, float64_array, utf8_array, ts_array]; + RecordBatch::try_new(schema, arrays).unwrap() +} + +/// Total number of rows. +const TOTAL_ROWS: usize = 500_000; + +/// Maximum rows per row group. +const ROW_GROUP_SIZE: usize = 100_000; + +/// Writes the RecordBatch to an in memory buffer, returning the buffer +fn write_parquet_file() -> Vec<u8> { + write_parquet_file_with_rows(TOTAL_ROWS, ROW_GROUP_SIZE) +} + +/// Writes a RecordBatch with a configurable shape to an in memory buffer, +/// returning the buffer. +fn write_parquet_file_with_rows(total_rows: usize, row_group_size: usize) -> Vec<u8> { + let batch = create_record_batch(total_rows); + write_record_batch_to_parquet(&batch, row_group_size) +} + +fn write_record_batch_to_parquet(batch: &RecordBatch, row_group_size: usize) -> Vec<u8> { + let schema = batch.schema(); + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .set_max_row_group_row_count(Some(row_group_size)) + .build(); + let mut buffer = vec![]; + { + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap(); + writer.write(batch).unwrap(); + writer.close().unwrap(); + } + buffer +} + +/// ProjectionCase defines the projection mode for the benchmark: +/// either projecting all columns or excluding the column that is used for filtering. +#[derive(Clone, Copy)] +enum ProjectionCase { + AllColumns, + ExcludeFilterColumn, + FilterColumnsOnly, + CountOnly, + FixedColumns, + Float64AndTs, + Float64Only, + Int64AndFloat64, + Int64AndUtf8, + TsAndUtf8, + Utf8Only, +} + +impl std::fmt::Display for ProjectionCase { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ProjectionCase::AllColumns => write!(f, "all_columns"), + ProjectionCase::ExcludeFilterColumn => write!(f, "exclude_filter_column"), + ProjectionCase::FilterColumnsOnly => write!(f, "filter_columns_only"), + ProjectionCase::CountOnly => write!(f, "count_only"), + ProjectionCase::FixedColumns => write!(f, "fixed_columns"), + ProjectionCase::Float64AndTs => write!(f, "float64_and_ts"), + ProjectionCase::Float64Only => write!(f, "float64_only"), + ProjectionCase::Int64AndFloat64 => write!(f, "int64_and_float64"), + ProjectionCase::Int64AndUtf8 => write!(f, "int64_and_utf8"), + ProjectionCase::TsAndUtf8 => write!(f, "ts_and_utf8"), + ProjectionCase::Utf8Only => write!(f, "utf8_only"), + } + } +} + +#[derive(Clone, Copy)] +enum AsyncStrategy { + FullPostFilter, + PushdownAuto, + PushdownSelectors, + PushdownMask, +} + +impl std::fmt::Display for AsyncStrategy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + AsyncStrategy::FullPostFilter => write!(f, "full_post_filter"), + AsyncStrategy::PushdownAuto => write!(f, "pushdown_auto"), + AsyncStrategy::PushdownSelectors => write!(f, "pushdown_selectors"), + AsyncStrategy::PushdownMask => write!(f, "pushdown_mask"), + } + } +} + +/// FilterType encapsulates the different filter comparisons. +/// The variants correspond to the different filter patterns. +#[derive(Clone, Copy, Debug)] +pub(crate) enum FilterType { Review Comment: Done. I extracted the shared synthetic reader fixture into `parquet/benches/arrow_reader_common/mod.rs` and wired both `arrow_reader_row_filter.rs` and `arrow_reader_materialization_policy.rs` through it. I used a bench-local module directory instead of a flat `row_filter_fixture.rs` so the shared helpers stay scoped to these reader benches. I can rename it if you prefer the flatter file name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
