This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 4d6b93a123 Clean up predicate_cache tests (#8755)
4d6b93a123 is described below
commit 4d6b93a1232ec1c7aeecf727b48bc49838c7602f
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Nov 10 14:44:46 2025 -0500
Clean up predicate_cache tests (#8755)
# Which issue does this PR close?
- Follow on to https://github.com/apache/arrow-rs/pull/8754
# Rationale for this change
While working on https://github.com/apache/arrow-rs/pull/8754 I found
the current formulation a bit akward so let's fix that
# What changes are included in this PR?
Move the builder configuration to a trait so the tests read more
fluently.
This is totally unecessary, it just makes me feel better about the tests
# Are these changes tested?
Yes by CI
# Are there any user-facing changes?
No this is test only
---
parquet/tests/arrow_reader/predicate_cache.rs | 122 +++++++++++++-------------
1 file changed, 63 insertions(+), 59 deletions(-)
diff --git a/parquet/tests/arrow_reader/predicate_cache.rs
b/parquet/tests/arrow_reader/predicate_cache.rs
index b2ad36b421..b419c37158 100644
--- a/parquet/tests/arrow_reader/predicate_cache.rs
+++ b/parquet/tests/arrow_reader/predicate_cache.rs
@@ -52,8 +52,7 @@ async fn test_default_read() {
#[tokio::test]
async fn test_async_cache_with_filters() {
let test =
ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(49);
- let async_builder = test.async_builder().await;
- let async_builder = test.add_project_ab_and_filter_b(async_builder);
+ let async_builder =
test.async_builder().await.add_project_ab_and_filter_b();
test.run_async(async_builder).await;
}
@@ -63,8 +62,7 @@ async fn test_sync_cache_with_filters() {
// The sync reader does not use the cache. See
https://github.com/apache/arrow-rs/issues/8000
.with_expected_records_read_from_cache(0);
- let sync_builder = test.sync_builder();
- let sync_builder = test.add_project_ab_and_filter_b(sync_builder);
+ let sync_builder = test.sync_builder().add_project_ab_and_filter_b();
test.run_sync(sync_builder);
}
@@ -72,12 +70,17 @@ async fn test_sync_cache_with_filters() {
async fn test_cache_disabled_with_filters() {
// expect no records to be read from cache, because the cache is disabled
let test =
ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(0);
- let sync_builder = test.sync_builder().with_max_predicate_cache_size(0);
- let sync_builder = test.add_project_ab_and_filter_b(sync_builder);
+ let sync_builder = test
+ .sync_builder()
+ .with_max_predicate_cache_size(0)
+ .add_project_ab_and_filter_b();
test.run_sync(sync_builder);
- let async_builder =
test.async_builder().await.with_max_predicate_cache_size(0);
- let async_builder = test.add_project_ab_and_filter_b(async_builder);
+ let async_builder = test
+ .async_builder()
+ .await
+ .with_max_predicate_cache_size(0)
+ .add_project_ab_and_filter_b();
test.run_async(async_builder).await;
}
@@ -85,12 +88,10 @@ async fn test_cache_disabled_with_filters() {
async fn test_cache_projection_excludes_nested_columns() {
let test =
ParquetPredicateCacheTest::new_nested().with_expected_records_read_from_cache(0);
- let sync_builder = test.sync_builder();
- let sync_builder = test.add_nested_filter(sync_builder);
+ let sync_builder = test.sync_builder().add_nested_filter();
test.run_sync(sync_builder);
- let async_builder = test.async_builder().await;
- let async_builder = test.add_nested_filter(async_builder);
+ let async_builder = test.async_builder().await.add_nested_filter();
test.run_async(async_builder).await;
}
@@ -154,53 +155,6 @@ impl ParquetPredicateCacheTest {
.unwrap()
}
- /// Return a [`ParquetRecordBatchReaderBuilder`] for reading the file with
- ///
- /// 1. a projection selecting the "a" and "b" column
- /// 2. a row_filter applied to "b": 575 < "b" < 625 (select 1 data page
from each row group)
- fn add_project_ab_and_filter_b<T>(
- &self,
- builder: ArrowReaderBuilder<T>,
- ) -> ArrowReaderBuilder<T> {
- let schema_descr =
builder.metadata().file_metadata().schema_descr_ptr();
-
- // "b" > 575 and "b" < 625
- let row_filter = ArrowPredicateFn::new(
- ProjectionMask::columns(&schema_descr, ["b"]),
- |batch: RecordBatch| {
- let scalar_575 = Int64Array::new_scalar(575);
- let scalar_625 = Int64Array::new_scalar(625);
- let column = batch.column(0).as_primitive::<Int64Type>();
- and(>(column, &scalar_575)?, <(column, &scalar_625)?)
- },
- );
-
- builder
- .with_projection(ProjectionMask::columns(&schema_descr, ["a",
"b"]))
- .with_row_filter(RowFilter::new(vec![Box::new(row_filter)]))
- }
-
- /// Add a filter on the nested leaf nodes
- fn add_nested_filter<T>(&self, builder: ArrowReaderBuilder<T>) ->
ArrowReaderBuilder<T> {
- let schema_descr =
builder.metadata().file_metadata().schema_descr_ptr();
-
- // Build a RowFilter whose predicate projects a leaf under the nested
root `b`
- // Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick
index 1 (b.aa)
- let nested_leaf_mask = ProjectionMask::leaves(&schema_descr, vec![1]);
-
- let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(),
|batch: RecordBatch| {
- Ok(arrow_array::BooleanArray::from(vec![
- true;
- batch.num_rows()
- ]))
- });
- let row_filter = RowFilter::new(vec![Box::new(always_true)]);
-
- builder
- .with_projection(nested_leaf_mask)
- .with_row_filter(row_filter)
- }
-
/// Build the reader from the specified builder, reading all batches from
it,
/// and asserts the
fn run_sync(&self, builder: ParquetRecordBatchReaderBuilder<Bytes>) {
@@ -322,6 +276,56 @@ static NESTED_TEST_FILE_DATA: LazyLock<Bytes> =
LazyLock::new(|| {
Bytes::from(output)
});
+trait ArrowReaderBuilderExt {
+ /// Applies the following:
+ /// 1. a projection selecting the "a" and "b" column
+ /// 2. a row_filter applied to "b": 575 < "b" < 625 (select 1 data page
from each row group)
+ fn add_project_ab_and_filter_b(self) -> Self;
+
+ /// Adds a row filter that projects the nested leaf column "b.aa" and
+ /// returns true for all rows.
+ fn add_nested_filter(self) -> Self;
+}
+
+impl<T> ArrowReaderBuilderExt for ArrowReaderBuilder<T> {
+ fn add_project_ab_and_filter_b(self) -> Self {
+ let schema_descr = self.metadata().file_metadata().schema_descr_ptr();
+
+ // "b" > 575 and "b" < 625
+ let row_filter = ArrowPredicateFn::new(
+ ProjectionMask::columns(&schema_descr, ["b"]),
+ |batch: RecordBatch| {
+ let scalar_575 = Int64Array::new_scalar(575);
+ let scalar_625 = Int64Array::new_scalar(625);
+ let column = batch.column(0).as_primitive::<Int64Type>();
+ and(>(column, &scalar_575)?, <(column, &scalar_625)?)
+ },
+ );
+
+ self.with_projection(ProjectionMask::columns(&schema_descr, ["a",
"b"]))
+ .with_row_filter(RowFilter::new(vec![Box::new(row_filter)]))
+ }
+
+ fn add_nested_filter(self) -> Self {
+ let schema_descr = self.metadata().file_metadata().schema_descr_ptr();
+
+ // Build a RowFilter whose predicate projects a leaf under the nested
root `b`
+ // Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick
index 1 (b.aa)
+ let nested_leaf_mask = ProjectionMask::leaves(&schema_descr, vec![1]);
+
+ let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(),
|batch: RecordBatch| {
+ Ok(arrow_array::BooleanArray::from(vec![
+ true;
+ batch.num_rows()
+ ]))
+ });
+ let row_filter = RowFilter::new(vec![Box::new(always_true)]);
+
+ self.with_projection(nested_leaf_mask)
+ .with_row_filter(row_filter)
+ }
+}
+
/// Copy paste version of the `AsyncFileReader` trait for testing purposes 🤮
/// TODO put this in a common place
#[derive(Clone)]