This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7f0e194e2f Improve `ParquetExec` and related documentation (#10647)
7f0e194e2f is described below

commit 7f0e194e2f009e05d5b0599032d01d8c284e0baf
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon May 27 20:14:52 2024 -0400

    Improve `ParquetExec` and related documentation (#10647)
    
    * Improve ParquetExec and related documentation
    
    * Improve documentation
    
    * Update 
datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs
    
    Co-authored-by: Oleks V <[email protected]>
    
    * fix link
    
    ---------
    
    Co-authored-by: Oleks V <[email protected]>
---
 .../src/datasource/physical_plan/parquet/mod.rs    | 127 ++++++++++++++++++---
 datafusion/core/src/datasource/schema_adapter.rs   |  28 +++--
 2 files changed, 131 insertions(+), 24 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 16da9c2d77..6655125ea8 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -76,7 +76,84 @@ use crate::datasource::schema_adapter::{
 pub use metrics::ParquetFileMetrics;
 pub use statistics::{RequestedStatistics, StatisticsConverter};
 
-/// Execution plan for scanning one or more Parquet partitions
+/// Execution plan for reading one or more Parquet files.
+///
+/// ```text
+///             ▲
+///             │
+///             │  Produce a stream of
+///             │  RecordBatches
+///             │
+/// ┌───────────────────────┐
+/// │                       │
+/// │      ParquetExec      │
+/// │                       │
+/// └───────────────────────┘
+///             ▲
+///             │  Asynchronously read from one
+///             │  or more parquet files via
+///             │  ObjectStore interface
+///             │
+///             │
+///   .───────────────────.
+///  │                     )
+///  │`───────────────────'│
+///  │    ObjectStore      │
+///  │.───────────────────.│
+///  │                     )
+///   `───────────────────'
+///
+/// ```
+/// # Features
+///
+/// Supports the following optimizations:
+///
+/// * Concurrent reads: Can read from one or more files in parallel as multiple
+/// partitions, including concurrently reading multiple row groups from a 
single
+/// file.
+///
+/// * Predicate push down: skips row groups and pages based on
+/// min/max/null_counts in the row group metadata, the page index and bloom
+/// filters.
+///
+/// * Projection pushdown: reads and decodes only the columns required.
+///
+/// * Limit pushdown: stop execution early after some number of rows are read.
+///
+/// * Custom readers: customize reading  parquet files, e.g. to cache metadata,
+/// coalesce I/O operations, etc. See [`ParquetFileReaderFactory`] for more
+/// details.
+///
+/// * Schema adapters: read parquet files with different schemas into a unified
+/// table schema. This can be used to implement "schema evolution". See
+/// [`SchemaAdapterFactory`] for more details.
+///
+/// * metadata_size_hint: controls the number of bytes read from the end of the
+/// file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a
+/// custom reader is used, it supplies the metadata directly and this parameter
+/// is ignored. See [`Self::with_parquet_file_reader_factory`] for more 
details.
+///
+/// # Execution Overview
+///
+/// * Step 1: [`ParquetExec::execute`] is called, returning a [`FileStream`]
+/// configured to open parquet files with a [`ParquetOpener`].
+///
+/// * Step 2: When the stream is polled, the [`ParquetOpener`] is called to 
open
+/// the file.
+///
+/// * Step 3: The `ParquetOpener` gets the file metadata via
+/// [`ParquetFileReaderFactory`] and applies any predicates
+/// and projections to determine what pages must be read.
+///
+/// * Step 4: The stream begins reading data, fetching the required pages
+/// and incrementally decoding them.
+///
+/// * Step 5: As each [`RecordBatch]` is read, it may be adapted by a
+/// [`SchemaAdapter`] to match the table schema. By default missing columns are
+/// filled with nulls, but this can be customized via [`SchemaAdapterFactory`].
+///
+/// [`RecordBatch`]: arrow::record_batch::RecordBatch
+/// [`SchemaAdapter`]: crate::datasource::schema_adapter::SchemaAdapter
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
     /// Base configuration for this scan
@@ -86,9 +163,9 @@ pub struct ParquetExec {
     metrics: ExecutionPlanMetricsSet,
     /// Optional predicate for row filtering during parquet scan
     predicate: Option<Arc<dyn PhysicalExpr>>,
-    /// Optional predicate for pruning row groups
+    /// Optional predicate for pruning row groups (derived from `predicate`)
     pruning_predicate: Option<Arc<PruningPredicate>>,
-    /// Optional predicate for pruning pages
+    /// Optional predicate for pruning pages (derived from `predicate`)
     page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
     /// Optional hint for the size of the parquet metadata
     metadata_size_hint: Option<usize>,
@@ -190,11 +267,13 @@ impl ParquetExec {
 
     /// Optional user defined parquet file reader factory.
     ///
-    /// `ParquetFileReaderFactory` complements `TableProvider`, It enables 
users to provide custom
-    /// implementation for data access operations.
+    /// You can use [`ParquetFileReaderFactory`] to more precisely control how
+    /// data is read from parquet files (e.g. skip re-reading metadata, 
coalesce
+    /// I/O operations, etc).
     ///
-    /// If custom `ParquetFileReaderFactory` is provided, then data access 
operations will be routed
-    /// to this factory instead of `ObjectStore`.
+    /// The default reader factory reads directly from an [`ObjectStore`]
+    /// instance using individual I/O operations for the footer and then  for
+    /// each page.
     pub fn with_parquet_file_reader_factory(
         mut self,
         parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
@@ -643,11 +722,21 @@ fn should_enable_page_index(
             .unwrap_or(false)
 }
 
-/// Factory of parquet file readers.
+/// Interface for reading parquet files.
 ///
-/// Provides means to implement custom data access interface.
+/// The combined implementations of [`ParquetFileReaderFactory`] and
+/// [`AsyncFileReader`] can be used to provide custom data access operations
+/// such as pre-cached data, I/O coalescing, etc.
+///
+/// See [`DefaultParquetFileReaderFactory`] for a simple implementation.
 pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
-    /// Provides `AsyncFileReader` over parquet file specified in `FileMeta`
+    /// Provides an `AsyncFileReader` for reading data from a parquet file 
specified
+    ///
+    /// # Arguments
+    /// * partition_index - Index of the partition (for reporting metrics)
+    /// * file_meta - The file to be read
+    /// * metadata_size_hint - If specified, the first IO reads this many 
bytes from the footer
+    /// * metrics - Execution metrics
     fn create_reader(
         &self,
         partition_index: usize,
@@ -657,20 +746,32 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 
'static {
     ) -> Result<Box<dyn AsyncFileReader + Send>>;
 }
 
-/// Default parquet reader factory.
+/// Default implementation of [`ParquetFileReaderFactory`]
+///
+/// This implementation:
+/// 1. Reads parquet directly from an underlying [`ObjectStore`] instance.
+/// 2. Reads the footer and page metadata on demand.
+/// 3. Does not cache metadata or coalesce I/O operations.
 #[derive(Debug)]
 pub struct DefaultParquetFileReaderFactory {
     store: Arc<dyn ObjectStore>,
 }
 
 impl DefaultParquetFileReaderFactory {
-    /// Create a factory.
+    /// Create a new `DefaultParquetFileReaderFactory`.
     pub fn new(store: Arc<dyn ObjectStore>) -> Self {
         Self { store }
     }
 }
 
-/// Implements [`AsyncFileReader`] for a parquet file in object storage
+/// Implements [`AsyncFileReader`] for a parquet file in object storage.
+///
+/// This implementation uses the [`ParquetObjectReader`] to read data from the
+/// object store on demand, as required, tracking the number of bytes read.
+///
+/// This implementation does not coalesce I/O operations or cache bytes. Such
+/// optimizations can be done either at the object store level or by providing 
a
+/// custom implementation of [`ParquetFileReaderFactory`].
 pub(crate) struct ParquetFileReader {
     file_metrics: ParquetFileMetrics,
     inner: ParquetObjectReader,
diff --git a/datafusion/core/src/datasource/schema_adapter.rs 
b/datafusion/core/src/datasource/schema_adapter.rs
index 36d33379b8..1838a3354b 100644
--- a/datafusion/core/src/datasource/schema_adapter.rs
+++ b/datafusion/core/src/datasource/schema_adapter.rs
@@ -15,7 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Schema Adapter provides a method of translating the RecordBatches that 
come out of the
+//! [`SchemaAdapter`] and [`SchemaAdapterFactory`] to adapt file-level record 
batches to a table schema.
+//!
+//! Adapter provides a method of translating the RecordBatches that come out 
of the
 //! physical format into how they should be used by DataFusion.  For instance, 
a schema
 //! can be stored external to a parquet file that maps parquet logical types 
to arrow types.
 
@@ -26,27 +28,29 @@ use datafusion_common::plan_err;
 use std::fmt::Debug;
 use std::sync::Arc;
 
-/// Factory of schema adapters.
+/// Factory for creating [`SchemaAdapter`]
 ///
-/// Provides means to implement custom schema adaptation.
+/// This interface provides a way to implement custom schema adaptation logic
+/// for ParquetExec (for example, to fill missing columns with default value
+/// other than null)
 pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
     /// Provides `SchemaAdapter`.
     fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter>;
 }
 
-/// A utility which can adapt file-level record batches to a table schema 
which may have a schema
+/// Adapt file-level [`RecordBatch`]es to a table schema, which may have a 
schema
 /// obtained from merging multiple file-level schemas.
 ///
 /// This is useful for enabling schema evolution in partitioned datasets.
 ///
 /// This has to be done in two stages.
 ///
-/// 1. Before reading the file, we have to map projected column indexes from 
the table schema to
-///    the file schema.
+/// 1. Before reading the file, we have to map projected column indexes from 
the
+///    table schema to the file schema.
 ///
-/// 2. After reading a record batch we need to map the read columns back to 
the expected columns
-///    indexes and insert null-valued columns wherever the file schema was 
missing a colum present
-///    in the table schema.
+/// 2. After reading a record batch map the read columns back to the expected
+///    columns indexes and insert null-valued columns wherever the file schema 
was
+///    missing a column present in the table schema.
 pub trait SchemaAdapter: Send + Sync {
     /// Map a column index in the table schema to a column index in a 
particular
     /// file schema
@@ -54,7 +58,8 @@ pub trait SchemaAdapter: Send + Sync {
     /// Panics if index is not in range for the table schema
     fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize>;
 
-    /// Creates a `SchemaMapping` that can be used to cast or map the columns 
from the file schema to the table schema.
+    /// Creates a `SchemaMapping` that can be used to cast or map the columns
+    /// from the file schema to the table schema.
     ///
     /// If the provided `file_schema` contains columns of a different type to 
the expected
     /// `table_schema`, the method will attempt to cast the array data from 
the file schema
@@ -68,7 +73,8 @@ pub trait SchemaAdapter: Send + Sync {
     ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
 }
 
-/// Transforms a RecordBatch from the physical layer to a RecordBatch that 
meets the table schema.
+/// Creates a `SchemaMapping` that can be used to cast or map the columns
+/// from the file schema to the table schema.
 pub trait SchemaMapper: Send + Sync {
     /// Adapts a `RecordBatch` to match the `table_schema` using the stored 
mapping and conversions.
     fn map_batch(&self, batch: RecordBatch) -> 
datafusion_common::Result<RecordBatch>;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to