kosiew commented on code in PR #22857:
URL: https://github.com/apache/datafusion/pull/22857#discussion_r3385612110
##########
datafusion/datasource-parquet/src/opener/mod.rs:
##########
Review Comment:
I think this still needs one more fix in the default cached-reader path.
This change relies on the initial metadata load honoring
`PageIndexPolicy::Skip`, but `ArrowReaderMetadata::load_async(...)` can still
call `CachedParquetFileReader::get_metadata()`. That path ignores the passed
`ArrowReaderOptions` page-index policy and calls
`DFParquetMetadata::fetch_metadata()` with a metadata cache. From there, the
metadata layer forces `PageIndexPolicy::Optional` whenever a metadata cache
exists.
The end result is that the opener can still load ColumnIndex and OffsetIndex
during metadata loading, before `should_load_page_index()` gets a chance to
skip it for fully matched row groups.
Could you please make this opener invariant hold end to end by threading or
respecting the requested skip policy through the cached reader and metadata
cache path? Another workable approach would be to prevent eager page-index
fetching until after row-group pruning. It would also be good to add coverage
using the default `ParquetSource` cached-reader path.
##########
datafusion/datasource-parquet/src/opener/mod.rs:
##########
@@ -1566,6 +1573,24 @@ pub(crate) fn build_pruning_predicates(
)
}
+/// Returns true if the page index must be loaded for page-level pruning.
+///
+/// The page index can only prune when at least one surviving row group is not
+/// fully matched by row-group statistics alone.
+fn should_load_page_index(
+ page_pruning_predicate: Option<&Arc<PagePruningAccessPlanFilter>>,
+ row_groups: &RowGroupAccessPlanFilter,
+) -> bool {
+ if page_pruning_predicate.is_none() || row_groups.is_empty() {
+ return false;
+ }
+
+ let is_fully_matched = row_groups.is_fully_matched();
Review Comment:
Small cleanup suggestion: this helper could encode the invariant a bit more
directly with `is_some_and` plus `any`, which avoids the early return and the
double-negative `!all(...)`.
```rust
page_pruning_predicate.is_some_and(|_| {
let fully_matched = row_groups.is_fully_matched();
row_groups
.row_group_indexes()
.any(|idx| !fully_matched[idx])
})
```
##########
datafusion/datasource-parquet/src/opener/mod.rs:
##########
@@ -2808,6 +2833,67 @@ mod test {
);
}
+ #[test]
+ fn should_load_page_index_without_predicate() {
+ use crate::RowGroupAccessPlanFilter;
+ let row_groups =
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
+ assert!(!should_load_page_index(None, &row_groups));
+ }
+
+ #[test]
+ fn should_load_page_index_when_surviving_row_groups_not_fully_matched() {
+ use crate::RowGroupAccessPlanFilter;
+ let schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int32, false)]));
+ let predicate = logical2physical(&col("a").gt(lit(50i32)), &schema);
+ let page_predicate = build_page_pruning_predicate(&predicate, &schema);
+ let row_groups =
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
+ assert!(should_load_page_index(Some(&page_predicate), &row_groups));
+ }
+
+ #[tokio::test]
+ async fn test_page_index_skipped_when_row_groups_fully_matched() {
+ use parquet::file::properties::WriterProperties;
+
+ let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+ let values: Vec<i32> = (1..=100).collect();
+ let batch = record_batch!((
+ "a",
+ Int32,
+ values.iter().map(|v| Some(*v)).collect::<Vec<_>>()
+ ))
+ .unwrap();
+ let props = WriterProperties::builder()
+ .set_data_page_row_count_limit(10)
+ .set_write_batch_size(10)
+ .build();
+ let schema = batch.schema();
+ let data_len = write_parquet_batches(
+ Arc::clone(&store),
+ "test.parquet",
+ vec![batch],
+ Some(props),
+ )
+ .await;
+
+ let file = PartitionedFile::new(
+ "test.parquet".to_string(),
+ u64::try_from(data_len).unwrap(),
+ );
+ let predicate = logical2physical(&col("a").is_not_null(), &schema);
+
+ let morselizer = ParquetMorselizerBuilder::new()
+ .with_store(Arc::clone(&store))
+ .with_schema(Arc::clone(&schema))
+ .with_predicate(Arc::clone(&predicate))
+ .with_enable_page_index(true)
+ .with_pushdown_filters(false)
+ .build();
+
+ let (_, rows) =
+ count_batches_and_rows(open_file(&morselizer,
file).await.unwrap()).await;
+ assert_eq!(rows, 100);
Review Comment:
Nice to have: this regression test currently only checks the row count,
which would still pass even if the page index were loaded and evaluated.
After the cached-reader path is fixed, could we assert the invariant more
directly? For example, the test could use a counting reader or object store
that records or fails on page-index range reads, or it could assert a metric or
state showing that `LoadPageIndex` was skipped. That would make the test much
better at catching future reorderings that accidentally bring the extra I/O
back.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]