This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new dccf3778e9 parquet reader: move pruning predicate creation from
ParquetSource to ParquetOpener (#15561)
dccf3778e9 is described below
commit dccf3778e95037f3ed8740627799e9d658943157
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Sun Apr 6 08:33:00 2025 -0500
parquet reader: move pruning predicate creation from ParquetSource to
ParquetOpener (#15561)
* parquet reader: move pruning predicate creation from ParquetSource to
ParquetOpener
* use file schema, avoid loading page index if unecessary
* Add comment
* add comment
* Add comment
* remove check
* fix clippy
* update sqllogictest
* restore to explain plans
* reverted
* modify access
* Fix ArrowReaderOptions should read with physical_file_schema so we do…
(#17)
* Fix ArrowReaderOptions should read with physical_file_schema so we don't
need to cast back to utf8
* Fix fmt
* Update opener.rs
* Always apply per-file schema during parquet read (#18)
* Update datafusion/datasource-parquet/src/opener.rs
---------
Co-authored-by: Qi Zhu <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
.../core/src/datasource/file_format/parquet.rs | 12 +-
.../core/src/datasource/physical_plan/parquet.rs | 282 ++++++++-------------
datafusion/datasource-parquet/src/opener.rs | 206 ++++++++++++---
datafusion/datasource-parquet/src/row_filter.rs | 4 +-
datafusion/datasource-parquet/src/source.rs | 51 +---
datafusion/datasource/src/file_scan_config.rs | 7 +
6 files changed, 304 insertions(+), 258 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 27a7e7ae3c..67a7ba8dc7 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -67,13 +67,13 @@ pub(crate) mod test_util {
.into_iter()
.zip(tmp_files.into_iter())
.map(|(batch, mut output)| {
- let builder =
parquet::file::properties::WriterProperties::builder();
- let props = if multi_page {
- builder.set_data_page_row_count_limit(ROWS_PER_PAGE)
- } else {
- builder
+ let mut builder =
parquet::file::properties::WriterProperties::builder();
+ if multi_page {
+ builder =
builder.set_data_page_row_count_limit(ROWS_PER_PAGE)
}
- .build();
+ builder = builder.set_bloom_filter_enabled(true);
+
+ let props = builder.build();
let mut writer = parquet::arrow::ArrowWriter::try_new(
&mut output,
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 9e1b2822e8..5c06c3902c 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -43,6 +43,7 @@ mod tests {
};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder};
use arrow::record_batch::RecordBatch;
+ use arrow::util::pretty::pretty_format_batches;
use arrow_schema::SchemaRef;
use bytes::{BufMut, BytesMut};
use datafusion_common::config::TableParquetOptions;
@@ -61,8 +62,9 @@ mod tests {
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::{col, lit, when, Expr};
use datafusion_physical_expr::planner::logical2physical;
+ use datafusion_physical_plan::analyze::AnalyzeExec;
+ use datafusion_physical_plan::collect;
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet,
MetricsSet};
- use datafusion_physical_plan::{collect, displayable};
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use chrono::{TimeZone, Utc};
@@ -81,10 +83,10 @@ mod tests {
struct RoundTripResult {
/// Data that was read back from ParquetFiles
batches: Result<Vec<RecordBatch>>,
+ /// The EXPLAIN ANALYZE output
+ explain: Result<String>,
/// The physical plan that was created (that has statistics, etc)
parquet_exec: Arc<DataSourceExec>,
- /// The ParquetSource that is used in plan
- parquet_source: ParquetSource,
}
/// round-trip record batches by writing each individual RecordBatch to
@@ -137,71 +139,109 @@ mod tests {
self.round_trip(batches).await.batches
}
- /// run the test, returning the `RoundTripResult`
- async fn round_trip(self, batches: Vec<RecordBatch>) ->
RoundTripResult {
- let Self {
- projection,
- schema,
- predicate,
- pushdown_predicate,
- page_index_predicate,
- } = self;
-
- let file_schema = match schema {
- Some(schema) => schema,
- None => Arc::new(
- Schema::try_merge(
- batches.iter().map(|b| b.schema().as_ref().clone()),
- )
- .unwrap(),
- ),
- };
- // If testing with page_index_predicate, write parquet
- // files with multiple pages
- let multi_page = page_index_predicate;
- let (meta, _files) = store_parquet(batches,
multi_page).await.unwrap();
- let file_group = meta.into_iter().map(Into::into).collect();
-
+ fn build_file_source(&self, file_schema: SchemaRef) ->
Arc<ParquetSource> {
// set up predicate (this is normally done by a layer higher up)
- let predicate = predicate.map(|p| logical2physical(&p,
&file_schema));
+ let predicate = self
+ .predicate
+ .as_ref()
+ .map(|p| logical2physical(p, &file_schema));
let mut source = ParquetSource::default();
if let Some(predicate) = predicate {
source = source.with_predicate(Arc::clone(&file_schema),
predicate);
}
- if pushdown_predicate {
+ if self.pushdown_predicate {
source = source
.with_pushdown_filters(true)
.with_reorder_filters(true);
}
- if page_index_predicate {
+ if self.page_index_predicate {
source = source.with_enable_page_index(true);
}
+ Arc::new(source)
+ }
+
+ fn build_parquet_exec(
+ &self,
+ file_schema: SchemaRef,
+ file_group: FileGroup,
+ source: Arc<ParquetSource>,
+ ) -> Arc<DataSourceExec> {
let base_config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
- Arc::new(source.clone()),
+ source,
)
.with_file_group(file_group)
- .with_projection(projection)
+ .with_projection(self.projection.clone())
.build();
+ DataSourceExec::from_data_source(base_config)
+ }
+
+ /// run the test, returning the `RoundTripResult`
+ async fn round_trip(&self, batches: Vec<RecordBatch>) ->
RoundTripResult {
+ let file_schema = match &self.schema {
+ Some(schema) => schema,
+ None => &Arc::new(
+ Schema::try_merge(
+ batches.iter().map(|b| b.schema().as_ref().clone()),
+ )
+ .unwrap(),
+ ),
+ };
+ let file_schema = Arc::clone(file_schema);
+ // If testing with page_index_predicate, write parquet
+ // files with multiple pages
+ let multi_page = self.page_index_predicate;
+ let (meta, _files) = store_parquet(batches,
multi_page).await.unwrap();
+ let file_group: FileGroup =
meta.into_iter().map(Into::into).collect();
+
+ // build a ParquetExec to return the results
+ let parquet_source = self.build_file_source(file_schema.clone());
+ let parquet_exec = self.build_parquet_exec(
+ file_schema.clone(),
+ file_group.clone(),
+ Arc::clone(&parquet_source),
+ );
+
+ let analyze_exec = Arc::new(AnalyzeExec::new(
+ false,
+ false,
+ // use a new ParquetSource to avoid sharing execution metrics
+ self.build_parquet_exec(
+ file_schema.clone(),
+ file_group.clone(),
+ self.build_file_source(file_schema.clone()),
+ ),
+ Arc::new(Schema::new(vec![
+ Field::new("plan_type", DataType::Utf8, true),
+ Field::new("plan", DataType::Utf8, true),
+ ])),
+ ));
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
- let parquet_exec =
DataSourceExec::from_data_source(base_config.clone());
+ let batches = collect(
+ Arc::clone(&parquet_exec) as Arc<dyn ExecutionPlan>,
+ task_ctx.clone(),
+ )
+ .await;
+
+ let explain = collect(analyze_exec, task_ctx.clone())
+ .await
+ .map(|batches| {
+ let batches = pretty_format_batches(&batches).unwrap();
+ format!("{batches}")
+ });
+
RoundTripResult {
- batches: collect(parquet_exec.clone(), task_ctx).await,
+ batches,
+ explain,
parquet_exec,
- parquet_source: base_config
- .file_source()
- .as_any()
- .downcast_ref::<ParquetSource>()
- .unwrap()
- .clone(),
}
}
}
@@ -1375,26 +1415,6 @@ mod tests {
create_batch(vec![("c1", c1.clone())])
}
- /// Returns a int64 array with contents:
- /// "[-1, 1, null, 2, 3, null, null]"
- fn int64_batch() -> RecordBatch {
- let contents: ArrayRef = Arc::new(Int64Array::from(vec![
- Some(-1),
- Some(1),
- None,
- Some(2),
- Some(3),
- None,
- None,
- ]));
-
- create_batch(vec![
- ("a", contents.clone()),
- ("b", contents.clone()),
- ("c", contents.clone()),
- ])
- }
-
#[tokio::test]
async fn parquet_exec_metrics() {
// batch1: c1(string)
@@ -1454,110 +1474,17 @@ mod tests {
.round_trip(vec![batch1])
.await;
- // should have a pruning predicate
- let pruning_predicate = rt.parquet_source.pruning_predicate();
- assert!(pruning_predicate.is_some());
-
- // convert to explain plan form
- let display = displayable(rt.parquet_exec.as_ref())
- .indent(true)
- .to_string();
-
- assert_contains!(
- &display,
- "pruning_predicate=c1_null_count@2 != row_count@3 AND (c1_min@0 !=
bar OR bar != c1_max@1)"
- );
-
- assert_contains!(&display, r#"predicate=c1@0 != bar"#);
-
- assert_contains!(&display, "projection=[c1]");
- }
-
- #[tokio::test]
- async fn parquet_exec_display_deterministic() {
- // batches: a(int64), b(int64), c(int64)
- let batches = int64_batch();
-
- fn extract_required_guarantees(s: &str) -> Option<&str> {
- s.split("required_guarantees=").nth(1)
- }
-
- // Ensuring that the required_guarantees remain consistent across
every display plan of the filter conditions
- for _ in 0..100 {
- // c = 1 AND b = 1 AND a = 1
- let filter0 = col("c")
- .eq(lit(1))
- .and(col("b").eq(lit(1)))
- .and(col("a").eq(lit(1)));
-
- let rt0 = RoundTrip::new()
- .with_predicate(filter0)
- .with_pushdown_predicate()
- .round_trip(vec![batches.clone()])
- .await;
-
- let pruning_predicate = rt0.parquet_source.pruning_predicate();
- assert!(pruning_predicate.is_some());
-
- let display0 = displayable(rt0.parquet_exec.as_ref())
- .indent(true)
- .to_string();
-
- let guarantees0: &str = extract_required_guarantees(&display0)
- .expect("Failed to extract required_guarantees");
- // Compare only the required_guarantees part (Because the
file_groups part will not be the same)
- assert_eq!(
- guarantees0.trim(),
- "[a in (1), b in (1), c in (1)]",
- "required_guarantees don't match"
- );
- }
+ let explain = rt.explain.unwrap();
- // c = 1 AND a = 1 AND b = 1
- let filter1 = col("c")
- .eq(lit(1))
- .and(col("a").eq(lit(1)))
- .and(col("b").eq(lit(1)));
+ // check that there was a pruning predicate -> row groups got pruned
+ assert_contains!(&explain, "predicate=c1@0 != bar");
- let rt1 = RoundTrip::new()
- .with_predicate(filter1)
- .with_pushdown_predicate()
- .round_trip(vec![batches.clone()])
- .await;
-
- // b = 1 AND a = 1 AND c = 1
- let filter2 = col("b")
- .eq(lit(1))
- .and(col("a").eq(lit(1)))
- .and(col("c").eq(lit(1)));
+ // there's a single row group, but we can check that it matched
+ // if no pruning was done this would be 0 instead of 1
+ assert_contains!(&explain, "row_groups_matched_statistics=1");
- let rt2 = RoundTrip::new()
- .with_predicate(filter2)
- .with_pushdown_predicate()
- .round_trip(vec![batches])
- .await;
-
- // should have a pruning predicate
- let pruning_predicate = rt1.parquet_source.pruning_predicate();
- assert!(pruning_predicate.is_some());
- let pruning_predicate = rt2.parquet_source.predicate();
- assert!(pruning_predicate.is_some());
-
- // convert to explain plan form
- let display1 = displayable(rt1.parquet_exec.as_ref())
- .indent(true)
- .to_string();
- let display2 = displayable(rt2.parquet_exec.as_ref())
- .indent(true)
- .to_string();
-
- let guarantees1 = extract_required_guarantees(&display1)
- .expect("Failed to extract required_guarantees");
- let guarantees2 = extract_required_guarantees(&display2)
- .expect("Failed to extract required_guarantees");
-
- // Compare only the required_guarantees part (Because the predicate
part will not be the same)
- assert_eq!(guarantees1, guarantees2, "required_guarantees don't
match");
+ // check the projection
+ assert_contains!(&explain, "projection=[c1]");
}
#[tokio::test]
@@ -1581,16 +1508,19 @@ mod tests {
.await;
// Should not contain a pruning predicate (since nothing can be pruned)
- let pruning_predicate = rt.parquet_source.pruning_predicate();
- assert!(
- pruning_predicate.is_none(),
- "Still had pruning predicate: {pruning_predicate:?}"
- );
+ let explain = rt.explain.unwrap();
- // but does still has a pushdown down predicate
- let predicate = rt.parquet_source.predicate();
- let filter_phys = logical2physical(&filter,
rt.parquet_exec.schema().as_ref());
- assert_eq!(predicate.unwrap().to_string(), filter_phys.to_string());
+ // When both matched and pruned are 0, it means that the pruning
predicate
+ // was not used at all.
+ assert_contains!(&explain, "row_groups_matched_statistics=0");
+ assert_contains!(&explain, "row_groups_pruned_statistics=0");
+
+ // But pushdown predicate should be present
+ assert_contains!(
+ &explain,
+ "predicate=CASE WHEN c1@0 != bar THEN true ELSE false END"
+ );
+ assert_contains!(&explain, "pushdown_rows_pruned=5");
}
#[tokio::test]
@@ -1616,8 +1546,14 @@ mod tests {
.await;
// Should have a pruning predicate
- let pruning_predicate = rt.parquet_source.pruning_predicate();
- assert!(pruning_predicate.is_some());
+ let explain = rt.explain.unwrap();
+ assert_contains!(
+ &explain,
+ "predicate=c1@0 = foo AND CASE WHEN c1@0 != bar THEN true ELSE
false END"
+ );
+
+ // And bloom filters should have been evaluated
+ assert_contains!(&explain, "row_groups_pruned_bloom_filter=1");
}
/// Returns the sum of all the metrics with the specified name
diff --git a/datafusion/datasource-parquet/src/opener.rs
b/datafusion/datasource-parquet/src/opener.rs
index 732fef47d5..708a8035a4 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -34,13 +34,14 @@ use arrow::error::ArrowError;
use datafusion_common::{exec_err, Result};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_optimizer::pruning::PruningPredicate;
-use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet,
MetricBuilder};
use futures::{StreamExt, TryStreamExt};
use log::debug;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
+use parquet::file::metadata::ParquetMetaDataReader;
/// Implements [`FileOpener`] for a parquet file
pub(super) struct ParquetOpener {
@@ -54,10 +55,6 @@ pub(super) struct ParquetOpener {
pub limit: Option<usize>,
/// Optional predicate to apply during the scan
pub predicate: Option<Arc<dyn PhysicalExpr>>,
- /// Optional pruning predicate applied to row group statistics
- pub pruning_predicate: Option<Arc<PruningPredicate>>,
- /// Optional pruning predicate applied to data page statistics
- pub page_pruning_predicate: Option<Arc<PagePruningAccessPlanFilter>>,
/// Schema of the output table
pub table_schema: SchemaRef,
/// Optional hint for how large the initial request to read parquet
metadata
@@ -80,6 +77,8 @@ pub(super) struct ParquetOpener {
pub enable_bloom_filter: bool,
/// Schema adapter factory
pub schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+ /// Should row group pruning be applied
+ pub enable_row_group_stats_pruning: bool,
}
impl FileOpener for ParquetOpener {
@@ -92,7 +91,7 @@ impl FileOpener for ParquetOpener {
let metadata_size_hint =
file_meta.metadata_size_hint.or(self.metadata_size_hint);
- let mut reader: Box<dyn AsyncFileReader> =
+ let mut async_file_reader: Box<dyn AsyncFileReader> =
self.parquet_file_reader_factory.create_reader(
self.partition_index,
file_meta,
@@ -109,47 +108,84 @@ impl FileOpener for ParquetOpener {
.schema_adapter_factory
.create(projected_schema, Arc::clone(&self.table_schema));
let predicate = self.predicate.clone();
- let pruning_predicate = self.pruning_predicate.clone();
- let page_pruning_predicate = self.page_pruning_predicate.clone();
let table_schema = Arc::clone(&self.table_schema);
let reorder_predicates = self.reorder_filters;
let pushdown_filters = self.pushdown_filters;
- let enable_page_index = should_enable_page_index(
- self.enable_page_index,
- &self.page_pruning_predicate,
- );
let enable_bloom_filter = self.enable_bloom_filter;
+ let enable_row_group_stats_pruning =
self.enable_row_group_stats_pruning;
let limit = self.limit;
- Ok(Box::pin(async move {
- let options =
ArrowReaderOptions::new().with_page_index(enable_page_index);
+ let predicate_creation_errors = MetricBuilder::new(&self.metrics)
+ .global_counter("num_predicate_creation_errors");
+
+ let enable_page_index = self.enable_page_index;
+ Ok(Box::pin(async move {
+ // Don't load the page index yet. Since it is not stored inline in
+ // the footer, loading the page index if it is not needed will do
+ // unecessary I/O. We decide later if it is needed to evaluate the
+ // pruning predicates. Thus default to not requesting if from the
+ // underlying reader.
+ let mut options = ArrowReaderOptions::new().with_page_index(false);
let mut metadata_timer = file_metrics.metadata_load_time.timer();
- let metadata =
- ArrowReaderMetadata::load_async(&mut reader,
options.clone()).await?;
- let mut schema = Arc::clone(metadata.schema());
- // read with view types
- if let Some(merged) =
apply_file_schema_type_coercions(&table_schema, &schema)
+ // Begin by loading the metadata from the underlying reader (note
+ // the returned metadata may actually include page indexes as some
+ // readers may return page indexes even when not requested -- for
+ // example when they are cached)
+ let mut reader_metadata =
+ ArrowReaderMetadata::load_async(&mut async_file_reader,
options.clone())
+ .await?;
+
+ // Note about schemas: we are actually dealing with **3 different
schemas** here:
+ // - The table schema as defined by the TableProvider. This is
what the user sees, what they get when they `SELECT * FROM table`, etc.
+ // - The "virtual" file schema: this is the table schema minus any
hive partition columns and projections. This is what the file schema is coerced
to.
+ // - The physical file schema: this is the schema as defined by
the parquet file. This is what the parquet file actually contains.
+ let mut physical_file_schema =
Arc::clone(reader_metadata.schema());
+
+ // The schema loaded from the file may not be the same as the
+ // desired schema (for example if we want to instruct the parquet
+ // reader to read strings using Utf8View instead). Update if
necessary
+ if let Some(merged) =
+ apply_file_schema_type_coercions(&table_schema,
&physical_file_schema)
{
- schema = Arc::new(merged);
+ physical_file_schema = Arc::new(merged);
+ options =
options.with_schema(Arc::clone(&physical_file_schema));
+ reader_metadata = ArrowReaderMetadata::try_new(
+ Arc::clone(reader_metadata.metadata()),
+ options.clone(),
+ )?;
}
- let options = ArrowReaderOptions::new()
- .with_page_index(enable_page_index)
- .with_schema(Arc::clone(&schema));
- let metadata =
- ArrowReaderMetadata::try_new(Arc::clone(metadata.metadata()),
options)?;
+ // Build predicates for this specific file
+ let (pruning_predicate, page_pruning_predicate) =
build_pruning_predicates(
+ &predicate,
+ &physical_file_schema,
+ &predicate_creation_errors,
+ );
- metadata_timer.stop();
+ // The page index is not stored inline in the parquet footer so the
+ // code above may not have read the page index structures yet. If
we
+ // need them for reading and they aren't yet loaded, we need to
load them now.
+ if should_enable_page_index(enable_page_index,
&page_pruning_predicate) {
+ reader_metadata = load_page_index(
+ reader_metadata,
+ &mut async_file_reader,
+ // Since we're manually loading the page index the option
here should not matter but we pass it in for consistency
+ options.with_page_index(true),
+ )
+ .await?;
+ }
- let mut builder =
- ParquetRecordBatchStreamBuilder::new_with_metadata(reader,
metadata);
+ metadata_timer.stop();
- let file_schema = Arc::clone(builder.schema());
+ let mut builder =
ParquetRecordBatchStreamBuilder::new_with_metadata(
+ async_file_reader,
+ reader_metadata,
+ );
let (schema_mapping, adapted_projections) =
- schema_adapter.map_schema(&file_schema)?;
+ schema_adapter.map_schema(&physical_file_schema)?;
let mask = ProjectionMask::roots(
builder.parquet_schema(),
@@ -160,7 +196,7 @@ impl FileOpener for ParquetOpener {
if let Some(predicate) =
pushdown_filters.then_some(predicate).flatten() {
let row_filter = row_filter::build_row_filter(
&predicate,
- &file_schema,
+ &physical_file_schema,
&table_schema,
builder.metadata(),
reorder_predicates,
@@ -197,18 +233,20 @@ impl FileOpener for ParquetOpener {
}
// If there is a predicate that can be evaluated against the
metadata
if let Some(predicate) = predicate.as_ref() {
- row_groups.prune_by_statistics(
- &file_schema,
- builder.parquet_schema(),
- rg_metadata,
- predicate,
- &file_metrics,
- );
+ if enable_row_group_stats_pruning {
+ row_groups.prune_by_statistics(
+ &physical_file_schema,
+ builder.parquet_schema(),
+ rg_metadata,
+ predicate,
+ &file_metrics,
+ );
+ }
if enable_bloom_filter && !row_groups.is_empty() {
row_groups
.prune_by_bloom_filters(
- &file_schema,
+ &physical_file_schema,
&mut builder,
predicate,
&file_metrics,
@@ -226,7 +264,7 @@ impl FileOpener for ParquetOpener {
if let Some(p) = page_pruning_predicate {
access_plan = p.prune_plan_with_page_index(
access_plan,
- &file_schema,
+ &physical_file_schema,
builder.parquet_schema(),
file_metadata.as_ref(),
&file_metrics,
@@ -295,3 +333,91 @@ fn create_initial_plan(
// default to scanning all row groups
Ok(ParquetAccessPlan::new_all(row_group_count))
}
+
+/// Build a pruning predicate from an optional predicate expression.
+/// If the predicate is None or the predicate cannot be converted to a pruning
+/// predicate, return None.
+/// If there is an error creating the pruning predicate it is recorded by
incrementing
+/// the `predicate_creation_errors` counter.
+pub(crate) fn build_pruning_predicate(
+ predicate: Arc<dyn PhysicalExpr>,
+ file_schema: &SchemaRef,
+ predicate_creation_errors: &Count,
+) -> Option<Arc<PruningPredicate>> {
+ match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
+ Ok(pruning_predicate) => {
+ if !pruning_predicate.always_true() {
+ return Some(Arc::new(pruning_predicate));
+ }
+ }
+ Err(e) => {
+ debug!("Could not create pruning predicate for: {e}");
+ predicate_creation_errors.add(1);
+ }
+ }
+ None
+}
+
+/// Build a page pruning predicate from an optional predicate expression.
+/// If the predicate is None or the predicate cannot be converted to a page
pruning
+/// predicate, return None.
+pub(crate) fn build_page_pruning_predicate(
+ predicate: &Arc<dyn PhysicalExpr>,
+ file_schema: &SchemaRef,
+) -> Arc<PagePruningAccessPlanFilter> {
+ Arc::new(PagePruningAccessPlanFilter::new(
+ predicate,
+ Arc::clone(file_schema),
+ ))
+}
+
+fn build_pruning_predicates(
+ predicate: &Option<Arc<dyn PhysicalExpr>>,
+ file_schema: &SchemaRef,
+ predicate_creation_errors: &Count,
+) -> (
+ Option<Arc<PruningPredicate>>,
+ Option<Arc<PagePruningAccessPlanFilter>>,
+) {
+ let Some(predicate) = predicate.as_ref() else {
+ return (None, None);
+ };
+ let pruning_predicate = build_pruning_predicate(
+ Arc::clone(predicate),
+ file_schema,
+ predicate_creation_errors,
+ );
+ let page_pruning_predicate = build_page_pruning_predicate(predicate,
file_schema);
+ (pruning_predicate, Some(page_pruning_predicate))
+}
+
+/// Returns a `ArrowReaderMetadata` with the page index loaded, loading
+/// it from the underlying `AsyncFileReader` if necessary.
+async fn load_page_index<T: AsyncFileReader>(
+ reader_metadata: ArrowReaderMetadata,
+ input: &mut T,
+ options: ArrowReaderOptions,
+) -> Result<ArrowReaderMetadata> {
+ let parquet_metadata = reader_metadata.metadata();
+ let missing_column_index = parquet_metadata.column_index().is_none();
+ let missing_offset_index = parquet_metadata.offset_index().is_none();
+ // You may ask yourself: why are we even checking if the page index is
already loaded here?
+ // Didn't we explicitly *not* load it above?
+ // Well it's possible that a custom implementation of `AsyncFileReader`
gives you
+ // the page index even if you didn't ask for it (e.g. because it's cached)
+ // so it's important to check that here to avoid extra work.
+ if missing_column_index || missing_offset_index {
+ let m = Arc::try_unwrap(Arc::clone(parquet_metadata))
+ .unwrap_or_else(|e| e.as_ref().clone());
+ let mut reader =
+
ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
+ reader.load_page_index(input).await?;
+ let new_parquet_metadata = reader.finish()?;
+ let new_arrow_reader =
+ ArrowReaderMetadata::try_new(Arc::new(new_parquet_metadata),
options)?;
+ Ok(new_arrow_reader)
+ } else {
+ // No need to load the page index again, just return the existing
metadata
+ Ok(reader_metadata)
+ }
+}
diff --git a/datafusion/datasource-parquet/src/row_filter.rs
b/datafusion/datasource-parquet/src/row_filter.rs
index da6bf114d7..2d2993c29a 100644
--- a/datafusion/datasource-parquet/src/row_filter.rs
+++ b/datafusion/datasource-parquet/src/row_filter.rs
@@ -449,7 +449,7 @@ fn columns_sorted(_columns: &[usize], _metadata:
&ParquetMetaData) -> Result<boo
/// `a = 1` and `c = 3`.
pub fn build_row_filter(
expr: &Arc<dyn PhysicalExpr>,
- file_schema: &SchemaRef,
+ physical_file_schema: &SchemaRef,
table_schema: &SchemaRef,
metadata: &ParquetMetaData,
reorder_predicates: bool,
@@ -470,7 +470,7 @@ pub fn build_row_filter(
.map(|expr| {
FilterCandidateBuilder::new(
Arc::clone(expr),
- Arc::clone(file_schema),
+ Arc::clone(physical_file_schema),
Arc::clone(table_schema),
Arc::clone(schema_adapter_factory),
)
diff --git a/datafusion/datasource-parquet/src/source.rs
b/datafusion/datasource-parquet/src/source.rs
index 66d4d313d5..a5629e4363 100644
--- a/datafusion/datasource-parquet/src/source.rs
+++ b/datafusion/datasource-parquet/src/source.rs
@@ -17,9 +17,12 @@
//! ParquetSource implementation for reading parquet files
use std::any::Any;
+use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
+use crate::opener::build_page_pruning_predicate;
+use crate::opener::build_pruning_predicate;
use crate::opener::ParquetOpener;
use crate::page_filter::PagePruningAccessPlanFilter;
use crate::DefaultParquetFileReaderFactory;
@@ -41,7 +44,6 @@ use
datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder};
use datafusion_physical_plan::DisplayFormatType;
use itertools::Itertools;
-use log::debug;
use object_store::ObjectStore;
/// Execution plan for reading one or more Parquet files.
@@ -316,24 +318,10 @@ impl ParquetSource {
conf = conf.with_metrics(metrics);
conf.predicate = Some(Arc::clone(&predicate));
- match PruningPredicate::try_new(Arc::clone(&predicate),
Arc::clone(&file_schema))
- {
- Ok(pruning_predicate) => {
- if !pruning_predicate.always_true() {
- conf.pruning_predicate = Some(Arc::new(pruning_predicate));
- }
- }
- Err(e) => {
- debug!("Could not create pruning predicate for: {e}");
- predicate_creation_errors.add(1);
- }
- };
-
- let page_pruning_predicate = Arc::new(PagePruningAccessPlanFilter::new(
- &predicate,
- Arc::clone(&file_schema),
- ));
- conf.page_pruning_predicate = Some(page_pruning_predicate);
+ conf.page_pruning_predicate =
+ Some(build_page_pruning_predicate(&predicate, &file_schema));
+ conf.pruning_predicate =
+ build_pruning_predicate(predicate, &file_schema,
&predicate_creation_errors);
conf
}
@@ -348,16 +336,6 @@ impl ParquetSource {
self.predicate.as_ref()
}
- /// Optional reference to this parquet scan's pruning predicate
- pub fn pruning_predicate(&self) -> Option<&Arc<PruningPredicate>> {
- self.pruning_predicate.as_ref()
- }
-
- /// Optional reference to this parquet scan's page pruning predicate
- pub fn page_pruning_predicate(&self) ->
Option<&Arc<PagePruningAccessPlanFilter>> {
- self.page_pruning_predicate.as_ref()
- }
-
/// return the optional file reader factory
pub fn parquet_file_reader_factory(
&self,
@@ -488,8 +466,6 @@ impl FileSource for ParquetSource {
.expect("Batch size must set before creating ParquetOpener"),
limit: base_config.limit,
predicate: self.predicate.clone(),
- pruning_predicate: self.pruning_predicate.clone(),
- page_pruning_predicate: self.page_pruning_predicate.clone(),
table_schema: Arc::clone(&base_config.file_schema),
metadata_size_hint: self.metadata_size_hint,
metrics: self.metrics().clone(),
@@ -498,6 +474,7 @@ impl FileSource for ParquetSource {
reorder_filters: self.reorder_filters(),
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
+ enable_row_group_stats_pruning:
self.table_parquet_options.global.pruning,
schema_adapter_factory,
})
}
@@ -537,11 +514,10 @@ impl FileSource for ParquetSource {
.expect("projected_statistics must be set");
// When filters are pushed down, we have no way of knowing the exact
statistics.
// Note that pruning predicate is also a kind of filter pushdown.
- // (bloom filters use `pruning_predicate` too)
- if self.pruning_predicate().is_some()
- || self.page_pruning_predicate().is_some()
- || (self.predicate().is_some() && self.pushdown_filters())
- {
+ // (bloom filters use `pruning_predicate` too).
+ // Because filter pushdown may happen dynamically as long as there is
a predicate
+ // if we have *any* predicate applied, we can't guarantee the
statistics are exact.
+ if self.predicate().is_some() {
Ok(statistics.to_inexact())
} else {
Ok(statistics)
@@ -560,7 +536,8 @@ impl FileSource for ParquetSource {
.map(|p| format!(", predicate={p}"))
.unwrap_or_default();
let pruning_predicate_string = self
- .pruning_predicate()
+ .pruning_predicate
+ .as_ref()
.map(|pre| {
let mut guarantees = pre
.literal_guarantees()
diff --git a/datafusion/datasource/src/file_scan_config.rs
b/datafusion/datasource/src/file_scan_config.rs
index 0b3bf3bdd1..58fe8c7562 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -138,6 +138,9 @@ pub struct FileScanConfig {
/// Schema before `projection` is applied. It contains the all columns
that may
/// appear in the files. It does not include table partition columns
/// that may be added.
+ /// Note that this is **not** the schema of the physical files.
+ /// This is the schema that the physical file schema will be
+ /// mapped onto, and the schema that the [`DataSourceExec`] will return.
pub file_schema: SchemaRef,
/// List of files to be processed, grouped into partitions
///
@@ -224,6 +227,10 @@ pub struct FileScanConfig {
#[derive(Clone)]
pub struct FileScanConfigBuilder {
object_store_url: ObjectStoreUrl,
+ /// Table schema before any projections or partition columns are applied.
+ /// This schema is used to read the files, but is **not** necessarily the
schema of the physical files.
+ /// Rather this is the schema that the physical file schema will be mapped
onto, and the schema that the
+ /// [`DataSourceExec`] will return.
file_schema: SchemaRef,
file_source: Arc<dyn FileSource>,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]