This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9503456388 Support user defined `ParquetAccessPlan` in `ParquetExec`, 
validation to `ParquetAccessPlan::select` (#10813)
9503456388 is described below

commit 9503456388544788e1a881a0a80a3c61ac015a86
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Jun 9 16:06:15 2024 -0400

    Support user defined `ParquetAccessPlan` in `ParquetExec`, validation to 
`ParquetAccessPlan::select` (#10813)
    
    * Allow `ParquetAccessPlan` to be passed in to `ParquetExec`, add 
validation to ParquetAccessPlan::select
    
    * Add test for filtering and user supplied access plan
    
    * fix on windows
    
    * Apply suggestions from code review
    
    Co-authored-by: Jeffrey Vo <[email protected]>
    
    ---------
    
    Co-authored-by: Jeffrey Vo <[email protected]>
---
 datafusion/core/src/datasource/listing/mod.rs      |  11 +
 .../physical_plan/parquet/access_plan.rs           | 121 +++++-
 .../src/datasource/physical_plan/parquet/mod.rs    |  46 +++
 .../src/datasource/physical_plan/parquet/opener.rs |  46 ++-
 .../core/tests/parquet/external_access_plan.rs     | 418 +++++++++++++++++++++
 datafusion/core/tests/parquet/mod.rs               |  31 +-
 datafusion/core/tests/parquet/utils.rs             |  55 +++
 datafusion/core/tests/parquet_exec.rs              |   2 +
 8 files changed, 692 insertions(+), 38 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/mod.rs 
b/datafusion/core/src/datasource/listing/mod.rs
index 04aec9d77d..44f9276090 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -134,6 +134,17 @@ impl PartitionedFile {
         self.range = Some(FileRange { start, end });
         self
     }
+
+    /// Update the user defined extensions for this file.
+    ///
+    /// This can be used to pass reader specific information.
+    pub fn with_extensions(
+        mut self,
+        extensions: Arc<dyn std::any::Any + Send + Sync>,
+    ) -> Self {
+        self.extensions = Some(extensions);
+        self
+    }
 }
 
 impl From<ObjectMeta> for PartitionedFile {
diff --git 
a/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
index c59459ba61..f51f2c49e8 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use datafusion_common::{internal_err, Result};
 use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
 use parquet::file::metadata::RowGroupMetaData;
 
@@ -182,6 +183,11 @@ impl ParquetAccessPlan {
     /// is returned for *all* the rows in the row groups that are not skipped.
     /// Thus it includes a `Select` selection for any [`RowGroupAccess::Scan`].
     ///
+    /// # Errors
+    ///
+    /// Returns an error if any specified row selection does not specify
+    /// the same number of rows as in it's corresponding `row_group_metadata`.
+    ///
     /// # Example: No Selections
     ///
     /// Given an access plan like this
@@ -228,7 +234,7 @@ impl ParquetAccessPlan {
     pub fn into_overall_row_selection(
         self,
         row_group_meta_data: &[RowGroupMetaData],
-    ) -> Option<RowSelection> {
+    ) -> Result<Option<RowSelection>> {
         assert_eq!(row_group_meta_data.len(), self.row_groups.len());
         // Intuition: entire row groups are filtered out using
         // `row_group_indexes` which come from Skip and Scan. An overall
@@ -239,7 +245,32 @@ impl ParquetAccessPlan {
             .iter()
             .any(|rg| matches!(rg, RowGroupAccess::Selection(_)))
         {
-            return None;
+            return Ok(None);
+        }
+
+        // validate all Selections
+        for (idx, (rg, rg_meta)) in self
+            .row_groups
+            .iter()
+            .zip(row_group_meta_data.iter())
+            .enumerate()
+        {
+            let RowGroupAccess::Selection(selection) = rg else {
+                continue;
+            };
+            let rows_in_selection = selection
+                .iter()
+                .map(|selection| selection.row_count)
+                .sum::<usize>();
+
+            let row_group_row_count = rg_meta.num_rows();
+            if rows_in_selection as i64 != row_group_row_count {
+                return internal_err!(
+                    "Invalid ParquetAccessPlan Selection. Row group {idx} has 
{row_group_row_count} rows \
+                    but selection only specifies {rows_in_selection} rows. \
+                    Selection: {selection:?}"
+                );
+            }
         }
 
         let total_selection: RowSelection = self
@@ -261,7 +292,7 @@ impl ParquetAccessPlan {
             })
             .collect();
 
-        Some(total_selection)
+        Ok(Some(total_selection))
     }
 
     /// Return an iterator over the row group indexes that should be scanned
@@ -305,6 +336,7 @@ impl ParquetAccessPlan {
 #[cfg(test)]
 mod test {
     use super::*;
+    use datafusion_common::assert_contains;
     use parquet::basic::LogicalType;
     use parquet::file::metadata::ColumnChunkMetaData;
     use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
@@ -320,7 +352,9 @@ mod test {
         ]);
 
         let row_group_indexes = access_plan.row_group_indexes();
-        let row_selection = 
access_plan.into_overall_row_selection(row_group_metadata());
+        let row_selection = access_plan
+            .into_overall_row_selection(row_group_metadata())
+            .unwrap();
 
         // scan all row groups, no selection
         assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
@@ -337,7 +371,9 @@ mod test {
         ]);
 
         let row_group_indexes = access_plan.row_group_indexes();
-        let row_selection = 
access_plan.into_overall_row_selection(row_group_metadata());
+        let row_selection = access_plan
+            .into_overall_row_selection(row_group_metadata())
+            .unwrap();
 
         // skip all row groups, no selection
         assert_eq!(row_group_indexes, vec![] as Vec<usize>);
@@ -348,14 +384,22 @@ mod test {
         let access_plan = ParquetAccessPlan::new(vec![
             RowGroupAccess::Scan,
             RowGroupAccess::Selection(
-                vec![RowSelector::select(5), RowSelector::skip(7)].into(),
+                // select / skip all 20 rows in row group 1
+                vec![
+                    RowSelector::select(5),
+                    RowSelector::skip(7),
+                    RowSelector::select(8),
+                ]
+                .into(),
             ),
             RowGroupAccess::Skip,
             RowGroupAccess::Skip,
         ]);
 
         let row_group_indexes = access_plan.row_group_indexes();
-        let row_selection = 
access_plan.into_overall_row_selection(row_group_metadata());
+        let row_selection = access_plan
+            .into_overall_row_selection(row_group_metadata())
+            .unwrap();
 
         assert_eq!(row_group_indexes, vec![0, 1]);
         assert_eq!(
@@ -366,7 +410,8 @@ mod test {
                     RowSelector::select(10),
                     // selectors from the second row group
                     RowSelector::select(5),
-                    RowSelector::skip(7)
+                    RowSelector::skip(7),
+                    RowSelector::select(8)
                 ]
                 .into()
             )
@@ -379,13 +424,21 @@ mod test {
             RowGroupAccess::Skip,
             RowGroupAccess::Scan,
             RowGroupAccess::Selection(
-                vec![RowSelector::select(5), RowSelector::skip(7)].into(),
+                // specify all 30 rows in row group 1
+                vec![
+                    RowSelector::select(5),
+                    RowSelector::skip(7),
+                    RowSelector::select(18),
+                ]
+                .into(),
             ),
             RowGroupAccess::Scan,
         ]);
 
         let row_group_indexes = access_plan.row_group_indexes();
-        let row_selection = 
access_plan.into_overall_row_selection(row_group_metadata());
+        let row_selection = access_plan
+            .into_overall_row_selection(row_group_metadata())
+            .unwrap();
 
         assert_eq!(row_group_indexes, vec![1, 2, 3]);
         assert_eq!(
@@ -397,6 +450,7 @@ mod test {
                     // selectors from the third row group
                     RowSelector::select(5),
                     RowSelector::skip(7),
+                    RowSelector::select(18),
                     // select the entire fourth row group
                     RowSelector::select(40),
                 ]
@@ -405,6 +459,53 @@ mod test {
         );
     }
 
+    #[test]
+    fn test_invalid_too_few() {
+        let access_plan = ParquetAccessPlan::new(vec![
+            RowGroupAccess::Scan,
+            // select 12 rows, but row group 1 has 20
+            RowGroupAccess::Selection(
+                vec![RowSelector::select(5), RowSelector::skip(7)].into(),
+            ),
+            RowGroupAccess::Scan,
+            RowGroupAccess::Scan,
+        ]);
+
+        let row_group_indexes = access_plan.row_group_indexes();
+        let err = access_plan
+            .into_overall_row_selection(row_group_metadata())
+            .unwrap_err()
+            .to_string();
+        assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
+        assert_contains!(err, "Internal error: Invalid ParquetAccessPlan 
Selection. Row group 1 has 20 rows but selection only specifies 12 rows");
+    }
+
+    #[test]
+    fn test_invalid_too_many() {
+        let access_plan = ParquetAccessPlan::new(vec![
+            RowGroupAccess::Scan,
+            // select 22 rows, but row group 1 has only 20
+            RowGroupAccess::Selection(
+                vec![
+                    RowSelector::select(10),
+                    RowSelector::skip(2),
+                    RowSelector::select(10),
+                ]
+                .into(),
+            ),
+            RowGroupAccess::Scan,
+            RowGroupAccess::Scan,
+        ]);
+
+        let row_group_indexes = access_plan.row_group_indexes();
+        let err = access_plan
+            .into_overall_row_selection(row_group_metadata())
+            .unwrap_err()
+            .to_string();
+        assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
+        assert_contains!(err, "Invalid ParquetAccessPlan Selection. Row group 
1 has 20 rows but selection only specifies 22 rows");
+    }
+
     static ROW_GROUP_METADATA: OnceLock<Vec<RowGroupMetaData>> = 
OnceLock::new();
 
     /// [`RowGroupMetaData`] that returns 4 row groups with 10, 20, 30, 40 rows
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 39c8761eac..5e5cc93bc5 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -145,6 +145,52 @@ pub use writer::plan_to_parquet;
 /// custom reader is used, it supplies the metadata directly and this parameter
 /// is ignored. [`ParquetExecBuilder::with_metadata_size_hint`] for more 
details.
 ///
+/// * User provided  [`ParquetAccessPlan`]s to skip row groups and/or pages
+/// based on external information. See "Implementing External Indexes" below
+///
+/// # Implementing External Indexes
+///
+/// It is possible to restrict the row groups and selections within those row
+/// groups that the ParquetExec will consider by providing an initial
+/// [`ParquetAccessPlan`] as `extensions` on [`PartitionedFile`]. This can be
+/// used to implement external indexes on top of parquet files and select only
+/// portions of the files.
+///
+/// The `ParquetExec` will try and further reduce any provided
+/// `ParquetAccessPlan` further based on the contents of `ParquetMetadata` and
+/// other settings.
+///
+/// ## Example of providing a ParquetAccessPlan
+///
+/// ```
+/// # use std::sync::Arc;
+/// # use arrow_schema::{Schema, SchemaRef};
+/// # use datafusion::datasource::listing::PartitionedFile;
+/// # use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
+/// # use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
+/// # use datafusion_execution::object_store::ObjectStoreUrl;
+/// # fn schema() -> SchemaRef {
+/// #   Arc::new(Schema::empty())
+/// # }
+/// // create an access plan to scan row group 0, 1 and 3 and skip row groups 
2 and 4
+/// let mut access_plan = ParquetAccessPlan::new_all(5);
+/// access_plan.skip(2);
+/// access_plan.skip(4);
+/// // provide the plan as extension to the FileScanConfig
+/// let partitioned_file = PartitionedFile::new("my_file.parquet", 1234)
+///   .with_extensions(Arc::new(access_plan));
+/// // create a ParquetExec to scan this file
+/// let file_scan_config = 
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema())
+///     .with_file(partitioned_file);
+/// // this parquet exec will not even try to read row groups 2 and 4. 
Additional
+/// // pruning based on predicates may also happen
+/// let exec = ParquetExec::builder(file_scan_config).build();
+/// ```
+///
+/// For a complete example, see the [`parquet_index_advanced` example]).
+///
+/// [`parquet_index_advanced` example]: 
https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/parquet_index_advanced.rs
+///
 /// # Execution Overview
 ///
 /// * Step 1: [`ParquetExec::execute`] is called, returning a [`FileStream`]
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
index a5047e487e..8557c6d5f9 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
@@ -28,6 +28,7 @@ use crate::datasource::physical_plan::{
 use crate::datasource::schema_adapter::SchemaAdapterFactory;
 use crate::physical_optimizer::pruning::PruningPredicate;
 use arrow_schema::{ArrowError, SchemaRef};
+use datafusion_common::{exec_err, Result};
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
 use futures::{StreamExt, TryStreamExt};
@@ -60,11 +61,10 @@ pub(super) struct ParquetOpener {
 impl FileOpener for ParquetOpener {
     fn open(&self, file_meta: FileMeta) -> 
datafusion_common::Result<FileOpenFuture> {
         let file_range = file_meta.range.clone();
-        let file_metrics = ParquetFileMetrics::new(
-            self.partition_index,
-            file_meta.location().as_ref(),
-            &self.metrics,
-        );
+        let extensions = file_meta.extensions.clone();
+        let file_name = file_meta.location().to_string();
+        let file_metrics =
+            ParquetFileMetrics::new(self.partition_index, &file_name, 
&self.metrics);
 
         let reader: Box<dyn AsyncFileReader> =
             self.parquet_file_reader_factory.create_reader(
@@ -139,7 +139,8 @@ impl FileOpener for ParquetOpener {
             let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
             let rg_metadata = file_metadata.row_groups();
             // track which row groups to actually read
-            let access_plan = ParquetAccessPlan::new_all(rg_metadata.len());
+            let access_plan =
+                create_initial_plan(&file_name, extensions, 
rg_metadata.len())?;
             let mut row_groups = RowGroupAccessPlanFilter::new(access_plan);
             // if there is a range restricting what parts of the file to read
             if let Some(range) = file_range.as_ref() {
@@ -186,7 +187,7 @@ impl FileOpener for ParquetOpener {
 
             let row_group_indexes = access_plan.row_group_indexes();
             if let Some(row_selection) =
-                access_plan.into_overall_row_selection(rg_metadata)
+                access_plan.into_overall_row_selection(rg_metadata)?
             {
                 builder = builder.with_row_selection(row_selection);
             }
@@ -212,3 +213,34 @@ impl FileOpener for ParquetOpener {
         }))
     }
 }
+
+/// Return the initial [`ParquetAccessPlan`]
+///
+/// If the user has supplied one as an extension, use that
+/// otherwise return a plan that scans all row groups
+///
+/// Returns an error if an invalid `ParquetAccessPlan` is provided
+///
+/// Note: file_name is only used for error messages
+fn create_initial_plan(
+    file_name: &str,
+    extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
+    row_group_count: usize,
+) -> Result<ParquetAccessPlan> {
+    if let Some(extensions) = extensions {
+        if let Some(access_plan) = 
extensions.downcast_ref::<ParquetAccessPlan>() {
+            let plan_len = access_plan.len();
+            if plan_len != row_group_count {
+                return exec_err!(
+                    "Invalid ParquetAccessPlan for {file_name}. Specified 
{plan_len} row groups, but file has {row_group_count}"
+                );
+            }
+
+            // check row group count matches the plan
+            return Ok(access_plan.clone());
+        }
+    }
+
+    // default to scanning all row groups
+    Ok(ParquetAccessPlan::new_all(row_group_count))
+}
diff --git a/datafusion/core/tests/parquet/external_access_plan.rs 
b/datafusion/core/tests/parquet/external_access_plan.rs
new file mode 100644
index 0000000000..03afc858df
--- /dev/null
+++ b/datafusion/core/tests/parquet/external_access_plan.rs
@@ -0,0 +1,418 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests for passing user provided [`ParquetAccessPlan`]` to `ParquetExec`]`
+use crate::parquet::utils::MetricsFinder;
+use crate::parquet::{create_data_batch, Scenario};
+use arrow::util::pretty::pretty_format_batches;
+use arrow_schema::SchemaRef;
+use datafusion::common::Result;
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::parquet::{ParquetAccessPlan, 
RowGroupAccess};
+use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
+use datafusion::prelude::SessionContext;
+use datafusion_common::{assert_contains, DFSchema};
+use datafusion_execution::object_store::ObjectStoreUrl;
+use datafusion_expr::{col, lit, Expr};
+use datafusion_physical_plan::metrics::MetricsSet;
+use datafusion_physical_plan::ExecutionPlan;
+use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
+use parquet::arrow::ArrowWriter;
+use parquet::file::properties::WriterProperties;
+use std::sync::{Arc, OnceLock};
+use tempfile::NamedTempFile;
+
+#[tokio::test]
+async fn none() {
+    // no user defined plan
+    Test {
+        access_plan: None,
+        expected_rows: 10,
+    }
+    .run_success()
+    .await;
+}
+
+#[tokio::test]
+async fn scan_all() {
+    let parquet_metrics = Test {
+        access_plan: Some(ParquetAccessPlan::new(vec![
+            RowGroupAccess::Scan,
+            RowGroupAccess::Scan,
+        ])),
+        expected_rows: 10,
+    }
+    .run_success()
+    .await;
+
+    // Verify that some bytes were read
+    let bytes_scanned = metric_value(&parquet_metrics, 
"bytes_scanned").unwrap();
+    assert_ne!(bytes_scanned, 0, "metrics : {parquet_metrics:#?}",);
+}
+
+#[tokio::test]
+async fn skip_all() {
+    let parquet_metrics = Test {
+        access_plan: Some(ParquetAccessPlan::new(vec![
+            RowGroupAccess::Skip,
+            RowGroupAccess::Skip,
+        ])),
+        expected_rows: 0,
+    }
+    .run_success()
+    .await;
+
+    // Verify that skipping all row groups skips reading any data at all
+    let bytes_scanned = metric_value(&parquet_metrics, 
"bytes_scanned").unwrap();
+    assert_eq!(bytes_scanned, 0, "metrics : {parquet_metrics:#?}",);
+}
+
+#[tokio::test]
+async fn skip_one_row_group() {
+    let plans = vec![
+        ParquetAccessPlan::new(vec![RowGroupAccess::Scan, 
RowGroupAccess::Skip]),
+        ParquetAccessPlan::new(vec![RowGroupAccess::Skip, 
RowGroupAccess::Scan]),
+    ];
+
+    for access_plan in plans {
+        Test {
+            access_plan: Some(access_plan),
+            expected_rows: 5,
+        }
+        .run_success()
+        .await;
+    }
+}
+
+#[tokio::test]
+async fn selection_scan() {
+    let plans = vec![
+        ParquetAccessPlan::new(vec![
+            RowGroupAccess::Scan,
+            RowGroupAccess::Selection(select_one_row()),
+        ]),
+        ParquetAccessPlan::new(vec![
+            RowGroupAccess::Selection(select_one_row()),
+            RowGroupAccess::Scan,
+        ]),
+    ];
+
+    for access_plan in plans {
+        Test {
+            access_plan: Some(access_plan),
+            expected_rows: 6,
+        }
+        .run_success()
+        .await;
+    }
+}
+
+#[tokio::test]
+async fn skip_scan() {
+    let plans = vec![
+        // skip one row group, scan the toehr
+        ParquetAccessPlan::new(vec![
+            RowGroupAccess::Skip,
+            RowGroupAccess::Selection(select_one_row()),
+        ]),
+        ParquetAccessPlan::new(vec![
+            RowGroupAccess::Selection(select_one_row()),
+            RowGroupAccess::Skip,
+        ]),
+    ];
+
+    for access_plan in plans {
+        Test {
+            access_plan: Some(access_plan),
+            expected_rows: 1,
+        }
+        .run_success()
+        .await;
+    }
+}
+
+#[tokio::test]
+async fn plan_and_filter() {
+    // show that row group pruning is applied even when an initial plan is 
supplied
+
+    // No rows match this predicate
+    let predicate = col("utf8").eq(lit("z"));
+
+    // user supplied access plan specifies to still read a row group
+    let access_plan = Some(ParquetAccessPlan::new(vec![
+        // Row group 0 has values a-d
+        RowGroupAccess::Skip,
+        // Row group 1 has values e-i
+        RowGroupAccess::Scan,
+    ]));
+
+    // initia
+    let parquet_metrics = TestFull {
+        access_plan,
+        expected_rows: 0,
+        predicate: Some(predicate),
+    }
+    .run()
+    .await
+    .unwrap();
+
+    // Verify that row group pruning still happens for just that group
+    let row_groups_pruned_statistics =
+        metric_value(&parquet_metrics, 
"row_groups_pruned_statistics").unwrap();
+    assert_eq!(
+        row_groups_pruned_statistics, 1,
+        "metrics : {parquet_metrics:#?}",
+    );
+}
+
+#[tokio::test]
+async fn two_selections() {
+    let plans = vec![
+        ParquetAccessPlan::new(vec![
+            RowGroupAccess::Selection(select_one_row()),
+            RowGroupAccess::Selection(select_two_rows()),
+        ]),
+        ParquetAccessPlan::new(vec![
+            RowGroupAccess::Selection(select_two_rows()),
+            RowGroupAccess::Selection(select_one_row()),
+        ]),
+    ];
+
+    for access_plan in plans {
+        Test {
+            access_plan: Some(access_plan),
+            expected_rows: 3,
+        }
+        .run_success()
+        .await;
+    }
+}
+
+#[tokio::test]
+async fn bad_row_groups() {
+    let err = TestFull {
+        access_plan: Some(ParquetAccessPlan::new(vec![
+            // file has only 2 row groups, but specify 3
+            RowGroupAccess::Scan,
+            RowGroupAccess::Skip,
+            RowGroupAccess::Scan,
+        ])),
+        expected_rows: 0,
+        predicate: None,
+    }
+    .run()
+    .await
+    .unwrap_err();
+    let err_string = err.to_string();
+    assert_contains!(&err_string, "Invalid ParquetAccessPlan");
+    assert_contains!(&err_string, "Specified 3 row groups, but file has 2");
+}
+
+#[tokio::test]
+async fn bad_selection() {
+    let err = TestFull {
+        access_plan: Some(ParquetAccessPlan::new(vec![
+            // specify fewer rows than are actually in the row group
+            RowGroupAccess::Selection(RowSelection::from(vec![
+                RowSelector::skip(1),
+                RowSelector::select(3),
+            ])),
+            RowGroupAccess::Skip,
+        ])),
+        // expects that we hit an error, this should not be run
+        expected_rows: 10000,
+        predicate: None,
+    }
+    .run()
+    .await
+    .unwrap_err();
+    let err_string = err.to_string();
+    assert_contains!(&err_string, "Internal error: Invalid ParquetAccessPlan 
Selection. Row group 0 has 5 rows but selection only specifies 4 rows");
+}
+
+/// Return a RowSelection of 1 rows from a row group of 5 rows
+fn select_one_row() -> RowSelection {
+    RowSelection::from(vec![
+        RowSelector::skip(2),
+        RowSelector::select(1),
+        RowSelector::skip(2),
+    ])
+}
+/// Return a RowSelection of 2 rows from a row group of 5 rows
+fn select_two_rows() -> RowSelection {
+    RowSelection::from(vec![
+        RowSelector::skip(1),
+        RowSelector::select(1),
+        RowSelector::skip(1),
+        RowSelector::select(1),
+        RowSelector::skip(1),
+    ])
+}
+
+/// Test for passing user defined ParquetAccessPlans. See [`TestFull`] for 
details.
+#[derive(Debug)]
+struct Test {
+    access_plan: Option<ParquetAccessPlan>,
+    expected_rows: usize,
+}
+
+impl Test {
+    /// Runs the test case, panic'ing on error.
+    ///
+    /// Returns the `MetricsSet` from the ParqeutExec
+    async fn run_success(self) -> MetricsSet {
+        let Self {
+            access_plan,
+            expected_rows,
+        } = self;
+        TestFull {
+            access_plan,
+            expected_rows,
+            predicate: None,
+        }
+        .run()
+        .await
+        .unwrap()
+    }
+}
+
+/// Test for passing user defined ParquetAccessPlans:
+///
+/// 1. Creates a parquet file with 2 row groups, each with 5 rows
+/// 2. Reads the parquet file with an optional user provided access plan
+/// 3. Verifies that the expected number of rows is read
+/// 4. Returns the statistics from running the plan
+struct TestFull {
+    access_plan: Option<ParquetAccessPlan>,
+    expected_rows: usize,
+    predicate: Option<Expr>,
+}
+
+impl TestFull {
+    async fn run(self) -> Result<MetricsSet> {
+        let ctx = SessionContext::new();
+
+        let Self {
+            access_plan,
+            expected_rows,
+            predicate,
+        } = self;
+
+        let TestData {
+            temp_file: _,
+            schema,
+            file_name,
+            file_size,
+        } = get_test_data();
+
+        let mut partitioned_file = PartitionedFile::new(file_name, *file_size);
+
+        // add the access plan, if any, as an extension
+        if let Some(access_plan) = access_plan {
+            partitioned_file = 
partitioned_file.with_extensions(Arc::new(access_plan));
+        }
+
+        // Create a ParquetExec to read the file
+        let object_store_url = ObjectStoreUrl::local_filesystem();
+        let config = FileScanConfig::new(object_store_url, schema.clone())
+            .with_file(partitioned_file);
+
+        let mut builder = ParquetExec::builder(config);
+
+        // add the predicate, if requested
+        if let Some(predicate) = predicate {
+            let df_schema = DFSchema::try_from(schema.clone())?;
+            let predicate = ctx.create_physical_expr(predicate, &df_schema)?;
+            builder = builder.with_predicate(predicate);
+        }
+
+        let plan: Arc<dyn ExecutionPlan> = builder.build_arc();
+
+        // run the ParquetExec and collect the results
+        let results =
+            datafusion::physical_plan::collect(Arc::clone(&plan), 
ctx.task_ctx()).await?;
+
+        // calculate the total number of rows that came out
+        let total_rows = results.iter().map(|b| b.num_rows()).sum::<usize>();
+        assert_eq!(
+            total_rows,
+            expected_rows,
+            "results: \n{}",
+            pretty_format_batches(&results).unwrap()
+        );
+
+        Ok(MetricsFinder::find_metrics(plan.as_ref()).unwrap())
+    }
+}
+
+// Holds necessary data for these tests to reuse the same parquet file
+struct TestData {
+    // field is present as on drop the file is deleted
+    #[allow(dead_code)]
+    temp_file: NamedTempFile,
+    schema: SchemaRef,
+    file_name: String,
+    file_size: u64,
+}
+
+static TEST_DATA: OnceLock<TestData> = OnceLock::new();
+
+/// Return a parquet file with 2 row groups each with 5 rows
+fn get_test_data() -> &'static TestData {
+    TEST_DATA.get_or_init(|| {
+        let scenario = Scenario::UTF8;
+        let row_per_group = 5;
+
+        let mut temp_file = tempfile::Builder::new()
+            .prefix("user_access_plan")
+            .suffix(".parquet")
+            .tempfile()
+            .expect("tempfile creation");
+
+        let props = WriterProperties::builder()
+            .set_max_row_group_size(row_per_group)
+            .build();
+
+        let batches = create_data_batch(scenario);
+        let schema = batches[0].schema();
+
+        let mut writer =
+            ArrowWriter::try_new(&mut temp_file, schema.clone(), 
Some(props)).unwrap();
+
+        for batch in batches {
+            writer.write(&batch).expect("writing batch");
+        }
+        writer.close().unwrap();
+
+        let file_name = temp_file.path().to_string_lossy().to_string();
+        let file_size = temp_file.path().metadata().unwrap().len();
+
+        TestData {
+            temp_file,
+            schema,
+            file_name,
+            file_size,
+        }
+    })
+}
+
+/// Return the total value of the specified metric name
+fn metric_value(parquet_metrics: &MetricsSet, metric_name: &str) -> 
Option<usize> {
+    parquet_metrics
+        .sum(|metric| metric.value().name() == metric_name)
+        .map(|v| v.as_usize())
+}
diff --git a/datafusion/core/tests/parquet/mod.rs 
b/datafusion/core/tests/parquet/mod.rs
index 99769a3367..5ab268beb9 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 //! Parquet integration tests
+use crate::parquet::utils::MetricsFinder;
 use arrow::array::Decimal128Array;
 use arrow::datatypes::{
     i256, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
@@ -41,8 +42,8 @@ use arrow_array::{
 use arrow_schema::IntervalUnit;
 use chrono::{Datelike, Duration, TimeDelta};
 use datafusion::{
-    datasource::{physical_plan::ParquetExec, provider_as_source, 
TableProvider},
-    physical_plan::{accept, metrics::MetricsSet, ExecutionPlan, 
ExecutionPlanVisitor},
+    datasource::{provider_as_source, TableProvider},
+    physical_plan::metrics::MetricsSet,
     prelude::{ParquetReadOptions, SessionConfig, SessionContext},
 };
 use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
@@ -51,8 +52,12 @@ use parquet::arrow::ArrowWriter;
 use parquet::file::properties::WriterProperties;
 use std::sync::Arc;
 use tempfile::NamedTempFile;
+
 mod arrow_statistics;
 mod custom_reader;
+// Don't run on windows as tempfiles don't seem to work the same
+#[cfg(not(target_os = "windows"))]
+mod external_access_plan;
 mod file_statistics;
 #[cfg(not(target_family = "windows"))]
 mod filter_pushdown;
@@ -60,6 +65,7 @@ mod page_pruning;
 mod row_group_pruning;
 mod schema;
 mod schema_coercion;
+mod utils;
 
 #[cfg(test)]
 #[ctor::ctor]
@@ -303,25 +309,8 @@ impl ContextWithParquet {
             .expect("Running");
 
         // find the parquet metrics
-        struct MetricsFinder {
-            metrics: Option<MetricsSet>,
-        }
-        impl ExecutionPlanVisitor for MetricsFinder {
-            type Error = std::convert::Infallible;
-            fn pre_visit(
-                &mut self,
-                plan: &dyn ExecutionPlan,
-            ) -> Result<bool, Self::Error> {
-                if plan.as_any().downcast_ref::<ParquetExec>().is_some() {
-                    self.metrics = plan.metrics();
-                }
-                // stop searching once we have found the metrics
-                Ok(self.metrics.is_none())
-            }
-        }
-        let mut finder = MetricsFinder { metrics: None };
-        accept(physical_plan.as_ref(), &mut finder).unwrap();
-        let parquet_metrics = finder.metrics.unwrap();
+        let parquet_metrics =
+            MetricsFinder::find_metrics(physical_plan.as_ref()).unwrap();
 
         let result_rows = results.iter().map(|b| b.num_rows()).sum();
 
diff --git a/datafusion/core/tests/parquet/utils.rs 
b/datafusion/core/tests/parquet/utils.rs
new file mode 100644
index 0000000000..d8d2b2fbb8
--- /dev/null
+++ b/datafusion/core/tests/parquet/utils.rs
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilities for parquet tests
+
+use datafusion::datasource::physical_plan::ParquetExec;
+use datafusion_physical_plan::metrics::MetricsSet;
+use datafusion_physical_plan::{accept, ExecutionPlan, ExecutionPlanVisitor};
+
+/// Find the metrics from the first ParquetExec encountered in the plan
+#[derive(Debug)]
+pub struct MetricsFinder {
+    metrics: Option<MetricsSet>,
+}
+impl MetricsFinder {
+    pub fn new() -> Self {
+        Self { metrics: None }
+    }
+
+    /// Return the metrics if found
+    pub fn into_metrics(self) -> Option<MetricsSet> {
+        self.metrics
+    }
+
+    pub fn find_metrics(plan: &dyn ExecutionPlan) -> Option<MetricsSet> {
+        let mut finder = Self::new();
+        accept(plan, &mut finder).unwrap();
+        finder.into_metrics()
+    }
+}
+
+impl ExecutionPlanVisitor for MetricsFinder {
+    type Error = std::convert::Infallible;
+    fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, 
Self::Error> {
+        if plan.as_any().downcast_ref::<ParquetExec>().is_some() {
+            self.metrics = plan.metrics();
+        }
+        // stop searching once we have found the metrics
+        Ok(self.metrics.is_none())
+    }
+}
diff --git a/datafusion/core/tests/parquet_exec.rs 
b/datafusion/core/tests/parquet_exec.rs
index 43ceb615a0..f41f82a76c 100644
--- a/datafusion/core/tests/parquet_exec.rs
+++ b/datafusion/core/tests/parquet_exec.rs
@@ -15,5 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+//! End to end test for `ParquetExec` and related components
+
 /// Run all tests that are found in the `parquet` directory
 mod parquet;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to