This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1a26eca25a Add `ParquetAccessPlan`, unify RowGroup selection and
PagePruning selection (#10738)
1a26eca25a is described below
commit 1a26eca25abade1fe80ce2126c810ff9c8defcd0
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Jun 6 08:40:30 2024 -0400
Add `ParquetAccessPlan`, unify RowGroup selection and PagePruning selection
(#10738)
* Add `ParquetAccessPlan` that describes which part of the parquet files to
read
* Rename to RowGroupAccessPlanFilter
* Clarify when overall selection is needed
* Update documentation to exlain the relationship between
scan/skip/selection
* Break early of the row selection is empty
---
.../physical_plan/parquet/access_plan.rs | 449 +++++++++++++++++++++
.../src/datasource/physical_plan/parquet/mod.rs | 7 +-
.../src/datasource/physical_plan/parquet/opener.rs | 31 +-
.../physical_plan/parquet/page_filter.rs | 288 ++++++-------
.../datasource/physical_plan/parquet/row_groups.rs | 125 +++---
5 files changed, 677 insertions(+), 223 deletions(-)
diff --git
a/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
new file mode 100644
index 0000000000..c59459ba61
--- /dev/null
+++ b/datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
@@ -0,0 +1,449 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
+use parquet::file::metadata::RowGroupMetaData;
+
+/// A selection of rows and row groups within a ParquetFile to decode.
+///
+/// A `ParquetAccessPlan` is used to limit the row groups and data pages a
`ParquetExec`
+/// will read and decode to improve performance.
+///
+/// Note that page level pruning based on ArrowPredicate is applied after all
of
+/// these selections
+///
+/// # Example
+///
+/// For example, given a Parquet file with 4 row groups, a `ParquetAccessPlan`
+/// can be used to specify skipping row group 0 and 2, scanning a range of rows
+/// in row group 1, and scanning all rows in row group 3 as follows:
+///
+/// ```rust
+/// # use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
+/// # use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
+/// // Default to scan all row groups
+/// let mut access_plan = ParquetAccessPlan::new_all(4);
+/// access_plan.skip(0); // skip row group
+/// // Use parquet reader RowSelector to specify scanning rows 100-200 and
350-400
+/// // in a row group that has 1000 rows
+/// let row_selection = RowSelection::from(vec![
+/// RowSelector::skip(100),
+/// RowSelector::select(100),
+/// RowSelector::skip(150),
+/// RowSelector::select(50),
+/// RowSelector::skip(600), // skip last 600 rows
+/// ]);
+/// access_plan.scan_selection(1, row_selection);
+/// access_plan.skip(2); // skip row group 2
+/// // row group 3 is scanned by default
+/// ```
+///
+/// The resulting plan would look like:
+///
+/// ```text
+/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
+///
+/// │ │ SKIP
+///
+/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+/// Row Group 0
+/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
+/// ┌────────────────┐ SCAN ONLY ROWS
+/// │└────────────────┘ │ 100-200
+/// ┌────────────────┐ 350-400
+/// │└────────────────┘ │
+/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
+/// Row Group 1
+/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
+/// SKIP
+/// │ │
+///
+/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+/// Row Group 2
+/// ┌───────────────────┐
+/// │ │ SCAN ALL ROWS
+/// │ │
+/// │ │
+/// └───────────────────┘
+/// Row Group 3
+/// ```
+#[derive(Debug, Clone, PartialEq)]
+pub struct ParquetAccessPlan {
+ /// How to access the i-th row group
+ row_groups: Vec<RowGroupAccess>,
+}
+
+/// Describes how the parquet reader will access a row group
+#[derive(Debug, Clone, PartialEq)]
+pub enum RowGroupAccess {
+ /// Do not read the row group at all
+ Skip,
+ /// Read all rows from the row group
+ Scan,
+ /// Scan only the specified rows within the row group
+ Selection(RowSelection),
+}
+
+impl RowGroupAccess {
+ /// Return true if this row group should be scanned
+ pub fn should_scan(&self) -> bool {
+ match self {
+ RowGroupAccess::Skip => false,
+ RowGroupAccess::Scan | RowGroupAccess::Selection(_) => true,
+ }
+ }
+}
+
+impl ParquetAccessPlan {
+ /// Create a new `ParquetAccessPlan` that scans all row groups
+ pub fn new_all(row_group_count: usize) -> Self {
+ Self {
+ row_groups: vec![RowGroupAccess::Scan; row_group_count],
+ }
+ }
+
+ /// Create a new `ParquetAccessPlan` that scans no row groups
+ pub fn new_none(row_group_count: usize) -> Self {
+ Self {
+ row_groups: vec![RowGroupAccess::Skip; row_group_count],
+ }
+ }
+
+ /// Create a new `ParquetAccessPlan` from the specified
[`RowGroupAccess`]es
+ pub fn new(row_groups: Vec<RowGroupAccess>) -> Self {
+ Self { row_groups }
+ }
+
+ /// Set the i-th row group to the specified [`RowGroupAccess`]
+ pub fn set(&mut self, idx: usize, access: RowGroupAccess) {
+ self.row_groups[idx] = access;
+ }
+
+ /// skips the i-th row group (should not be scanned)
+ pub fn skip(&mut self, idx: usize) {
+ self.set(idx, RowGroupAccess::Skip);
+ }
+
+ /// Return true if the i-th row group should be scanned
+ pub fn should_scan(&self, idx: usize) -> bool {
+ self.row_groups[idx].should_scan()
+ }
+
+ /// Set to scan only the [`RowSelection`] in the specified row group.
+ ///
+ /// Behavior is different depending on the existing access
+ /// * [`RowGroupAccess::Skip`]: does nothing
+ /// * [`RowGroupAccess::Scan`]: Updates to scan only the rows in the
`RowSelection`
+ /// * [`RowGroupAccess::Selection`]: Updates to scan only the intersection
of the existing selection and the new selection
+ pub fn scan_selection(&mut self, idx: usize, selection: RowSelection) {
+ self.row_groups[idx] = match &self.row_groups[idx] {
+ // already skipping the entire row group
+ RowGroupAccess::Skip => RowGroupAccess::Skip,
+ RowGroupAccess::Scan => RowGroupAccess::Selection(selection),
+ RowGroupAccess::Selection(existing_selection) => {
+
RowGroupAccess::Selection(existing_selection.intersection(&selection))
+ }
+ }
+ }
+
+ /// Return an overall `RowSelection`, if needed
+ ///
+ /// This is used to compute the row selection for the parquet reader. See
+ /// [`ArrowReaderBuilder::with_row_selection`] for more details.
+ ///
+ /// Returns
+ /// * `None` if there are no [`RowGroupAccess::Selection`]
+ /// * `Some(selection)` if there are [`RowGroupAccess::Selection`]s
+ ///
+ /// The returned selection represents which rows to scan across any row
+ /// row groups which are not skipped.
+ ///
+ /// # Notes
+ ///
+ /// If there are no [`RowGroupAccess::Selection`]s, the overall row
+ /// selection is `None` because each row group is either entirely skipped
or
+ /// scanned, which is covered by [`Self::row_group_indexes`].
+ ///
+ /// If there are any [`RowGroupAccess::Selection`], an overall row
selection
+ /// is returned for *all* the rows in the row groups that are not skipped.
+ /// Thus it includes a `Select` selection for any [`RowGroupAccess::Scan`].
+ ///
+ /// # Example: No Selections
+ ///
+ /// Given an access plan like this
+ ///
+ /// ```text
+ /// RowGroupAccess::Scan (scan all row group 0)
+ /// RowGroupAccess::Skip (skip row group 1)
+ /// RowGroupAccess::Scan (scan all row group 2)
+ /// RowGroupAccess::Scan (scan all row group 3)
+ /// ```
+ ///
+ /// The overall row selection would be `None` because there are no
+ /// [`RowGroupAccess::Selection`]s. The row group indexes
+ /// returned by [`Self::row_group_indexes`] would be `0, 2, 3` .
+ ///
+ /// # Example: With Selections
+ ///
+ /// Given an access plan like this:
+ ///
+ /// ```text
+ /// RowGroupAccess::Scan (scan all row group 0)
+ /// RowGroupAccess::Skip (skip row group 1)
+ /// RowGroupAccess::Select (skip 50, scan 50, skip 900) (scan rows
50-100 in row group 2)
+ /// RowGroupAccess::Scan (scan all row group 3)
+ /// ```
+ ///
+ /// Assuming each row group has 1000 rows, the resulting row selection
would
+ /// be the rows to scan in row group 0, 2 and 4:
+ ///
+ /// ```text
+ /// RowSelection::Select(1000) (scan all rows in row group 0)
+ /// RowSelection::Skip(50) (skip first 50 rows in row group 2)
+ /// RowSelection::Select(50) (scan rows 50-100 in row group 2)
+ /// RowSelection::Skip(900) (skip last 900 rows in row group 2)
+ /// RowSelection::Select(1000) (scan all rows in row group 3)
+ /// ```
+ ///
+ /// Note there is no entry for the (entirely) skipped row group 1.
+ ///
+ /// The row group indexes returned by [`Self::row_group_indexes`] would
+ /// still be `0, 2, 3` .
+ ///
+ /// [`ArrowReaderBuilder::with_row_selection`]:
parquet::arrow::arrow_reader::ArrowReaderBuilder::with_row_selection
+ pub fn into_overall_row_selection(
+ self,
+ row_group_meta_data: &[RowGroupMetaData],
+ ) -> Option<RowSelection> {
+ assert_eq!(row_group_meta_data.len(), self.row_groups.len());
+ // Intuition: entire row groups are filtered out using
+ // `row_group_indexes` which come from Skip and Scan. An overall
+ // RowSelection is only useful if there is any parts *within* a row
group
+ // which can be filtered out, that is a `Selection`.
+ if !self
+ .row_groups
+ .iter()
+ .any(|rg| matches!(rg, RowGroupAccess::Selection(_)))
+ {
+ return None;
+ }
+
+ let total_selection: RowSelection = self
+ .row_groups
+ .into_iter()
+ .zip(row_group_meta_data.iter())
+ .flat_map(|(rg, rg_meta)| {
+ match rg {
+ RowGroupAccess::Skip => vec![],
+ RowGroupAccess::Scan => {
+ // need a row group access to scan the entire row
group (need row group counts)
+ vec![RowSelector::select(rg_meta.num_rows() as usize)]
+ }
+ RowGroupAccess::Selection(selection) => {
+ let selection: Vec<RowSelector> = selection.into();
+ selection
+ }
+ }
+ })
+ .collect();
+
+ Some(total_selection)
+ }
+
+ /// Return an iterator over the row group indexes that should be scanned
+ pub fn row_group_index_iter(&self) -> impl Iterator<Item = usize> + '_ {
+ self.row_groups.iter().enumerate().filter_map(|(idx, b)| {
+ if b.should_scan() {
+ Some(idx)
+ } else {
+ None
+ }
+ })
+ }
+
+ /// Return a vec of all row group indexes to scan
+ pub fn row_group_indexes(&self) -> Vec<usize> {
+ self.row_group_index_iter().collect()
+ }
+
+ /// Return the total number of row groups (not the total number or groups
to
+ /// scan)
+ pub fn len(&self) -> usize {
+ self.row_groups.len()
+ }
+
+ /// Return true if there are no row groups
+ pub fn is_empty(&self) -> bool {
+ self.row_groups.is_empty()
+ }
+
+ /// Get a reference to the inner accesses
+ pub fn inner(&self) -> &[RowGroupAccess] {
+ &self.row_groups
+ }
+
+ /// Covert into the inner row group accesses
+ pub fn into_inner(self) -> Vec<RowGroupAccess> {
+ self.row_groups
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use parquet::basic::LogicalType;
+ use parquet::file::metadata::ColumnChunkMetaData;
+ use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
+ use std::sync::{Arc, OnceLock};
+
+ #[test]
+ fn test_only_scans() {
+ let access_plan = ParquetAccessPlan::new(vec![
+ RowGroupAccess::Scan,
+ RowGroupAccess::Scan,
+ RowGroupAccess::Scan,
+ RowGroupAccess::Scan,
+ ]);
+
+ let row_group_indexes = access_plan.row_group_indexes();
+ let row_selection =
access_plan.into_overall_row_selection(row_group_metadata());
+
+ // scan all row groups, no selection
+ assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
+ assert_eq!(row_selection, None);
+ }
+
+ #[test]
+ fn test_only_skips() {
+ let access_plan = ParquetAccessPlan::new(vec![
+ RowGroupAccess::Skip,
+ RowGroupAccess::Skip,
+ RowGroupAccess::Skip,
+ RowGroupAccess::Skip,
+ ]);
+
+ let row_group_indexes = access_plan.row_group_indexes();
+ let row_selection =
access_plan.into_overall_row_selection(row_group_metadata());
+
+ // skip all row groups, no selection
+ assert_eq!(row_group_indexes, vec![] as Vec<usize>);
+ assert_eq!(row_selection, None);
+ }
+ #[test]
+ fn test_mixed_1() {
+ let access_plan = ParquetAccessPlan::new(vec![
+ RowGroupAccess::Scan,
+ RowGroupAccess::Selection(
+ vec![RowSelector::select(5), RowSelector::skip(7)].into(),
+ ),
+ RowGroupAccess::Skip,
+ RowGroupAccess::Skip,
+ ]);
+
+ let row_group_indexes = access_plan.row_group_indexes();
+ let row_selection =
access_plan.into_overall_row_selection(row_group_metadata());
+
+ assert_eq!(row_group_indexes, vec![0, 1]);
+ assert_eq!(
+ row_selection,
+ Some(
+ vec![
+ // select the entire first row group
+ RowSelector::select(10),
+ // selectors from the second row group
+ RowSelector::select(5),
+ RowSelector::skip(7)
+ ]
+ .into()
+ )
+ );
+ }
+
+ #[test]
+ fn test_mixed_2() {
+ let access_plan = ParquetAccessPlan::new(vec![
+ RowGroupAccess::Skip,
+ RowGroupAccess::Scan,
+ RowGroupAccess::Selection(
+ vec![RowSelector::select(5), RowSelector::skip(7)].into(),
+ ),
+ RowGroupAccess::Scan,
+ ]);
+
+ let row_group_indexes = access_plan.row_group_indexes();
+ let row_selection =
access_plan.into_overall_row_selection(row_group_metadata());
+
+ assert_eq!(row_group_indexes, vec![1, 2, 3]);
+ assert_eq!(
+ row_selection,
+ Some(
+ vec![
+ // select the entire second row group
+ RowSelector::select(20),
+ // selectors from the third row group
+ RowSelector::select(5),
+ RowSelector::skip(7),
+ // select the entire fourth row group
+ RowSelector::select(40),
+ ]
+ .into()
+ )
+ );
+ }
+
+ static ROW_GROUP_METADATA: OnceLock<Vec<RowGroupMetaData>> =
OnceLock::new();
+
+ /// [`RowGroupMetaData`] that returns 4 row groups with 10, 20, 30, 40 rows
+ /// respectively
+ fn row_group_metadata() -> &'static [RowGroupMetaData] {
+ ROW_GROUP_METADATA.get_or_init(|| {
+ let schema_descr = get_test_schema_descr();
+ let row_counts = [10, 20, 30, 40];
+
+ row_counts
+ .into_iter()
+ .map(|num_rows| {
+ let column =
ColumnChunkMetaData::builder(schema_descr.column(0))
+ .set_num_values(num_rows)
+ .build()
+ .unwrap();
+
+ RowGroupMetaData::builder(schema_descr.clone())
+ .set_num_rows(num_rows)
+ .set_column_metadata(vec![column])
+ .build()
+ .unwrap()
+ })
+ .collect()
+ })
+ }
+
+ /// Single column schema with a single column named "a" of type
`BYTE_ARRAY`/`String`
+ fn get_test_schema_descr() -> SchemaDescPtr {
+ use parquet::basic::Type as PhysicalType;
+ use parquet::schema::types::Type as SchemaType;
+ let field = SchemaType::primitive_type_builder("a",
PhysicalType::BYTE_ARRAY)
+ .with_logical_type(Some(LogicalType::String))
+ .build()
+ .unwrap();
+ let schema = SchemaType::group_type_builder("schema")
+ .with_fields(vec![Arc::new(field)])
+ .build()
+ .unwrap();
+ Arc::new(SchemaDescriptor::new(Arc::new(schema)))
+ }
+}
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index f0328098b4..04b25069e9 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -47,6 +47,7 @@ use log::debug;
use parquet::basic::{ConvertedType, LogicalType};
use parquet::schema::types::ColumnDescriptor;
+mod access_plan;
mod metrics;
mod opener;
mod page_filter;
@@ -59,6 +60,7 @@ mod writer;
use crate::datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
+pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
pub use metrics::ParquetFileMetrics;
use opener::ParquetOpener;
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
@@ -152,8 +154,9 @@ pub use writer::plan_to_parquet;
/// the file.
///
/// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata)
-/// via [`ParquetFileReaderFactory`] and applies any predicates and projections
-/// to determine what pages must be read.
+/// via [`ParquetFileReaderFactory`], creating a [`ParquetAccessPlan`] by
+/// applying predicates to metadata. The plan and projections are used to
+/// determine what pages must be read.
///
/// * Step 4: The stream begins reading data, fetching the required pages
/// and incrementally decoding them.
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
index 5fb21975df..a5047e487e 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
@@ -18,8 +18,10 @@
//! [`ParquetOpener`] for opening Parquet files
use
crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate;
-use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
-use crate::datasource::physical_plan::parquet::{row_filter,
should_enable_page_index};
+use
crate::datasource::physical_plan::parquet::row_groups::RowGroupAccessPlanFilter;
+use crate::datasource::physical_plan::parquet::{
+ row_filter, should_enable_page_index, ParquetAccessPlan,
+};
use crate::datasource::physical_plan::{
FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics,
ParquetFileReaderFactory,
};
@@ -137,7 +139,8 @@ impl FileOpener for ParquetOpener {
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let rg_metadata = file_metadata.row_groups();
// track which row groups to actually read
- let mut row_groups = RowGroupSet::new(rg_metadata.len());
+ let access_plan = ParquetAccessPlan::new_all(rg_metadata.len());
+ let mut row_groups = RowGroupAccessPlanFilter::new(access_plan);
// if there is a range restricting what parts of the file to read
if let Some(range) = file_range.as_ref() {
row_groups.prune_by_range(rg_metadata, range);
@@ -164,24 +167,30 @@ impl FileOpener for ParquetOpener {
}
}
+ let mut access_plan = row_groups.build();
+
// page index pruning: if all data on individual pages can
// be ruled using page metadata, rows from other columns
// with that range can be skipped as well
- if enable_page_index && !row_groups.is_empty() {
+ if enable_page_index && !access_plan.is_empty() {
if let Some(p) = page_pruning_predicate {
- let pruned = p.prune(
+ access_plan = p.prune_plan_with_page_index(
+ access_plan,
&file_schema,
builder.parquet_schema(),
- &row_groups,
file_metadata.as_ref(),
&file_metrics,
- )?;
- if let Some(row_selection) = pruned {
- builder = builder.with_row_selection(row_selection);
- }
+ );
}
}
+ let row_group_indexes = access_plan.row_group_indexes();
+ if let Some(row_selection) =
+ access_plan.into_overall_row_selection(rg_metadata)
+ {
+ builder = builder.with_row_selection(row_selection);
+ }
+
if let Some(limit) = limit {
builder = builder.with_limit(limit)
}
@@ -189,7 +198,7 @@ impl FileOpener for ParquetOpener {
let stream = builder
.with_projection(mask)
.with_batch_size(batch_size)
- .with_row_groups(row_groups.indexes())
+ .with_row_groups(row_group_indexes)
.build()?;
let adapted = stream
diff --git
a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
index d47d5c56bd..7429ca5938 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs
@@ -22,16 +22,15 @@ use arrow::array::{
StringArray,
};
use arrow::datatypes::DataType;
-use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
+use arrow::{array::ArrayRef, datatypes::SchemaRef};
use arrow_schema::Schema;
-use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_common::{Result, ScalarValue};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
use log::{debug, trace};
use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor};
use parquet::{
arrow::arrow_reader::{RowSelection, RowSelector},
- errors::ParquetError,
file::{
metadata::{ParquetMetaData, RowGroupMetaData},
page_index::index::Index,
@@ -42,10 +41,10 @@ use std::collections::HashSet;
use std::sync::Arc;
use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type;
-use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
use crate::datasource::physical_plan::parquet::statistics::{
from_bytes_to_i128, parquet_column,
};
+use crate::datasource::physical_plan::parquet::ParquetAccessPlan;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use super::metrics::ParquetFileMetrics;
@@ -111,6 +110,7 @@ pub struct PagePruningPredicate {
impl PagePruningPredicate {
/// Create a new [`PagePruningPredicate`]
+ // TODO: this is infallaible -- it can not return an error
pub fn try_new(expr: &Arc<dyn PhysicalExpr>, schema: SchemaRef) ->
Result<Self> {
let predicates = split_conjunction(expr)
.into_iter()
@@ -129,105 +129,117 @@ impl PagePruningPredicate {
Ok(Self { predicates })
}
- /// Returns a [`RowSelection`] for the given file
- pub fn prune(
+ /// Returns an updated [`ParquetAccessPlan`] by applying predicates to the
+ /// parquet page index, if any
+ pub fn prune_plan_with_page_index(
&self,
+ mut access_plan: ParquetAccessPlan,
arrow_schema: &Schema,
parquet_schema: &SchemaDescriptor,
- row_groups: &RowGroupSet,
file_metadata: &ParquetMetaData,
file_metrics: &ParquetFileMetrics,
- ) -> Result<Option<RowSelection>> {
+ ) -> ParquetAccessPlan {
// scoped timer updates on drop
let _timer_guard = file_metrics.page_index_eval_time.timer();
if self.predicates.is_empty() {
- return Ok(None);
+ return access_plan;
}
let page_index_predicates = &self.predicates;
let groups = file_metadata.row_groups();
if groups.is_empty() {
- return Ok(None);
+ return access_plan;
}
- let file_offset_indexes = file_metadata.offset_index();
- let file_page_indexes = file_metadata.column_index();
- let (file_offset_indexes, file_page_indexes) = match (
- file_offset_indexes,
- file_page_indexes,
- ) {
- (Some(o), Some(i)) => (o, i),
- _ => {
- trace!(
+ let (Some(file_offset_indexes), Some(file_page_indexes)) =
+ (file_metadata.offset_index(), file_metadata.column_index())
+ else {
+ trace!(
"skip page pruning due to lack of indexes. Have offset:
{}, column index: {}",
- file_offset_indexes.is_some(), file_page_indexes.is_some()
+ file_metadata.offset_index().is_some(),
file_metadata.column_index().is_some()
);
- return Ok(None);
- }
+ return access_plan;
};
- let mut row_selections =
Vec::with_capacity(page_index_predicates.len());
- for predicate in page_index_predicates {
- // find column index in the parquet schema
- let col_idx = find_column_index(predicate, arrow_schema,
parquet_schema);
- let mut selectors = Vec::with_capacity(row_groups.len());
- for r in row_groups.iter() {
+ // track the total number of rows that should be skipped
+ let mut total_skip = 0;
+
+ let row_group_indexes = access_plan.row_group_indexes();
+ for r in row_group_indexes {
+ // The selection for this particular row group
+ let mut overall_selection = None;
+ for predicate in page_index_predicates {
+ // find column index in the parquet schema
+ let col_idx = find_column_index(predicate, arrow_schema,
parquet_schema);
let row_group_metadata = &groups[r];
- let rg_offset_indexes = file_offset_indexes.get(r);
- let rg_page_indexes = file_page_indexes.get(r);
- if let (Some(rg_page_indexes), Some(rg_offset_indexes),
Some(col_idx)) =
- (rg_page_indexes, rg_offset_indexes, col_idx)
- {
- selectors.extend(
- prune_pages_in_one_row_group(
- row_group_metadata,
- predicate,
- rg_offset_indexes.get(col_idx),
- rg_page_indexes.get(col_idx),
- groups[r].column(col_idx).column_descr(),
- file_metrics,
- )
- .map_err(|e| {
- ArrowError::ParquetError(format!(
- "Fail in prune_pages_in_one_row_group: {e}"
- ))
- }),
+ let (Some(rg_page_indexes), Some(rg_offset_indexes),
Some(col_idx)) = (
+ file_page_indexes.get(r),
+ file_offset_indexes.get(r),
+ col_idx,
+ ) else {
+ trace!(
+ "Did not have enough metadata to prune with page
indexes, \
+ falling back to all rows",
);
+ continue;
+ };
+
+ let selection = prune_pages_in_one_row_group(
+ row_group_metadata,
+ predicate,
+ rg_offset_indexes.get(col_idx),
+ rg_page_indexes.get(col_idx),
+ groups[r].column(col_idx).column_descr(),
+ file_metrics,
+ );
+
+ let Some(selection) = selection else {
+ trace!("No pages pruned in prune_pages_in_one_row_group");
+ continue;
+ };
+
+ debug!("Use filter and page index to create RowSelection {:?}
from predicate: {:?}",
+ &selection,
+ predicate.predicate_expr(),
+ );
+
+ overall_selection = update_selection(overall_selection,
selection);
+
+ // if the overall selection has ruled out all rows, no need to
+ // continue with the other predicates
+ let selects_any = overall_selection
+ .as_ref()
+ .map(|selection| selection.selects_any())
+ .unwrap_or(true);
+
+ if !selects_any {
+ break;
+ }
+ }
+
+ if let Some(overall_selection) = overall_selection {
+ if overall_selection.selects_any() {
+ let rows_skipped = rows_skipped(&overall_selection);
+ trace!("Overall selection from predicate skipped
{rows_skipped}: {overall_selection:?}");
+ total_skip += rows_skipped;
+ access_plan.scan_selection(r, overall_selection)
} else {
+ // Selection skips all rows, so skip the entire row group
+ let rows_skipped = groups[r].num_rows() as usize;
+ access_plan.skip(r);
+ total_skip += rows_skipped;
trace!(
- "Did not have enough metadata to prune with page
indexes, \
- falling back to all rows",
+ "Overall selection from predicate is empty, \
+ skipping all {rows_skipped} rows in row group {r}"
);
- // fallback select all rows
- let all_selected =
- vec![RowSelector::select(groups[r].num_rows() as
usize)];
- selectors.push(all_selected);
}
}
- debug!(
- "Use filter and page index create RowSelection {:?} from
predicate: {:?}",
- &selectors,
- predicate.predicate_expr(),
- );
-
row_selections.push(selectors.into_iter().flatten().collect::<Vec<_>>());
}
- let final_selection = combine_multi_col_selection(row_selections);
- let total_skip =
- final_selection.iter().fold(
- 0,
- |acc, x| {
- if x.skip {
- acc + x.row_count
- } else {
- acc
- }
- },
- );
file_metrics.page_index_rows_filtered.add(total_skip);
- Ok(Some(final_selection))
+ access_plan
}
/// Returns the number of filters in the [`PagePruningPredicate`]
@@ -236,6 +248,24 @@ impl PagePruningPredicate {
}
}
+/// returns the number of rows skipped in the selection
+/// TODO should this be upstreamed to RowSelection?
+fn rows_skipped(selection: &RowSelection) -> usize {
+ selection
+ .iter()
+ .fold(0, |acc, x| if x.skip { acc + x.row_count } else { acc })
+}
+
+fn update_selection(
+ current_selection: Option<RowSelection>,
+ row_selection: RowSelection,
+) -> Option<RowSelection> {
+ match current_selection {
+ None => Some(row_selection),
+ Some(current_selection) =>
Some(current_selection.intersection(&row_selection)),
+ }
+}
+
/// Returns the column index in the row parquet schema for the single
/// column of a single column pruning predicate.
///
@@ -282,22 +312,8 @@ fn find_column_index(
parquet_column(parquet_schema, arrow_schema, column.name()).map(|x| x.0)
}
-/// Intersects the [`RowSelector`]s
-///
-/// For exampe, given:
-/// * `RowSelector1: [ Skip(0~199), Read(200~299)]`
-/// * `RowSelector2: [ Skip(0~99), Read(100~249), Skip(250~299)]`
-///
-/// The final selection is the intersection of these `RowSelector`s:
-/// * `final_selection:[ Skip(0~199), Read(200~249), Skip(250~299)]`
-fn combine_multi_col_selection(row_selections: Vec<Vec<RowSelector>>) ->
RowSelection {
- row_selections
- .into_iter()
- .map(RowSelection::from)
- .reduce(|s1, s2| s1.intersection(&s2))
- .unwrap()
-}
-
+/// Returns a `RowSelection` for the pages in this RowGroup if any
+/// rows can be pruned based on the page index
fn prune_pages_in_one_row_group(
group: &RowGroupMetaData,
predicate: &PruningPredicate,
@@ -305,63 +321,61 @@ fn prune_pages_in_one_row_group(
col_page_indexes: Option<&Index>,
col_desc: &ColumnDescriptor,
metrics: &ParquetFileMetrics,
-) -> Result<Vec<RowSelector>> {
+) -> Option<RowSelection> {
let num_rows = group.num_rows() as usize;
- if let (Some(col_offset_indexes), Some(col_page_indexes)) =
+ let (Some(col_offset_indexes), Some(col_page_indexes)) =
(col_offset_indexes, col_page_indexes)
- {
- let target_type = parquet_to_arrow_decimal_type(col_desc);
- let pruning_stats = PagesPruningStatistics {
- col_page_indexes,
- col_offset_indexes,
- target_type: &target_type,
- num_rows_in_row_group: group.num_rows(),
- };
+ else {
+ return None;
+ };
- match predicate.prune(&pruning_stats) {
- Ok(values) => {
- let mut vec = Vec::with_capacity(values.len());
- let row_vec =
create_row_count_in_each_page(col_offset_indexes, num_rows);
- assert_eq!(row_vec.len(), values.len());
- let mut sum_row = *row_vec.first().unwrap();
- let mut selected = *values.first().unwrap();
- trace!("Pruned to {:?} using {:?}", values, pruning_stats);
- for (i, &f) in values.iter().enumerate().skip(1) {
- if f == selected {
- sum_row += *row_vec.get(i).unwrap();
- } else {
- let selector = if selected {
- RowSelector::select(sum_row)
- } else {
- RowSelector::skip(sum_row)
- };
- vec.push(selector);
- sum_row = *row_vec.get(i).unwrap();
- selected = f;
- }
- }
+ let target_type = parquet_to_arrow_decimal_type(col_desc);
+ let pruning_stats = PagesPruningStatistics {
+ col_page_indexes,
+ col_offset_indexes,
+ target_type: &target_type,
+ num_rows_in_row_group: group.num_rows(),
+ };
- let selector = if selected {
- RowSelector::select(sum_row)
- } else {
- RowSelector::skip(sum_row)
- };
- vec.push(selector);
- return Ok(vec);
- }
+ let values = match predicate.prune(&pruning_stats) {
+ Ok(values) => values,
+ Err(e) => {
// stats filter array could not be built
// return a result which will not filter out any pages
- Err(e) => {
- debug!("Error evaluating page index predicate values {e}");
- metrics.predicate_evaluation_errors.add(1);
- return Ok(vec![RowSelector::select(group.num_rows() as
usize)]);
- }
+ debug!("Error evaluating page index predicate values {e}");
+ metrics.predicate_evaluation_errors.add(1);
+ return None;
+ }
+ };
+
+ let mut vec = Vec::with_capacity(values.len());
+ let row_vec = create_row_count_in_each_page(col_offset_indexes, num_rows);
+ assert_eq!(row_vec.len(), values.len());
+ let mut sum_row = *row_vec.first().unwrap();
+ let mut selected = *values.first().unwrap();
+ trace!("Pruned to {:?} using {:?}", values, pruning_stats);
+ for (i, &f) in values.iter().enumerate().skip(1) {
+ if f == selected {
+ sum_row += *row_vec.get(i).unwrap();
+ } else {
+ let selector = if selected {
+ RowSelector::select(sum_row)
+ } else {
+ RowSelector::skip(sum_row)
+ };
+ vec.push(selector);
+ sum_row = *row_vec.get(i).unwrap();
+ selected = f;
}
}
- Err(DataFusionError::ParquetError(ParquetError::General(
- "Got some error in prune_pages_in_one_row_group, plz try open the
debuglog mode"
- .to_string(),
- )))
+
+ let selector = if selected {
+ RowSelector::select(sum_row)
+ } else {
+ RowSelector::skip(sum_row)
+ };
+ vec.push(selector);
+ Some(RowSelection::from(vec))
}
fn create_row_count_in_each_page(
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index 7dd91d3d4e..e2548412cc 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -36,58 +36,35 @@ use crate::datasource::physical_plan::parquet::statistics::{
};
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
-use super::ParquetFileMetrics;
+use super::{ParquetAccessPlan, ParquetFileMetrics};
-/// Tracks which RowGroups within a parquet file should be scanned.
+/// Reduces the [`ParquetAccessPlan`] based on row group level metadata.
///
-/// This struct encapsulates the various types of pruning that can be applied
to
-/// a set of row groups within a parquet file, progressively narrowing down the
-/// set of row groups that should be scanned.
-#[derive(Debug, PartialEq)]
-pub struct RowGroupSet {
- /// `row_groups[i]` is true if the i-th row group should be scanned
- row_groups: Vec<bool>,
+/// This struct implements the various types of pruning that are applied to a
+/// set of row groups within a parquet file, progressively narrowing down the
+/// set of row groups (and ranges/selections within those row groups) that
+/// should be scanned, based on the available metadata.
+#[derive(Debug, Clone, PartialEq)]
+pub struct RowGroupAccessPlanFilter {
+ /// which row groups should be accessed
+ access_plan: ParquetAccessPlan,
}
-impl RowGroupSet {
- /// Create a new `RowGroupSet` with all row groups set to true (will be
scanned)
- pub fn new(num_row_groups: usize) -> Self {
- Self {
- row_groups: vec![true; num_row_groups],
- }
- }
-
- /// Set the i-th row group to false (should not be scanned)
- pub fn do_not_scan(&mut self, idx: usize) {
- self.row_groups[idx] = false;
- }
-
- /// Return true if the i-th row group should be scanned
- fn should_scan(&self, idx: usize) -> bool {
- self.row_groups[idx]
+impl RowGroupAccessPlanFilter {
+ /// Create a new `RowGroupPlanBuilder` for pruning out the groups to scan
+ /// based on metadata and statistics
+ pub fn new(access_plan: ParquetAccessPlan) -> Self {
+ Self { access_plan }
}
- /// Return the total number of row groups (not the total number to be
scanned)
- pub fn len(&self) -> usize {
- self.row_groups.len()
- }
-
- /// Return true if there are no row groups
+ /// Return true if there are no row groups to scan
pub fn is_empty(&self) -> bool {
- self.row_groups.is_empty()
- }
-
- /// Return an iterator over the row group indexes that should be scanned
- pub fn iter(&self) -> impl Iterator<Item = usize> + '_ {
- self.row_groups
- .iter()
- .enumerate()
- .filter_map(|(idx, &b)| if b { Some(idx) } else { None })
+ self.access_plan.is_empty()
}
- /// Return a `Vec` of row group indices that should be scanned
- pub fn indexes(&self) -> Vec<usize> {
- self.iter().collect()
+ /// Returns the inner access plan
+ pub fn build(self) -> ParquetAccessPlan {
+ self.access_plan
}
/// Prune remaining row groups to only those within the specified range.
@@ -97,9 +74,9 @@ impl RowGroupSet {
/// # Panics
/// if `groups.len() != self.len()`
pub fn prune_by_range(&mut self, groups: &[RowGroupMetaData], range:
&FileRange) {
- assert_eq!(groups.len(), self.len());
+ assert_eq!(groups.len(), self.access_plan.len());
for (idx, metadata) in groups.iter().enumerate() {
- if !self.should_scan(idx) {
+ if !self.access_plan.should_scan(idx) {
continue;
}
@@ -113,7 +90,7 @@ impl RowGroupSet {
.dictionary_page_offset()
.unwrap_or_else(|| col.data_page_offset());
if !range.contains(offset) {
- self.do_not_scan(idx);
+ self.access_plan.skip(idx);
}
}
}
@@ -135,9 +112,9 @@ impl RowGroupSet {
predicate: &PruningPredicate,
metrics: &ParquetFileMetrics,
) {
- assert_eq!(groups.len(), self.len());
+ assert_eq!(groups.len(), self.access_plan.len());
for (idx, metadata) in groups.iter().enumerate() {
- if !self.should_scan(idx) {
+ if !self.access_plan.should_scan(idx) {
continue;
}
let pruning_stats = RowGroupPruningStatistics {
@@ -150,7 +127,7 @@ impl RowGroupSet {
// NB: false means don't scan row group
if !values[0] {
metrics.row_groups_pruned_statistics.add(1);
- self.do_not_scan(idx);
+ self.access_plan.skip(idx);
continue;
}
}
@@ -179,9 +156,9 @@ impl RowGroupSet {
predicate: &PruningPredicate,
metrics: &ParquetFileMetrics,
) {
- assert_eq!(builder.metadata().num_row_groups(), self.len());
- for idx in 0..self.len() {
- if !self.should_scan(idx) {
+ assert_eq!(builder.metadata().num_row_groups(),
self.access_plan.len());
+ for idx in 0..self.access_plan.len() {
+ if !self.access_plan.should_scan(idx) {
continue;
}
@@ -230,7 +207,7 @@ impl RowGroupSet {
if prune_group {
metrics.row_groups_pruned_bloom_filter.add(1);
- self.do_not_scan(idx)
+ self.access_plan.skip(idx)
} else if !stats.column_sbbf.is_empty() {
metrics.row_groups_matched_bloom_filter.add(1);
}
@@ -500,7 +477,7 @@ mod tests {
);
let metrics = parquet_file_metrics();
- let mut row_groups = RowGroupSet::new(2);
+ let mut row_groups =
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
row_groups.prune_by_statistics(
&schema,
&schema_descr,
@@ -534,7 +511,7 @@ mod tests {
let metrics = parquet_file_metrics();
// missing statistics for first row group mean that the result from
the predicate expression
// is null / undefined so the first row group can't be filtered out
- let mut row_groups = RowGroupSet::new(2);
+ let mut row_groups =
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
row_groups.prune_by_statistics(
&schema,
&schema_descr,
@@ -581,7 +558,7 @@ mod tests {
let groups = &[rgm1, rgm2];
// the first row group is still filtered out because the predicate
expression can be partially evaluated
// when conditions are joined using AND
- let mut row_groups = RowGroupSet::new(2);
+ let mut row_groups =
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
row_groups.prune_by_statistics(
&schema,
&schema_descr,
@@ -599,7 +576,7 @@ mod tests {
// if conditions in predicate are joined with OR and an unsupported
expression is used
// this bypasses the entire predicate expression and no row groups are
filtered out
- let mut row_groups = RowGroupSet::new(2);
+ let mut row_groups =
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
row_groups.prune_by_statistics(
&schema,
&schema_descr,
@@ -655,7 +632,7 @@ mod tests {
let groups = &[rgm1, rgm2];
// the first row group should be left because c1 is greater than zero
// the second should be filtered out because c1 is less than zero
- let mut row_groups = RowGroupSet::new(2);
+ let mut row_groups =
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
row_groups.prune_by_statistics(
&file_schema,
&schema_descr,
@@ -704,7 +681,7 @@ mod tests {
let metrics = parquet_file_metrics();
// First row group was filtered out because it contains no null value
on "c2".
- let mut row_groups = RowGroupSet::new(2);
+ let mut row_groups =
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
row_groups.prune_by_statistics(
&schema,
&schema_descr,
@@ -736,7 +713,8 @@ mod tests {
let metrics = parquet_file_metrics();
// bool = NULL always evaluates to NULL (and thus will not
// pass predicates. Ideally these should both be false
- let mut row_groups = RowGroupSet::new(groups.len());
+ let mut row_groups =
+
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(groups.len()));
row_groups.prune_by_statistics(
&schema,
&schema_descr,
@@ -796,7 +774,7 @@ mod tests {
vec![ParquetStatistics::int32(Some(100), None, None, 0, false)],
);
let metrics = parquet_file_metrics();
- let mut row_groups = RowGroupSet::new(3);
+ let mut row_groups =
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3));
row_groups.prune_by_statistics(
&schema,
&schema_descr,
@@ -864,7 +842,7 @@ mod tests {
vec![ParquetStatistics::int32(None, Some(2), None, 0, false)],
);
let metrics = parquet_file_metrics();
- let mut row_groups = RowGroupSet::new(4);
+ let mut row_groups =
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(4));
row_groups.prune_by_statistics(
&schema,
&schema_descr,
@@ -915,7 +893,7 @@ mod tests {
vec![ParquetStatistics::int64(None, None, None, 0, false)],
);
let metrics = parquet_file_metrics();
- let mut row_groups = RowGroupSet::new(3);
+ let mut row_groups =
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3));
row_groups.prune_by_statistics(
&schema,
&schema_descr,
@@ -989,7 +967,7 @@ mod tests {
)],
);
let metrics = parquet_file_metrics();
- let mut row_groups = RowGroupSet::new(3);
+ let mut row_groups =
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3));
row_groups.prune_by_statistics(
&schema,
&schema_descr,
@@ -1052,7 +1030,7 @@ mod tests {
vec![ParquetStatistics::byte_array(None, None, None, 0, false)],
);
let metrics = parquet_file_metrics();
- let mut row_groups = RowGroupSet::new(3);
+ let mut row_groups =
RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3));
row_groups.prune_by_statistics(
&schema,
&schema_descr,
@@ -1179,7 +1157,7 @@ mod tests {
)
.await
.unwrap();
- assert!(pruned_row_groups.indexes().is_empty());
+ assert!(pruned_row_groups.access_plan.row_group_indexes().is_empty());
}
#[tokio::test]
@@ -1251,12 +1229,12 @@ mod tests {
impl ExpectedPruning {
/// asserts that the pruned row group match this expectation
- fn assert(&self, row_groups: &RowGroupSet) {
- let num_row_groups = row_groups.len();
+ fn assert(&self, row_groups: &RowGroupAccessPlanFilter) {
+ let num_row_groups = row_groups.access_plan.len();
assert!(num_row_groups > 0);
let num_pruned = (0..num_row_groups)
.filter_map(|i| {
- if row_groups.should_scan(i) {
+ if row_groups.access_plan.should_scan(i) {
None
} else {
Some(1)
@@ -1278,14 +1256,14 @@ mod tests {
);
}
ExpectedPruning::Some(expected) => {
- let actual = row_groups.indexes();
+ let actual = row_groups.access_plan.row_group_indexes();
assert_eq!(expected, &actual, "Unexpected row groups
pruned. Expected {expected:?}, got {actual:?}");
}
}
}
}
- fn assert_pruned(row_groups: RowGroupSet, expected: ExpectedPruning) {
+ fn assert_pruned(row_groups: RowGroupAccessPlanFilter, expected:
ExpectedPruning) {
expected.assert(&row_groups);
}
@@ -1386,7 +1364,7 @@ mod tests {
file_name: &str,
data: bytes::Bytes,
pruning_predicate: &PruningPredicate,
- ) -> Result<RowGroupSet> {
+ ) -> Result<RowGroupAccessPlanFilter> {
use object_store::{ObjectMeta, ObjectStore};
let object_meta = ObjectMeta {
@@ -1411,7 +1389,8 @@ mod tests {
};
let mut builder =
ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
- let mut pruned_row_groups =
RowGroupSet::new(builder.metadata().num_row_groups());
+ let access_plan =
ParquetAccessPlan::new_all(builder.metadata().num_row_groups());
+ let mut pruned_row_groups = RowGroupAccessPlanFilter::new(access_plan);
pruned_row_groups
.prune_by_bloom_filters(
pruning_predicate.schema(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]