This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 84a7e3554e [Parquet] Avoid fetching multiple pages when the predicate 
cache is disabled (#8554)
84a7e3554e is described below

commit 84a7e3554e8780caaf6dc50221eae7ba0deebb7e
Author: Nuno Faria <[email protected]>
AuthorDate: Tue Oct 7 19:25:06 2025 +0100

    [Parquet] Avoid fetching multiple pages when the predicate cache is 
disabled (#8554)
    
    # Which issue does this PR close?
    
    - Closes #8542.
    
    # Rationale for this change
    
    When the `max_predicate_cache_size` is set to 0 there is no need to
    select multiple data pages until `batch_size` is reached.
    
    # What changes are included in this PR?
    
    - Make `ReaderFactory::compute_cache_projection` return `None` if the
    cache is disabled, which will end up not retrieving multiple pages
    unnecessarily.
    - Added a unit test to confirm the new behavior.
    
    # Are these changes tested?
    
    Yes.
    
    # Are there any user-facing changes?
    No.
---
 parquet/src/arrow/async_reader/mod.rs | 97 +++++++++++++++++++++++++++++++++++
 1 file changed, 97 insertions(+)

diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index 8d95c9a205..4b16116369 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -715,6 +715,11 @@ where
 
     /// Compute which columns are used in filters and the final (output) 
projection
     fn compute_cache_projection(&self, projection: &ProjectionMask) -> 
Option<ProjectionMask> {
+        // Do not compute the projection mask if the predicate cache is 
disabled
+        if self.max_predicate_cache_size == 0 {
+            return None;
+        }
+
         let filters = self.filter.as_ref()?;
         let mut cache_projection = 
filters.predicates.first()?.projection().clone();
         for predicate in filters.predicates.iter() {
@@ -2611,4 +2616,96 @@ mod tests {
         // error we want to reproduce.
         let _result: Vec<_> = stream.try_collect().await.unwrap();
     }
+
+    #[tokio::test]
+    async fn test_predicate_cache_disabled() {
+        let k = Int32Array::from_iter_values(0..10);
+        let data = RecordBatch::try_from_iter([("k", Arc::new(k) as 
ArrayRef)]).unwrap();
+
+        let mut buf = Vec::new();
+        // both the page row limit and batch size are set to 1 to create one 
page per row
+        let props = WriterProperties::builder()
+            .set_data_page_row_count_limit(1)
+            .set_write_batch_size(1)
+            .set_max_row_group_size(10)
+            .set_write_page_header_statistics(true)
+            .build();
+        let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), 
Some(props)).unwrap();
+        writer.write(&data).unwrap();
+        writer.close().unwrap();
+
+        let data = Bytes::from(buf);
+        let metadata = ParquetMetaDataReader::new()
+            .with_page_index_policy(PageIndexPolicy::Required)
+            .parse_and_finish(&data)
+            .unwrap();
+        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
+
+        // the filter is not clone-able, so we use a lambda to simplify
+        let build_filter = || {
+            let scalar = Int32Array::from_iter_values([5]);
+            let predicate = ArrowPredicateFn::new(
+                ProjectionMask::leaves(&parquet_schema, vec![0]),
+                move |batch| eq(batch.column(0), &Scalar::new(&scalar)),
+            );
+            RowFilter::new(vec![Box::new(predicate)])
+        };
+
+        // select only one of the pages
+        let selection = RowSelection::from(vec![RowSelector::skip(5), 
RowSelector::select(1)]);
+
+        let options = 
ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
+        let reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), 
options).unwrap();
+
+        // using the predicate cache (default)
+        let reader_with_cache = TestReader::new(data.clone());
+        let requests_with_cache = reader_with_cache.requests.clone();
+        let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
+            reader_with_cache,
+            reader_metadata.clone(),
+        )
+        .with_batch_size(1000)
+        .with_row_selection(selection.clone())
+        .with_row_filter(build_filter())
+        .build()
+        .unwrap();
+        let batches_with_cache: Vec<_> = stream.try_collect().await.unwrap();
+
+        // disabling the predicate cache
+        let reader_without_cache = TestReader::new(data);
+        let requests_without_cache = reader_without_cache.requests.clone();
+        let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
+            reader_without_cache,
+            reader_metadata,
+        )
+        .with_batch_size(1000)
+        .with_row_selection(selection)
+        .with_row_filter(build_filter())
+        .with_max_predicate_cache_size(0) // disabling it by setting the limit 
to 0
+        .build()
+        .unwrap();
+        let batches_without_cache: Vec<_> = 
stream.try_collect().await.unwrap();
+
+        assert_eq!(batches_with_cache, batches_without_cache);
+
+        let requests_with_cache = requests_with_cache.lock().unwrap();
+        let requests_without_cache = requests_without_cache.lock().unwrap();
+
+        // less requests will be made without the predicate cache
+        assert_eq!(requests_with_cache.len(), 11);
+        assert_eq!(requests_without_cache.len(), 2);
+
+        // less bytes will be retrieved without the predicate cache
+        assert_eq!(
+            requests_with_cache.iter().map(|r| r.len()).sum::<usize>(),
+            433
+        );
+        assert_eq!(
+            requests_without_cache
+                .iter()
+                .map(|r| r.len())
+                .sum::<usize>(),
+            92
+        );
+    }
 }

Reply via email to