This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 06f7f07e8a refactor `test_cache_projection_excludes_nested_columns` to 
use high level APIs (#8754)
06f7f07e8a is described below

commit 06f7f07e8aa0a04016af854f06c67397c9f2880f
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Oct 31 17:24:34 2025 -0400

    refactor `test_cache_projection_excludes_nested_columns` to use high level 
APIs (#8754)
    
    # Which issue does this PR close?
    
    - Related to https://github.com/apache/arrow-rs/issues/8677
    - part of https://github.com/apache/arrow-rs/pull/8159
    
    
    # Rationale for this change
    
    I am reworking how the parquet decoder's state machine works in
    https://github.com/apache/arrow-rs/pull/8159
    
    One of the unit tests, `test_cache_projection_excludes_nested_columns`
    uses non-public APIs that I am changing
    
    Rather than rewrite them into other non public APIs I think it would be
    better if this test is in terms of public APIs
    
    # What changes are included in this PR?
    1. refactor `test_cache_projection_excludes_nested_columns` to use high
    level APIs
    
    # Are these changes tested?
    
    They are run in CI
    
    I also verified this test covers the intended functionality by
    commenting it out:
    
    ```diff
    --- a/parquet/src/arrow/async_reader/mod.rs
    +++ b/parquet/src/arrow/async_reader/mod.rs
    @@ -724,7 +724,9 @@ where
                 cache_projection.union(predicate.projection());
             }
             cache_projection.intersect(projection);
    -        self.exclude_nested_columns_from_cache(&cache_projection)
    +        // TEMP don't exclude nested columns
    +        //self.exclude_nested_columns_from_cache(&cache_projection)
    +        Some(cache_projection)
         }
    
         /// Exclude leaves belonging to roots that span multiple parquet 
leaves (i.e. nested columns)
    ```
    
    And then running the test:
    ```shell
    cargo test --all-features --test arrow_reader
    ```
    
    And the test fails (as expected)
    ```
    ---- predicate_cache::test_cache_projection_excludes_nested_columns stdout 
----
    
    thread 'predicate_cache::test_cache_projection_excludes_nested_columns' 
panicked at parquet/tests/arrow_reader/predicate_cache.rs:244:9:
    assertion `left == right` failed: Expected 0 records read from cache, but 
got 100
      left: 100
     right: 0
    note: run with `RUST_BACKTRACE=1` environment variable to display a 
backtrace
    
    
    failures:
        predicate_cache::test_cache_projection_excludes_nested_columns
    
    test result: FAILED. 88 passed; 1 failed; 1 ignored; 0 measured; 0 filtered 
out; finished in 0.20s
    ```
    # Are there any user-facing changes?
    
    No, this is only test changes
---
 parquet/src/arrow/async_reader/mod.rs         | 71 ----------------------
 parquet/tests/arrow_reader/predicate_cache.rs | 85 ++++++++++++++++++++++++++-
 2 files changed, 84 insertions(+), 72 deletions(-)

diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index 9b81e8e569..c5badea7f3 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -2181,77 +2181,6 @@ mod tests {
         assert_eq!(requests.lock().unwrap().len(), 3);
     }
 
-    #[tokio::test]
-    async fn test_cache_projection_excludes_nested_columns() {
-        use arrow_array::{ArrayRef, StringArray};
-
-        // Build a simple RecordBatch with a primitive column `a` and a nested 
struct column `b { aa, bb }`
-        let a = StringArray::from_iter_values(["r1", "r2"]);
-        let b = StructArray::from(vec![
-            (
-                Arc::new(Field::new("aa", DataType::Utf8, true)),
-                Arc::new(StringArray::from_iter_values(["v1", "v2"])) as 
ArrayRef,
-            ),
-            (
-                Arc::new(Field::new("bb", DataType::Utf8, true)),
-                Arc::new(StringArray::from_iter_values(["w1", "w2"])) as 
ArrayRef,
-            ),
-        ]);
-
-        let schema = Arc::new(Schema::new(vec![
-            Field::new("a", DataType::Utf8, true),
-            Field::new("b", b.data_type().clone(), true),
-        ]));
-
-        let mut buf = Vec::new();
-        let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
-        let batch = RecordBatch::try_from_iter([
-            ("a", Arc::new(a) as ArrayRef),
-            ("b", Arc::new(b) as ArrayRef),
-        ])
-        .unwrap();
-        writer.write(&batch).unwrap();
-        writer.close().unwrap();
-
-        // Load Parquet metadata
-        let data: Bytes = buf.into();
-        let metadata = ParquetMetaDataReader::new()
-            .parse_and_finish(&data)
-            .unwrap();
-        let metadata = Arc::new(metadata);
-
-        // Build a RowFilter whose predicate projects a leaf under the nested 
root `b`
-        // Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick 
index 1 (b.aa)
-        let parquet_schema = metadata.file_metadata().schema_descr();
-        let nested_leaf_mask = ProjectionMask::leaves(parquet_schema, vec![1]);
-
-        let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), 
|batch: RecordBatch| {
-            Ok(arrow_array::BooleanArray::from(vec![
-                true;
-                batch.num_rows()
-            ]))
-        });
-        let filter = RowFilter::new(vec![Box::new(always_true)]);
-
-        // Construct a ReaderFactory and compute cache projection
-        let reader_factory = ReaderFactory {
-            metadata: Arc::clone(&metadata),
-            fields: None,
-            input: TestReader::new(data),
-            filter: Some(filter),
-            limit: None,
-            offset: None,
-            metrics: ArrowReaderMetrics::disabled(),
-            max_predicate_cache_size: 0,
-        };
-
-        // Provide an output projection that also selects the same nested leaf
-        let cache_projection = 
reader_factory.compute_cache_projection(&nested_leaf_mask);
-
-        // Expect None since nested columns should be excluded from cache 
projection
-        assert!(cache_projection.is_none());
-    }
-
     #[tokio::test]
     #[allow(deprecated)]
     async fn empty_offset_index_doesnt_panic_in_read_row_group() {
diff --git a/parquet/tests/arrow_reader/predicate_cache.rs 
b/parquet/tests/arrow_reader/predicate_cache.rs
index ebcec2a44a..b2ad36b421 100644
--- a/parquet/tests/arrow_reader/predicate_cache.rs
+++ b/parquet/tests/arrow_reader/predicate_cache.rs
@@ -23,7 +23,8 @@ use arrow::compute::and;
 use arrow::compute::kernels::cmp::{gt, lt};
 use arrow_array::cast::AsArray;
 use arrow_array::types::Int64Type;
-use arrow_array::{RecordBatch, StringViewArray};
+use arrow_array::{RecordBatch, StringArray, StringViewArray, StructArray};
+use arrow_schema::{DataType, Field};
 use bytes::Bytes;
 use futures::future::BoxFuture;
 use futures::{FutureExt, StreamExt};
@@ -80,6 +81,19 @@ async fn test_cache_disabled_with_filters() {
     test.run_async(async_builder).await;
 }
 
+#[tokio::test]
+async fn test_cache_projection_excludes_nested_columns() {
+    let test = 
ParquetPredicateCacheTest::new_nested().with_expected_records_read_from_cache(0);
+
+    let sync_builder = test.sync_builder();
+    let sync_builder = test.add_nested_filter(sync_builder);
+    test.run_sync(sync_builder);
+
+    let async_builder = test.async_builder().await;
+    let async_builder = test.add_nested_filter(async_builder);
+    test.run_async(async_builder).await;
+}
+
 // --  Begin test infrastructure --
 
 /// A test parquet file
@@ -104,6 +118,18 @@ impl ParquetPredicateCacheTest {
         }
     }
 
+    /// Create a new `TestParquetFile` with
+    /// 2 columns:
+    ///
+    /// * string column `a`
+    /// * nested struct column `b { aa, bb }`
+    fn new_nested() -> Self {
+        Self {
+            bytes: NESTED_TEST_FILE_DATA.clone(),
+            expected_records_read_from_cache: 0,
+        }
+    }
+
     /// Set the expected number of records read from the cache
     fn with_expected_records_read_from_cache(
         mut self,
@@ -154,6 +180,27 @@ impl ParquetPredicateCacheTest {
             .with_row_filter(RowFilter::new(vec![Box::new(row_filter)]))
     }
 
+    /// Add a filter on the nested leaf nodes
+    fn add_nested_filter<T>(&self, builder: ArrowReaderBuilder<T>) -> 
ArrowReaderBuilder<T> {
+        let schema_descr = 
builder.metadata().file_metadata().schema_descr_ptr();
+
+        // Build a RowFilter whose predicate projects a leaf under the nested 
root `b`
+        // Leaf indices are depth-first; with schema [a, b.aa, b.bb] we pick 
index 1 (b.aa)
+        let nested_leaf_mask = ProjectionMask::leaves(&schema_descr, vec![1]);
+
+        let always_true = ArrowPredicateFn::new(nested_leaf_mask.clone(), 
|batch: RecordBatch| {
+            Ok(arrow_array::BooleanArray::from(vec![
+                true;
+                batch.num_rows()
+            ]))
+        });
+        let row_filter = RowFilter::new(vec![Box::new(always_true)]);
+
+        builder
+            .with_projection(nested_leaf_mask)
+            .with_row_filter(row_filter)
+    }
+
     /// Build the reader from the specified builder, reading all batches from 
it,
     /// and asserts the
     fn run_sync(&self, builder: ParquetRecordBatchReaderBuilder<Bytes>) {
@@ -239,6 +286,42 @@ static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
     Bytes::from(output)
 });
 
+/// Build a ParquetFile with a
+///
+/// * string column `a`
+/// * nested struct column `b { aa, bb }`
+static NESTED_TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
+    const NUM_ROWS: usize = 100;
+    let a: StringArray = (0..NUM_ROWS).map(|i| 
Some(format!("r{i}"))).collect();
+
+    let aa: StringArray = (0..NUM_ROWS).map(|i| 
Some(format!("v{i}"))).collect();
+    let bb: StringArray = (0..NUM_ROWS).map(|i| 
Some(format!("w{i}"))).collect();
+    let b = StructArray::from(vec![
+        (
+            Arc::new(Field::new("aa", DataType::Utf8, true)),
+            Arc::new(aa) as ArrayRef,
+        ),
+        (
+            Arc::new(Field::new("bb", DataType::Utf8, true)),
+            Arc::new(bb) as ArrayRef,
+        ),
+    ]);
+
+    let input_batch = RecordBatch::try_from_iter([
+        ("a", Arc::new(a) as ArrayRef),
+        ("b", Arc::new(b) as ArrayRef),
+    ])
+    .unwrap();
+
+    let mut output = Vec::new();
+    let writer_options = None;
+    let mut writer =
+        ArrowWriter::try_new(&mut output, input_batch.schema(), 
writer_options).unwrap();
+    writer.write(&input_batch).unwrap();
+    writer.close().unwrap();
+    Bytes::from(output)
+});
+
 /// Copy paste version of the `AsyncFileReader` trait for testing purposes 🤮
 /// TODO put this in a common place
 #[derive(Clone)]

Reply via email to