alamb commented on code in PR #15561:
URL: https://github.com/apache/datafusion/pull/15561#discussion_r2028745649
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -295,3 +312,89 @@ fn create_initial_plan(
// default to scanning all row groups
Ok(ParquetAccessPlan::new_all(row_group_count))
}
+
+/// Build a pruning predicate from an optional predicate expression.
+/// If the predicate is None or the predicate cannot be converted to a pruning
+/// predicate, return None.
+/// If there is an error creating the pruning predicate it is recorded by
incrementing
+/// the `predicate_creation_errors` counter.
+pub(crate) fn build_pruning_predicate(
+ predicate: Arc<dyn PhysicalExpr>,
+ file_schema: &SchemaRef,
+ predicate_creation_errors: &Count,
+) -> Option<Arc<PruningPredicate>> {
+ match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
+ Ok(pruning_predicate) => {
+ if !pruning_predicate.always_true() {
+ return Some(Arc::new(pruning_predicate));
+ }
+ }
+ Err(e) => {
+ debug!("Could not create pruning predicate for: {e}");
+ predicate_creation_errors.add(1);
+ }
+ }
+ None
+}
+
+/// Build a page pruning predicate from an optional predicate expression.
+/// If the predicate is None or the predicate cannot be converted to a page
pruning
+/// predicate, return None.
+pub(crate) fn build_page_pruning_predicate(
+ predicate: &Arc<dyn PhysicalExpr>,
+ file_schema: &SchemaRef,
+) -> Arc<PagePruningAccessPlanFilter> {
+ Arc::new(PagePruningAccessPlanFilter::new(
+ predicate,
+ Arc::clone(file_schema),
+ ))
+}
+
+fn build_pruning_predicates(
+ predicate: &Option<Arc<dyn PhysicalExpr>>,
+ file_schema: &SchemaRef,
+ predicate_creation_errors: &Count,
+) -> (
+ Option<Arc<PruningPredicate>>,
+ Option<Arc<PagePruningAccessPlanFilter>>,
+) {
+ let Some(predicate) = predicate.as_ref() else {
+ return (None, None);
+ };
+ let pruning_predicate = build_pruning_predicate(
+ Arc::clone(predicate),
+ file_schema,
+ predicate_creation_errors,
+ );
+ let page_pruning_predicate = build_page_pruning_predicate(predicate,
file_schema);
+ (pruning_predicate, Some(page_pruning_predicate))
+}
+
+async fn load_page_index<T: AsyncFileReader>(
+ arrow_reader: ArrowReaderMetadata,
+ input: &mut T,
+ options: ArrowReaderOptions,
+) -> Result<ArrowReaderMetadata> {
+ let parquet_metadata = arrow_reader.metadata();
+ let missing_column_index = parquet_metadata.column_index().is_none();
+ let missing_offset_index = parquet_metadata.offset_index().is_none();
+ // You may ask yourself: why are we even checking if the page index is
already loaded here?
+ // Didn't we explicitly *not* load it above?
+ // Well it's possible that a custom implementation of `AsyncFileReader`
gives you
+ // the page index even if you didn't ask for it (e.g. because it's cached)
+ // so it's important to check that here to avoid extra work.
Review Comment:
👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]