This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new df63590310 [Minor] Use per-predicate projection masks in 
arrow_reader_clickbench benchmark (#9413)
df63590310 is described below

commit df635903108418d95f7d0fc2101091684d8504fd
Author: Daniël Heres <[email protected]>
AuthorDate: Sun Feb 15 15:04:00 2026 +0100

    [Minor] Use per-predicate projection masks in arrow_reader_clickbench 
benchmark (#9413)
    
    # Which issue does this PR close?
    
    - Closes #NNN.
    
    # Rationale for this change
    
    As suggested by Claude - currently it uses a projection mask for all
    columns, significantly slowing down queries that have multiple
    predicates.
    This makes it more in line with consumer side (e.g. DataFusion) (so we
    can more accurately benchmark improvements).
    
    It shows the perf difference in a number of (multi-filter) queries:
    
    ```
    group                                             clickbench-optimizations  
             main
    arrow_reader_clickbench/async_object_store/Q22    1.00    151.8±6.46ms      
  ? ?/sec    1.52    230.5±1.68ms        ? ?/sec
    arrow_reader_clickbench/async_object_store/Q36    1.00     26.3±0.24ms      
  ? ?/sec    4.30    113.1±0.67ms        ? ?/sec
    arrow_reader_clickbench/async_object_store/Q37    1.00      9.3±0.06ms      
  ? ?/sec    9.64     89.7±1.20ms        ? ?/sec
    arrow_reader_clickbench/async_object_store/Q38    1.00     22.4±0.26ms      
  ? ?/sec    1.44     32.3±0.29ms        ? ?/sec
    arrow_reader_clickbench/async_object_store/Q39    1.00     38.1±0.66ms      
  ? ?/sec    1.09     41.5±0.35ms        ? ?/sec
    arrow_reader_clickbench/async_object_store/Q40    1.00     13.0±0.15ms      
  ? ?/sec    2.96     38.6±0.45ms        ? ?/sec
    arrow_reader_clickbench/async_object_store/Q41    1.00     10.1±0.11ms      
  ? ?/sec    2.83     28.5±0.73ms        ? ?/sec
    arrow_reader_clickbench/async_object_store/Q42    1.00      5.6±0.05ms      
  ? ?/sec    1.87     10.5±0.12ms        ? ?/sec
    ```
    
    # What changes are included in this PR?
    
    # Are these changes tested?
    
    
    # Are there any user-facing changes?
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 parquet/benches/arrow_reader_clickbench.rs | 97 +++++-------------------------
 1 file changed, 16 insertions(+), 81 deletions(-)

diff --git a/parquet/benches/arrow_reader_clickbench.rs 
b/parquet/benches/arrow_reader_clickbench.rs
index 8635a59557..5a6fb36d58 100644
--- a/parquet/benches/arrow_reader_clickbench.rs
+++ b/parquet/benches/arrow_reader_clickbench.rs
@@ -638,66 +638,6 @@ fn find_file_if_exists(mut current_dir: PathBuf, 
file_name: &str) -> Option<Path
     None
 }
 
-/// Represents a mapping from each column selected in the `ProjectionMask`
-/// created from `filter_columns`, to the corresponding index in the list of
-/// `filter_columns`?
-///
-/// # Example
-///
-/// If:
-/// * the file schema has columns `[A, B, C]`
-/// * `filter_columns` is `[C, A]`
-/// * ==> `ProjectionMask` will be `[true, false, true]` = `[A, C]`
-///
-/// `FilterIndices` will be `[1, 0]`, because column `C` (index 0 in
-/// filter_columns) is selected at index 1 of the `ProjectionMask` and column
-/// `A` (index 1 in `filter_columns`) is selected at index 0 of the
-/// `ProjectionMask`.
-struct FilterIndices {
-    /// * index is offset in Query::filter_columns
-    /// * value is offset in column selected by filter ProjectionMask
-    inner: Vec<usize>,
-}
-
-impl FilterIndices {
-    /// Create a new `FilterIndices` from a list of column indices
-    ///
-    /// Parameters:
-    /// * `schema_descriptor`: The schema of the file
-    /// * `filter_schema_indices`: a list of column indices in the schema
-    fn new(schema_descriptor: &SchemaDescriptor, filter_schema_indices: 
Vec<usize>) -> Self {
-        for &filter_index in &filter_schema_indices {
-            assert!(filter_index < schema_descriptor.num_columns());
-        }
-        // When the columns are selected using a ProjectionMask, they are
-        // returned in the order of the schema (not the order they were 
specified)
-        //
-        // So if the original schema indices are 5, 1, 3 (select the sixth and
-        // second and fourth column),  the RecordBatch returned will select 
them
-        // in order 1, 3, 5,
-        //
-        // Thus we need a map to convert back to the original selection order
-        // `[1, 2, 0]`
-        let mut reordered: Vec<_> = 
filter_schema_indices.iter().enumerate().collect();
-        reordered.sort_by_key(|(_projection_idx, original_schema_idx)| 
**original_schema_idx);
-        let mut inner = vec![0; reordered.len()];
-        for (output_idx, (projection_idx, _original_schema_idx)) in
-            reordered.into_iter().enumerate()
-        {
-            inner[projection_idx] = output_idx;
-        }
-        Self { inner }
-    }
-
-    /// Given the index of a column in `filter_columns`, return the index of 
the
-    /// column in the columns selected from `ProjectionMask`
-    fn map_column(&self, filter_columns_index: usize) -> usize {
-        // The selection index is the index in the filter mask
-        // The inner index is the index in the filter columns
-        self.inner[filter_columns_index]
-    }
-}
-
 /// Encapsulates the test parameters for a single benchmark
 struct ReadTest {
     /// Human identifiable name
@@ -706,10 +646,8 @@ struct ReadTest {
     arrow_reader_metadata: ArrowReaderMetadata,
     /// Which columns in the file should be projected (decoded after filter)?
     projection_mask: ProjectionMask,
-    /// Which columns in the file should be passed to the filter?
-    filter_mask: ProjectionMask,
-    /// Mapping from column selected in filter mask to `Query::filter_columns`
-    filter_indices: FilterIndices,
+    /// Schema indices for each filter column (in filter_columns order)
+    filter_schema_indices: Vec<usize>,
     /// Predicates to apply
     predicates: Vec<ClickBenchPredicate>,
     /// How many rows are expected to pass the predicate?
@@ -744,16 +682,12 @@ impl ReadTest {
         };
 
         let filter_schema_indices = column_indices(schema_descr, 
&filter_columns);
-        let filter_mask =
-            ProjectionMask::leaves(schema_descr, 
filter_schema_indices.iter().cloned());
-        let filter_indices = FilterIndices::new(schema_descr, 
filter_schema_indices);
 
         Self {
             name,
             arrow_reader_metadata,
             projection_mask,
-            filter_mask,
-            filter_indices,
+            filter_schema_indices,
             predicates,
             expected_row_count,
         }
@@ -851,25 +785,26 @@ impl ReadTest {
 
     /// Return a `RowFilter` to apply to the reader.
     ///
-    /// Note that since `RowFilter` does not implement Clone, we need to create
-    /// the filter for each row
+    /// Each predicate gets a ProjectionMask containing only the single column
+    /// it needs, rather than all filter columns. This avoids decoding 
expensive
+    /// columns (e.g. strings) when evaluating cheap predicates (e.g. integer 
equality).
     fn row_filter(&self) -> RowFilter {
-        // Note: The predicates are in terms columns in the filter mask
-        // but the record batch passed back has columns in the order of the 
file
-        // schema
+        let schema_descr = self
+            .arrow_reader_metadata
+            .metadata()
+            .file_metadata()
+            .schema_descr();
 
-        // Convert the predicates to ArrowPredicateFn to conform to the 
RowFilter API
         let arrow_predicates: Vec<_> = self
             .predicates
             .iter()
             .map(|pred| {
-                let orig_column_index = pred.column_index();
-                let column_index = 
self.filter_indices.map_column(orig_column_index);
+                let schema_index = 
self.filter_schema_indices[pred.column_index()];
+                let predicate_mask = ProjectionMask::leaves(schema_descr, 
[schema_index]);
                 let mut predicate_fn = pred.predicate_fn();
-                Box::new(ArrowPredicateFn::new(
-                    self.filter_mask.clone(),
-                    move |batch| (predicate_fn)(batch.column(column_index)),
-                )) as Box<dyn ArrowPredicate>
+                Box::new(ArrowPredicateFn::new(predicate_mask, move |batch| {
+                    (predicate_fn)(batch.column(0))
+                })) as Box<dyn ArrowPredicate>
             })
             .collect();
 

Reply via email to