This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c679ef8a7 fix(parquet): exclude single-leaf struct roots from 
predicate cache (#9983)
1c679ef8a7 is described below

commit 1c679ef8a7d0481596a29736a0bdea10f7eb4fed
Author: Sergei <[email protected]>
AuthorDate: Mon May 25 18:00:30 2026 +0700

    fix(parquet): exclude single-leaf struct roots from predicate cache (#9983)
    
    # Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax.
    -->
    
    - Closes #9982 .
    
    # Rationale for this change
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    # What changes are included in this PR?
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    ## Root cause
    
    `ProjectionMask::without_nested_types` (`parquet/src/arrow/mod.rs:427`)
    decides which leaves the predicate cache may cover. The check before
    this fix was:
    
    ```rust
    if root_leaf_counts[root_idx] == 1 && !root.is_list() {
        included_leaves.push(leaf_idx);
    }
    ```
    
    PR #8866 added `!root.is_list()` to exclude lists, but a **struct** root
    with a single leaf still satisfies the condition and gets cached.
    
    ## Fix (1 line)
    
    `parquet/src/arrow/mod.rs:455`:
    
    ```diff
    -                if root_leaf_counts[root_idx] == 1 && !root.is_list() {
    +                if root_leaf_counts[root_idx] == 1 && root.is_primitive() {
                         included_leaves.push(leaf_idx);
                     }
    ```
    
    # Are these changes tested?
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    
    If this PR claims a performance improvement, please include evidence
    such as benchmark results.
    -->
    ## Tests added on the branch
    
    ### 1. Reproducer integration test
    **File:** `parquet/tests/arrow_reader/predicate_cache.rs`
    **Name:** `test_async_predicate_on_single_leaf_nullable_struct`
    
    Builds an in-memory Parquet file with `OPTIONAL group b { REQUIRED
    BYTE_ARRAY aa (UTF8); }`, writes two rows (parent NULL, parent
    non-NULL), then runs the same `IS NULL` row filter through the async
    reader twice: once with the default cache, once with
    `with_max_predicate_cache_size(0)`. It asserts that
    
    - the uncached control yields exactly 1 row (`address` NULL row
    matches);
    - the cached run yields the same row count as the uncached one.
    
    **Pre-fix:** panic at `struct_array.rs:142`.
    **Post-fix:** passes (1 row in both cases).
    
    ### 2. Unit test
    **File:** `parquet/src/arrow/mod.rs` (test module)
    **Name:** `test_projection_mask_without_nested_single_leaf_struct`
    
    Directly checks `ProjectionMask::without_nested_types` against a schema
    with `OPTIONAL group address { REQUIRED BYTE_ARRAY street; } REQUIRED
    INT32 id`, for three input masks (single nested leaf, mixed, all
    leaves). All three expected outputs reflect that the struct's leaf is
    now considered nested.
    
    **Pre-fix:** would return `Some([street_leaf])` for the single-leaf-only
    mask.
    **Post-fix:** returns `None` for the single-leaf-only mask; returns
    `Some([id])` for mixed.
    
    ## Verification matrix
    
    | Test | Pre-fix | Post-fix |
    |---|---|---|
    | `test_projection_mask_without_nested_single_leaf_struct` (new unit) |
    would FAIL | PASS |
    | `test_async_predicate_on_single_leaf_nullable_struct` (new
    integration) | PANIC | PASS |
    | `predicate_cache::test_default_read` | PASS | PASS |
    | `predicate_cache::test_async_cache_with_filters` | PASS | PASS |
    | `predicate_cache::test_sync_cache_with_filters` | PASS | PASS |
    | `predicate_cache::test_cache_disabled_with_filters` | PASS | PASS |
    | `predicate_cache::test_cache_projection_excludes_nested_columns` |
    PASS | PASS |
    | `test_projection_mask_without_nested_*` (5 existing) | PASS | PASS |
    
    # Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    
    If there are any breaking changes to public APIs, please call them out.
    -->
---
 parquet/src/arrow/mod.rs                      | 49 ++++++++++++-----
 parquet/tests/arrow_reader/predicate_cache.rs | 78 +++++++++++++++++++++++++++
 2 files changed, 114 insertions(+), 13 deletions(-)

diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index 5215298816..14f7b9b6b4 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -435,24 +435,14 @@ impl ProjectionMask {
             root_leaf_counts[root_idx] += 1;
         }
 
-        // Keep only leaves whose root has exactly one leaf (non-nested) and 
is not a
-        // LIST. LIST is encoded as a wrapped logical type with a single leaf, 
e.g.
-        // 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
-        //
-        // ```text
-        // // List<String> (list non-null, elements nullable)
-        // required group my_list (LIST) {
-        //   repeated group list {
-        //     optional binary element (STRING);
-        //   }
-        // }
-        // ```
+        // Cache only top-level primitive columns.
+        // Even a one-leaf group is nested; caching it drops parent def levels.
         let mut included_leaves = Vec::new();
         for leaf_idx in 0..num_leaves {
             if self.leaf_included(leaf_idx) {
                 let root = schema.get_column_root(leaf_idx);
                 let root_idx = schema.get_column_root_idx(leaf_idx);
-                if root_leaf_counts[root_idx] == 1 && !root.is_list() {
+                if root_leaf_counts[root_idx] == 1 && root.is_primitive() {
                     included_leaves.push(leaf_idx);
                 }
             }
@@ -1042,6 +1032,39 @@ mod test {
         );
     }
 
+    #[test]
+    fn test_projection_mask_without_nested_single_leaf_struct() {
+        // Regression: a single-leaf struct is still nested.
+        let schema = parse_schema(
+            "
+            message test_schema {
+                OPTIONAL group address {
+                    REQUIRED BYTE_ARRAY street (UTF8);
+                }
+                REQUIRED INT32 id;
+            }
+            ",
+        );
+
+        // street -> empty; root is a struct
+        let mask = ProjectionMask::leaves(&schema, [0]);
+        assert_eq!(None, mask.without_nested_types(&schema));
+
+        // street, id --> id only
+        let mask = ProjectionMask::leaves(&schema, [0, 1]);
+        assert_eq!(
+            Some(ProjectionMask::leaves(&schema, [1])),
+            mask.without_nested_types(&schema)
+        );
+
+        // all --> id only
+        let mask = ProjectionMask::all();
+        assert_eq!(
+            Some(ProjectionMask::leaves(&schema, [1])),
+            mask.without_nested_types(&schema)
+        );
+    }
+
     /// Converts a schema string into a `SchemaDescriptor`
     fn parse_schema(schema: &str) -> SchemaDescriptor {
         let parquet_group_type = parse_message_type(schema).unwrap();
diff --git a/parquet/tests/arrow_reader/predicate_cache.rs 
b/parquet/tests/arrow_reader/predicate_cache.rs
index 85dba68c9c..4029b4e19e 100644
--- a/parquet/tests/arrow_reader/predicate_cache.rs
+++ b/parquet/tests/arrow_reader/predicate_cache.rs
@@ -18,6 +18,7 @@
 //! Test for predicate cache in Parquet Arrow reader
 
 use super::io::TestReader;
+use arrow::array::Array;
 use arrow::array::ArrayRef;
 use arrow::array::Int64Array;
 use arrow::compute::and;
@@ -92,6 +93,83 @@ async fn test_cache_projection_excludes_nested_columns() {
     test.run_async(async_builder).await;
 }
 
+/// Regression: cache must match no-cache for a nullable single-leaf struct.
+#[tokio::test]
+async fn test_async_predicate_on_single_leaf_nullable_struct() {
+    // Rows: b = NULL, then b.aa = "hello".
+    let aa: StringArray = StringArray::from(vec!["padding", "hello"]);
+    let nulls = arrow_buffer::NullBuffer::from(vec![false, true]);
+    let b = StructArray::new(
+        vec![Arc::new(Field::new("aa", DataType::Utf8, false))].into(),
+        vec![Arc::new(aa) as ArrayRef],
+        Some(nulls),
+    );
+    let input_batch = RecordBatch::try_from_iter([("b", Arc::new(b) as 
ArrayRef)]).unwrap();
+
+    let mut output = Vec::new();
+    let mut writer = ArrowWriter::try_new(&mut output, input_batch.schema(), 
None).unwrap();
+    writer.write(&input_batch).unwrap();
+    writer.close().unwrap();
+    let bytes = Bytes::from(output);
+
+    // Since `aa` is required, `b.aa IS NULL` means `b` is NULL.
+    let build_is_null_filter = |schema_descr: 
&parquet::schema::types::SchemaDescPtr| -> RowFilter {
+        let mask = ProjectionMask::leaves(schema_descr, vec![0]);
+        let predicate = ArrowPredicateFn::new(mask.clone(), |batch: 
RecordBatch| {
+            let struct_arr = batch.column(0).as_struct();
+            let leaf = struct_arr.column(0);
+            Ok((0..batch.num_rows())
+                .map(|i| struct_arr.is_null(i) || leaf.is_null(i))
+                .collect::<arrow_array::BooleanArray>())
+        });
+        RowFilter::new(vec![Box::new(predicate)])
+    };
+
+    // Default cache.
+    let reader = TestReader::new(bytes.clone());
+    let async_builder =
+        ParquetRecordBatchStreamBuilder::new_with_options(reader, 
ArrowReaderOptions::default())
+            .await
+            .unwrap();
+    let schema_descr = 
async_builder.metadata().file_metadata().schema_descr_ptr();
+    let async_builder = async_builder
+        .with_projection(ProjectionMask::leaves(&schema_descr, vec![0]))
+        .with_row_filter(build_is_null_filter(&schema_descr));
+    let mut stream = async_builder.build().unwrap();
+    let mut row_count_cached = 0;
+    while let Some(batch) = stream.next().await {
+        row_count_cached += batch.unwrap().num_rows();
+    }
+
+    // Cache disabled.
+    let reader = TestReader::new(bytes.clone());
+    let async_builder =
+        ParquetRecordBatchStreamBuilder::new_with_options(reader, 
ArrowReaderOptions::default())
+            .await
+            .unwrap();
+    let schema_descr = 
async_builder.metadata().file_metadata().schema_descr_ptr();
+    let async_builder = async_builder
+        .with_projection(ProjectionMask::leaves(&schema_descr, vec![0]))
+        .with_row_filter(build_is_null_filter(&schema_descr))
+        .with_max_predicate_cache_size(0);
+    let mut stream = async_builder.build().unwrap();
+    let mut row_count_uncached = 0;
+    while let Some(batch) = stream.next().await {
+        row_count_uncached += batch.unwrap().num_rows();
+    }
+
+    assert_eq!(
+        row_count_uncached, 1,
+        "control: with cache disabled the predicate must match exactly one row 
(parent NULL)"
+    );
+    assert_eq!(
+        row_count_cached, row_count_uncached,
+        "cached reader must match uncached reader; \
+         got {row_count_cached} cached vs {row_count_uncached} uncached. \
+         single-leaf struct roots must stay out of the cache."
+    );
+}
+
 // --  Begin test infrastructure --
 
 /// A test parquet file

Reply via email to