This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9df957b63e API  from `ParquetExec` to `ParquetExecBuilder` (#12799)
9df957b63e is described below

commit 9df957b63ea4c9fb787c6f9a3c15d1dcd394ac72
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Oct 8 16:58:08 2024 -0400

    API  from `ParquetExec` to `ParquetExecBuilder` (#12799)
    
    * API to go from `ParquetExec` to `ParquetExecBuilder`
    
    * fix potential regression
    
    * Apply suggestions from code review
    
    Co-authored-by: Nga Tran <[email protected]>
    
    * add note about fields being re-created
    
    ---------
    
    Co-authored-by: Nga Tran <[email protected]>
---
 .../src/datasource/physical_plan/parquet/mod.rs    | 92 +++++++++++++++++++++-
 1 file changed, 89 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 6afb66cc7c..743dd58969 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -166,6 +166,33 @@ pub use writer::plan_to_parquet;
 /// [`RowFilter`]: parquet::arrow::arrow_reader::RowFilter
 /// [Parquet PageIndex]: 
https://github.com/apache/parquet-format/blob/master/PageIndex.md
 ///
+/// # Example: rewriting `ParquetExec`
+///
+/// You can modify a `ParquetExec` using [`ParquetExecBuilder`], for example
+/// to change files or add a predicate.
+///
+/// ```no_run
+/// # use std::sync::Arc;
+/// # use arrow::datatypes::Schema;
+/// # use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
+/// # use datafusion::datasource::listing::PartitionedFile;
+/// # fn parquet_exec() -> ParquetExec { unimplemented!() }
+/// // Split a single ParquetExec into multiple ParquetExecs, one for each file
+/// let exec = parquet_exec();
+/// let existing_file_groups = &exec.base_config().file_groups;
+/// let new_execs = existing_file_groups
+///   .iter()
+///   .map(|file_group| {
+///     // create a new exec by copying the existing exec into a builder
+///     let new_exec = exec.clone()
+///        .into_builder()
+///        .with_file_groups(vec![file_group.clone()])
+///       .build();
+///     new_exec
+///   })
+///   .collect::<Vec<_>>();
+/// ```
+///
 /// # Implementing External Indexes
 ///
 /// It is possible to restrict the row groups and selections within those row
@@ -257,6 +284,12 @@ pub struct ParquetExec {
     schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
 }
 
+impl From<ParquetExec> for ParquetExecBuilder {
+    fn from(exec: ParquetExec) -> Self {
+        exec.into_builder()
+    }
+}
+
 /// [`ParquetExecBuilder`], builder for [`ParquetExec`].
 ///
 /// See example on [`ParquetExec`].
@@ -291,6 +324,12 @@ impl ParquetExecBuilder {
         }
     }
 
+    /// Update the list of files groups to read
+    pub fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) 
-> Self {
+        self.file_scan_config.file_groups = file_groups;
+        self
+    }
+
     /// Set the filter predicate when reading.
     ///
     /// See the "Predicate Pushdown" section of the [`ParquetExec`] 
documenation
@@ -459,6 +498,34 @@ impl ParquetExec {
         ParquetExecBuilder::new(file_scan_config)
     }
 
+    /// Convert this `ParquetExec` into a builder for modification
+    pub fn into_builder(self) -> ParquetExecBuilder {
+        // list out fields so it is clear what is being dropped
+        // (note the fields which are dropped are re-created as part of calling
+        // `build` on the builder)
+        let Self {
+            base_config,
+            projected_statistics: _,
+            metrics: _,
+            predicate,
+            pruning_predicate: _,
+            page_pruning_predicate: _,
+            metadata_size_hint,
+            parquet_file_reader_factory,
+            cache: _,
+            table_parquet_options,
+            schema_adapter_factory,
+        } = self;
+        ParquetExecBuilder {
+            file_scan_config: base_config,
+            predicate,
+            metadata_size_hint,
+            table_parquet_options,
+            parquet_file_reader_factory,
+            schema_adapter_factory,
+        }
+    }
+
     /// [`FileScanConfig`] that controls this scan (such as which files to 
read)
     pub fn base_config(&self) -> &FileScanConfig {
         &self.base_config
@@ -479,9 +546,15 @@ impl ParquetExec {
         self.pruning_predicate.as_ref()
     }
 
+    /// return the optional file reader factory
+    pub fn parquet_file_reader_factory(
+        &self,
+    ) -> Option<&Arc<dyn ParquetFileReaderFactory>> {
+        self.parquet_file_reader_factory.as_ref()
+    }
+
     /// Optional user defined parquet file reader factory.
     ///
-    /// See documentation on 
[`ParquetExecBuilder::with_parquet_file_reader_factory`]
     pub fn with_parquet_file_reader_factory(
         mut self,
         parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
@@ -490,6 +563,11 @@ impl ParquetExec {
         self
     }
 
+    /// return the optional schema adapter factory
+    pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn 
SchemaAdapterFactory>> {
+        self.schema_adapter_factory.as_ref()
+    }
+
     /// Optional schema adapter factory.
     ///
     /// See documentation on 
[`ParquetExecBuilder::with_schema_adapter_factory`]
@@ -586,7 +664,14 @@ impl ParquetExec {
         )
     }
 
-    fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) -> 
Self {
+    /// Updates the file groups to read and recalculates the output 
partitioning
+    ///
+    /// Note this function does not update statistics or other properties
+    /// that depend on the file groups.
+    fn with_file_groups_and_update_partitioning(
+        mut self,
+        file_groups: Vec<Vec<PartitionedFile>>,
+    ) -> Self {
         self.base_config.file_groups = file_groups;
         // Changing file groups may invalidate output partitioning. Update it 
also
         let output_partitioning = 
Self::output_partitioning_helper(&self.base_config);
@@ -679,7 +764,8 @@ impl ExecutionPlan for ParquetExec {
 
         let mut new_plan = self.clone();
         if let Some(repartitioned_file_groups) = 
repartitioned_file_groups_option {
-            new_plan = new_plan.with_file_groups(repartitioned_file_groups);
+            new_plan = new_plan
+                
.with_file_groups_and_update_partitioning(repartitioned_file_groups);
         }
         Ok(Some(Arc::new(new_plan)))
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to