This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 84a7e3554e [Parquet] Avoid fetching multiple pages when the predicate
cache is disabled (#8554)
84a7e3554e is described below
commit 84a7e3554e8780caaf6dc50221eae7ba0deebb7e
Author: Nuno Faria <[email protected]>
AuthorDate: Tue Oct 7 19:25:06 2025 +0100
[Parquet] Avoid fetching multiple pages when the predicate cache is
disabled (#8554)
# Which issue does this PR close?
- Closes #8542.
# Rationale for this change
When the `max_predicate_cache_size` is set to 0 there is no need to
select multiple data pages until `batch_size` is reached.
# What changes are included in this PR?
- Make `ReaderFactory::compute_cache_projection` return `None` if the
cache is disabled, which will end up not retrieving multiple pages
unnecessarily.
- Added a unit test to confirm the new behavior.
# Are these changes tested?
Yes.
# Are there any user-facing changes?
No.
---
parquet/src/arrow/async_reader/mod.rs | 97 +++++++++++++++++++++++++++++++++++
1 file changed, 97 insertions(+)
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index 8d95c9a205..4b16116369 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -715,6 +715,11 @@ where
/// Compute which columns are used in filters and the final (output)
projection
fn compute_cache_projection(&self, projection: &ProjectionMask) ->
Option<ProjectionMask> {
+ // Do not compute the projection mask if the predicate cache is
disabled
+ if self.max_predicate_cache_size == 0 {
+ return None;
+ }
+
let filters = self.filter.as_ref()?;
let mut cache_projection =
filters.predicates.first()?.projection().clone();
for predicate in filters.predicates.iter() {
@@ -2611,4 +2616,96 @@ mod tests {
// error we want to reproduce.
let _result: Vec<_> = stream.try_collect().await.unwrap();
}
+
+ #[tokio::test]
+ async fn test_predicate_cache_disabled() {
+ let k = Int32Array::from_iter_values(0..10);
+ let data = RecordBatch::try_from_iter([("k", Arc::new(k) as
ArrayRef)]).unwrap();
+
+ let mut buf = Vec::new();
+ // both the page row limit and batch size are set to 1 to create one
page per row
+ let props = WriterProperties::builder()
+ .set_data_page_row_count_limit(1)
+ .set_write_batch_size(1)
+ .set_max_row_group_size(10)
+ .set_write_page_header_statistics(true)
+ .build();
+ let mut writer = ArrowWriter::try_new(&mut buf, data.schema(),
Some(props)).unwrap();
+ writer.write(&data).unwrap();
+ writer.close().unwrap();
+
+ let data = Bytes::from(buf);
+ let metadata = ParquetMetaDataReader::new()
+ .with_page_index_policy(PageIndexPolicy::Required)
+ .parse_and_finish(&data)
+ .unwrap();
+ let parquet_schema = metadata.file_metadata().schema_descr_ptr();
+
+ // the filter is not clone-able, so we use a lambda to simplify
+ let build_filter = || {
+ let scalar = Int32Array::from_iter_values([5]);
+ let predicate = ArrowPredicateFn::new(
+ ProjectionMask::leaves(&parquet_schema, vec![0]),
+ move |batch| eq(batch.column(0), &Scalar::new(&scalar)),
+ );
+ RowFilter::new(vec![Box::new(predicate)])
+ };
+
+ // select only one of the pages
+ let selection = RowSelection::from(vec![RowSelector::skip(5),
RowSelector::select(1)]);
+
+ let options =
ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
+ let reader_metadata = ArrowReaderMetadata::try_new(metadata.into(),
options).unwrap();
+
+ // using the predicate cache (default)
+ let reader_with_cache = TestReader::new(data.clone());
+ let requests_with_cache = reader_with_cache.requests.clone();
+ let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
+ reader_with_cache,
+ reader_metadata.clone(),
+ )
+ .with_batch_size(1000)
+ .with_row_selection(selection.clone())
+ .with_row_filter(build_filter())
+ .build()
+ .unwrap();
+ let batches_with_cache: Vec<_> = stream.try_collect().await.unwrap();
+
+ // disabling the predicate cache
+ let reader_without_cache = TestReader::new(data);
+ let requests_without_cache = reader_without_cache.requests.clone();
+ let stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
+ reader_without_cache,
+ reader_metadata,
+ )
+ .with_batch_size(1000)
+ .with_row_selection(selection)
+ .with_row_filter(build_filter())
+ .with_max_predicate_cache_size(0) // disabling it by setting the limit
to 0
+ .build()
+ .unwrap();
+ let batches_without_cache: Vec<_> =
stream.try_collect().await.unwrap();
+
+ assert_eq!(batches_with_cache, batches_without_cache);
+
+ let requests_with_cache = requests_with_cache.lock().unwrap();
+ let requests_without_cache = requests_without_cache.lock().unwrap();
+
+ // less requests will be made without the predicate cache
+ assert_eq!(requests_with_cache.len(), 11);
+ assert_eq!(requests_without_cache.len(), 2);
+
+ // less bytes will be retrieved without the predicate cache
+ assert_eq!(
+ requests_with_cache.iter().map(|r| r.len()).sum::<usize>(),
+ 433
+ );
+ assert_eq!(
+ requests_without_cache
+ .iter()
+ .map(|r| r.len())
+ .sum::<usize>(),
+ 92
+ );
+ }
}