alamb commented on code in PR #21190:
URL: https://github.com/apache/datafusion/pull/21190#discussion_r3001465687
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -205,524 +468,547 @@ impl FileOpener for ParquetOpener {
.transpose()?;
}
- let reorder_predicates = self.reorder_filters;
- let pushdown_filters = self.pushdown_filters;
- let force_filter_selections = self.force_filter_selections;
- let coerce_int96 = self.coerce_int96;
- let enable_bloom_filter = self.enable_bloom_filter;
- let enable_row_group_stats_pruning =
self.enable_row_group_stats_pruning;
- let limit = self.limit;
- let parquet_file_reader_factory =
Arc::clone(&self.parquet_file_reader_factory);
- let partition_index = self.partition_index;
- let metrics = self.metrics.clone();
-
let predicate_creation_errors = MetricBuilder::new(&self.metrics)
.global_counter("num_predicate_creation_errors");
- let expr_adapter_factory = Arc::clone(&self.expr_adapter_factory);
-
- let enable_page_index = self.enable_page_index;
- #[cfg(feature = "parquet_encryption")]
- let encryption_context = self.get_encryption_context();
- let max_predicate_cache_size = self.max_predicate_cache_size;
+ // Apply literal replacements to projection and predicate
+ let file_pruner = predicate
+ .as_ref()
+ .filter(|p| is_dynamic_physical_expr(p) ||
partitioned_file.has_statistics())
+ .and_then(|p| {
+ FilePruner::try_new(
+ Arc::clone(p),
+ &logical_file_schema,
+ &partitioned_file,
+ predicate_creation_errors.clone(),
+ )
+ });
+
+ Ok(PreparedParquetOpen {
+ partition_index: self.partition_index,
+ partitioned_file,
+ file_range,
+ extensions,
+ file_name,
+ file_metrics,
+ file_pruner,
+ metadata_size_hint,
+ metrics: self.metrics.clone(),
+ parquet_file_reader_factory:
Arc::clone(&self.parquet_file_reader_factory),
+ async_file_reader,
+ batch_size: self.batch_size,
+ logical_file_schema: Arc::clone(&logical_file_schema),
+ physical_file_schema: logical_file_schema,
+ output_schema,
+ projection,
+ predicate,
+ reorder_predicates: self.reorder_filters,
+ pushdown_filters: self.pushdown_filters,
+ force_filter_selections: self.force_filter_selections,
+ enable_page_index: self.enable_page_index,
+ enable_bloom_filter: self.enable_bloom_filter,
+ enable_row_group_stats_pruning:
self.enable_row_group_stats_pruning,
+ limit: self.limit,
+ coerce_int96: self.coerce_int96,
+ expr_adapter_factory: Arc::clone(&self.expr_adapter_factory),
+ predicate_creation_errors,
+ max_predicate_cache_size: self.max_predicate_cache_size,
+ reverse_row_groups: self.reverse_row_groups,
+ preserve_order: self.preserve_order,
+ #[cfg(feature = "parquet_encryption")]
+ file_decryption_properties: None,
+ })
+ }
+}
- let reverse_row_groups = self.reverse_row_groups;
- let preserve_order = self.preserve_order;
+impl PreparedParquetOpen {
Review Comment:
Using whitespace blind diff I think it is easier to see that what this does
is just to break up the giant future into multiple smaller methods
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -205,524 +468,547 @@ impl FileOpener for ParquetOpener {
.transpose()?;
}
- let reorder_predicates = self.reorder_filters;
- let pushdown_filters = self.pushdown_filters;
- let force_filter_selections = self.force_filter_selections;
- let coerce_int96 = self.coerce_int96;
- let enable_bloom_filter = self.enable_bloom_filter;
- let enable_row_group_stats_pruning =
self.enable_row_group_stats_pruning;
- let limit = self.limit;
- let parquet_file_reader_factory =
Arc::clone(&self.parquet_file_reader_factory);
- let partition_index = self.partition_index;
- let metrics = self.metrics.clone();
-
let predicate_creation_errors = MetricBuilder::new(&self.metrics)
.global_counter("num_predicate_creation_errors");
- let expr_adapter_factory = Arc::clone(&self.expr_adapter_factory);
-
- let enable_page_index = self.enable_page_index;
- #[cfg(feature = "parquet_encryption")]
- let encryption_context = self.get_encryption_context();
- let max_predicate_cache_size = self.max_predicate_cache_size;
+ // Apply literal replacements to projection and predicate
+ let file_pruner = predicate
+ .as_ref()
+ .filter(|p| is_dynamic_physical_expr(p) ||
partitioned_file.has_statistics())
+ .and_then(|p| {
+ FilePruner::try_new(
+ Arc::clone(p),
+ &logical_file_schema,
+ &partitioned_file,
+ predicate_creation_errors.clone(),
+ )
+ });
+
+ Ok(PreparedParquetOpen {
Review Comment:
This is a pretty good example of what this PR is doing -- it marshalls up
the state into an explicit structure (rather than letting the rust compiler do
it across `await` calls)
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -48,27 +49,32 @@ use datafusion_physical_expr_common::physical_expr::{
PhysicalExpr, is_dynamic_physical_expr,
};
use datafusion_physical_plan::metrics::{
- BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder,
PruningMetrics,
+ Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics,
};
use datafusion_pruning::{FilePruner, PruningPredicate,
build_pruning_predicate};
#[cfg(feature = "parquet_encryption")]
use datafusion_common::config::EncryptionFactoryOptions;
#[cfg(feature = "parquet_encryption")]
use datafusion_execution::parquet_encryption::EncryptionFactory;
-use futures::{Stream, StreamExt, ready};
+use futures::{
+ FutureExt, Stream, StreamExt, TryStreamExt, future::BoxFuture, ready,
+ stream::BoxStream,
+};
use log::debug;
-use parquet::DecodeResult;
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use parquet::arrow::arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy,
};
use parquet::arrow::async_reader::AsyncFileReader;
-use parquet::arrow::push_decoder::{ParquetPushDecoder,
ParquetPushDecoderBuilder};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
-/// Implements [`FileOpener`] for a parquet file
+/// Entry point for opening a Parquet file
+///
+/// Reading a Parquet file is a multi-stage process, with multiple
CPU-intensive
+/// steps interspersed with I/O steps. The code in this module implements the
steps
+/// as an explicit state machine -- see [`ParquetOpenState`] for details.
Review Comment:
This explains the design -- rather than an implicit state machine in a
single giant future, this PR splits the opener into an explicit state machine
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -205,524 +393,546 @@ impl FileOpener for ParquetOpener {
.transpose()?;
}
- let reorder_predicates = self.reorder_filters;
- let pushdown_filters = self.pushdown_filters;
- let force_filter_selections = self.force_filter_selections;
- let coerce_int96 = self.coerce_int96;
- let enable_bloom_filter = self.enable_bloom_filter;
- let enable_row_group_stats_pruning =
self.enable_row_group_stats_pruning;
- let limit = self.limit;
- let parquet_file_reader_factory =
Arc::clone(&self.parquet_file_reader_factory);
- let partition_index = self.partition_index;
- let metrics = self.metrics.clone();
-
let predicate_creation_errors = MetricBuilder::new(&self.metrics)
.global_counter("num_predicate_creation_errors");
- let expr_adapter_factory = Arc::clone(&self.expr_adapter_factory);
-
- let enable_page_index = self.enable_page_index;
- #[cfg(feature = "parquet_encryption")]
- let encryption_context = self.get_encryption_context();
- let max_predicate_cache_size = self.max_predicate_cache_size;
+ // Apply literal replacements to projection and predicate
+ let file_pruner = predicate
+ .as_ref()
+ .filter(|p| is_dynamic_physical_expr(p) ||
partitioned_file.has_statistics())
+ .and_then(|p| {
+ FilePruner::try_new(
+ Arc::clone(p),
+ &logical_file_schema,
+ &partitioned_file,
+ predicate_creation_errors.clone(),
+ )
+ });
+
+ Ok(PreparedParquetOpen {
+ partition_index: self.partition_index,
+ partitioned_file,
+ file_range,
+ extensions,
+ file_name,
+ file_metrics,
+ file_pruner,
+ metadata_size_hint,
+ metrics: self.metrics.clone(),
+ parquet_file_reader_factory:
Arc::clone(&self.parquet_file_reader_factory),
+ async_file_reader,
+ batch_size: self.batch_size,
+ logical_file_schema: Arc::clone(&logical_file_schema),
+ physical_file_schema: logical_file_schema,
+ output_schema,
+ projection,
+ predicate,
+ reorder_predicates: self.reorder_filters,
+ pushdown_filters: self.pushdown_filters,
+ force_filter_selections: self.force_filter_selections,
+ enable_page_index: self.enable_page_index,
+ enable_bloom_filter: self.enable_bloom_filter,
+ enable_row_group_stats_pruning:
self.enable_row_group_stats_pruning,
+ limit: self.limit,
+ coerce_int96: self.coerce_int96,
+ expr_adapter_factory: Arc::clone(&self.expr_adapter_factory),
+ predicate_creation_errors,
+ max_predicate_cache_size: self.max_predicate_cache_size,
+ reverse_row_groups: self.reverse_row_groups,
+ preserve_order: self.preserve_order,
+ #[cfg(feature = "parquet_encryption")]
+ file_decryption_properties: None,
+ })
+ }
+}
- let reverse_row_groups = self.reverse_row_groups;
- let preserve_order = self.preserve_order;
+impl PreparedParquetOpen {
+ /// File-level pruning before any parquet metadata is loaded.
+ ///
+ /// Returns `None` if the file can be skipped completely.
+ fn prune_file(mut self) -> Result<Option<Self>> {
+ // Prune this file using the file level statistics and partition
values.
+ // Since dynamic filters may have been updated since planning it is
possible that we are able
+ // to prune files now that we couldn't prune at planning time.
+ // It is assumed that there is no point in doing pruning here if the
predicate is not dynamic,
+ // as it would have been done at planning time.
+ // We'll also check this after every record batch we read,
+ // and if at some point we are able to prove we can prune the file
using just the file level statistics
+ // we can end the stream early.
+ //
+ // Make a FilePruner only if there is either
+ // 1. a dynamic expr in the predicate
+ // 2. the file has file-level statistics.
+ //
+ // File-level statistics may prune the file without loading
+ // any row groups or metadata.
+ //
+ // Dynamic filters may prune the file after initial
+ // planning, as the dynamic filter is updated during
+ // execution.
+ //
+ // The case where there is a dynamic filter but no
+ // statistics corresponds to a dynamic filter that
+ // references partition columns. While rare, this is possible
+ // e.g. `select * from table order by partition_col limit
+ // 10` could hit this condition.
+ if let Some(file_pruner) = &mut self.file_pruner
+ && file_pruner.should_prune()?
+ {
+ self.file_metrics
+ .files_ranges_pruned_statistics
+ .add_pruned(1);
+ return Ok(None);
+ }
- Ok(Box::pin(async move {
- #[cfg(feature = "parquet_encryption")]
- let file_decryption_properties = encryption_context
- .get_file_decryption_properties(&file_location)
- .await?;
+ self.file_metrics
+ .files_ranges_pruned_statistics
+ .add_matched(1);
+ Ok(Some(self))
+ }
- // ---------------------------------------------
- // Step: try to prune the current file partition
- // ---------------------------------------------
-
- // Prune this file using the file level statistics and partition
values.
- // Since dynamic filters may have been updated since planning it
is possible that we are able
- // to prune files now that we couldn't prune at planning time.
- // It is assumed that there is no point in doing pruning here if
the predicate is not dynamic,
- // as it would have been done at planning time.
- // We'll also check this after every record batch we read,
- // and if at some point we are able to prove we can prune the file
using just the file level statistics
- // we can end the stream early.
- let mut file_pruner = predicate
- .as_ref()
- .filter(|p| {
- // Make a FilePruner only if there is either
- // 1. a dynamic expr in the predicate
- // 2. the file has file-level statistics.
- //
- // File-level statistics may prune the file without loading
- // any row groups or metadata.
- //
- // Dynamic filters may prune the file after initial
- // planning, as the dynamic filter is updated during
- // execution.
- //
- // The case where there is a dynamic filter but no
- // statistics corresponds to a dynamic filter that
- // references partition columns. While rare, this is
possible
- // e.g. `select * from table order by partition_col limit
- // 10` could hit this condition.
- is_dynamic_physical_expr(p) ||
partitioned_file.has_statistics()
- })
- .and_then(|p| {
- FilePruner::try_new(
- Arc::clone(p),
- &logical_file_schema,
- &partitioned_file,
- predicate_creation_errors.clone(),
- )
- });
-
- if let Some(file_pruner) = &mut file_pruner
- && file_pruner.should_prune()?
- {
- // Return an empty stream immediately to skip the work of
setting up the actual stream
- file_metrics.files_ranges_pruned_statistics.add_pruned(1);
- return Ok(futures::stream::empty().boxed());
- }
+ /// Load parquet metadata after file-level pruning is complete.
+ async fn load(mut self) -> Result<MetadataLoadedParquetOpen> {
+ // Don't load the page index yet. Since it is not stored inline in
+ // the footer, loading the page index if it is not needed will do
+ // unnecessary I/O. We decide later if it is needed to evaluate the
+ // pruning predicates. Thus default to not requesting it from the
+ // underlying reader.
+ let options =
+
ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip);
+ #[cfg(feature = "parquet_encryption")]
+ let mut options = options;
+ #[cfg(feature = "parquet_encryption")]
+ if let Some(fd_val) = &self.file_decryption_properties {
+ options =
options.with_file_decryption_properties(Arc::clone(fd_val));
+ }
- file_metrics.files_ranges_pruned_statistics.add_matched(1);
+ let mut metadata_timer = self.file_metrics.metadata_load_time.timer();
+ // Begin by loading the metadata from the underlying reader (note
+ // the returned metadata may actually include page indexes as some
+ // readers may return page indexes even when not requested -- for
+ // example when they are cached)
+ let reader_metadata =
+ ArrowReaderMetadata::load_async(&mut self.async_file_reader,
options.clone())
+ .await?;
+ metadata_timer.stop();
+ drop(metadata_timer);
+
+ Ok(MetadataLoadedParquetOpen {
+ prepared: self,
+ reader_metadata,
+ options,
+ })
+ }
+}
- // --------------------------------------------------------
- // Step: fetch Parquet metadata (and optionally page index)
- // --------------------------------------------------------
+impl MetadataLoadedParquetOpen {
+ /// Prepare file-schema coercions and pruning predicates once metadata is
loaded.
+ fn prepare_filters(self) -> Result<FiltersPreparedParquetOpen> {
+ let MetadataLoadedParquetOpen {
+ mut prepared,
+ mut reader_metadata,
+ mut options,
+ } = self;
+
+ // Note about schemas: we are actually dealing with **3 different
schemas** here:
+ // - The table schema as defined by the TableProvider.
+ // This is what the user sees, what they get when they `SELECT *
FROM table`, etc.
+ // - The logical file schema: this is the table schema minus any hive
partition columns and projections.
+ // This is what the physical file schema is coerced to.
+ // - The physical file schema: this is the schema that the arrow-rs
+ // parquet reader will actually produce.
+ let mut physical_file_schema = Arc::clone(reader_metadata.schema());
+
+ // The schema loaded from the file may not be the same as the
+ // desired schema (for example if we want to instruct the parquet
+ // reader to read strings using Utf8View instead). Update if necessary
+ if let Some(merged) = apply_file_schema_type_coercions(
+ &prepared.logical_file_schema,
+ &physical_file_schema,
+ ) {
+ physical_file_schema = Arc::new(merged);
+ options = options.with_schema(Arc::clone(&physical_file_schema));
+ reader_metadata = ArrowReaderMetadata::try_new(
+ Arc::clone(reader_metadata.metadata()),
+ options.clone(),
+ )?;
+ }
- // Don't load the page index yet. Since it is not stored inline in
- // the footer, loading the page index if it is not needed will do
- // unnecessary I/O. We decide later if it is needed to evaluate the
- // pruning predicates. Thus default to not requesting it from the
- // underlying reader.
- let mut options =
-
ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip);
- #[cfg(feature = "parquet_encryption")]
- if let Some(fd_val) = file_decryption_properties {
- options =
options.with_file_decryption_properties(Arc::clone(&fd_val));
- }
- let mut metadata_timer = file_metrics.metadata_load_time.timer();
-
- // Begin by loading the metadata from the underlying reader (note
- // the returned metadata may actually include page indexes as some
- // readers may return page indexes even when not requested -- for
- // example when they are cached)
- let mut reader_metadata =
- ArrowReaderMetadata::load_async(&mut async_file_reader,
options.clone())
- .await?;
-
- // Note about schemas: we are actually dealing with **3 different
schemas** here:
- // - The table schema as defined by the TableProvider.
- // This is what the user sees, what they get when they `SELECT *
FROM table`, etc.
- // - The logical file schema: this is the table schema minus any
hive partition columns and projections.
- // This is what the physical file schema is coerced to.
- // - The physical file schema: this is the schema that the arrow-rs
- // parquet reader will actually produce.
- let mut physical_file_schema =
Arc::clone(reader_metadata.schema());
-
- // The schema loaded from the file may not be the same as the
- // desired schema (for example if we want to instruct the parquet
- // reader to read strings using Utf8View instead). Update if
necessary
- if let Some(merged) = apply_file_schema_type_coercions(
- &logical_file_schema,
+ if let Some(ref coerce) = prepared.coerce_int96
+ && let Some(merged) = coerce_int96_to_resolution(
+ reader_metadata.parquet_schema(),
&physical_file_schema,
- ) {
- physical_file_schema = Arc::new(merged);
- options =
options.with_schema(Arc::clone(&physical_file_schema));
- reader_metadata = ArrowReaderMetadata::try_new(
- Arc::clone(reader_metadata.metadata()),
- options.clone(),
- )?;
- }
-
- if let Some(ref coerce) = coerce_int96
- && let Some(merged) = coerce_int96_to_resolution(
- reader_metadata.parquet_schema(),
- &physical_file_schema,
- coerce,
- )
- {
- physical_file_schema = Arc::new(merged);
- options =
options.with_schema(Arc::clone(&physical_file_schema));
- reader_metadata = ArrowReaderMetadata::try_new(
- Arc::clone(reader_metadata.metadata()),
- options.clone(),
- )?;
- }
+ coerce,
+ )
+ {
+ physical_file_schema = Arc::new(merged);
+ options = options.with_schema(Arc::clone(&physical_file_schema));
+ reader_metadata = ArrowReaderMetadata::try_new(
+ Arc::clone(reader_metadata.metadata()),
+ options.clone(),
+ )?;
+ }
- // Adapt the projection & filter predicate to the physical file
schema.
- // This evaluates missing columns and inserts any necessary casts.
- // After rewriting to the file schema, further simplifications may
be possible.
- // For example, if `'a' = col_that_is_missing` becomes `'a' =
NULL` that can then be simplified to `FALSE`
- // and we can avoid doing any more work on the file (bloom
filters, loading the page index, etc.).
- // Additionally, if any casts were inserted we can move casts from
the column to the literal side:
- // `CAST(col AS INT) = 5` can become `col = CAST(5 AS <col
type>)`, which can be evaluated statically.
- //
- // When the schemas are identical and there is no predicate, the
- // rewriter is a no-op: column indices already match (partition
- // columns are appended after file columns in the table schema),
- // types are the same, and there are no missing columns. Skip the
- // tree walk entirely in that case.
- let needs_rewrite =
- predicate.is_some() || logical_file_schema !=
physical_file_schema;
- if needs_rewrite {
- let rewriter = expr_adapter_factory.create(
- Arc::clone(&logical_file_schema),
- Arc::clone(&physical_file_schema),
- )?;
- let simplifier =
PhysicalExprSimplifier::new(&physical_file_schema);
- predicate = predicate
- .map(|p| simplifier.simplify(rewriter.rewrite(p)?))
- .transpose()?;
- // Adapt projections to the physical file schema as well
- projection = projection
- .try_map_exprs(|p|
simplifier.simplify(rewriter.rewrite(p)?))?;
- }
+ // Adapt the projection & filter predicate to the physical file schema.
+ // This evaluates missing columns and inserts any necessary casts.
+ // After rewriting to the file schema, further simplifications may be
possible.
+ // For example, if `'a' = col_that_is_missing` becomes `'a' = NULL`
that can then be simplified to `FALSE`
+ // and we can avoid doing any more work on the file (bloom filters,
loading the page index, etc.).
+ // Additionally, if any casts were inserted we can move casts from the
column to the literal side:
+ // `CAST(col AS INT) = 5` can become `col = CAST(5 AS <col type>)`,
which can be evaluated statically.
+ let rewriter = prepared.expr_adapter_factory.create(
Review Comment:
good call -- restored
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]