This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a26eca25a Add `ParquetAccessPlan`, unify RowGroup selection and 
PagePruning selection (#10738)
1a26eca25a is described below

commit 1a26eca25abade1fe80ce2126c810ff9c8defcd0
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Jun 6 08:40:30 2024 -0400

    Add `ParquetAccessPlan`, unify RowGroup selection and PagePruning selection 
(#10738)
    
    * Add `ParquetAccessPlan` that describes which part of the parquet files to 
read
    
    * Rename to RowGroupAccessPlanFilter
    
    * Clarify when overall selection is needed
    
    * Update documentation to exlain the relationship between 
scan/skip/selection
    
    * Break early of the row selection is empty
---
 .../physical_plan/parquet/access_plan.rs           | 449 +++++++++++++++++++++
 .../src/datasource/physical_plan/parquet/mod.rs    |   7 +-
 .../src/datasource/physical_plan/parquet/opener.rs |  31 +-
 .../physical_plan/parquet/page_filter.rs           | 288 ++++++-------
 .../datasource/physical_plan/parquet/row_groups.rs | 125 +++---
 5 files changed, 677 insertions(+), 223 deletions(-)

diff --git 
a/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
new file mode 100644
index 0000000000..c59459ba61
--- /dev/null
+++ b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
@@ -0,0 +1,449 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
+use parquet::file::metadata::RowGroupMetaData;
+
+/// A selection of rows and row groups within a ParquetFile to decode.
+///
+/// A `ParquetAccessPlan` is used to limit the row groups and data pages a 
`ParquetExec`
+/// will read and decode to improve performance.
+///
+/// Note that page level pruning based on ArrowPredicate is applied after all 
of
+/// these selections
+///
+/// # Example
+///
+/// For example, given a Parquet file with 4 row groups, a `ParquetAccessPlan`
+/// can be used to specify skipping row group 0 and 2, scanning a range of rows
+/// in row group 1, and scanning all rows in row group 3 as follows:
+///
+/// ```rust
+/// # use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
+/// # use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
+/// // Default to scan all row groups
+/// let mut access_plan = ParquetAccessPlan::new_all(4);
+/// access_plan.skip(0); // skip row group
+/// // Use parquet reader RowSelector to specify scanning rows 100-200 and 
350-400
+/// // in a row group that has 1000 rows
+/// let row_selection = RowSelection::from(vec![
+///    RowSelector::skip(100),
+///    RowSelector::select(100),
+///    RowSelector::skip(150),
+///    RowSelector::select(50),
+///    RowSelector::skip(600),  // skip last 600 rows
+/// ]);
+/// access_plan.scan_selection(1, row_selection);
+/// access_plan.skip(2); // skip row group 2
+/// // row group 3 is scanned by default
+/// ```
+///
+/// The resulting plan would look like:
+///
+/// ```text
+/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
+///
+/// │                   │  SKIP
+///
+/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+///  Row Group 0
+/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
+///  ┌────────────────┐    SCAN ONLY ROWS
+/// │└────────────────┘ │  100-200
+///  ┌────────────────┐    350-400
+/// │└────────────────┘ │
+///  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
+///  Row Group 1
+/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
+///                        SKIP
+/// │                   │
+///
+/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+///  Row Group 2
+/// ┌───────────────────┐
+/// │                   │  SCAN ALL ROWS
+/// │                   │
+/// │                   │
+/// └───────────────────┘
+///  Row Group 3
+/// ```
+#[derive(Debug, Clone, PartialEq)]
+pub struct ParquetAccessPlan {
+    /// How to access the i-th row group
+    row_groups: Vec<RowGroupAccess>,
+}
+
+/// Describes how the parquet reader will access a row group
+#[derive(Debug, Clone, PartialEq)]
+pub enum RowGroupAccess {
+    /// Do not read the row group at all
+    Skip,
+    /// Read all rows from the row group
+    Scan,
+    /// Scan only the specified rows within the row group
+    Selection(RowSelection),
+}
+
+impl RowGroupAccess {
+    /// Return true if this row group should be scanned
+    pub fn should_scan(&self) -> bool {
+        match self {
+            RowGroupAccess::Skip => false,
+            RowGroupAccess::Scan | RowGroupAccess::Selection(_) => true,
+        }
+    }
+}
+
+impl ParquetAccessPlan {
+    /// Create a new `ParquetAccessPlan` that scans all row groups
+    pub fn new_all(row_group_count: usize) -> Self {
+        Self {
+            row_groups: vec![RowGroupAccess::Scan; row_group_count],
+        }
+    }
+
+    /// Create a new `ParquetAccessPlan` that scans no row groups
+    pub fn new_none(row_group_count: usize) -> Self {
+        Self {
+            row_groups: vec![RowGroupAccess::Skip; row_group_count],
+        }
+    }
+
+    /// Create a new `ParquetAccessPlan` from the specified 
[`RowGroupAccess`]es
+    pub fn new(row_groups: Vec<RowGroupAccess>) -> Self {
+        Self { row_groups }
+    }
+
+    /// Set the i-th row group to the specified [`RowGroupAccess`]
+    pub fn set(&mut self, idx: usize, access: RowGroupAccess) {
+        self.row_groups[idx] = access;
+    }
+
+    /// skips the i-th row group (should not be scanned)
+    pub fn skip(&mut self, idx: usize) {
+        self.set(idx, RowGroupAccess::Skip);
+    }
+
+    /// Return true if the i-th row group should be scanned
+    pub fn should_scan(&self, idx: usize) -> bool {
+        self.row_groups[idx].should_scan()
+    }
+
+    /// Set to scan only the [`RowSelection`] in the specified row group.
+    ///
+    /// Behavior is different depending on the existing access
+    /// * [`RowGroupAccess::Skip`]: does nothing
+    /// * [`RowGroupAccess::Scan`]: Updates to scan only the rows in the 
`RowSelection`
+    /// * [`RowGroupAccess::Selection`]: Updates to scan only the intersection 
of the existing selection and the new selection
+    pub fn scan_selection(&mut self, idx: usize, selection: RowSelection) {
+        self.row_groups[idx] = match &self.row_groups[idx] {
+            // already skipping the entire row group
+            RowGroupAccess::Skip => RowGroupAccess::Skip,
+            RowGroupAccess::Scan => RowGroupAccess::Selection(selection),
+            RowGroupAccess::Selection(existing_selection) => {
+                
RowGroupAccess::Selection(existing_selection.intersection(&selection))
+            }
+        }
+    }
+
+    /// Return an overall `RowSelection`, if needed
+    ///
+    /// This is used to compute the row selection for the parquet reader. See
+    /// [`ArrowReaderBuilder::with_row_selection`] for more details.
+    ///
+    /// Returns
+    /// * `None` if there are no  [`RowGroupAccess::Selection`]
+    /// * `Some(selection)` if there are [`RowGroupAccess::Selection`]s
+    ///
+    /// The returned selection represents which rows to scan across any row
+    /// row groups which are not skipped.
+    ///
+    /// # Notes
+    ///
+    /// If there are no [`RowGroupAccess::Selection`]s, the overall row
+    /// selection is `None` because each row group is either entirely skipped 
or
+    /// scanned, which is covered by [`Self::row_group_indexes`].
+    ///
+    /// If there are any [`RowGroupAccess::Selection`], an overall row 
selection
+    /// is returned for *all* the rows in the row groups that are not skipped.
+    /// Thus it includes a `Select` selection for any [`RowGroupAccess::Scan`].
+    ///
+    /// # Example: No Selections
+    ///
+    /// Given an access plan like this
+    ///
+    /// ```text
+    ///   RowGroupAccess::Scan (scan all row group 0)
+    ///   RowGroupAccess::Skip (skip row group 1)
+    ///   RowGroupAccess::Scan (scan all row group 2)
+    ///   RowGroupAccess::Scan (scan all row group 3)
+    /// ```
+    ///
+    /// The overall row selection would be `None` because there are no
+    /// [`RowGroupAccess::Selection`]s. The row group indexes
+    /// returned by [`Self::row_group_indexes`] would be `0, 2, 3` .
+    ///
+    /// # Example: With Selections
+    ///
+    /// Given an access plan like this:
+    ///
+    /// ```text
+    ///   RowGroupAccess::Scan (scan all row group 0)
+    ///   RowGroupAccess::Skip (skip row group 1)
+    ///   RowGroupAccess::Select (skip 50, scan 50, skip 900) (scan rows 
50-100 in row group 2)
+    ///   RowGroupAccess::Scan (scan all row group 3)
+    /// ```
+    ///
+    /// Assuming each row group has 1000 rows, the resulting row selection 
would
+    /// be the rows to scan in row group 0, 2 and 4:
+    ///
+    /// ```text
+    ///  RowSelection::Select(1000) (scan all rows in row group 0)
+    ///  RowSelection::Skip(50)     (skip first 50 rows in row group 2)
+    ///  RowSelection::Select(50)   (scan rows 50-100 in row group 2)
+    ///  RowSelection::Skip(900)    (skip last 900 rows in row group 2)
+    ///  RowSelection::Select(1000) (scan all rows in row group 3)
+    /// ```
+    ///
+    /// Note there is no entry for the (entirely) skipped row group 1.
+    ///
+    /// The row group indexes returned by [`Self::row_group_indexes`] would
+    /// still be `0, 2, 3` .
+    ///
+    /// [`ArrowReaderBuilder::with_row_selection`]: 
parquet::arrow::arrow_reader::ArrowReaderBuilder::with_row_selection
+    pub fn into_overall_row_selection(
+        self,
+        row_group_meta_data: &[RowGroupMetaData],
+    ) -> Option<RowSelection> {
+        assert_eq!(row_group_meta_data.len(), self.row_groups.len());
+        // Intuition: entire row groups are filtered out using
+        // `row_group_indexes` which come from Skip and Scan. An overall
+        // RowSelection is only useful if there is any parts *within* a row 
group
+        // which can be filtered out, that is a `Selection`.
+        if !self
+            .row_groups
+            .iter()
+            .any(|rg| matches!(rg, RowGroupAccess::Selection(_)))
+        {
+            return None;
+        }
+
+        let total_selection: RowSelection = self
+            .row_groups
+            .into_iter()
+            .zip(row_group_meta_data.iter())
+            .flat_map(|(rg, rg_meta)| {
+                match rg {
+                    RowGroupAccess::Skip => vec![],
+                    RowGroupAccess::Scan => {
+                        // need a row group access to scan the entire row 
group (need row group counts)
+                        vec![RowSelector::select(rg_meta.num_rows() as usize)]
+                    }
+                    RowGroupAccess::Selection(selection) => {
+                        let selection: Vec<RowSelector> = selection.into();
+                        selection
+                    }
+                }
+            })
+            .collect();
+
+        Some(total_selection)
+    }
+
+    /// Return an iterator over the row group indexes that should be scanned
+    pub fn row_group_index_iter(&self) -> impl Iterator<Item = usize> + '_ {
+        self.row_groups.iter().enumerate().filter_map(|(idx, b)| {
+            if b.should_scan() {
+                Some(idx)
+            } else {
+                None
+            }
+        })
+    }
+
+    /// Return a vec of all row group indexes to scan
+    pub fn row_group_indexes(&self) -> Vec<usize> {
+        self.row_group_index_iter().collect()
+    }
+
+    /// Return the total number of row groups (not the total number or groups 
to
+    /// scan)
+    pub fn len(&self) -> usize {
+        self.row_groups.len()
+    }
+
+    /// Return true if there are no row groups
+    pub fn is_empty(&self) -> bool {
+        self.row_groups.is_empty()
+    }
+
+    /// Get a reference to the inner accesses
+    pub fn inner(&self) -> &[RowGroupAccess] {
+        &self.row_groups
+    }
+
+    /// Covert into the inner row group accesses
+    pub fn into_inner(self) -> Vec<RowGroupAccess> {
+        self.row_groups
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use parquet::basic::LogicalType;
+    use parquet::file::metadata::ColumnChunkMetaData;
+    use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
+    use std::sync::{Arc, OnceLock};
+
+    #[test]
+    fn test_only_scans() {
+        let access_plan = ParquetAccessPlan::new(vec![
+            RowGroupAccess::Scan,
+            RowGroupAccess::Scan,
+            RowGroupAccess::Scan,
+            RowGroupAccess::Scan,
+        ]);
+
+        let row_group_indexes = access_plan.row_group_indexes();
+        let row_selection = 
access_plan.into_overall_row_selection(row_group_metadata());
+
+        // scan all row groups, no selection
+        assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
+        assert_eq!(row_selection, None);
+    }
+
+    #[test]
+    fn test_only_skips() {
+        let access_plan = ParquetAccessPlan::new(vec![
+            RowGroupAccess::Skip,
+            RowGroupAccess::Skip,
+            RowGroupAccess::Skip,
+            RowGroupAccess::Skip,
+        ]);
+
+        let row_group_indexes = access_plan.row_group_indexes();
+        let row_selection = 
access_plan.into_overall_row_selection(row_group_metadata());
+
+        // skip all row groups, no selection
+        assert_eq!(row_group_indexes, vec![] as Vec<usize>);
+        assert_eq!(row_selection, None);
+    }
+    #[test]
+    fn test_mixed_1() {
+        let access_plan = ParquetAccessPlan::new(vec![
+            RowGroupAccess::Scan,
+            RowGroupAccess::Selection(
+                vec![RowSelector::select(5), RowSelector::skip(7)].into(),
+            ),
+            RowGroupAccess::Skip,
+            RowGroupAccess::Skip,
+        ]);
+
+        let row_group_indexes = access_plan.row_group_indexes();
+        let row_selection = 
access_plan.into_overall_row_selection(row_group_metadata());
+
+        assert_eq!(row_group_indexes, vec![0, 1]);
+        assert_eq!(
+            row_selection,
+            Some(
+                vec![
+                    // select the entire first row group
+                    RowSelector::select(10),
+                    // selectors from the second row group
+                    RowSelector::select(5),
+                    RowSelector::skip(7)
+                ]
+                .into()
+            )
+        );
+    }
+
+    #[test]
+    fn test_mixed_2() {
+        let access_plan = ParquetAccessPlan::new(vec![
+            RowGroupAccess::Skip,
+            RowGroupAccess::Scan,
+            RowGroupAccess::Selection(
+                vec![RowSelector::select(5), RowSelector::skip(7)].into(),
+            ),
+            RowGroupAccess::Scan,
+        ]);
+
+        let row_group_indexes = access_plan.row_group_indexes();
+        let row_selection = 
access_plan.into_overall_row_selection(row_group_metadata());
+
+        assert_eq!(row_group_indexes, vec![1, 2, 3]);
+        assert_eq!(
+            row_selection,
+            Some(
+                vec![
+                    // select the entire second row group
+                    RowSelector::select(20),
+                    // selectors from the third row group
+                    RowSelector::select(5),
+                    RowSelector::skip(7),
+                    // select the entire fourth row group
+                    RowSelector::select(40),
+                ]
+                .into()
+            )
+        );
+    }
+
+    static ROW_GROUP_METADATA: OnceLock<Vec<RowGroupMetaData>> = 
OnceLock::new();
+
+    /// [`RowGroupMetaData`] that returns 4 row groups with 10, 20, 30, 40 rows
+    /// respectively
+    fn row_group_metadata() -> &'static [RowGroupMetaData] {
+        ROW_GROUP_METADATA.get_or_init(|| {
+            let schema_descr = get_test_schema_descr();
+            let row_counts = [10, 20, 30, 40];
+
+            row_counts
+                .into_iter()
+                .map(|num_rows| {
+                    let column = 
ColumnChunkMetaData::builder(schema_descr.column(0))
+                        .set_num_values(num_rows)
+                        .build()
+                        .unwrap();
+
+                    RowGroupMetaData::builder(schema_descr.clone())
+                        .set_num_rows(num_rows)
+                        .set_column_metadata(vec![column])
+                        .build()
+                        .unwrap()
+                })
+                .collect()
+        })
+    }
+
+    /// Single column schema with a single column named "a" of type 
`BYTE_ARRAY`/`String`
+    fn get_test_schema_descr() -> SchemaDescPtr {
+        use parquet::basic::Type as PhysicalType;
+        use parquet::schema::types::Type as SchemaType;
+        let field = SchemaType::primitive_type_builder("a", 
PhysicalType::BYTE_ARRAY)
+            .with_logical_type(Some(LogicalType::String))
+            .build()
+            .unwrap();
+        let schema = SchemaType::group_type_builder("schema")
+            .with_fields(vec![Arc::new(field)])
+            .build()
+            .unwrap();
+        Arc::new(SchemaDescriptor::new(Arc::new(schema)))
+    }
+}
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index f0328098b4..04b25069e9 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -47,6 +47,7 @@ use log::debug;
 use parquet::basic::{ConvertedType, LogicalType};
 use parquet::schema::types::ColumnDescriptor;
 
+mod access_plan;
 mod metrics;
 mod opener;
 mod page_filter;
@@ -59,6 +60,7 @@ mod writer;
 use crate::datasource::schema_adapter::{
     DefaultSchemaAdapterFactory, SchemaAdapterFactory,
 };
+pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
 pub use metrics::ParquetFileMetrics;
 use opener::ParquetOpener;
 pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
@@ -152,8 +154,9 @@ pub use writer::plan_to_parquet;
 /// the file.
 ///
 /// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata)
-/// via [`ParquetFileReaderFactory`] and applies any predicates and projections
-/// to determine what pages must be read.
+/// via [`ParquetFileReaderFactory`], creating a [`ParquetAccessPlan`] by
+/// applying predicates to metadata. The plan and projections are used to
+/// determine what pages must be read.
 ///
 /// * Step 4: The stream begins reading data, fetching the required pages
 /// and incrementally decoding them.
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
index 5fb21975df..a5047e487e 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
@@ -18,8 +18,10 @@
 //! [`ParquetOpener`] for opening Parquet files
 
 use 
crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate;
-use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
-use crate::datasource::physical_plan::parquet::{row_filter, 
should_enable_page_index};
+use 
crate::datasource::physical_plan::parquet::row_groups::RowGroupAccessPlanFilter;
+use crate::datasource::physical_plan::parquet::{
+    row_filter, should_enable_page_index, ParquetAccessPlan,
+};
 use crate::datasource::physical_plan::{
     FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, 
ParquetFileReaderFactory,
 };
@@ -137,7 +139,8 @@ impl FileOpener for ParquetOpener {
             let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
             let rg_metadata = file_metadata.row_groups();
             // track which row groups to actually read
-            let mut row_groups = RowGroupSet::new(rg_metadata.len());
+            let access_plan = ParquetAccessPlan::new_all(rg_metadata.len());
+            let mut row_groups = RowGroupAccessPlanFilter::new(access_plan);
             // if there is a range restricting what parts of the file to read
             if let Some(range) = file_range.as_ref() {
                 row_groups.prune_by_range(rg_metadata, range);
@@ -164,24 +167,30 @@ impl FileOpener for ParquetOpener {
                 }
             }
 
+            let mut access_plan = row_groups.build();
+
             // page index pruning: if all data on individual pages can
             // be ruled using page metadata, rows from other columns
             // with that range can be skipped as well
-            if enable_page_index && !row_groups.is_empty() {
+            if enable_page_index && !access_plan.is_empty() {
                 if let Some(p) = page_pruning_predicate {
-                    let pruned = p.prune(
+                    access_plan = p.prune_plan_with_page_index(
+                        access_plan,
                         &file_schema,
                         builder.parquet_schema(),
-                        &row_groups,
                         file_metadata.as_ref(),
                         &file_metrics,
-                    )?;
-                    if let Some(row_selection) = pruned {
-                        builder = builder.with_row_selection(row_selection);
-                    }
+                    );
                 }
             }
 
+            let row_group_indexes = access_plan.row_group_indexes();
+            if let Some(row_selection) =
+                access_plan.into_overall_row_selection(rg_metadata)
+            {
+                builder = builder.with_row_selection(row_selection);
+            }
+
             if let Some(limit) = limit {
                 builder = builder.with_limit(limit)
             }
@@ -189,7 +198,7 @@ impl FileOpener for ParquetOpener {
             let stream = builder
                 .with_projection(mask)
                 .with_batch_size(batch_size)
-                .with_row_groups(row_groups.indexes())
+                .with_row_groups(row_group_indexes)
                 .build()?;
 
             let adapted = stream
diff --git 
a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
index d47d5c56bd..7429ca5938 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
@@ -22,16 +22,15 @@ use arrow::array::{
     StringArray,
 };
 use arrow::datatypes::DataType;
-use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
+use arrow::{array::ArrayRef, datatypes::SchemaRef};
 use arrow_schema::Schema;
-use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_common::{Result, ScalarValue};
 use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
 use log::{debug, trace};
 use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor};
 use parquet::{
     arrow::arrow_reader::{RowSelection, RowSelector},
-    errors::ParquetError,
     file::{
         metadata::{ParquetMetaData, RowGroupMetaData},
         page_index::index::Index,
@@ -42,10 +41,10 @@ use std::collections::HashSet;
 use std::sync::Arc;
 
 use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type;
-use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
 use crate::datasource::physical_plan::parquet::statistics::{
     from_bytes_to_i128, parquet_column,
 };
+use crate::datasource::physical_plan::parquet::ParquetAccessPlan;
 use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
 
 use super::metrics::ParquetFileMetrics;
@@ -111,6 +110,7 @@ pub struct PagePruningPredicate {
 
 impl PagePruningPredicate {
     /// Create a new [`PagePruningPredicate`]
+    // TODO: this is infallaible -- it can not return an error
     pub fn try_new(expr: &Arc<dyn PhysicalExpr>, schema: SchemaRef) -> 
Result<Self> {
         let predicates = split_conjunction(expr)
             .into_iter()
@@ -129,105 +129,117 @@ impl PagePruningPredicate {
         Ok(Self { predicates })
     }
 
-    /// Returns a [`RowSelection`] for the given file
-    pub fn prune(
+    /// Returns an updated [`ParquetAccessPlan`] by applying predicates to the
+    /// parquet page index, if any
+    pub fn prune_plan_with_page_index(
         &self,
+        mut access_plan: ParquetAccessPlan,
         arrow_schema: &Schema,
         parquet_schema: &SchemaDescriptor,
-        row_groups: &RowGroupSet,
         file_metadata: &ParquetMetaData,
         file_metrics: &ParquetFileMetrics,
-    ) -> Result<Option<RowSelection>> {
+    ) -> ParquetAccessPlan {
         // scoped timer updates on drop
         let _timer_guard = file_metrics.page_index_eval_time.timer();
         if self.predicates.is_empty() {
-            return Ok(None);
+            return access_plan;
         }
 
         let page_index_predicates = &self.predicates;
         let groups = file_metadata.row_groups();
 
         if groups.is_empty() {
-            return Ok(None);
+            return access_plan;
         }
 
-        let file_offset_indexes = file_metadata.offset_index();
-        let file_page_indexes = file_metadata.column_index();
-        let (file_offset_indexes, file_page_indexes) = match (
-            file_offset_indexes,
-            file_page_indexes,
-        ) {
-            (Some(o), Some(i)) => (o, i),
-            _ => {
-                trace!(
+        let (Some(file_offset_indexes), Some(file_page_indexes)) =
+            (file_metadata.offset_index(), file_metadata.column_index())
+        else {
+            trace!(
                     "skip page pruning due to lack of indexes. Have offset: 
{}, column index: {}",
-                    file_offset_indexes.is_some(), file_page_indexes.is_some()
+                    file_metadata.offset_index().is_some(), 
file_metadata.column_index().is_some()
                 );
-                return Ok(None);
-            }
+            return access_plan;
         };
 
-        let mut row_selections = 
Vec::with_capacity(page_index_predicates.len());
-        for predicate in page_index_predicates {
-            // find column index in the parquet schema
-            let col_idx = find_column_index(predicate, arrow_schema, 
parquet_schema);
-            let mut selectors = Vec::with_capacity(row_groups.len());
-            for r in row_groups.iter() {
+        // track the total number of rows that should be skipped
+        let mut total_skip = 0;
+
+        let row_group_indexes = access_plan.row_group_indexes();
+        for r in row_group_indexes {
+            // The selection for this particular row group
+            let mut overall_selection = None;
+            for predicate in page_index_predicates {
+                // find column index in the parquet schema
+                let col_idx = find_column_index(predicate, arrow_schema, 
parquet_schema);
                 let row_group_metadata = &groups[r];
 
-                let rg_offset_indexes = file_offset_indexes.get(r);
-                let rg_page_indexes = file_page_indexes.get(r);
-                if let (Some(rg_page_indexes), Some(rg_offset_indexes), 
Some(col_idx)) =
-                    (rg_page_indexes, rg_offset_indexes, col_idx)
-                {
-                    selectors.extend(
-                        prune_pages_in_one_row_group(
-                            row_group_metadata,
-                            predicate,
-                            rg_offset_indexes.get(col_idx),
-                            rg_page_indexes.get(col_idx),
-                            groups[r].column(col_idx).column_descr(),
-                            file_metrics,
-                        )
-                        .map_err(|e| {
-                            ArrowError::ParquetError(format!(
-                                "Fail in prune_pages_in_one_row_group: {e}"
-                            ))
-                        }),
+                let (Some(rg_page_indexes), Some(rg_offset_indexes), 
Some(col_idx)) = (
+                    file_page_indexes.get(r),
+                    file_offset_indexes.get(r),
+                    col_idx,
+                ) else {
+                    trace!(
+                        "Did not have enough metadata to prune with page 
indexes, \
+                     falling back to all rows",
                     );
+                    continue;
+                };
+
+                let selection = prune_pages_in_one_row_group(
+                    row_group_metadata,
+                    predicate,
+                    rg_offset_indexes.get(col_idx),
+                    rg_page_indexes.get(col_idx),
+                    groups[r].column(col_idx).column_descr(),
+                    file_metrics,
+                );
+
+                let Some(selection) = selection else {
+                    trace!("No pages pruned in prune_pages_in_one_row_group");
+                    continue;
+                };
+
+                debug!("Use filter and page index to create RowSelection {:?} 
from predicate: {:?}",
+                    &selection,
+                    predicate.predicate_expr(),
+                );
+
+                overall_selection = update_selection(overall_selection, 
selection);
+
+                // if the overall selection has ruled out all rows, no need to
+                // continue with the other predicates
+                let selects_any = overall_selection
+                    .as_ref()
+                    .map(|selection| selection.selects_any())
+                    .unwrap_or(true);
+
+                if !selects_any {
+                    break;
+                }
+            }
+
+            if let Some(overall_selection) = overall_selection {
+                if overall_selection.selects_any() {
+                    let rows_skipped = rows_skipped(&overall_selection);
+                    trace!("Overall selection from predicate skipped 
{rows_skipped}: {overall_selection:?}");
+                    total_skip += rows_skipped;
+                    access_plan.scan_selection(r, overall_selection)
                 } else {
+                    // Selection skips all rows, so skip the entire row group
+                    let rows_skipped = groups[r].num_rows() as usize;
+                    access_plan.skip(r);
+                    total_skip += rows_skipped;
                     trace!(
-                        "Did not have enough metadata to prune with page 
indexes, \
-                         falling back to all rows",
+                        "Overall selection from predicate is empty, \
+                        skipping all {rows_skipped} rows in row group {r}"
                     );
-                    // fallback select all rows
-                    let all_selected =
-                        vec![RowSelector::select(groups[r].num_rows() as 
usize)];
-                    selectors.push(all_selected);
                 }
             }
-            debug!(
-                "Use filter and page index create RowSelection {:?} from 
predicate: {:?}",
-                &selectors,
-                predicate.predicate_expr(),
-            );
-            
row_selections.push(selectors.into_iter().flatten().collect::<Vec<_>>());
         }
 
-        let final_selection = combine_multi_col_selection(row_selections);
-        let total_skip =
-            final_selection.iter().fold(
-                0,
-                |acc, x| {
-                    if x.skip {
-                        acc + x.row_count
-                    } else {
-                        acc
-                    }
-                },
-            );
         file_metrics.page_index_rows_filtered.add(total_skip);
-        Ok(Some(final_selection))
+        access_plan
     }
 
     /// Returns the number of filters in the [`PagePruningPredicate`]
@@ -236,6 +248,24 @@ impl PagePruningPredicate {
     }
 }
 
+/// returns the number of rows skipped in the selection
+/// TODO should this be upstreamed to RowSelection?
+fn rows_skipped(selection: &RowSelection) -> usize {
+    selection
+        .iter()
+        .fold(0, |acc, x| if x.skip { acc + x.row_count } else { acc })
+}
+
+fn update_selection(
+    current_selection: Option<RowSelection>,
+    row_selection: RowSelection,
+) -> Option<RowSelection> {
+    match current_selection {
+        None => Some(row_selection),
+        Some(current_selection) => 
Some(current_selection.intersection(&row_selection)),
+    }
+}
+
 /// Returns the column index in the row parquet schema for the single
 /// column of a single column pruning predicate.
 ///
@@ -282,22 +312,8 @@ fn find_column_index(
     parquet_column(parquet_schema, arrow_schema, column.name()).map(|x| x.0)
 }
 
-/// Intersects the [`RowSelector`]s
-///
-/// For exampe, given:
-/// * `RowSelector1: [ Skip(0~199), Read(200~299)]`
-/// * `RowSelector2: [ Skip(0~99), Read(100~249), Skip(250~299)]`
-///
-/// The final selection is the intersection of these  `RowSelector`s:
-/// * `final_selection:[ Skip(0~199), Read(200~249), Skip(250~299)]`
-fn combine_multi_col_selection(row_selections: Vec<Vec<RowSelector>>) -> 
RowSelection {
-    row_selections
-        .into_iter()
-        .map(RowSelection::from)
-        .reduce(|s1, s2| s1.intersection(&s2))
-        .unwrap()
-}
-
+/// Returns a `RowSelection` for the pages in this RowGroup if any
+/// rows can be pruned based on the page index
 fn prune_pages_in_one_row_group(
     group: &RowGroupMetaData,
     predicate: &PruningPredicate,
@@ -305,63 +321,61 @@ fn prune_pages_in_one_row_group(
     col_page_indexes: Option<&Index>,
     col_desc: &ColumnDescriptor,
     metrics: &ParquetFileMetrics,
-) -> Result<Vec<RowSelector>> {
+) -> Option<RowSelection> {
     let num_rows = group.num_rows() as usize;
-    if let (Some(col_offset_indexes), Some(col_page_indexes)) =
+    let (Some(col_offset_indexes), Some(col_page_indexes)) =
         (col_offset_indexes, col_page_indexes)
-    {
-        let target_type = parquet_to_arrow_decimal_type(col_desc);
-        let pruning_stats = PagesPruningStatistics {
-            col_page_indexes,
-            col_offset_indexes,
-            target_type: &target_type,
-            num_rows_in_row_group: group.num_rows(),
-        };
+    else {
+        return None;
+    };
 
-        match predicate.prune(&pruning_stats) {
-            Ok(values) => {
-                let mut vec = Vec::with_capacity(values.len());
-                let row_vec = 
create_row_count_in_each_page(col_offset_indexes, num_rows);
-                assert_eq!(row_vec.len(), values.len());
-                let mut sum_row = *row_vec.first().unwrap();
-                let mut selected = *values.first().unwrap();
-                trace!("Pruned to {:?} using {:?}", values, pruning_stats);
-                for (i, &f) in values.iter().enumerate().skip(1) {
-                    if f == selected {
-                        sum_row += *row_vec.get(i).unwrap();
-                    } else {
-                        let selector = if selected {
-                            RowSelector::select(sum_row)
-                        } else {
-                            RowSelector::skip(sum_row)
-                        };
-                        vec.push(selector);
-                        sum_row = *row_vec.get(i).unwrap();
-                        selected = f;
-                    }
-                }
+    let target_type = parquet_to_arrow_decimal_type(col_desc);
+    let pruning_stats = PagesPruningStatistics {
+        col_page_indexes,
+        col_offset_indexes,
+        target_type: &target_type,
+        num_rows_in_row_group: group.num_rows(),
+    };
 
-                let selector = if selected {
-                    RowSelector::select(sum_row)
-                } else {
-                    RowSelector::skip(sum_row)
-                };
-                vec.push(selector);
-                return Ok(vec);
-            }
+    let values = match predicate.prune(&pruning_stats) {
+        Ok(values) => values,
+        Err(e) => {
             // stats filter array could not be built
             // return a result which will not filter out any pages
-            Err(e) => {
-                debug!("Error evaluating page index predicate values {e}");
-                metrics.predicate_evaluation_errors.add(1);
-                return Ok(vec![RowSelector::select(group.num_rows() as 
usize)]);
-            }
+            debug!("Error evaluating page index predicate values {e}");
+            metrics.predicate_evaluation_errors.add(1);
+            return None;
+        }
+    };
+
+    let mut vec = Vec::with_capacity(values.len());
+    let row_vec = create_row_count_in_each_page(col_offset_indexes, num_rows);
+    assert_eq!(row_vec.len(), values.len());
+    let mut sum_row = *row_vec.first().unwrap();
+    let mut selected = *values.first().unwrap();
+    trace!("Pruned to {:?} using {:?}", values, pruning_stats);
+    for (i, &f) in values.iter().enumerate().skip(1) {
+        if f == selected {
+            sum_row += *row_vec.get(i).unwrap();
+        } else {
+            let selector = if selected {
+                RowSelector::select(sum_row)
+            } else {
+                RowSelector::skip(sum_row)
+            };
+            vec.push(selector);
+            sum_row = *row_vec.get(i).unwrap();
+            selected = f;
         }
     }
-    Err(DataFusionError::ParquetError(ParquetError::General(
-        "Got some error in prune_pages_in_one_row_group, plz try open the 
debuglog mode"
-            .to_string(),
-    )))
+
+    let selector = if selected {
+        RowSelector::select(sum_row)
+    } else {
+        RowSelector::skip(sum_row)
+    };
+    vec.push(selector);
+    Some(RowSelection::from(vec))
 }
 
 fn create_row_count_in_each_page(
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index 7dd91d3d4e..e2548412cc 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -36,58 +36,35 @@ use crate::datasource::physical_plan::parquet::statistics::{
 };
 use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
 
-use super::ParquetFileMetrics;
+use super::{ParquetAccessPlan, ParquetFileMetrics};
 
-/// Tracks which RowGroups within a parquet file should be scanned.
+/// Reduces the [`ParquetAccessPlan`] based on row group level metadata.
 ///
-/// This struct encapsulates the various types of pruning that can be applied 
to
-/// a set of row groups within a parquet file, progressively narrowing down the
-/// set of row groups that should be scanned.
-#[derive(Debug, PartialEq)]
-pub struct RowGroupSet {
-    /// `row_groups[i]` is true if the i-th row group should be scanned
-    row_groups: Vec<bool>,
+/// This struct implements the various types of pruning that are applied to a
+/// set of row groups within a parquet file, progressively narrowing down the
+/// set of row groups (and ranges/selections within those row groups) that
+/// should be scanned, based on the available metadata.
+#[derive(Debug, Clone, PartialEq)]
+pub struct RowGroupAccessPlanFilter {
+    /// which row groups should be accessed
+    access_plan: ParquetAccessPlan,
 }
 
-impl RowGroupSet {
-    /// Create a new `RowGroupSet` with all row groups set to true (will be 
scanned)
-    pub fn new(num_row_groups: usize) -> Self {
-        Self {
-            row_groups: vec![true; num_row_groups],
-        }
-    }
-
-    /// Set the i-th row group to false (should not be scanned)
-    pub fn do_not_scan(&mut self, idx: usize) {
-        self.row_groups[idx] = false;
-    }
-
-    /// Return true if the i-th row group should be scanned
-    fn should_scan(&self, idx: usize) -> bool {
-        self.row_groups[idx]
+impl RowGroupAccessPlanFilter {
+    /// Create a new `RowGroupPlanBuilder` for pruning out the groups to scan
+    /// based on metadata and statistics
+    pub fn new(access_plan: ParquetAccessPlan) -> Self {
+        Self { access_plan }
     }
 
-    /// Return the total number of row groups (not the total number to be 
scanned)
-    pub fn len(&self) -> usize {
-        self.row_groups.len()
-    }
-
-    /// Return true if there are no row groups
+    /// Return true if there are no row groups to scan
     pub fn is_empty(&self) -> bool {
-        self.row_groups.is_empty()
-    }
-
-    /// Return an iterator over the row group indexes that should be scanned
-    pub fn iter(&self) -> impl Iterator<Item = usize> + '_ {
-        self.row_groups
-            .iter()
-            .enumerate()
-            .filter_map(|(idx, &b)| if b { Some(idx) } else { None })
+        self.access_plan.is_empty()
     }
 
-    /// Return a `Vec` of row group indices that should be scanned
-    pub fn indexes(&self) -> Vec<usize> {
-        self.iter().collect()
+    /// Returns the inner access plan
+    pub fn build(self) -> ParquetAccessPlan {
+        self.access_plan
     }
 
     /// Prune remaining row groups to only those  within the specified range.
@@ -97,9 +74,9 @@ impl RowGroupSet {
     /// # Panics
     /// if `groups.len() != self.len()`
     pub fn prune_by_range(&mut self, groups: &[RowGroupMetaData], range: 
&FileRange) {
-        assert_eq!(groups.len(), self.len());
+        assert_eq!(groups.len(), self.access_plan.len());
         for (idx, metadata) in groups.iter().enumerate() {
-            if !self.should_scan(idx) {
+            if !self.access_plan.should_scan(idx) {
                 continue;
             }
 
@@ -113,7 +90,7 @@ impl RowGroupSet {
                 .dictionary_page_offset()
                 .unwrap_or_else(|| col.data_page_offset());
             if !range.contains(offset) {
-                self.do_not_scan(idx);
+                self.access_plan.skip(idx);
             }
         }
     }
@@ -135,9 +112,9 @@ impl RowGroupSet {
         predicate: &PruningPredicate,
         metrics: &ParquetFileMetrics,
     ) {
-        assert_eq!(groups.len(), self.len());
+        assert_eq!(groups.len(), self.access_plan.len());
         for (idx, metadata) in groups.iter().enumerate() {
-            if !self.should_scan(idx) {
+            if !self.access_plan.should_scan(idx) {
                 continue;
             }
             let pruning_stats = RowGroupPruningStatistics {
@@ -150,7 +127,7 @@ impl RowGroupSet {
                     // NB: false means don't scan row group
                     if !values[0] {
                         metrics.row_groups_pruned_statistics.add(1);
-                        self.do_not_scan(idx);
+                        self.access_plan.skip(idx);
                         continue;
                     }
                 }
@@ -179,9 +156,9 @@ impl RowGroupSet {
         predicate: &PruningPredicate,
         metrics: &ParquetFileMetrics,
     ) {
-        assert_eq!(builder.metadata().num_row_groups(), self.len());
-        for idx in 0..self.len() {
-            if !self.should_scan(idx) {
+        assert_eq!(builder.metadata().num_row_groups(), 
self.access_plan.len());
+        for idx in 0..self.access_plan.len() {
+            if !self.access_plan.should_scan(idx) {
                 continue;
             }
 
@@ -230,7 +207,7 @@ impl RowGroupSet {
 
             if prune_group {
                 metrics.row_groups_pruned_bloom_filter.add(1);
-                self.do_not_scan(idx)
+                self.access_plan.skip(idx)
             } else if !stats.column_sbbf.is_empty() {
                 metrics.row_groups_matched_bloom_filter.add(1);
             }
@@ -500,7 +477,7 @@ mod tests {
         );
 
         let metrics = parquet_file_metrics();
-        let mut row_groups = RowGroupSet::new(2);
+        let mut row_groups = 
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
         row_groups.prune_by_statistics(
             &schema,
             &schema_descr,
@@ -534,7 +511,7 @@ mod tests {
         let metrics = parquet_file_metrics();
         // missing statistics for first row group mean that the result from 
the predicate expression
         // is null / undefined so the first row group can't be filtered out
-        let mut row_groups = RowGroupSet::new(2);
+        let mut row_groups = 
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
         row_groups.prune_by_statistics(
             &schema,
             &schema_descr,
@@ -581,7 +558,7 @@ mod tests {
         let groups = &[rgm1, rgm2];
         // the first row group is still filtered out because the predicate 
expression can be partially evaluated
         // when conditions are joined using AND
-        let mut row_groups = RowGroupSet::new(2);
+        let mut row_groups = 
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
         row_groups.prune_by_statistics(
             &schema,
             &schema_descr,
@@ -599,7 +576,7 @@ mod tests {
 
         // if conditions in predicate are joined with OR and an unsupported 
expression is used
         // this bypasses the entire predicate expression and no row groups are 
filtered out
-        let mut row_groups = RowGroupSet::new(2);
+        let mut row_groups = 
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
         row_groups.prune_by_statistics(
             &schema,
             &schema_descr,
@@ -655,7 +632,7 @@ mod tests {
         let groups = &[rgm1, rgm2];
         // the first row group should be left because c1 is greater than zero
         // the second should be filtered out because c1 is less than zero
-        let mut row_groups = RowGroupSet::new(2);
+        let mut row_groups = 
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
         row_groups.prune_by_statistics(
             &file_schema,
             &schema_descr,
@@ -704,7 +681,7 @@ mod tests {
 
         let metrics = parquet_file_metrics();
         // First row group was filtered out because it contains no null value 
on "c2".
-        let mut row_groups = RowGroupSet::new(2);
+        let mut row_groups = 
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
         row_groups.prune_by_statistics(
             &schema,
             &schema_descr,
@@ -736,7 +713,8 @@ mod tests {
         let metrics = parquet_file_metrics();
         // bool = NULL always evaluates to NULL (and thus will not
         // pass predicates. Ideally these should both be false
-        let mut row_groups = RowGroupSet::new(groups.len());
+        let mut row_groups =
+            
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(groups.len()));
         row_groups.prune_by_statistics(
             &schema,
             &schema_descr,
@@ -796,7 +774,7 @@ mod tests {
             vec![ParquetStatistics::int32(Some(100), None, None, 0, false)],
         );
         let metrics = parquet_file_metrics();
-        let mut row_groups = RowGroupSet::new(3);
+        let mut row_groups = 
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3));
         row_groups.prune_by_statistics(
             &schema,
             &schema_descr,
@@ -864,7 +842,7 @@ mod tests {
             vec![ParquetStatistics::int32(None, Some(2), None, 0, false)],
         );
         let metrics = parquet_file_metrics();
-        let mut row_groups = RowGroupSet::new(4);
+        let mut row_groups = 
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(4));
         row_groups.prune_by_statistics(
             &schema,
             &schema_descr,
@@ -915,7 +893,7 @@ mod tests {
             vec![ParquetStatistics::int64(None, None, None, 0, false)],
         );
         let metrics = parquet_file_metrics();
-        let mut row_groups = RowGroupSet::new(3);
+        let mut row_groups = 
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3));
         row_groups.prune_by_statistics(
             &schema,
             &schema_descr,
@@ -989,7 +967,7 @@ mod tests {
             )],
         );
         let metrics = parquet_file_metrics();
-        let mut row_groups = RowGroupSet::new(3);
+        let mut row_groups = 
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3));
         row_groups.prune_by_statistics(
             &schema,
             &schema_descr,
@@ -1052,7 +1030,7 @@ mod tests {
             vec![ParquetStatistics::byte_array(None, None, None, 0, false)],
         );
         let metrics = parquet_file_metrics();
-        let mut row_groups = RowGroupSet::new(3);
+        let mut row_groups = 
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3));
         row_groups.prune_by_statistics(
             &schema,
             &schema_descr,
@@ -1179,7 +1157,7 @@ mod tests {
         )
         .await
         .unwrap();
-        assert!(pruned_row_groups.indexes().is_empty());
+        assert!(pruned_row_groups.access_plan.row_group_indexes().is_empty());
     }
 
     #[tokio::test]
@@ -1251,12 +1229,12 @@ mod tests {
 
     impl ExpectedPruning {
         /// asserts that the pruned row group match this expectation
-        fn assert(&self, row_groups: &RowGroupSet) {
-            let num_row_groups = row_groups.len();
+        fn assert(&self, row_groups: &RowGroupAccessPlanFilter) {
+            let num_row_groups = row_groups.access_plan.len();
             assert!(num_row_groups > 0);
             let num_pruned = (0..num_row_groups)
                 .filter_map(|i| {
-                    if row_groups.should_scan(i) {
+                    if row_groups.access_plan.should_scan(i) {
                         None
                     } else {
                         Some(1)
@@ -1278,14 +1256,14 @@ mod tests {
                     );
                 }
                 ExpectedPruning::Some(expected) => {
-                    let actual = row_groups.indexes();
+                    let actual = row_groups.access_plan.row_group_indexes();
                     assert_eq!(expected, &actual, "Unexpected row groups 
pruned. Expected {expected:?}, got {actual:?}");
                 }
             }
         }
     }
 
-    fn assert_pruned(row_groups: RowGroupSet, expected: ExpectedPruning) {
+    fn assert_pruned(row_groups: RowGroupAccessPlanFilter, expected: 
ExpectedPruning) {
         expected.assert(&row_groups);
     }
 
@@ -1386,7 +1364,7 @@ mod tests {
         file_name: &str,
         data: bytes::Bytes,
         pruning_predicate: &PruningPredicate,
-    ) -> Result<RowGroupSet> {
+    ) -> Result<RowGroupAccessPlanFilter> {
         use object_store::{ObjectMeta, ObjectStore};
 
         let object_meta = ObjectMeta {
@@ -1411,7 +1389,8 @@ mod tests {
         };
         let mut builder = 
ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
 
-        let mut pruned_row_groups = 
RowGroupSet::new(builder.metadata().num_row_groups());
+        let access_plan = 
ParquetAccessPlan::new_all(builder.metadata().num_row_groups());
+        let mut pruned_row_groups = RowGroupAccessPlanFilter::new(access_plan);
         pruned_row_groups
             .prune_by_bloom_filters(
                 pruning_predicate.schema(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to