kosiew commented on code in PR #22857:
URL: https://github.com/apache/datafusion/pull/22857#discussion_r3385612110


##########
datafusion/datasource-parquet/src/opener/mod.rs:
##########


Review Comment:
   I think this still needs one more fix in the default cached-reader path.
   
   This change relies on the initial metadata load honoring 
`PageIndexPolicy::Skip`, but `ArrowReaderMetadata::load_async(...)` can still 
call `CachedParquetFileReader::get_metadata()`. That path ignores the passed 
`ArrowReaderOptions` page-index policy and calls 
`DFParquetMetadata::fetch_metadata()` with a metadata cache. From there, the 
metadata layer forces `PageIndexPolicy::Optional` whenever a metadata cache 
exists.
   
   The end result is that the opener can still load ColumnIndex and OffsetIndex 
during metadata loading, before `should_load_page_index()` gets a chance to 
skip it for fully matched row groups.
   
   Could you please make this opener invariant hold end to end by threading or 
respecting the requested skip policy through the cached reader and metadata 
cache path? Another workable approach would be to prevent eager page-index 
fetching until after row-group pruning. It would also be good to add coverage 
using the default `ParquetSource` cached-reader path.



##########
datafusion/datasource-parquet/src/opener/mod.rs:
##########
@@ -1566,6 +1573,24 @@ pub(crate) fn build_pruning_predicates(
     )
 }
 
+/// Returns true if the page index must be loaded for page-level pruning.
+///
+/// The page index can only prune when at least one surviving row group is not
+/// fully matched by row-group statistics alone.
+fn should_load_page_index(
+    page_pruning_predicate: Option<&Arc<PagePruningAccessPlanFilter>>,
+    row_groups: &RowGroupAccessPlanFilter,
+) -> bool {
+    if page_pruning_predicate.is_none() || row_groups.is_empty() {
+        return false;
+    }
+
+    let is_fully_matched = row_groups.is_fully_matched();

Review Comment:
   Small cleanup suggestion: this helper could encode the invariant a bit more 
directly with `is_some_and` plus `any`, which avoids the early return and the 
double-negative `!all(...)`.
   
   ```rust
   page_pruning_predicate.is_some_and(|_| {
       let fully_matched = row_groups.is_fully_matched();
       row_groups
           .row_group_indexes()
           .any(|idx| !fully_matched[idx])
   })
   ```



##########
datafusion/datasource-parquet/src/opener/mod.rs:
##########
@@ -2808,6 +2833,67 @@ mod test {
         );
     }
 
+    #[test]
+    fn should_load_page_index_without_predicate() {
+        use crate::RowGroupAccessPlanFilter;
+        let row_groups = 
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
+        assert!(!should_load_page_index(None, &row_groups));
+    }
+
+    #[test]
+    fn should_load_page_index_when_surviving_row_groups_not_fully_matched() {
+        use crate::RowGroupAccessPlanFilter;
+        let schema = Arc::new(Schema::new(vec![Field::new("a", 
DataType::Int32, false)]));
+        let predicate = logical2physical(&col("a").gt(lit(50i32)), &schema);
+        let page_predicate = build_page_pruning_predicate(&predicate, &schema);
+        let row_groups = 
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
+        assert!(should_load_page_index(Some(&page_predicate), &row_groups));
+    }
+
+    #[tokio::test]
+    async fn test_page_index_skipped_when_row_groups_fully_matched() {
+        use parquet::file::properties::WriterProperties;
+
+        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+        let values: Vec<i32> = (1..=100).collect();
+        let batch = record_batch!((
+            "a",
+            Int32,
+            values.iter().map(|v| Some(*v)).collect::<Vec<_>>()
+        ))
+        .unwrap();
+        let props = WriterProperties::builder()
+            .set_data_page_row_count_limit(10)
+            .set_write_batch_size(10)
+            .build();
+        let schema = batch.schema();
+        let data_len = write_parquet_batches(
+            Arc::clone(&store),
+            "test.parquet",
+            vec![batch],
+            Some(props),
+        )
+        .await;
+
+        let file = PartitionedFile::new(
+            "test.parquet".to_string(),
+            u64::try_from(data_len).unwrap(),
+        );
+        let predicate = logical2physical(&col("a").is_not_null(), &schema);
+
+        let morselizer = ParquetMorselizerBuilder::new()
+            .with_store(Arc::clone(&store))
+            .with_schema(Arc::clone(&schema))
+            .with_predicate(Arc::clone(&predicate))
+            .with_enable_page_index(true)
+            .with_pushdown_filters(false)
+            .build();
+
+        let (_, rows) =
+            count_batches_and_rows(open_file(&morselizer, 
file).await.unwrap()).await;
+        assert_eq!(rows, 100);

Review Comment:
   Nice to have: this regression test currently only checks the row count, 
which would still pass even if the page index were loaded and evaluated.
   
   After the cached-reader path is fixed, could we assert the invariant more 
directly? For example, the test could use a counting reader or object store 
that records or fails on page-index range reads, or it could assert a metric or 
state showing that `LoadPageIndex` was skipped. That would make the test much 
better at catching future reorderings that accidentally bring the extra I/O 
back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to