This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7f0e194e2f Improve `ParquetExec` and related documentation (#10647)
7f0e194e2f is described below
commit 7f0e194e2f009e05d5b0599032d01d8c284e0baf
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon May 27 20:14:52 2024 -0400
Improve `ParquetExec` and related documentation (#10647)
* Improve ParquetExec and related documentation
* Improve documentation
* Update
datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs
Co-authored-by: Oleks V <[email protected]>
* fix link
---------
Co-authored-by: Oleks V <[email protected]>
---
.../src/datasource/physical_plan/parquet/mod.rs | 127 ++++++++++++++++++---
datafusion/core/src/datasource/schema_adapter.rs | 28 +++--
2 files changed, 131 insertions(+), 24 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 16da9c2d77..6655125ea8 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -76,7 +76,84 @@ use crate::datasource::schema_adapter::{
pub use metrics::ParquetFileMetrics;
pub use statistics::{RequestedStatistics, StatisticsConverter};
-/// Execution plan for scanning one or more Parquet partitions
+/// Execution plan for reading one or more Parquet files.
+///
+/// ```text
+/// ▲
+/// │
+/// │ Produce a stream of
+/// │ RecordBatches
+/// │
+/// ┌───────────────────────┐
+/// │ │
+/// │ ParquetExec │
+/// │ │
+/// └───────────────────────┘
+/// ▲
+/// │ Asynchronously read from one
+/// │ or more parquet files via
+/// │ ObjectStore interface
+/// │
+/// │
+/// .───────────────────.
+/// │ )
+/// │`───────────────────'│
+/// │ ObjectStore │
+/// │.───────────────────.│
+/// │ )
+/// `───────────────────'
+///
+/// ```
+/// # Features
+///
+/// Supports the following optimizations:
+///
+/// * Concurrent reads: Can read from one or more files in parallel as multiple
+/// partitions, including concurrently reading multiple row groups from a
single
+/// file.
+///
+/// * Predicate push down: skips row groups and pages based on
+/// min/max/null_counts in the row group metadata, the page index and bloom
+/// filters.
+///
+/// * Projection pushdown: reads and decodes only the columns required.
+///
+/// * Limit pushdown: stop execution early after some number of rows are read.
+///
+/// * Custom readers: customize reading parquet files, e.g. to cache metadata,
+/// coalesce I/O operations, etc. See [`ParquetFileReaderFactory`] for more
+/// details.
+///
+/// * Schema adapters: read parquet files with different schemas into a unified
+/// table schema. This can be used to implement "schema evolution". See
+/// [`SchemaAdapterFactory`] for more details.
+///
+/// * metadata_size_hint: controls the number of bytes read from the end of the
+/// file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a
+/// custom reader is used, it supplies the metadata directly and this parameter
+/// is ignored. See [`Self::with_parquet_file_reader_factory`] for more
details.
+///
+/// # Execution Overview
+///
+/// * Step 1: [`ParquetExec::execute`] is called, returning a [`FileStream`]
+/// configured to open parquet files with a [`ParquetOpener`].
+///
+/// * Step 2: When the stream is polled, the [`ParquetOpener`] is called to
open
+/// the file.
+///
+/// * Step 3: The `ParquetOpener` gets the file metadata via
+/// [`ParquetFileReaderFactory`] and applies any predicates
+/// and projections to determine what pages must be read.
+///
+/// * Step 4: The stream begins reading data, fetching the required pages
+/// and incrementally decoding them.
+///
+/// * Step 5: As each [`RecordBatch]` is read, it may be adapted by a
+/// [`SchemaAdapter`] to match the table schema. By default missing columns are
+/// filled with nulls, but this can be customized via [`SchemaAdapterFactory`].
+///
+/// [`RecordBatch`]: arrow::record_batch::RecordBatch
+/// [`SchemaAdapter`]: crate::datasource::schema_adapter::SchemaAdapter
#[derive(Debug, Clone)]
pub struct ParquetExec {
/// Base configuration for this scan
@@ -86,9 +163,9 @@ pub struct ParquetExec {
metrics: ExecutionPlanMetricsSet,
/// Optional predicate for row filtering during parquet scan
predicate: Option<Arc<dyn PhysicalExpr>>,
- /// Optional predicate for pruning row groups
+ /// Optional predicate for pruning row groups (derived from `predicate`)
pruning_predicate: Option<Arc<PruningPredicate>>,
- /// Optional predicate for pruning pages
+ /// Optional predicate for pruning pages (derived from `predicate`)
page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
/// Optional hint for the size of the parquet metadata
metadata_size_hint: Option<usize>,
@@ -190,11 +267,13 @@ impl ParquetExec {
/// Optional user defined parquet file reader factory.
///
- /// `ParquetFileReaderFactory` complements `TableProvider`, It enables
users to provide custom
- /// implementation for data access operations.
+ /// You can use [`ParquetFileReaderFactory`] to more precisely control how
+ /// data is read from parquet files (e.g. skip re-reading metadata,
coalesce
+ /// I/O operations, etc).
///
- /// If custom `ParquetFileReaderFactory` is provided, then data access
operations will be routed
- /// to this factory instead of `ObjectStore`.
+ /// The default reader factory reads directly from an [`ObjectStore`]
+ /// instance using individual I/O operations for the footer and then for
+ /// each page.
pub fn with_parquet_file_reader_factory(
mut self,
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
@@ -643,11 +722,21 @@ fn should_enable_page_index(
.unwrap_or(false)
}
-/// Factory of parquet file readers.
+/// Interface for reading parquet files.
///
-/// Provides means to implement custom data access interface.
+/// The combined implementations of [`ParquetFileReaderFactory`] and
+/// [`AsyncFileReader`] can be used to provide custom data access operations
+/// such as pre-cached data, I/O coalescing, etc.
+///
+/// See [`DefaultParquetFileReaderFactory`] for a simple implementation.
pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
- /// Provides `AsyncFileReader` over parquet file specified in `FileMeta`
+ /// Provides an `AsyncFileReader` for reading data from a parquet file
specified
+ ///
+ /// # Arguments
+ /// * partition_index - Index of the partition (for reporting metrics)
+ /// * file_meta - The file to be read
+ /// * metadata_size_hint - If specified, the first IO reads this many
bytes from the footer
+ /// * metrics - Execution metrics
fn create_reader(
&self,
partition_index: usize,
@@ -657,20 +746,32 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync +
'static {
) -> Result<Box<dyn AsyncFileReader + Send>>;
}
-/// Default parquet reader factory.
+/// Default implementation of [`ParquetFileReaderFactory`]
+///
+/// This implementation:
+/// 1. Reads parquet directly from an underlying [`ObjectStore`] instance.
+/// 2. Reads the footer and page metadata on demand.
+/// 3. Does not cache metadata or coalesce I/O operations.
#[derive(Debug)]
pub struct DefaultParquetFileReaderFactory {
store: Arc<dyn ObjectStore>,
}
impl DefaultParquetFileReaderFactory {
- /// Create a factory.
+ /// Create a new `DefaultParquetFileReaderFactory`.
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self { store }
}
}
-/// Implements [`AsyncFileReader`] for a parquet file in object storage
+/// Implements [`AsyncFileReader`] for a parquet file in object storage.
+///
+/// This implementation uses the [`ParquetObjectReader`] to read data from the
+/// object store on demand, as required, tracking the number of bytes read.
+///
+/// This implementation does not coalesce I/O operations or cache bytes. Such
+/// optimizations can be done either at the object store level or by providing
a
+/// custom implementation of [`ParquetFileReaderFactory`].
pub(crate) struct ParquetFileReader {
file_metrics: ParquetFileMetrics,
inner: ParquetObjectReader,
diff --git a/datafusion/core/src/datasource/schema_adapter.rs
b/datafusion/core/src/datasource/schema_adapter.rs
index 36d33379b8..1838a3354b 100644
--- a/datafusion/core/src/datasource/schema_adapter.rs
+++ b/datafusion/core/src/datasource/schema_adapter.rs
@@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-//! Schema Adapter provides a method of translating the RecordBatches that
come out of the
+//! [`SchemaAdapter`] and [`SchemaAdapterFactory`] to adapt file-level record
batches to a table schema.
+//!
+//! Adapter provides a method of translating the RecordBatches that come out
of the
//! physical format into how they should be used by DataFusion. For instance,
a schema
//! can be stored external to a parquet file that maps parquet logical types
to arrow types.
@@ -26,27 +28,29 @@ use datafusion_common::plan_err;
use std::fmt::Debug;
use std::sync::Arc;
-/// Factory of schema adapters.
+/// Factory for creating [`SchemaAdapter`]
///
-/// Provides means to implement custom schema adaptation.
+/// This interface provides a way to implement custom schema adaptation logic
+/// for ParquetExec (for example, to fill missing columns with default value
+/// other than null)
pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
/// Provides `SchemaAdapter`.
fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter>;
}
-/// A utility which can adapt file-level record batches to a table schema
which may have a schema
+/// Adapt file-level [`RecordBatch`]es to a table schema, which may have a
schema
/// obtained from merging multiple file-level schemas.
///
/// This is useful for enabling schema evolution in partitioned datasets.
///
/// This has to be done in two stages.
///
-/// 1. Before reading the file, we have to map projected column indexes from
the table schema to
-/// the file schema.
+/// 1. Before reading the file, we have to map projected column indexes from
the
+/// table schema to the file schema.
///
-/// 2. After reading a record batch we need to map the read columns back to
the expected columns
-/// indexes and insert null-valued columns wherever the file schema was
missing a colum present
-/// in the table schema.
+/// 2. After reading a record batch map the read columns back to the expected
+/// columns indexes and insert null-valued columns wherever the file schema
was
+/// missing a column present in the table schema.
pub trait SchemaAdapter: Send + Sync {
/// Map a column index in the table schema to a column index in a
particular
/// file schema
@@ -54,7 +58,8 @@ pub trait SchemaAdapter: Send + Sync {
/// Panics if index is not in range for the table schema
fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option<usize>;
- /// Creates a `SchemaMapping` that can be used to cast or map the columns
from the file schema to the table schema.
+ /// Creates a `SchemaMapping` that can be used to cast or map the columns
+ /// from the file schema to the table schema.
///
/// If the provided `file_schema` contains columns of a different type to
the expected
/// `table_schema`, the method will attempt to cast the array data from
the file schema
@@ -68,7 +73,8 @@ pub trait SchemaAdapter: Send + Sync {
) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
}
-/// Transforms a RecordBatch from the physical layer to a RecordBatch that
meets the table schema.
+/// Creates a `SchemaMapping` that can be used to cast or map the columns
+/// from the file schema to the table schema.
pub trait SchemaMapper: Send + Sync {
/// Adapts a `RecordBatch` to match the `table_schema` using the stored
mapping and conversions.
fn map_batch(&self, batch: RecordBatch) ->
datafusion_common::Result<RecordBatch>;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]