This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 06f7f07e8a refactor `test_cache_projection_excludes_nested_columns` to
use high level APIs (#8754)
06f7f07e8a is described below
commit 06f7f07e8aa0a04016af854f06c67397c9f2880f
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Oct 31 17:24:34 2025 -0400
refactor `test_cache_projection_excludes_nested_columns` to use high level
APIs (#8754)
# Which issue does this PR close?
- Related to https://github.com/apache/arrow-rs/issues/8677
- part of https://github.com/apache/arrow-rs/pull/8159
# Rationale for this change
I am reworking how the parquet decoder's state machine works in
https://github.com/apache/arrow-rs/pull/8159
One of the unit tests, `test_cache_projection_excludes_nested_columns`
uses non-public APIs that I am changing
Rather than rewrite them into other non public APIs I think it would be
better if this test is in terms of public APIs
# What changes are included in this PR?
1. refactor `test_cache_projection_excludes_nested_columns` to use high
level APIs
# Are these changes tested?
They are run in CI
I also verified this test covers the intended functionality by
commenting it out:
```diff
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -724,7 +724,9 @@ where
cache_projection.union(predicate.projection());
}
cache_projection.intersect(projection);
- self.exclude_nested_columns_from_cache(&cache_projection)
+ // TEMP don't exclude nested columns
+ //self.exclude_nested_columns_from_cache(&cache_projection)
+ Some(cache_projection)
}
/// Exclude leaves belonging to roots that span multiple parquet
leaves (i.e. nested columns)
```
And then running the test:
```shell
cargo test --all-features --test arrow_reader
```
And the test fails (as expected)
```
---- predicate_cache::test_cache_projection_excludes_nested_columns stdout
----
thread 'predicate_cache::test_cache_projection_excludes_nested_columns'
panicked at parquet/tests/arrow_reader/predicate_cache.rs:244:9:
assertion `left == right` failed: Expected 0 records read from cache, but
got 100
left: 100
right: 0
note: run with `RUST_BACKTRACE=1` environment variable to display a
backtrace
failures:
predicate_cache::test_cache_projection_excludes_nested_columns
test result: FAILED. 88 passed; 1 failed; 1 ignored; 0 measured; 0 filtered
out; finished in 0.20s
```
# Are there any user-facing changes?
No, this is only test changes
---
parquet/src/arrow/async_reader/mod.rs | 71 ----------------------
parquet/tests/arrow_reader/predicate_cache.rs | 85 ++++++++++++++++++++++++++-
2 files changed, 84 insertions(+), 72 deletions(-)
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index 9b81e8e569..c5badea7f3 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -2181,77 +2181,6 @@ mod tests {
assert_eq!(requests.lock().unwrap().len(), 3);
}
- #[tokio::test]
- async fn test_cache_projection_excludes_nested_columns() {
- use arrow_array::{ArrayRef, StringArray};
-
- // Build a simple RecordBatch with a primitive column `a` and a nested
struct column `b { aa, bb }`
- let a = StringArray::from_iter_values(["r1", "r2"]);
- let b = StructArray::from(vec![
- (
- Arc::new(Field::new("aa", DataType::Utf8, true)),
- Arc::new(StringArray::from_iter_values(["v1", "v2"])) as
ArrayRef,
- ),
- (
- Arc::new(Field::new("bb", DataType::Utf8, true)),
- Arc::new(StringArray::from_iter_values(["w1", "w2"])) as
ArrayRef,
- ),
- ]);
-
- let schema = Arc::new(Schema::new(vec![
- Field::new("a", DataType::Utf8, true),
- Field::new("b", b.data_type().clone(), true),
- ]));
-
- let mut buf = Vec::new();
- let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
- let batch = RecordBatch::try_from_iter([
- ("a", Arc::new(a) as ArrayRef),
- ("b", Arc::new(b) as ArrayRef),
- ])
- .unwrap();
- writer.write(&batch).unwrap();
- writer.close().unwrap();
-
- // Load Parquet metadata
- let data: Bytes = buf.into();
- let metadata = ParquetMetaDataReader::new()
- .parse_and_finish(&data)
- .unwrap();
- let metadata = Arc::new(metadata);
-
- // Build a RowFilter whose predicate projects a leaf under the nested
root `b`
- // Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick
index 1 (b.aa)
- let parquet_schema = metadata.file_metadata().schema_descr();
- let nested_leaf_mask = ProjectionMask::leaves(parquet_schema, vec![1]);
-
- let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(),
|batch: RecordBatch| {
- Ok(arrow_array::BooleanArray::from(vec![
- true;
- batch.num_rows()
- ]))
- });
- let filter = RowFilter::new(vec![Box::new(always_true)]);
-
- // Construct a ReaderFactory and compute cache projection
- let reader_factory = ReaderFactory {
- metadata: Arc::clone(&metadata),
- fields: None,
- input: TestReader::new(data),
- filter: Some(filter),
- limit: None,
- offset: None,
- metrics: ArrowReaderMetrics::disabled(),
- max_predicate_cache_size: 0,
- };
-
- // Provide an output projection that also selects the same nested leaf
- let cache_projection =
reader_factory.compute_cache_projection(&nested_leaf_mask);
-
- // Expect None since nested columns should be excluded from cache
projection
- assert!(cache_projection.is_none());
- }
-
#[tokio::test]
#[allow(deprecated)]
async fn empty_offset_index_doesnt_panic_in_read_row_group() {
diff --git a/parquet/tests/arrow_reader/predicate_cache.rs
b/parquet/tests/arrow_reader/predicate_cache.rs
index ebcec2a44a..b2ad36b421 100644
--- a/parquet/tests/arrow_reader/predicate_cache.rs
+++ b/parquet/tests/arrow_reader/predicate_cache.rs
@@ -23,7 +23,8 @@ use arrow::compute::and;
use arrow::compute::kernels::cmp::{gt, lt};
use arrow_array::cast::AsArray;
use arrow_array::types::Int64Type;
-use arrow_array::{RecordBatch, StringViewArray};
+use arrow_array::{RecordBatch, StringArray, StringViewArray, StructArray};
+use arrow_schema::{DataType, Field};
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt};
@@ -80,6 +81,19 @@ async fn test_cache_disabled_with_filters() {
test.run_async(async_builder).await;
}
+#[tokio::test]
+async fn test_cache_projection_excludes_nested_columns() {
+ let test =
ParquetPredicateCacheTest::new_nested().with_expected_records_read_from_cache(0);
+
+ let sync_builder = test.sync_builder();
+ let sync_builder = test.add_nested_filter(sync_builder);
+ test.run_sync(sync_builder);
+
+ let async_builder = test.async_builder().await;
+ let async_builder = test.add_nested_filter(async_builder);
+ test.run_async(async_builder).await;
+}
+
// -- Begin test infrastructure --
/// A test parquet file
@@ -104,6 +118,18 @@ impl ParquetPredicateCacheTest {
}
}
+ /// Create a new `TestParquetFile` with
+ /// 2 columns:
+ ///
+ /// * string column `a`
+ /// * nested struct column `b { aa, bb }`
+ fn new_nested() -> Self {
+ Self {
+ bytes: NESTED_TEST_FILE_DATA.clone(),
+ expected_records_read_from_cache: 0,
+ }
+ }
+
/// Set the expected number of records read from the cache
fn with_expected_records_read_from_cache(
mut self,
@@ -154,6 +180,27 @@ impl ParquetPredicateCacheTest {
.with_row_filter(RowFilter::new(vec![Box::new(row_filter)]))
}
+ /// Add a filter on the nested leaf nodes
+ fn add_nested_filter<T>(&self, builder: ArrowReaderBuilder<T>) ->
ArrowReaderBuilder<T> {
+ let schema_descr =
builder.metadata().file_metadata().schema_descr_ptr();
+
+ // Build a RowFilter whose predicate projects a leaf under the nested
root `b`
+ // Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick
index 1 (b.aa)
+ let nested_leaf_mask = ProjectionMask::leaves(&schema_descr, vec![1]);
+
+ let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(),
|batch: RecordBatch| {
+ Ok(arrow_array::BooleanArray::from(vec![
+ true;
+ batch.num_rows()
+ ]))
+ });
+ let row_filter = RowFilter::new(vec![Box::new(always_true)]);
+
+ builder
+ .with_projection(nested_leaf_mask)
+ .with_row_filter(row_filter)
+ }
+
/// Build the reader from the specified builder, reading all batches from
it,
/// and asserts the
fn run_sync(&self, builder: ParquetRecordBatchReaderBuilder<Bytes>) {
@@ -239,6 +286,42 @@ static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
Bytes::from(output)
});
+/// Build a ParquetFile with a
+///
+/// * string column `a`
+/// * nested struct column `b { aa, bb }`
+static NESTED_TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
+ const NUM_ROWS: usize = 100;
+ let a: StringArray = (0..NUM_ROWS).map(|i|
Some(format!("r{i}"))).collect();
+
+ let aa: StringArray = (0..NUM_ROWS).map(|i|
Some(format!("v{i}"))).collect();
+ let bb: StringArray = (0..NUM_ROWS).map(|i|
Some(format!("w{i}"))).collect();
+ let b = StructArray::from(vec![
+ (
+ Arc::new(Field::new("aa", DataType::Utf8, true)),
+ Arc::new(aa) as ArrayRef,
+ ),
+ (
+ Arc::new(Field::new("bb", DataType::Utf8, true)),
+ Arc::new(bb) as ArrayRef,
+ ),
+ ]);
+
+ let input_batch = RecordBatch::try_from_iter([
+ ("a", Arc::new(a) as ArrayRef),
+ ("b", Arc::new(b) as ArrayRef),
+ ])
+ .unwrap();
+
+ let mut output = Vec::new();
+ let writer_options = None;
+ let mut writer =
+ ArrowWriter::try_new(&mut output, input_batch.schema(),
writer_options).unwrap();
+ writer.write(&input_batch).unwrap();
+ writer.close().unwrap();
+ Bytes::from(output)
+});
+
/// Copy paste version of the `AsyncFileReader` trait for testing purposes 🤮
/// TODO put this in a common place
#[derive(Clone)]