This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new df63590310 [Minor] Use per-predicate projection masks in
arrow_reader_clickbench benchmark (#9413)
df63590310 is described below
commit df635903108418d95f7d0fc2101091684d8504fd
Author: Daniël Heres <[email protected]>
AuthorDate: Sun Feb 15 15:04:00 2026 +0100
[Minor] Use per-predicate projection masks in arrow_reader_clickbench
benchmark (#9413)
# Which issue does this PR close?
- Closes #NNN.
# Rationale for this change
As suggested by Claude - currently it uses a projection mask for all
columns, significantly slowing down queries that have multiple
predicates.
This makes it more in line with consumer side (e.g. DataFusion) (so we
can more accurately benchmark improvements).
It shows the perf difference in a number of (multi-filter) queries:
```
group clickbench-optimizations
main
arrow_reader_clickbench/async_object_store/Q22 1.00 151.8±6.46ms
? ?/sec 1.52 230.5±1.68ms ? ?/sec
arrow_reader_clickbench/async_object_store/Q36 1.00 26.3±0.24ms
? ?/sec 4.30 113.1±0.67ms ? ?/sec
arrow_reader_clickbench/async_object_store/Q37 1.00 9.3±0.06ms
? ?/sec 9.64 89.7±1.20ms ? ?/sec
arrow_reader_clickbench/async_object_store/Q38 1.00 22.4±0.26ms
? ?/sec 1.44 32.3±0.29ms ? ?/sec
arrow_reader_clickbench/async_object_store/Q39 1.00 38.1±0.66ms
? ?/sec 1.09 41.5±0.35ms ? ?/sec
arrow_reader_clickbench/async_object_store/Q40 1.00 13.0±0.15ms
? ?/sec 2.96 38.6±0.45ms ? ?/sec
arrow_reader_clickbench/async_object_store/Q41 1.00 10.1±0.11ms
? ?/sec 2.83 28.5±0.73ms ? ?/sec
arrow_reader_clickbench/async_object_store/Q42 1.00 5.6±0.05ms
? ?/sec 1.87 10.5±0.12ms ? ?/sec
```
# What changes are included in this PR?
# Are these changes tested?
# Are there any user-facing changes?
---------
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
parquet/benches/arrow_reader_clickbench.rs | 97 +++++-------------------------
1 file changed, 16 insertions(+), 81 deletions(-)
diff --git a/parquet/benches/arrow_reader_clickbench.rs
b/parquet/benches/arrow_reader_clickbench.rs
index 8635a59557..5a6fb36d58 100644
--- a/parquet/benches/arrow_reader_clickbench.rs
+++ b/parquet/benches/arrow_reader_clickbench.rs
@@ -638,66 +638,6 @@ fn find_file_if_exists(mut current_dir: PathBuf,
file_name: &str) -> Option<Path
None
}
-/// Represents a mapping from each column selected in the `ProjectionMask`
-/// created from `filter_columns`, to the corresponding index in the list of
-/// `filter_columns`?
-///
-/// # Example
-///
-/// If:
-/// * the file schema has columns `[A, B, C]`
-/// * `filter_columns` is `[C, A]`
-/// * ==> `ProjectionMask` will be `[true, false, true]` = `[A, C]`
-///
-/// `FilterIndices` will be `[1, 0]`, because column `C` (index 0 in
-/// filter_columns) is selected at index 1 of the `ProjectionMask` and column
-/// `A` (index 1 in `filter_columns`) is selected at index 0 of the
-/// `ProjectionMask`.
-struct FilterIndices {
- /// * index is offset in Query::filter_columns
- /// * value is offset in column selected by filter ProjectionMask
- inner: Vec<usize>,
-}
-
-impl FilterIndices {
- /// Create a new `FilterIndices` from a list of column indices
- ///
- /// Parameters:
- /// * `schema_descriptor`: The schema of the file
- /// * `filter_schema_indices`: a list of column indices in the schema
- fn new(schema_descriptor: &SchemaDescriptor, filter_schema_indices:
Vec<usize>) -> Self {
- for &filter_index in &filter_schema_indices {
- assert!(filter_index < schema_descriptor.num_columns());
- }
- // When the columns are selected using a ProjectionMask, they are
- // returned in the order of the schema (not the order they were
specified)
- //
- // So if the original schema indices are 5, 1, 3 (select the sixth and
- // second and fourth column), the RecordBatch returned will select
them
- // in order 1, 3, 5,
- //
- // Thus we need a map to convert back to the original selection order
- // `[1, 2, 0]`
- let mut reordered: Vec<_> =
filter_schema_indices.iter().enumerate().collect();
- reordered.sort_by_key(|(_projection_idx, original_schema_idx)|
**original_schema_idx);
- let mut inner = vec![0; reordered.len()];
- for (output_idx, (projection_idx, _original_schema_idx)) in
- reordered.into_iter().enumerate()
- {
- inner[projection_idx] = output_idx;
- }
- Self { inner }
- }
-
- /// Given the index of a column in `filter_columns`, return the index of
the
- /// column in the columns selected from `ProjectionMask`
- fn map_column(&self, filter_columns_index: usize) -> usize {
- // The selection index is the index in the filter mask
- // The inner index is the index in the filter columns
- self.inner[filter_columns_index]
- }
-}
-
/// Encapsulates the test parameters for a single benchmark
struct ReadTest {
/// Human identifiable name
@@ -706,10 +646,8 @@ struct ReadTest {
arrow_reader_metadata: ArrowReaderMetadata,
/// Which columns in the file should be projected (decoded after filter)?
projection_mask: ProjectionMask,
- /// Which columns in the file should be passed to the filter?
- filter_mask: ProjectionMask,
- /// Mapping from column selected in filter mask to `Query::filter_columns`
- filter_indices: FilterIndices,
+ /// Schema indices for each filter column (in filter_columns order)
+ filter_schema_indices: Vec<usize>,
/// Predicates to apply
predicates: Vec<ClickBenchPredicate>,
/// How many rows are expected to pass the predicate?
@@ -744,16 +682,12 @@ impl ReadTest {
};
let filter_schema_indices = column_indices(schema_descr,
&filter_columns);
- let filter_mask =
- ProjectionMask::leaves(schema_descr,
filter_schema_indices.iter().cloned());
- let filter_indices = FilterIndices::new(schema_descr,
filter_schema_indices);
Self {
name,
arrow_reader_metadata,
projection_mask,
- filter_mask,
- filter_indices,
+ filter_schema_indices,
predicates,
expected_row_count,
}
@@ -851,25 +785,26 @@ impl ReadTest {
/// Return a `RowFilter` to apply to the reader.
///
- /// Note that since `RowFilter` does not implement Clone, we need to create
- /// the filter for each row
+ /// Each predicate gets a ProjectionMask containing only the single column
+ /// it needs, rather than all filter columns. This avoids decoding
expensive
+ /// columns (e.g. strings) when evaluating cheap predicates (e.g. integer
equality).
fn row_filter(&self) -> RowFilter {
- // Note: The predicates are in terms columns in the filter mask
- // but the record batch passed back has columns in the order of the
file
- // schema
+ let schema_descr = self
+ .arrow_reader_metadata
+ .metadata()
+ .file_metadata()
+ .schema_descr();
- // Convert the predicates to ArrowPredicateFn to conform to the
RowFilter API
let arrow_predicates: Vec<_> = self
.predicates
.iter()
.map(|pred| {
- let orig_column_index = pred.column_index();
- let column_index =
self.filter_indices.map_column(orig_column_index);
+ let schema_index =
self.filter_schema_indices[pred.column_index()];
+ let predicate_mask = ProjectionMask::leaves(schema_descr,
[schema_index]);
let mut predicate_fn = pred.predicate_fn();
- Box::new(ArrowPredicateFn::new(
- self.filter_mask.clone(),
- move |batch| (predicate_fn)(batch.column(column_index)),
- )) as Box<dyn ArrowPredicate>
+ Box::new(ArrowPredicateFn::new(predicate_mask, move |batch| {
+ (predicate_fn)(batch.column(0))
+ })) as Box<dyn ArrowPredicate>
})
.collect();