This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new dccf3778e9 parquet reader: move pruning predicate creation from 
ParquetSource to ParquetOpener (#15561)
dccf3778e9 is described below

commit dccf3778e95037f3ed8740627799e9d658943157
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Sun Apr 6 08:33:00 2025 -0500

    parquet reader: move pruning predicate creation from ParquetSource to 
ParquetOpener (#15561)
    
    * parquet reader: move pruning predicate creation from ParquetSource to 
ParquetOpener
    
    * use file schema, avoid loading page index if unecessary
    
    * Add comment
    
    * add comment
    
    * Add comment
    
    * remove check
    
    * fix clippy
    
    * update sqllogictest
    
    * restore to explain plans
    
    * reverted
    
    * modify access
    
    * Fix ArrowReaderOptions should read with physical_file_schema so we do… 
(#17)
    
    * Fix ArrowReaderOptions should read with physical_file_schema so we don't 
need to cast back to utf8
    
    * Fix fmt
    
    * Update opener.rs
    
    * Always apply per-file schema during parquet read (#18)
    
    * Update datafusion/datasource-parquet/src/opener.rs
    
    ---------
    
    Co-authored-by: Qi Zhu <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../core/src/datasource/file_format/parquet.rs     |  12 +-
 .../core/src/datasource/physical_plan/parquet.rs   | 282 ++++++++-------------
 datafusion/datasource-parquet/src/opener.rs        | 206 ++++++++++++---
 datafusion/datasource-parquet/src/row_filter.rs    |   4 +-
 datafusion/datasource-parquet/src/source.rs        |  51 +---
 datafusion/datasource/src/file_scan_config.rs      |   7 +
 6 files changed, 304 insertions(+), 258 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 27a7e7ae3c..67a7ba8dc7 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -67,13 +67,13 @@ pub(crate) mod test_util {
             .into_iter()
             .zip(tmp_files.into_iter())
             .map(|(batch, mut output)| {
-                let builder = 
parquet::file::properties::WriterProperties::builder();
-                let props = if multi_page {
-                    builder.set_data_page_row_count_limit(ROWS_PER_PAGE)
-                } else {
-                    builder
+                let mut builder = 
parquet::file::properties::WriterProperties::builder();
+                if multi_page {
+                    builder = 
builder.set_data_page_row_count_limit(ROWS_PER_PAGE)
                 }
-                .build();
+                builder = builder.set_bloom_filter_enabled(true);
+
+                let props = builder.build();
 
                 let mut writer = parquet::arrow::ArrowWriter::try_new(
                     &mut output,
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs 
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 9e1b2822e8..5c06c3902c 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -43,6 +43,7 @@ mod tests {
     };
     use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder};
     use arrow::record_batch::RecordBatch;
+    use arrow::util::pretty::pretty_format_batches;
     use arrow_schema::SchemaRef;
     use bytes::{BufMut, BytesMut};
     use datafusion_common::config::TableParquetOptions;
@@ -61,8 +62,9 @@ mod tests {
     use datafusion_execution::object_store::ObjectStoreUrl;
     use datafusion_expr::{col, lit, when, Expr};
     use datafusion_physical_expr::planner::logical2physical;
+    use datafusion_physical_plan::analyze::AnalyzeExec;
+    use datafusion_physical_plan::collect;
     use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, 
MetricsSet};
-    use datafusion_physical_plan::{collect, displayable};
     use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
 
     use chrono::{TimeZone, Utc};
@@ -81,10 +83,10 @@ mod tests {
     struct RoundTripResult {
         /// Data that was read back from ParquetFiles
         batches: Result<Vec<RecordBatch>>,
+        /// The EXPLAIN ANALYZE output
+        explain: Result<String>,
         /// The physical plan that was created (that has statistics, etc)
         parquet_exec: Arc<DataSourceExec>,
-        /// The ParquetSource that is used in plan
-        parquet_source: ParquetSource,
     }
 
     /// round-trip record batches by writing each individual RecordBatch to
@@ -137,71 +139,109 @@ mod tests {
             self.round_trip(batches).await.batches
         }
 
-        /// run the test, returning the `RoundTripResult`
-        async fn round_trip(self, batches: Vec<RecordBatch>) -> 
RoundTripResult {
-            let Self {
-                projection,
-                schema,
-                predicate,
-                pushdown_predicate,
-                page_index_predicate,
-            } = self;
-
-            let file_schema = match schema {
-                Some(schema) => schema,
-                None => Arc::new(
-                    Schema::try_merge(
-                        batches.iter().map(|b| b.schema().as_ref().clone()),
-                    )
-                    .unwrap(),
-                ),
-            };
-            // If testing with page_index_predicate, write parquet
-            // files with multiple pages
-            let multi_page = page_index_predicate;
-            let (meta, _files) = store_parquet(batches, 
multi_page).await.unwrap();
-            let file_group = meta.into_iter().map(Into::into).collect();
-
+        fn build_file_source(&self, file_schema: SchemaRef) -> 
Arc<ParquetSource> {
             // set up predicate (this is normally done by a layer higher up)
-            let predicate = predicate.map(|p| logical2physical(&p, 
&file_schema));
+            let predicate = self
+                .predicate
+                .as_ref()
+                .map(|p| logical2physical(p, &file_schema));
 
             let mut source = ParquetSource::default();
             if let Some(predicate) = predicate {
                 source = source.with_predicate(Arc::clone(&file_schema), 
predicate);
             }
 
-            if pushdown_predicate {
+            if self.pushdown_predicate {
                 source = source
                     .with_pushdown_filters(true)
                     .with_reorder_filters(true);
             }
 
-            if page_index_predicate {
+            if self.page_index_predicate {
                 source = source.with_enable_page_index(true);
             }
 
+            Arc::new(source)
+        }
+
+        fn build_parquet_exec(
+            &self,
+            file_schema: SchemaRef,
+            file_group: FileGroup,
+            source: Arc<ParquetSource>,
+        ) -> Arc<DataSourceExec> {
             let base_config = FileScanConfigBuilder::new(
                 ObjectStoreUrl::local_filesystem(),
                 file_schema,
-                Arc::new(source.clone()),
+                source,
             )
             .with_file_group(file_group)
-            .with_projection(projection)
+            .with_projection(self.projection.clone())
             .build();
+            DataSourceExec::from_data_source(base_config)
+        }
+
+        /// run the test, returning the `RoundTripResult`
+        async fn round_trip(&self, batches: Vec<RecordBatch>) -> 
RoundTripResult {
+            let file_schema = match &self.schema {
+                Some(schema) => schema,
+                None => &Arc::new(
+                    Schema::try_merge(
+                        batches.iter().map(|b| b.schema().as_ref().clone()),
+                    )
+                    .unwrap(),
+                ),
+            };
+            let file_schema = Arc::clone(file_schema);
+            // If testing with page_index_predicate, write parquet
+            // files with multiple pages
+            let multi_page = self.page_index_predicate;
+            let (meta, _files) = store_parquet(batches, 
multi_page).await.unwrap();
+            let file_group: FileGroup = 
meta.into_iter().map(Into::into).collect();
+
+            // build a ParquetExec to return the results
+            let parquet_source = self.build_file_source(file_schema.clone());
+            let parquet_exec = self.build_parquet_exec(
+                file_schema.clone(),
+                file_group.clone(),
+                Arc::clone(&parquet_source),
+            );
+
+            let analyze_exec = Arc::new(AnalyzeExec::new(
+                false,
+                false,
+                // use a new ParquetSource to avoid sharing execution metrics
+                self.build_parquet_exec(
+                    file_schema.clone(),
+                    file_group.clone(),
+                    self.build_file_source(file_schema.clone()),
+                ),
+                Arc::new(Schema::new(vec![
+                    Field::new("plan_type", DataType::Utf8, true),
+                    Field::new("plan", DataType::Utf8, true),
+                ])),
+            ));
 
             let session_ctx = SessionContext::new();
             let task_ctx = session_ctx.task_ctx();
 
-            let parquet_exec = 
DataSourceExec::from_data_source(base_config.clone());
+            let batches = collect(
+                Arc::clone(&parquet_exec) as Arc<dyn ExecutionPlan>,
+                task_ctx.clone(),
+            )
+            .await;
+
+            let explain = collect(analyze_exec, task_ctx.clone())
+                .await
+                .map(|batches| {
+                    let batches = pretty_format_batches(&batches).unwrap();
+                    format!("{batches}")
+                });
+
             RoundTripResult {
-                batches: collect(parquet_exec.clone(), task_ctx).await,
+                batches,
+                explain,
                 parquet_exec,
-                parquet_source: base_config
-                    .file_source()
-                    .as_any()
-                    .downcast_ref::<ParquetSource>()
-                    .unwrap()
-                    .clone(),
             }
         }
     }
@@ -1375,26 +1415,6 @@ mod tests {
         create_batch(vec![("c1", c1.clone())])
     }
 
-    /// Returns a int64 array with contents:
-    /// "[-1, 1, null, 2, 3, null, null]"
-    fn int64_batch() -> RecordBatch {
-        let contents: ArrayRef = Arc::new(Int64Array::from(vec![
-            Some(-1),
-            Some(1),
-            None,
-            Some(2),
-            Some(3),
-            None,
-            None,
-        ]));
-
-        create_batch(vec![
-            ("a", contents.clone()),
-            ("b", contents.clone()),
-            ("c", contents.clone()),
-        ])
-    }
-
     #[tokio::test]
     async fn parquet_exec_metrics() {
         // batch1: c1(string)
@@ -1454,110 +1474,17 @@ mod tests {
             .round_trip(vec![batch1])
             .await;
 
-        // should have a pruning predicate
-        let pruning_predicate = rt.parquet_source.pruning_predicate();
-        assert!(pruning_predicate.is_some());
-
-        // convert to explain plan form
-        let display = displayable(rt.parquet_exec.as_ref())
-            .indent(true)
-            .to_string();
-
-        assert_contains!(
-            &display,
-            "pruning_predicate=c1_null_count@2 != row_count@3 AND (c1_min@0 != 
bar OR bar != c1_max@1)"
-        );
-
-        assert_contains!(&display, r#"predicate=c1@0 != bar"#);
-
-        assert_contains!(&display, "projection=[c1]");
-    }
-
-    #[tokio::test]
-    async fn parquet_exec_display_deterministic() {
-        // batches: a(int64), b(int64), c(int64)
-        let batches = int64_batch();
-
-        fn extract_required_guarantees(s: &str) -> Option<&str> {
-            s.split("required_guarantees=").nth(1)
-        }
-
-        // Ensuring that the required_guarantees remain consistent across 
every display plan of the filter conditions
-        for _ in 0..100 {
-            // c = 1 AND b = 1 AND a = 1
-            let filter0 = col("c")
-                .eq(lit(1))
-                .and(col("b").eq(lit(1)))
-                .and(col("a").eq(lit(1)));
-
-            let rt0 = RoundTrip::new()
-                .with_predicate(filter0)
-                .with_pushdown_predicate()
-                .round_trip(vec![batches.clone()])
-                .await;
-
-            let pruning_predicate = rt0.parquet_source.pruning_predicate();
-            assert!(pruning_predicate.is_some());
-
-            let display0 = displayable(rt0.parquet_exec.as_ref())
-                .indent(true)
-                .to_string();
-
-            let guarantees0: &str = extract_required_guarantees(&display0)
-                .expect("Failed to extract required_guarantees");
-            // Compare only the required_guarantees part (Because the 
file_groups part will not be the same)
-            assert_eq!(
-                guarantees0.trim(),
-                "[a in (1), b in (1), c in (1)]",
-                "required_guarantees don't match"
-            );
-        }
+        let explain = rt.explain.unwrap();
 
-        // c = 1 AND a = 1 AND b = 1
-        let filter1 = col("c")
-            .eq(lit(1))
-            .and(col("a").eq(lit(1)))
-            .and(col("b").eq(lit(1)));
+        // check that there was a pruning predicate -> row groups got pruned
+        assert_contains!(&explain, "predicate=c1@0 != bar");
 
-        let rt1 = RoundTrip::new()
-            .with_predicate(filter1)
-            .with_pushdown_predicate()
-            .round_trip(vec![batches.clone()])
-            .await;
-
-        // b = 1 AND a = 1 AND c = 1
-        let filter2 = col("b")
-            .eq(lit(1))
-            .and(col("a").eq(lit(1)))
-            .and(col("c").eq(lit(1)));
+        // there's a single row group, but we can check that it matched
+        // if no pruning was done this would be 0 instead of 1
+        assert_contains!(&explain, "row_groups_matched_statistics=1");
 
-        let rt2 = RoundTrip::new()
-            .with_predicate(filter2)
-            .with_pushdown_predicate()
-            .round_trip(vec![batches])
-            .await;
-
-        // should have a pruning predicate
-        let pruning_predicate = rt1.parquet_source.pruning_predicate();
-        assert!(pruning_predicate.is_some());
-        let pruning_predicate = rt2.parquet_source.predicate();
-        assert!(pruning_predicate.is_some());
-
-        // convert to explain plan form
-        let display1 = displayable(rt1.parquet_exec.as_ref())
-            .indent(true)
-            .to_string();
-        let display2 = displayable(rt2.parquet_exec.as_ref())
-            .indent(true)
-            .to_string();
-
-        let guarantees1 = extract_required_guarantees(&display1)
-            .expect("Failed to extract required_guarantees");
-        let guarantees2 = extract_required_guarantees(&display2)
-            .expect("Failed to extract required_guarantees");
-
-        // Compare only the required_guarantees part (Because the predicate 
part will not be the same)
-        assert_eq!(guarantees1, guarantees2, "required_guarantees don't 
match");
+        // check the projection
+        assert_contains!(&explain, "projection=[c1]");
     }
 
     #[tokio::test]
@@ -1581,16 +1508,19 @@ mod tests {
             .await;
 
         // Should not contain a pruning predicate (since nothing can be pruned)
-        let pruning_predicate = rt.parquet_source.pruning_predicate();
-        assert!(
-            pruning_predicate.is_none(),
-            "Still had pruning predicate: {pruning_predicate:?}"
-        );
+        let explain = rt.explain.unwrap();
 
-        // but does still has a pushdown down predicate
-        let predicate = rt.parquet_source.predicate();
-        let filter_phys = logical2physical(&filter, 
rt.parquet_exec.schema().as_ref());
-        assert_eq!(predicate.unwrap().to_string(), filter_phys.to_string());
+        // When both matched and pruned are 0, it means that the pruning 
predicate
+        // was not used at all.
+        assert_contains!(&explain, "row_groups_matched_statistics=0");
+        assert_contains!(&explain, "row_groups_pruned_statistics=0");
+
+        // But pushdown predicate should be present
+        assert_contains!(
+            &explain,
+            "predicate=CASE WHEN c1@0 != bar THEN true ELSE false END"
+        );
+        assert_contains!(&explain, "pushdown_rows_pruned=5");
     }
 
     #[tokio::test]
@@ -1616,8 +1546,14 @@ mod tests {
             .await;
 
         // Should have a pruning predicate
-        let pruning_predicate = rt.parquet_source.pruning_predicate();
-        assert!(pruning_predicate.is_some());
+        let explain = rt.explain.unwrap();
+        assert_contains!(
+            &explain,
+            "predicate=c1@0 = foo AND CASE WHEN c1@0 != bar THEN true ELSE 
false END"
+        );
+
+        // And bloom filters should have been evaluated
+        assert_contains!(&explain, "row_groups_pruned_bloom_filter=1");
     }
 
     /// Returns the sum of all the metrics with the specified name
diff --git a/datafusion/datasource-parquet/src/opener.rs 
b/datafusion/datasource-parquet/src/opener.rs
index 732fef47d5..708a8035a4 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -34,13 +34,14 @@ use arrow::error::ArrowError;
 use datafusion_common::{exec_err, Result};
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 use datafusion_physical_optimizer::pruning::PruningPredicate;
-use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, 
MetricBuilder};
 
 use futures::{StreamExt, TryStreamExt};
 use log::debug;
 use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
 use parquet::arrow::async_reader::AsyncFileReader;
 use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
+use parquet::file::metadata::ParquetMetaDataReader;
 
 /// Implements [`FileOpener`] for a parquet file
 pub(super) struct ParquetOpener {
@@ -54,10 +55,6 @@ pub(super) struct ParquetOpener {
     pub limit: Option<usize>,
     /// Optional predicate to apply during the scan
     pub predicate: Option<Arc<dyn PhysicalExpr>>,
-    /// Optional pruning predicate applied to row group statistics
-    pub pruning_predicate: Option<Arc<PruningPredicate>>,
-    /// Optional pruning predicate applied to data page statistics
-    pub page_pruning_predicate: Option<Arc<PagePruningAccessPlanFilter>>,
     /// Schema of the output table
     pub table_schema: SchemaRef,
     /// Optional hint for how large the initial request to read parquet 
metadata
@@ -80,6 +77,8 @@ pub(super) struct ParquetOpener {
     pub enable_bloom_filter: bool,
     /// Schema adapter factory
     pub schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+    /// Should row group pruning be applied
+    pub enable_row_group_stats_pruning: bool,
 }
 
 impl FileOpener for ParquetOpener {
@@ -92,7 +91,7 @@ impl FileOpener for ParquetOpener {
 
         let metadata_size_hint = 
file_meta.metadata_size_hint.or(self.metadata_size_hint);
 
-        let mut reader: Box<dyn AsyncFileReader> =
+        let mut async_file_reader: Box<dyn AsyncFileReader> =
             self.parquet_file_reader_factory.create_reader(
                 self.partition_index,
                 file_meta,
@@ -109,47 +108,84 @@ impl FileOpener for ParquetOpener {
             .schema_adapter_factory
             .create(projected_schema, Arc::clone(&self.table_schema));
         let predicate = self.predicate.clone();
-        let pruning_predicate = self.pruning_predicate.clone();
-        let page_pruning_predicate = self.page_pruning_predicate.clone();
         let table_schema = Arc::clone(&self.table_schema);
         let reorder_predicates = self.reorder_filters;
         let pushdown_filters = self.pushdown_filters;
-        let enable_page_index = should_enable_page_index(
-            self.enable_page_index,
-            &self.page_pruning_predicate,
-        );
         let enable_bloom_filter = self.enable_bloom_filter;
+        let enable_row_group_stats_pruning = 
self.enable_row_group_stats_pruning;
         let limit = self.limit;
 
-        Ok(Box::pin(async move {
-            let options = 
ArrowReaderOptions::new().with_page_index(enable_page_index);
+        let predicate_creation_errors = MetricBuilder::new(&self.metrics)
+            .global_counter("num_predicate_creation_errors");
+
+        let enable_page_index = self.enable_page_index;
 
+        Ok(Box::pin(async move {
+            // Don't load the page index yet. Since it is not stored inline in
+            // the footer, loading the page index if it is not needed will do
+            // unecessary I/O. We decide later if it is needed to evaluate the
+            // pruning predicates. Thus default to not requesting if from the
+            // underlying reader.
+            let mut options = ArrowReaderOptions::new().with_page_index(false);
             let mut metadata_timer = file_metrics.metadata_load_time.timer();
-            let metadata =
-                ArrowReaderMetadata::load_async(&mut reader, 
options.clone()).await?;
-            let mut schema = Arc::clone(metadata.schema());
 
-            // read with view types
-            if let Some(merged) = 
apply_file_schema_type_coercions(&table_schema, &schema)
+            // Begin by loading the metadata from the underlying reader (note
+            // the returned metadata may actually include page indexes as some
+            // readers may return page indexes even when not requested -- for
+            // example when they are cached)
+            let mut reader_metadata =
+                ArrowReaderMetadata::load_async(&mut async_file_reader, 
options.clone())
+                    .await?;
+
+            // Note about schemas: we are actually dealing with **3 different 
schemas** here:
+            // - The table schema as defined by the TableProvider. This is 
what the user sees, what they get when they `SELECT * FROM table`, etc.
+            // - The "virtual" file schema: this is the table schema minus any 
hive partition columns and projections. This is what the file schema is coerced 
to.
+            // - The physical file schema: this is the schema as defined by 
the parquet file. This is what the parquet file actually contains.
+            let mut physical_file_schema = 
Arc::clone(reader_metadata.schema());
+
+            // The schema loaded from the file may not be the same as the
+            // desired schema (for example if we want to instruct the parquet
+            // reader to read strings using Utf8View instead). Update if 
necessary
+            if let Some(merged) =
+                apply_file_schema_type_coercions(&table_schema, 
&physical_file_schema)
             {
-                schema = Arc::new(merged);
+                physical_file_schema = Arc::new(merged);
+                options = 
options.with_schema(Arc::clone(&physical_file_schema));
+                reader_metadata = ArrowReaderMetadata::try_new(
+                    Arc::clone(reader_metadata.metadata()),
+                    options.clone(),
+                )?;
             }
 
-            let options = ArrowReaderOptions::new()
-                .with_page_index(enable_page_index)
-                .with_schema(Arc::clone(&schema));
-            let metadata =
-                ArrowReaderMetadata::try_new(Arc::clone(metadata.metadata()), 
options)?;
+            // Build predicates for this specific file
+            let (pruning_predicate, page_pruning_predicate) = 
build_pruning_predicates(
+                &predicate,
+                &physical_file_schema,
+                &predicate_creation_errors,
+            );
 
-            metadata_timer.stop();
+            // The page index is not stored inline in the parquet footer so the
+            // code above may not have read the page index structures yet. If 
we
+            // need them for reading and they aren't yet loaded, we need to 
load them now.
+            if should_enable_page_index(enable_page_index, 
&page_pruning_predicate) {
+                reader_metadata = load_page_index(
+                    reader_metadata,
+                    &mut async_file_reader,
+                    // Since we're manually loading the page index the option 
here should not matter but we pass it in for consistency
+                    options.with_page_index(true),
+                )
+                .await?;
+            }
 
-            let mut builder =
-                ParquetRecordBatchStreamBuilder::new_with_metadata(reader, 
metadata);
+            metadata_timer.stop();
 
-            let file_schema = Arc::clone(builder.schema());
+            let mut builder = 
ParquetRecordBatchStreamBuilder::new_with_metadata(
+                async_file_reader,
+                reader_metadata,
+            );
 
             let (schema_mapping, adapted_projections) =
-                schema_adapter.map_schema(&file_schema)?;
+                schema_adapter.map_schema(&physical_file_schema)?;
 
             let mask = ProjectionMask::roots(
                 builder.parquet_schema(),
@@ -160,7 +196,7 @@ impl FileOpener for ParquetOpener {
             if let Some(predicate) = 
pushdown_filters.then_some(predicate).flatten() {
                 let row_filter = row_filter::build_row_filter(
                     &predicate,
-                    &file_schema,
+                    &physical_file_schema,
                     &table_schema,
                     builder.metadata(),
                     reorder_predicates,
@@ -197,18 +233,20 @@ impl FileOpener for ParquetOpener {
             }
             // If there is a predicate that can be evaluated against the 
metadata
             if let Some(predicate) = predicate.as_ref() {
-                row_groups.prune_by_statistics(
-                    &file_schema,
-                    builder.parquet_schema(),
-                    rg_metadata,
-                    predicate,
-                    &file_metrics,
-                );
+                if enable_row_group_stats_pruning {
+                    row_groups.prune_by_statistics(
+                        &physical_file_schema,
+                        builder.parquet_schema(),
+                        rg_metadata,
+                        predicate,
+                        &file_metrics,
+                    );
+                }
 
                 if enable_bloom_filter && !row_groups.is_empty() {
                     row_groups
                         .prune_by_bloom_filters(
-                            &file_schema,
+                            &physical_file_schema,
                             &mut builder,
                             predicate,
                             &file_metrics,
@@ -226,7 +264,7 @@ impl FileOpener for ParquetOpener {
                 if let Some(p) = page_pruning_predicate {
                     access_plan = p.prune_plan_with_page_index(
                         access_plan,
-                        &file_schema,
+                        &physical_file_schema,
                         builder.parquet_schema(),
                         file_metadata.as_ref(),
                         &file_metrics,
@@ -295,3 +333,91 @@ fn create_initial_plan(
     // default to scanning all row groups
     Ok(ParquetAccessPlan::new_all(row_group_count))
 }
+
+/// Build a pruning predicate from an optional predicate expression.
+/// If the predicate is None or the predicate cannot be converted to a pruning
+/// predicate, return None.
+/// If there is an error creating the pruning predicate it is recorded by 
incrementing
+/// the `predicate_creation_errors` counter.
+pub(crate) fn build_pruning_predicate(
+    predicate: Arc<dyn PhysicalExpr>,
+    file_schema: &SchemaRef,
+    predicate_creation_errors: &Count,
+) -> Option<Arc<PruningPredicate>> {
+    match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
+        Ok(pruning_predicate) => {
+            if !pruning_predicate.always_true() {
+                return Some(Arc::new(pruning_predicate));
+            }
+        }
+        Err(e) => {
+            debug!("Could not create pruning predicate for: {e}");
+            predicate_creation_errors.add(1);
+        }
+    }
+    None
+}
+
+/// Build a page pruning predicate from an optional predicate expression.
+/// If the predicate is None or the predicate cannot be converted to a page 
pruning
+/// predicate, return None.
+pub(crate) fn build_page_pruning_predicate(
+    predicate: &Arc<dyn PhysicalExpr>,
+    file_schema: &SchemaRef,
+) -> Arc<PagePruningAccessPlanFilter> {
+    Arc::new(PagePruningAccessPlanFilter::new(
+        predicate,
+        Arc::clone(file_schema),
+    ))
+}
+
+fn build_pruning_predicates(
+    predicate: &Option<Arc<dyn PhysicalExpr>>,
+    file_schema: &SchemaRef,
+    predicate_creation_errors: &Count,
+) -> (
+    Option<Arc<PruningPredicate>>,
+    Option<Arc<PagePruningAccessPlanFilter>>,
+) {
+    let Some(predicate) = predicate.as_ref() else {
+        return (None, None);
+    };
+    let pruning_predicate = build_pruning_predicate(
+        Arc::clone(predicate),
+        file_schema,
+        predicate_creation_errors,
+    );
+    let page_pruning_predicate = build_page_pruning_predicate(predicate, 
file_schema);
+    (pruning_predicate, Some(page_pruning_predicate))
+}
+
+/// Returns a `ArrowReaderMetadata` with the page index loaded, loading
+/// it from the underlying `AsyncFileReader` if necessary.
+async fn load_page_index<T: AsyncFileReader>(
+    reader_metadata: ArrowReaderMetadata,
+    input: &mut T,
+    options: ArrowReaderOptions,
+) -> Result<ArrowReaderMetadata> {
+    let parquet_metadata = reader_metadata.metadata();
+    let missing_column_index = parquet_metadata.column_index().is_none();
+    let missing_offset_index = parquet_metadata.offset_index().is_none();
+    // You may ask yourself: why are we even checking if the page index is 
already loaded here?
+    // Didn't we explicitly *not* load it above?
+    // Well it's possible that a custom implementation of `AsyncFileReader` 
gives you
+    // the page index even if you didn't ask for it (e.g. because it's cached)
+    // so it's important to check that here to avoid extra work.
+    if missing_column_index || missing_offset_index {
+        let m = Arc::try_unwrap(Arc::clone(parquet_metadata))
+            .unwrap_or_else(|e| e.as_ref().clone());
+        let mut reader =
+            
ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
+        reader.load_page_index(input).await?;
+        let new_parquet_metadata = reader.finish()?;
+        let new_arrow_reader =
+            ArrowReaderMetadata::try_new(Arc::new(new_parquet_metadata), 
options)?;
+        Ok(new_arrow_reader)
+    } else {
+        // No need to load the page index again, just return the existing 
metadata
+        Ok(reader_metadata)
+    }
+}
diff --git a/datafusion/datasource-parquet/src/row_filter.rs 
b/datafusion/datasource-parquet/src/row_filter.rs
index da6bf114d7..2d2993c29a 100644
--- a/datafusion/datasource-parquet/src/row_filter.rs
+++ b/datafusion/datasource-parquet/src/row_filter.rs
@@ -449,7 +449,7 @@ fn columns_sorted(_columns: &[usize], _metadata: 
&ParquetMetaData) -> Result<boo
 /// `a = 1` and `c = 3`.
 pub fn build_row_filter(
     expr: &Arc<dyn PhysicalExpr>,
-    file_schema: &SchemaRef,
+    physical_file_schema: &SchemaRef,
     table_schema: &SchemaRef,
     metadata: &ParquetMetaData,
     reorder_predicates: bool,
@@ -470,7 +470,7 @@ pub fn build_row_filter(
         .map(|expr| {
             FilterCandidateBuilder::new(
                 Arc::clone(expr),
-                Arc::clone(file_schema),
+                Arc::clone(physical_file_schema),
                 Arc::clone(table_schema),
                 Arc::clone(schema_adapter_factory),
             )
diff --git a/datafusion/datasource-parquet/src/source.rs 
b/datafusion/datasource-parquet/src/source.rs
index 66d4d313d5..a5629e4363 100644
--- a/datafusion/datasource-parquet/src/source.rs
+++ b/datafusion/datasource-parquet/src/source.rs
@@ -17,9 +17,12 @@
 
 //! ParquetSource implementation for reading parquet files
 use std::any::Any;
+use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::sync::Arc;
 
+use crate::opener::build_page_pruning_predicate;
+use crate::opener::build_pruning_predicate;
 use crate::opener::ParquetOpener;
 use crate::page_filter::PagePruningAccessPlanFilter;
 use crate::DefaultParquetFileReaderFactory;
@@ -41,7 +44,6 @@ use 
datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder};
 use datafusion_physical_plan::DisplayFormatType;
 
 use itertools::Itertools;
-use log::debug;
 use object_store::ObjectStore;
 
 /// Execution plan for reading one or more Parquet files.
@@ -316,24 +318,10 @@ impl ParquetSource {
         conf = conf.with_metrics(metrics);
         conf.predicate = Some(Arc::clone(&predicate));
 
-        match PruningPredicate::try_new(Arc::clone(&predicate), 
Arc::clone(&file_schema))
-        {
-            Ok(pruning_predicate) => {
-                if !pruning_predicate.always_true() {
-                    conf.pruning_predicate = Some(Arc::new(pruning_predicate));
-                }
-            }
-            Err(e) => {
-                debug!("Could not create pruning predicate for: {e}");
-                predicate_creation_errors.add(1);
-            }
-        };
-
-        let page_pruning_predicate = Arc::new(PagePruningAccessPlanFilter::new(
-            &predicate,
-            Arc::clone(&file_schema),
-        ));
-        conf.page_pruning_predicate = Some(page_pruning_predicate);
+        conf.page_pruning_predicate =
+            Some(build_page_pruning_predicate(&predicate, &file_schema));
+        conf.pruning_predicate =
+            build_pruning_predicate(predicate, &file_schema, 
&predicate_creation_errors);
 
         conf
     }
@@ -348,16 +336,6 @@ impl ParquetSource {
         self.predicate.as_ref()
     }
 
-    /// Optional reference to this parquet scan's pruning predicate
-    pub fn pruning_predicate(&self) -> Option<&Arc<PruningPredicate>> {
-        self.pruning_predicate.as_ref()
-    }
-
-    /// Optional reference to this parquet scan's page pruning predicate
-    pub fn page_pruning_predicate(&self) -> 
Option<&Arc<PagePruningAccessPlanFilter>> {
-        self.page_pruning_predicate.as_ref()
-    }
-
     /// return the optional file reader factory
     pub fn parquet_file_reader_factory(
         &self,
@@ -488,8 +466,6 @@ impl FileSource for ParquetSource {
                 .expect("Batch size must set before creating ParquetOpener"),
             limit: base_config.limit,
             predicate: self.predicate.clone(),
-            pruning_predicate: self.pruning_predicate.clone(),
-            page_pruning_predicate: self.page_pruning_predicate.clone(),
             table_schema: Arc::clone(&base_config.file_schema),
             metadata_size_hint: self.metadata_size_hint,
             metrics: self.metrics().clone(),
@@ -498,6 +474,7 @@ impl FileSource for ParquetSource {
             reorder_filters: self.reorder_filters(),
             enable_page_index: self.enable_page_index(),
             enable_bloom_filter: self.bloom_filter_on_read(),
+            enable_row_group_stats_pruning: 
self.table_parquet_options.global.pruning,
             schema_adapter_factory,
         })
     }
@@ -537,11 +514,10 @@ impl FileSource for ParquetSource {
             .expect("projected_statistics must be set");
         // When filters are pushed down, we have no way of knowing the exact 
statistics.
         // Note that pruning predicate is also a kind of filter pushdown.
-        // (bloom filters use `pruning_predicate` too)
-        if self.pruning_predicate().is_some()
-            || self.page_pruning_predicate().is_some()
-            || (self.predicate().is_some() && self.pushdown_filters())
-        {
+        // (bloom filters use `pruning_predicate` too).
+        // Because filter pushdown may happen dynamically as long as there is 
a predicate
+        // if we have *any* predicate applied, we can't guarantee the 
statistics are exact.
+        if self.predicate().is_some() {
             Ok(statistics.to_inexact())
         } else {
             Ok(statistics)
@@ -560,7 +536,8 @@ impl FileSource for ParquetSource {
                     .map(|p| format!(", predicate={p}"))
                     .unwrap_or_default();
                 let pruning_predicate_string = self
-                    .pruning_predicate()
+                    .pruning_predicate
+                    .as_ref()
                     .map(|pre| {
                         let mut guarantees = pre
                             .literal_guarantees()
diff --git a/datafusion/datasource/src/file_scan_config.rs 
b/datafusion/datasource/src/file_scan_config.rs
index 0b3bf3bdd1..58fe8c7562 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -138,6 +138,9 @@ pub struct FileScanConfig {
     /// Schema before `projection` is applied. It contains the all columns 
that may
     /// appear in the files. It does not include table partition columns
     /// that may be added.
+    /// Note that this is **not** the schema of the physical files.
+    /// This is the schema that the physical file schema will be
+    /// mapped onto, and the schema that the [`DataSourceExec`] will return.
     pub file_schema: SchemaRef,
     /// List of files to be processed, grouped into partitions
     ///
@@ -224,6 +227,10 @@ pub struct FileScanConfig {
 #[derive(Clone)]
 pub struct FileScanConfigBuilder {
     object_store_url: ObjectStoreUrl,
+    /// Table schema before any projections or partition columns are applied.
+    /// This schema is used to read the files, but is **not** necessarily the 
schema of the physical files.
+    /// Rather this is the schema that the physical file schema will be mapped 
onto, and the schema that the
+    /// [`DataSourceExec`] will return.
     file_schema: SchemaRef,
     file_source: Arc<dyn FileSource>,
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to