This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 1c679ef8a7 fix(parquet): exclude single-leaf struct roots from
predicate cache (#9983)
1c679ef8a7 is described below
commit 1c679ef8a7d0481596a29736a0bdea10f7eb4fed
Author: Sergei <[email protected]>
AuthorDate: Mon May 25 18:00:30 2026 +0700
fix(parquet): exclude single-leaf struct roots from predicate cache (#9983)
# Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax.
-->
- Closes #9982 .
# Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
# What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
## Root cause
`ProjectionMask::without_nested_types` (`parquet/src/arrow/mod.rs:427`)
decides which leaves the predicate cache may cover. The check before
this fix was:
```rust
if root_leaf_counts[root_idx] == 1 && !root.is_list() {
included_leaves.push(leaf_idx);
}
```
PR #8866 added `!root.is_list()` to exclude lists, but a **struct** root
with a single leaf still satisfies the condition and gets cached.
## Fix (1 line)
`parquet/src/arrow/mod.rs:455`:
```diff
- if root_leaf_counts[root_idx] == 1 && !root.is_list() {
+ if root_leaf_counts[root_idx] == 1 && root.is_primitive() {
included_leaves.push(leaf_idx);
}
```
# Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
If this PR claims a performance improvement, please include evidence
such as benchmark results.
-->
## Tests added on the branch
### 1. Reproducer integration test
**File:** `parquet/tests/arrow_reader/predicate_cache.rs`
**Name:** `test_async_predicate_on_single_leaf_nullable_struct`
Builds an in-memory Parquet file with `OPTIONAL group b { REQUIRED
BYTE_ARRAY aa (UTF8); }`, writes two rows (parent NULL, parent
non-NULL), then runs the same `IS NULL` row filter through the async
reader twice: once with the default cache, once with
`with_max_predicate_cache_size(0)`. It asserts that
- the uncached control yields exactly 1 row (`address` NULL row
matches);
- the cached run yields the same row count as the uncached one.
**Pre-fix:** panic at `struct_array.rs:142`.
**Post-fix:** passes (1 row in both cases).
### 2. Unit test
**File:** `parquet/src/arrow/mod.rs` (test module)
**Name:** `test_projection_mask_without_nested_single_leaf_struct`
Directly checks `ProjectionMask::without_nested_types` against a schema
with `OPTIONAL group address { REQUIRED BYTE_ARRAY street; } REQUIRED
INT32 id`, for three input masks (single nested leaf, mixed, all
leaves). All three expected outputs reflect that the struct's leaf is
now considered nested.
**Pre-fix:** would return `Some([street_leaf])` for the single-leaf-only
mask.
**Post-fix:** returns `None` for the single-leaf-only mask; returns
`Some([id])` for mixed.
## Verification matrix
| Test | Pre-fix | Post-fix |
|---|---|---|
| `test_projection_mask_without_nested_single_leaf_struct` (new unit) |
would FAIL | PASS |
| `test_async_predicate_on_single_leaf_nullable_struct` (new
integration) | PANIC | PASS |
| `predicate_cache::test_default_read` | PASS | PASS |
| `predicate_cache::test_async_cache_with_filters` | PASS | PASS |
| `predicate_cache::test_sync_cache_with_filters` | PASS | PASS |
| `predicate_cache::test_cache_disabled_with_filters` | PASS | PASS |
| `predicate_cache::test_cache_projection_excludes_nested_columns` |
PASS | PASS |
| `test_projection_mask_without_nested_*` (5 existing) | PASS | PASS |
# Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
If there are any breaking changes to public APIs, please call them out.
-->
---
parquet/src/arrow/mod.rs | 49 ++++++++++++-----
parquet/tests/arrow_reader/predicate_cache.rs | 78 +++++++++++++++++++++++++++
2 files changed, 114 insertions(+), 13 deletions(-)
diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index 5215298816..14f7b9b6b4 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -435,24 +435,14 @@ impl ProjectionMask {
root_leaf_counts[root_idx] += 1;
}
- // Keep only leaves whose root has exactly one leaf (non-nested) and
is not a
- // LIST. LIST is encoded as a wrapped logical type with a single leaf,
e.g.
- //
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
- //
- // ```text
- // // List<String> (list non-null, elements nullable)
- // required group my_list (LIST) {
- // repeated group list {
- // optional binary element (STRING);
- // }
- // }
- // ```
+ // Cache only top-level primitive columns.
+ // Even a one-leaf group is nested; caching it drops parent def levels.
let mut included_leaves = Vec::new();
for leaf_idx in 0..num_leaves {
if self.leaf_included(leaf_idx) {
let root = schema.get_column_root(leaf_idx);
let root_idx = schema.get_column_root_idx(leaf_idx);
- if root_leaf_counts[root_idx] == 1 && !root.is_list() {
+ if root_leaf_counts[root_idx] == 1 && root.is_primitive() {
included_leaves.push(leaf_idx);
}
}
@@ -1042,6 +1032,39 @@ mod test {
);
}
+ #[test]
+ fn test_projection_mask_without_nested_single_leaf_struct() {
+ // Regression: a single-leaf struct is still nested.
+ let schema = parse_schema(
+ "
+ message test_schema {
+ OPTIONAL group address {
+ REQUIRED BYTE_ARRAY street (UTF8);
+ }
+ REQUIRED INT32 id;
+ }
+ ",
+ );
+
+ // street -> empty; root is a struct
+ let mask = ProjectionMask::leaves(&schema, [0]);
+ assert_eq!(None, mask.without_nested_types(&schema));
+
+ // street, id --> id only
+ let mask = ProjectionMask::leaves(&schema, [0, 1]);
+ assert_eq!(
+ Some(ProjectionMask::leaves(&schema, [1])),
+ mask.without_nested_types(&schema)
+ );
+
+ // all --> id only
+ let mask = ProjectionMask::all();
+ assert_eq!(
+ Some(ProjectionMask::leaves(&schema, [1])),
+ mask.without_nested_types(&schema)
+ );
+ }
+
/// Converts a schema string into a `SchemaDescriptor`
fn parse_schema(schema: &str) -> SchemaDescriptor {
let parquet_group_type = parse_message_type(schema).unwrap();
diff --git a/parquet/tests/arrow_reader/predicate_cache.rs
b/parquet/tests/arrow_reader/predicate_cache.rs
index 85dba68c9c..4029b4e19e 100644
--- a/parquet/tests/arrow_reader/predicate_cache.rs
+++ b/parquet/tests/arrow_reader/predicate_cache.rs
@@ -18,6 +18,7 @@
//! Test for predicate cache in Parquet Arrow reader
use super::io::TestReader;
+use arrow::array::Array;
use arrow::array::ArrayRef;
use arrow::array::Int64Array;
use arrow::compute::and;
@@ -92,6 +93,83 @@ async fn test_cache_projection_excludes_nested_columns() {
test.run_async(async_builder).await;
}
+/// Regression: cache must match no-cache for a nullable single-leaf struct.
+#[tokio::test]
+async fn test_async_predicate_on_single_leaf_nullable_struct() {
+ // Rows: b = NULL, then b.aa = "hello".
+ let aa: StringArray = StringArray::from(vec!["padding", "hello"]);
+ let nulls = arrow_buffer::NullBuffer::from(vec![false, true]);
+ let b = StructArray::new(
+ vec![Arc::new(Field::new("aa", DataType::Utf8, false))].into(),
+ vec![Arc::new(aa) as ArrayRef],
+ Some(nulls),
+ );
+ let input_batch = RecordBatch::try_from_iter([("b", Arc::new(b) as
ArrayRef)]).unwrap();
+
+ let mut output = Vec::new();
+ let mut writer = ArrowWriter::try_new(&mut output, input_batch.schema(),
None).unwrap();
+ writer.write(&input_batch).unwrap();
+ writer.close().unwrap();
+ let bytes = Bytes::from(output);
+
+ // Since `aa` is required, `b.aa IS NULL` means `b` is NULL.
+ let build_is_null_filter = |schema_descr:
&parquet::schema::types::SchemaDescPtr| -> RowFilter {
+ let mask = ProjectionMask::leaves(schema_descr, vec![0]);
+ let predicate = ArrowPredicateFn::new(mask.clone(), |batch:
RecordBatch| {
+ let struct_arr = batch.column(0).as_struct();
+ let leaf = struct_arr.column(0);
+ Ok((0..batch.num_rows())
+ .map(|i| struct_arr.is_null(i) || leaf.is_null(i))
+ .collect::<arrow_array::BooleanArray>())
+ });
+ RowFilter::new(vec![Box::new(predicate)])
+ };
+
+ // Default cache.
+ let reader = TestReader::new(bytes.clone());
+ let async_builder =
+ ParquetRecordBatchStreamBuilder::new_with_options(reader,
ArrowReaderOptions::default())
+ .await
+ .unwrap();
+ let schema_descr =
async_builder.metadata().file_metadata().schema_descr_ptr();
+ let async_builder = async_builder
+ .with_projection(ProjectionMask::leaves(&schema_descr, vec![0]))
+ .with_row_filter(build_is_null_filter(&schema_descr));
+ let mut stream = async_builder.build().unwrap();
+ let mut row_count_cached = 0;
+ while let Some(batch) = stream.next().await {
+ row_count_cached += batch.unwrap().num_rows();
+ }
+
+ // Cache disabled.
+ let reader = TestReader::new(bytes.clone());
+ let async_builder =
+ ParquetRecordBatchStreamBuilder::new_with_options(reader,
ArrowReaderOptions::default())
+ .await
+ .unwrap();
+ let schema_descr =
async_builder.metadata().file_metadata().schema_descr_ptr();
+ let async_builder = async_builder
+ .with_projection(ProjectionMask::leaves(&schema_descr, vec![0]))
+ .with_row_filter(build_is_null_filter(&schema_descr))
+ .with_max_predicate_cache_size(0);
+ let mut stream = async_builder.build().unwrap();
+ let mut row_count_uncached = 0;
+ while let Some(batch) = stream.next().await {
+ row_count_uncached += batch.unwrap().num_rows();
+ }
+
+ assert_eq!(
+ row_count_uncached, 1,
+ "control: with cache disabled the predicate must match exactly one row
(parent NULL)"
+ );
+ assert_eq!(
+ row_count_cached, row_count_uncached,
+ "cached reader must match uncached reader; \
+ got {row_count_cached} cached vs {row_count_uncached} uncached. \
+ single-leaf struct roots must stay out of the cache."
+ );
+}
+
// -- Begin test infrastructure --
/// A test parquet file