hhhizzz commented on code in PR #10135:
URL: https://github.com/apache/arrow-rs/pull/10135#discussion_r3465770411


##########
parquet/benches/arrow_reader_materialization_policy.rs:
##########
@@ -0,0 +1,1648 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Focused benchmark for Parquet reader materialization policy decisions.
+//!
+//! # Background:
+//!
+//! As described in [Efficient Filter Pushdown in Parquet], evaluating
+//! pushdown filters is a two-step process:
+//!
+//! 1. Build a filter mask by decoding and evaluating filter functions on
+//!    the filter column(s).
+//!
+//! 2. Decode the rows that match the filter mask from the projected columns.
+//!
+//! The performance depends on factors such as the number of rows selected,
+//! the clustering of results (which affects the efficiency of the filter 
mask),
+//! and whether the same column is used for both filtering and projection.
+//!
+//! This benchmark isolates the reader policy choice between full 
post-filtering
+//! and row-filter pushdown with `Auto`, forced `Selectors`, and forced `Mask`.
+//!
+//! [Efficient Filter Pushdown in Parquet]: 
https://datafusion.apache.org/blog/2025/03/21/parquet-pushdown/
+//!
+//! The benchmark creates an in-memory Parquet file with 500K rows and four 
root
+//! columns:
+//! - `int64`: random integers with an injected point-lookup value.
+//! - `float64`: random floating-point values used for sparse and dense 
filters.
+//! - `utf8View`: ClickBench-like string values with sparse sentinel values.
+//! - `ts`: sequential timestamps used for clustered filters.
+//!
+//! The benchmark cases are organized by reader-level axes: selection density
+//! and clustering, predicate/output overlap, deferred output payload, 
predicate
+//! cost and order, count/filter-only outputs, and small-file behavior.
+//!
+//! Full TPC-DS runs can show query-level movement that does not reproduce in
+//! isolated reader probes. Keep these cases focused on stable reader-level
+//! risks: moderate projected predicates with cheap deferred output can favor
+//! post-filtering, while clustered selections, variable-width deferred output,
+//! complex OR predicates, and sparse scalar prefixes should not be swept into
+//! that shortcut without their own evidence.
+
+use arrow::array::{ArrayRef, BooleanArray, Float64Array, Int64Array, 
TimestampMillisecondArray};
+use arrow::compute::kernels::cmp::{eq, gt, lt, lt_eq, neq};
+use arrow::compute::{and, or};
+use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
+use arrow::record_batch::RecordBatch;
+use arrow_array::StringViewArray;
+use arrow_array::builder::{ArrayBuilder, StringViewBuilder};
+use bytes::Bytes;
+use criterion::{
+    BenchmarkGroup, BenchmarkId, Criterion, criterion_group, criterion_main, 
measurement::WallTime,
+};
+use futures::future::BoxFuture;
+use futures::{FutureExt, StreamExt};
+use parquet::arrow::arrow_reader::{
+    ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelectionPolicy,
+};
+use parquet::arrow::async_reader::AsyncFileReader;
+use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, 
ProjectionMask};
+use parquet::basic::Compression;
+use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, 
ParquetMetaDataReader};
+use parquet::file::properties::WriterProperties;
+use rand::{Rng, SeedableRng, rngs::StdRng};
+use std::ops::Range;
+use std::sync::Arc;
+
+const COLUMN_NAMES: [&str; 4] = ["int64", "float64", "utf8View", "ts"];
+const UTF8_VIEW_MISSING_VALUE: &str = "__arrow_rs_missing__";
+
+/// Generates a random string. Has a 50% chance to generate a short string 
(3–11 characters)
+/// or a long string (13–20 characters).
+fn random_string(rng: &mut StdRng) -> String {
+    let charset = 
b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+    let is_long = rng.random_bool(0.5);
+    let len = if is_long {
+        rng.random_range(13..21)
+    } else {
+        rng.random_range(3..12)
+    };
+    (0..len)
+        .map(|_| charset[rng.random_range(0..charset.len())] as char)
+        .collect()
+}
+
+/// Creates an int64 array of a given size with random integers in [0, 100).
+/// Then, it overwrites a single random index with 9999 to serve as the unique 
value for point lookup.
+fn create_int64_array(size: usize) -> ArrayRef {
+    let mut rng = StdRng::seed_from_u64(42);
+    let mut values: Vec<i64> = (0..size).map(|_| 
rng.random_range(0..100)).collect();
+    let unique_index = rng.random_range(0..size);
+    values[unique_index] = 9999; // Unique value for point lookup
+    Arc::new(Int64Array::from(values)) as ArrayRef
+}
+
+/// Creates a float64 array of a given size with random floats in [0.0, 100.0).
+fn create_float64_array(size: usize) -> ArrayRef {
+    let mut rng = StdRng::seed_from_u64(43);
+    let values: Vec<f64> = (0..size).map(|_| 
rng.random_range(0.0..100.0)).collect();
+    Arc::new(Float64Array::from(values)) as ArrayRef
+}
+
+fn append_utf8_view_value(builder: &mut StringViewBuilder, value: &str) {
+    if builder.len() % 1_000 == 0 {
+        builder.append_value(UTF8_VIEW_MISSING_VALUE);
+    } else {
+        builder.append_value(value);
+    }
+}
+
+/// Creates a utf8View array of a given size with random strings.
+///
+/// This is modeled after the "SearchPhrase" column in the ClickBench 
benchmark.
+///
+/// See <https://github.com/apache/arrow-rs/issues/7460> for calculations.
+///
+/// The important ClickBench data properties are:
+/// * Selectivity is: 13172392 / 99997497 = 0.132
+/// * Number of RowSelections = 14054784
+/// * Average run length of each RowSelection: 99997497 / 14054784 = 7.114
+///
+/// A 100K-row reference generated by this shape has:
+/// * Selectivity is: 15144 / 100000 = 0.15144
+/// * Number of RowSelections = 12904
+/// * Average run length of each RowSelection: 100000 / 12904 = 7.75
+fn create_utf8_view_array(size: usize) -> ArrayRef {
+    const AVG_RUN_LENGTH: usize = 4; // average number of empty/non-empty 
strings in a row
+    const EMPTY_DENSITY: u32 = 85; // percent chance that each run is an empty 
string
+
+    let mut builder = StringViewBuilder::with_capacity(size);
+    let mut rng = StdRng::seed_from_u64(44);
+    while builder.len() < size {
+        let mut run_length = rng.random_range(1..AVG_RUN_LENGTH);
+        if builder.len() + run_length > size {
+            // cap to size rows
+            run_length = size - builder.len();
+        }
+
+        let choice = rng.random_range(0..100);
+        if choice < EMPTY_DENSITY {
+            for _ in 0..run_length {
+                append_utf8_view_value(&mut builder, "");
+            }
+        } else {
+            for _ in 0..run_length {
+                append_utf8_view_value(&mut builder, &random_string(&mut rng));
+            }
+        }
+    }
+    Arc::new(builder.finish()) as ArrayRef
+}
+
+/// Creates a ts (timestamp) array of a given size. Each value is computed as 
i % 10_000,
+/// which simulates repeating blocks (each block of 10,000) to model clustered 
patterns.
+fn create_ts_array(size: usize) -> ArrayRef {
+    let values: Vec<i64> = (0..size).map(|i| (i % 10_000) as i64).collect();
+    Arc::new(TimestampMillisecondArray::from(values)) as ArrayRef
+}
+
+/// Creates a RecordBatch with `size` rows and 4 columns: int64, float64,
+/// utf8View, and ts.
+pub(crate) fn create_record_batch(size: usize) -> RecordBatch {
+    let fields = vec![
+        Field::new("int64", DataType::Int64, false),
+        Field::new("float64", DataType::Float64, false),
+        Field::new("utf8View", DataType::Utf8View, true),
+        Field::new(
+            "ts",
+            DataType::Timestamp(TimeUnit::Millisecond, None),
+            false,
+        ),
+    ];
+    let schema = Arc::new(Schema::new(fields));
+
+    let int64_array = create_int64_array(size);
+    let float64_array = create_float64_array(size);
+    let utf8_array = create_utf8_view_array(size);
+    let ts_array = create_ts_array(size);
+
+    let arrays: Vec<ArrayRef> = vec![int64_array, float64_array, utf8_array, 
ts_array];
+    RecordBatch::try_new(schema, arrays).unwrap()
+}
+
+/// Total number of rows.
+const TOTAL_ROWS: usize = 500_000;
+
+/// Maximum rows per row group.
+const ROW_GROUP_SIZE: usize = 100_000;
+
+/// Writes the RecordBatch to an in memory buffer, returning the buffer
+fn write_parquet_file() -> Vec<u8> {
+    write_parquet_file_with_rows(TOTAL_ROWS, ROW_GROUP_SIZE)
+}
+
+/// Writes a RecordBatch with a configurable shape to an in memory buffer,
+/// returning the buffer.
+fn write_parquet_file_with_rows(total_rows: usize, row_group_size: usize) -> 
Vec<u8> {
+    let batch = create_record_batch(total_rows);
+    write_record_batch_to_parquet(&batch, row_group_size)
+}
+
+fn write_record_batch_to_parquet(batch: &RecordBatch, row_group_size: usize) 
-> Vec<u8> {
+    let schema = batch.schema();
+    let props = WriterProperties::builder()
+        .set_compression(Compression::SNAPPY)
+        .set_max_row_group_row_count(Some(row_group_size))
+        .build();
+    let mut buffer = vec![];
+    {
+        let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), 
Some(props)).unwrap();
+        writer.write(batch).unwrap();
+        writer.close().unwrap();
+    }
+    buffer
+}
+
+/// ProjectionCase defines the projection mode for the benchmark:
+/// either projecting all columns or excluding the column that is used for 
filtering.
+#[derive(Clone, Copy)]
+enum ProjectionCase {
+    AllColumns,
+    ExcludeFilterColumn,
+    FilterColumnsOnly,
+    CountOnly,
+    FixedColumns,
+    Float64AndTs,
+    Float64Only,
+    Int64AndFloat64,
+    Int64AndUtf8,
+    TsAndUtf8,
+    Utf8Only,
+}
+
+impl std::fmt::Display for ProjectionCase {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            ProjectionCase::AllColumns => write!(f, "all_columns"),
+            ProjectionCase::ExcludeFilterColumn => write!(f, 
"exclude_filter_column"),
+            ProjectionCase::FilterColumnsOnly => write!(f, 
"filter_columns_only"),
+            ProjectionCase::CountOnly => write!(f, "count_only"),
+            ProjectionCase::FixedColumns => write!(f, "fixed_columns"),
+            ProjectionCase::Float64AndTs => write!(f, "float64_and_ts"),
+            ProjectionCase::Float64Only => write!(f, "float64_only"),
+            ProjectionCase::Int64AndFloat64 => write!(f, "int64_and_float64"),
+            ProjectionCase::Int64AndUtf8 => write!(f, "int64_and_utf8"),
+            ProjectionCase::TsAndUtf8 => write!(f, "ts_and_utf8"),
+            ProjectionCase::Utf8Only => write!(f, "utf8_only"),
+        }
+    }
+}
+
+#[derive(Clone, Copy)]
+enum AsyncStrategy {
+    FullPostFilter,
+    PushdownAuto,
+    PushdownSelectors,
+    PushdownMask,
+}
+
+impl std::fmt::Display for AsyncStrategy {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            AsyncStrategy::FullPostFilter => write!(f, "full_post_filter"),
+            AsyncStrategy::PushdownAuto => write!(f, "pushdown_auto"),
+            AsyncStrategy::PushdownSelectors => write!(f, 
"pushdown_selectors"),
+            AsyncStrategy::PushdownMask => write!(f, "pushdown_mask"),
+        }
+    }
+}
+
+/// FilterType encapsulates the different filter comparisons.
+/// The variants correspond to the different filter patterns.
+#[derive(Clone, Copy, Debug)]
+pub(crate) enum FilterType {

Review Comment:
   Done. I extracted the shared synthetic reader fixture into 
`parquet/benches/arrow_reader_common/mod.rs` and wired both 
`arrow_reader_row_filter.rs` and `arrow_reader_materialization_policy.rs` 
through it.
   
   I used a bench-local module directory instead of a flat 
`row_filter_fixture.rs` so the shared helpers stay scoped to these reader 
benches. I can rename it if you prefer the flatter file name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to