This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 77352b2411 Add `ParquetExec::builder()`, deprecate `ParquetExec::new` 
(#10636)
77352b2411 is described below

commit 77352b2411b5d9340374c30e21b861b0d0d46f82
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed May 29 16:46:18 2024 -0400

    Add `ParquetExec::builder()`, deprecate `ParquetExec::new` (#10636)
    
    * Add `ParquetExec::builder()`, deprecate `ParquetExec::new`
    
    * Add a #[must_use]
---
 .../core/src/datasource/file_format/parquet.rs     |  22 +-
 .../src/datasource/physical_plan/parquet/mod.rs    | 239 +++++++++++++++++----
 datafusion/core/src/datasource/schema_adapter.rs   |   6 +-
 .../combine_partial_final_agg.rs                   |   8 +-
 .../src/physical_optimizer/enforce_distribution.rs |  16 +-
 .../core/src/physical_optimizer/test_utils.rs      |  16 +-
 datafusion/core/src/test_util/parquet.rs           |  22 +-
 datafusion/core/tests/parquet/custom_reader.rs     |   6 +-
 datafusion/core/tests/parquet/page_pruning.rs      |  11 +-
 datafusion/core/tests/parquet/schema_coercion.rs   |  16 +-
 datafusion/proto/src/physical_plan/mod.rs          |  11 +-
 .../proto/tests/cases/roundtrip_physical_plan.rs   |  18 +-
 datafusion/substrait/src/physical_plan/consumer.rs |   8 +-
 .../tests/cases/roundtrip_physical_plan.rs         |   8 +-
 14 files changed, 265 insertions(+), 142 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index e102cfc372..39e6900ed5 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -30,7 +30,7 @@ use crate::arrow::array::{
 };
 use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef};
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
-use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig, 
ParquetExec};
+use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig};
 use crate::datasource::schema_adapter::{
     DefaultSchemaAdapterFactory, SchemaAdapterFactory,
 };
@@ -75,6 +75,7 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
 use tokio::sync::mpsc::{self, Receiver, Sender};
 use tokio::task::JoinSet;
 
+use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
 use futures::{StreamExt, TryStreamExt};
 use hashbrown::HashMap;
 use object_store::path::Path;
@@ -253,17 +254,22 @@ impl FileFormat for ParquetFormat {
         conf: FileScanConfig,
         filters: Option<&Arc<dyn PhysicalExpr>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
+        let mut builder =
+            ParquetExecBuilder::new_with_options(conf, self.options.clone());
+
         // If enable pruning then combine the filters to build the predicate.
         // If disable pruning then set the predicate to None, thus readers
         // will not prune data based on the statistics.
-        let predicate = self.enable_pruning().then(|| 
filters.cloned()).flatten();
+        if self.enable_pruning() {
+            if let Some(predicate) = filters.cloned() {
+                builder = builder.with_predicate(predicate);
+            }
+        }
+        if let Some(metadata_size_hint) = self.metadata_size_hint() {
+            builder = builder.with_metadata_size_hint(metadata_size_hint);
+        }
 
-        Ok(Arc::new(ParquetExec::new(
-            conf,
-            predicate,
-            self.metadata_size_hint(),
-            self.options.clone(),
-        )))
+        Ok(builder.build_arc())
     }
 
     async fn create_writer_physical_plan(
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 6655125ea8..ac7c39bbdb 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -104,6 +104,27 @@ pub use statistics::{RequestedStatistics, 
StatisticsConverter};
 ///   `───────────────────'
 ///
 /// ```
+///
+/// # Example: Create a `ParquetExec`
+/// ```
+/// # use std::sync::Arc;
+/// # use arrow::datatypes::Schema;
+/// # use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
+/// # use datafusion::datasource::listing::PartitionedFile;
+/// # let file_schema = Arc::new(Schema::empty());
+/// # let object_store_url = ObjectStoreUrl::local_filesystem();
+/// # use datafusion_execution::object_store::ObjectStoreUrl;
+/// # use datafusion_physical_expr::expressions::lit;
+/// # let predicate = lit(true);
+/// // Create a ParquetExec for reading `file1.parquet` with a file size of 
100MB
+/// let file_scan_config = FileScanConfig::new(object_store_url, file_schema)
+///    .with_file(PartitionedFile::new("file1.parquet", 100*1024*1024));
+/// let exec = ParquetExec::builder(file_scan_config)
+///   // Provide a predicate for filtering row groups/pages
+///   .with_predicate(predicate)
+///   .build();
+/// ```
+///
 /// # Features
 ///
 /// Supports the following optimizations:
@@ -131,7 +152,7 @@ pub use statistics::{RequestedStatistics, 
StatisticsConverter};
 /// * metadata_size_hint: controls the number of bytes read from the end of the
 /// file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a
 /// custom reader is used, it supplies the metadata directly and this parameter
-/// is ignored. See [`Self::with_parquet_file_reader_factory`] for more 
details.
+/// is ignored. [`ParquetExecBuilder::with_metadata_size_hint`] for more 
details.
 ///
 /// # Execution Overview
 ///
@@ -141,9 +162,9 @@ pub use statistics::{RequestedStatistics, 
StatisticsConverter};
 /// * Step 2: When the stream is polled, the [`ParquetOpener`] is called to 
open
 /// the file.
 ///
-/// * Step 3: The `ParquetOpener` gets the file metadata via
-/// [`ParquetFileReaderFactory`] and applies any predicates
-/// and projections to determine what pages must be read.
+/// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata)
+/// via [`ParquetFileReaderFactory`] and applies any predicates and projections
+/// to determine what pages must be read.
 ///
 /// * Step 4: The stream begins reading data, fetching the required pages
 /// and incrementally decoding them.
@@ -154,6 +175,7 @@ pub use statistics::{RequestedStatistics, 
StatisticsConverter};
 ///
 /// [`RecordBatch`]: arrow::record_batch::RecordBatch
 /// [`SchemaAdapter`]: crate::datasource::schema_adapter::SchemaAdapter
+/// [`ParquetMetadata`]: parquet::file::metadata::ParquetMetaData
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
     /// Base configuration for this scan
@@ -179,14 +201,125 @@ pub struct ParquetExec {
     schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
 }
 
-impl ParquetExec {
-    /// Create a new Parquet reader execution plan provided file list and 
schema.
-    pub fn new(
-        base_config: FileScanConfig,
-        predicate: Option<Arc<dyn PhysicalExpr>>,
-        metadata_size_hint: Option<usize>,
+/// [`ParquetExecBuilder`]`, builder for [`ParquetExec`].
+///
+/// See example on [`ParquetExec`].
+pub struct ParquetExecBuilder {
+    file_scan_config: FileScanConfig,
+    predicate: Option<Arc<dyn PhysicalExpr>>,
+    metadata_size_hint: Option<usize>,
+    table_parquet_options: TableParquetOptions,
+    parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
+    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
+}
+
+impl ParquetExecBuilder {
+    /// Create a new builder to read the provided file scan configuration
+    pub fn new(file_scan_config: FileScanConfig) -> Self {
+        Self::new_with_options(file_scan_config, 
TableParquetOptions::default())
+    }
+
+    /// Create a new builder to read the data specified in the file scan
+    /// configuration with the provided `TableParquetOptions`.
+    pub fn new_with_options(
+        file_scan_config: FileScanConfig,
         table_parquet_options: TableParquetOptions,
     ) -> Self {
+        Self {
+            file_scan_config,
+            predicate: None,
+            metadata_size_hint: None,
+            table_parquet_options,
+            parquet_file_reader_factory: None,
+            schema_adapter_factory: None,
+        }
+    }
+
+    /// Set the predicate for the scan.
+    ///
+    /// The ParquetExec uses this predicate to filter row groups and data pages
+    /// using the Parquet statistics and bloom filters.
+    ///
+    /// If the predicate can not be used to prune the scan, it is ignored (no
+    /// error is raised).
+    pub fn with_predicate(mut self, predicate: Arc<dyn PhysicalExpr>) -> Self {
+        self.predicate = Some(predicate);
+        self
+    }
+
+    /// Set the metadata size hint
+    ///
+    /// This value determines how many bytes at the end of the file the default
+    /// [`ParquetFileReaderFactory`] will request in the initial IO. If this is
+    /// too small, the ParquetExec will need to make additional IO requests to
+    /// read the footer.
+    pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> 
Self {
+        self.metadata_size_hint = Some(metadata_size_hint);
+        self
+    }
+
+    /// Set the table parquet options that control how the ParquetExec reads.
+    ///
+    /// See also [`Self::new_with_options`]
+    pub fn with_table_parquet_options(
+        mut self,
+        table_parquet_options: TableParquetOptions,
+    ) -> Self {
+        self.table_parquet_options = table_parquet_options;
+        self
+    }
+
+    /// Set optional user defined parquet file reader factory.
+    ///
+    /// You can use [`ParquetFileReaderFactory`] to more precisely control how
+    /// data is read from parquet files (e.g. skip re-reading metadata, 
coalesce
+    /// I/O operations, etc).
+    ///
+    /// The default reader factory reads directly from an [`ObjectStore`]
+    /// instance using individual I/O operations for the footer and each page.
+    ///
+    /// If a custom `ParquetFileReaderFactory` is provided, then data access
+    /// operations will be routed to this factory instead of `ObjectStore`.
+    pub fn with_parquet_file_reader_factory(
+        mut self,
+        parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
+    ) -> Self {
+        self.parquet_file_reader_factory = Some(parquet_file_reader_factory);
+        self
+    }
+
+    /// Set optional schema adapter factory.
+    ///
+    /// [`SchemaAdapterFactory`] allows user to specify how fields from the
+    /// parquet file get mapped to that of the table schema.  The default 
schema
+    /// adapter uses arrow's cast library to map the parquet fields to the 
table
+    /// schema.
+    pub fn with_schema_adapter_factory(
+        mut self,
+        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+    ) -> Self {
+        self.schema_adapter_factory = Some(schema_adapter_factory);
+        self
+    }
+
+    /// Convenience: build an `Arc`d `ParquetExec` from this builder
+    pub fn build_arc(self) -> Arc<ParquetExec> {
+        Arc::new(self.build())
+    }
+
+    /// Build a [`ParquetExec`]
+    #[must_use]
+    pub fn build(self) -> ParquetExec {
+        let Self {
+            file_scan_config,
+            predicate,
+            metadata_size_hint,
+            table_parquet_options,
+            parquet_file_reader_factory,
+            schema_adapter_factory,
+        } = self;
+
+        let base_config = file_scan_config;
         debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: 
{:?}, limit: {:?}",
         base_config.file_groups, base_config.projection, predicate, 
base_config.limit);
 
@@ -225,12 +358,12 @@ impl ParquetExec {
 
         let (projected_schema, projected_statistics, 
projected_output_ordering) =
             base_config.project();
-        let cache = Self::compute_properties(
+        let cache = ParquetExec::compute_properties(
             projected_schema,
             &projected_output_ordering,
             &base_config,
         );
-        Self {
+        ParquetExec {
             base_config,
             projected_statistics,
             metrics,
@@ -238,12 +371,44 @@ impl ParquetExec {
             pruning_predicate,
             page_pruning_predicate,
             metadata_size_hint,
-            parquet_file_reader_factory: None,
+            parquet_file_reader_factory,
             cache,
             table_parquet_options,
-            schema_adapter_factory: None,
+            schema_adapter_factory,
         }
     }
+}
+
+impl ParquetExec {
+    /// Create a new Parquet reader execution plan provided file list and 
schema.
+    #[deprecated(
+        since = "39.0.0",
+        note = "use `ParquetExec::builder` or `ParquetExecBuilder`"
+    )]
+    pub fn new(
+        base_config: FileScanConfig,
+        predicate: Option<Arc<dyn PhysicalExpr>>,
+        metadata_size_hint: Option<usize>,
+        table_parquet_options: TableParquetOptions,
+    ) -> Self {
+        let mut builder =
+            ParquetExecBuilder::new_with_options(base_config, 
table_parquet_options);
+        if let Some(predicate) = predicate {
+            builder = builder.with_predicate(predicate);
+        }
+        if let Some(metadata_size_hint) = metadata_size_hint {
+            builder = builder.with_metadata_size_hint(metadata_size_hint);
+        }
+        builder.build()
+    }
+
+    /// Return a [`ParquetExecBuilder`].
+    ///
+    /// See example on [`ParquetExec`] and [`ParquetExecBuilder`] for 
specifying
+    /// parquet table options.
+    pub fn builder(file_scan_config: FileScanConfig) -> ParquetExecBuilder {
+        ParquetExecBuilder::new(file_scan_config)
+    }
 
     /// [`FileScanConfig`] that controls this scan (such as which files to 
read)
     pub fn base_config(&self) -> &FileScanConfig {
@@ -267,13 +432,7 @@ impl ParquetExec {
 
     /// Optional user defined parquet file reader factory.
     ///
-    /// You can use [`ParquetFileReaderFactory`] to more precisely control how
-    /// data is read from parquet files (e.g. skip re-reading metadata, 
coalesce
-    /// I/O operations, etc).
-    ///
-    /// The default reader factory reads directly from an [`ObjectStore`]
-    /// instance using individual I/O operations for the footer and then  for
-    /// each page.
+    /// See documentation on 
[`ParquetExecBuilder::with_parquet_file_reader_factory`]
     pub fn with_parquet_file_reader_factory(
         mut self,
         parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
@@ -284,9 +443,7 @@ impl ParquetExec {
 
     /// Optional schema adapter factory.
     ///
-    /// `SchemaAdapterFactory` allows user to specify how fields from the 
parquet file get mapped to
-    ///  that of the table schema.  The default schema adapter uses arrow's 
cast library to map
-    ///  the parquet fields to the table schema.
+    /// See documentation on 
[`ParquetExecBuilder::with_schema_adapter_factory`]
     pub fn with_schema_adapter_factory(
         mut self,
         schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
@@ -1033,15 +1190,17 @@ mod tests {
             let predicate = predicate.map(|p| logical2physical(&p, 
&file_schema));
 
             // prepare the scan
-            let mut parquet_exec = ParquetExec::new(
+            let mut builder = ParquetExec::builder(
                 FileScanConfig::new(ObjectStoreUrl::local_filesystem(), 
file_schema)
                     .with_file_group(file_group)
                     .with_projection(projection),
-                predicate,
-                None,
-                Default::default(),
             );
 
+            if let Some(predicate) = predicate {
+                builder = builder.with_predicate(predicate);
+            }
+            let mut parquet_exec = builder.build();
+
             if pushdown_predicate {
                 parquet_exec = parquet_exec
                     .with_pushdown_filters(true)
@@ -1684,13 +1843,11 @@ mod tests {
             expected_row_num: Option<usize>,
             file_schema: SchemaRef,
         ) -> Result<()> {
-            let parquet_exec = ParquetExec::new(
+            let parquet_exec = ParquetExec::builder(
                 FileScanConfig::new(ObjectStoreUrl::local_filesystem(), 
file_schema)
                     .with_file_groups(file_groups),
-                None,
-                None,
-                Default::default(),
-            );
+            )
+            .build();
             assert_eq!(
                 parquet_exec
                     .properties()
@@ -1786,7 +1943,7 @@ mod tests {
             ),
         ]);
 
-        let parquet_exec = ParquetExec::new(
+        let parquet_exec = ParquetExec::builder(
             FileScanConfig::new(object_store_url, schema.clone())
                 .with_file(partitioned_file)
                 // file has 10 cols so index 12 should be month and 13 should 
be day
@@ -1803,10 +1960,8 @@ mod tests {
                         false,
                     ),
                 ]),
-            None,
-            None,
-            Default::default(),
-        );
+        )
+        .build();
         assert_eq!(
             parquet_exec.cache.output_partitioning().partition_count(),
             1
@@ -1861,13 +2016,11 @@ mod tests {
         };
 
         let file_schema = Arc::new(Schema::empty());
-        let parquet_exec = ParquetExec::new(
+        let parquet_exec = ParquetExec::builder(
             FileScanConfig::new(ObjectStoreUrl::local_filesystem(), 
file_schema)
                 .with_file(partitioned_file),
-            None,
-            None,
-            Default::default(),
-        );
+        )
+        .build();
 
         let mut results = parquet_exec.execute(0, state.task_ctx())?;
         let batch = results.next().await.unwrap();
diff --git a/datafusion/core/src/datasource/schema_adapter.rs 
b/datafusion/core/src/datasource/schema_adapter.rs
index 1838a3354b..77fde608fd 100644
--- a/datafusion/core/src/datasource/schema_adapter.rs
+++ b/datafusion/core/src/datasource/schema_adapter.rs
@@ -258,13 +258,11 @@ mod tests {
         let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
 
         // prepare the scan
-        let parquet_exec = ParquetExec::new(
+        let parquet_exec = ParquetExec::builder(
             FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema)
                 .with_file(partitioned_file),
-            None,
-            None,
-            Default::default(),
         )
+        .build()
         .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}));
 
         let session_ctx = SessionContext::new();
diff --git 
a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs 
b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index b93f4012b0..909c8acdb8 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -245,16 +245,14 @@ mod tests {
     }
 
     fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
-        Arc::new(ParquetExec::new(
+        ParquetExec::builder(
             FileScanConfig::new(
                 ObjectStoreUrl::parse("test:///").unwrap(),
                 schema.clone(),
             )
             .with_file(PartitionedFile::new("x".to_string(), 100)),
-            None,
-            None,
-            Default::default(),
-        ))
+        )
+        .build_arc()
     }
 
     fn partial_aggregate_exec(
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 9eb5aafd81..88fa3a978a 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1431,14 +1431,12 @@ pub(crate) mod tests {
     pub(crate) fn parquet_exec_with_sort(
         output_ordering: Vec<Vec<PhysicalSortExpr>>,
     ) -> Arc<ParquetExec> {
-        Arc::new(ParquetExec::new(
+        ParquetExec::builder(
             FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), 
schema())
                 .with_file(PartitionedFile::new("x".to_string(), 100))
                 .with_output_ordering(output_ordering),
-            None,
-            None,
-            Default::default(),
-        ))
+        )
+        .build_arc()
     }
 
     fn parquet_exec_multiple() -> Arc<ParquetExec> {
@@ -1449,17 +1447,15 @@ pub(crate) mod tests {
     fn parquet_exec_multiple_sorted(
         output_ordering: Vec<Vec<PhysicalSortExpr>>,
     ) -> Arc<ParquetExec> {
-        Arc::new(ParquetExec::new(
+        ParquetExec::builder(
             FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), 
schema())
                 .with_file_groups(vec![
                     vec![PartitionedFile::new("x".to_string(), 100)],
                     vec![PartitionedFile::new("y".to_string(), 100)],
                 ])
                 .with_output_ordering(output_ordering),
-            None,
-            None,
-            Default::default(),
-        ))
+        )
+        .build_arc()
     }
 
     fn csv_exec() -> Arc<CsvExec> {
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs 
b/datafusion/core/src/physical_optimizer/test_utils.rs
index 4d926847e4..cfd0312f81 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -274,13 +274,11 @@ pub fn sort_preserving_merge_exec(
 
 /// Create a non sorted parquet exec
 pub fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
-    Arc::new(ParquetExec::new(
+    ParquetExec::builder(
         FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), 
schema.clone())
             .with_file(PartitionedFile::new("x".to_string(), 100)),
-        None,
-        None,
-        Default::default(),
-    ))
+    )
+    .build_arc()
 }
 
 // Created a sorted parquet exec
@@ -290,14 +288,12 @@ pub fn parquet_exec_sorted(
 ) -> Arc<dyn ExecutionPlan> {
     let sort_exprs = sort_exprs.into_iter().collect();
 
-    Arc::new(ParquetExec::new(
+    ParquetExec::builder(
         FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), 
schema.clone())
             .with_file(PartitionedFile::new("x".to_string(), 100))
             .with_output_ordering(vec![sort_exprs]),
-        None,
-        None,
-        Default::default(),
-    ))
+    )
+    .build_arc()
 }
 
 pub fn union_exec(input: Vec<Arc<dyn ExecutionPlan>>) -> Arc<dyn 
ExecutionPlan> {
diff --git a/datafusion/core/src/test_util/parquet.rs 
b/datafusion/core/src/test_util/parquet.rs
index ed539d29bd..9f06ad9308 100644
--- a/datafusion/core/src/test_util/parquet.rs
+++ b/datafusion/core/src/test_util/parquet.rs
@@ -37,6 +37,7 @@ use crate::physical_plan::metrics::MetricsSet;
 use crate::physical_plan::ExecutionPlan;
 use crate::prelude::{Expr, SessionConfig, SessionContext};
 
+use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
 use object_store::path::Path;
 use object_store::ObjectMeta;
 use parquet::arrow::ArrowWriter;
@@ -163,22 +164,19 @@ impl TestParquetFile {
             let filter = simplifier.coerce(filter, &df_schema).unwrap();
             let physical_filter_expr =
                 create_physical_expr(&filter, &df_schema, 
&ExecutionProps::default())?;
-            let parquet_exec = Arc::new(ParquetExec::new(
-                scan_config,
-                Some(physical_filter_expr.clone()),
-                None,
-                parquet_options,
-            ));
+
+            let parquet_exec =
+                ParquetExecBuilder::new_with_options(scan_config, 
parquet_options)
+                    .with_predicate(physical_filter_expr.clone())
+                    .build_arc();
 
             let exec = Arc::new(FilterExec::try_new(physical_filter_expr, 
parquet_exec)?);
             Ok(exec)
         } else {
-            Ok(Arc::new(ParquetExec::new(
-                scan_config,
-                None,
-                None,
-                parquet_options,
-            )))
+            Ok(
+                ParquetExecBuilder::new_with_options(scan_config, 
parquet_options)
+                    .build_arc(),
+            )
         }
     }
 
diff --git a/datafusion/core/tests/parquet/custom_reader.rs 
b/datafusion/core/tests/parquet/custom_reader.rs
index 4f50c55c62..0e515fd464 100644
--- a/datafusion/core/tests/parquet/custom_reader.rs
+++ b/datafusion/core/tests/parquet/custom_reader.rs
@@ -75,17 +75,15 @@ async fn 
route_data_access_ops_to_parquet_file_reader_factory() {
         .collect();
 
     // prepare the scan
-    let parquet_exec = ParquetExec::new(
+    let parquet_exec = ParquetExec::builder(
         FileScanConfig::new(
             // just any url that doesn't point to in memory object store
             ObjectStoreUrl::local_filesystem(),
             file_schema,
         )
         .with_file_group(file_group),
-        None,
-        None,
-        Default::default(),
     )
+    .build()
     
.with_parquet_file_reader_factory(Arc::new(InMemoryParquetFileReaderFactory(
         Arc::clone(&in_memory_object_store),
     )));
diff --git a/datafusion/core/tests/parquet/page_pruning.rs 
b/datafusion/core/tests/parquet/page_pruning.rs
index 2e9cda40c3..15efd4bcd9 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -70,13 +70,12 @@ async fn get_parquet_exec(state: &SessionState, filter: 
Expr) -> ParquetExec {
     let execution_props = ExecutionProps::new();
     let predicate = create_physical_expr(&filter, &df_schema, 
&execution_props).unwrap();
 
-    let parquet_exec = ParquetExec::new(
+    ParquetExec::builder(
         FileScanConfig::new(object_store_url, 
schema).with_file(partitioned_file),
-        Some(predicate),
-        None,
-        Default::default(),
-    );
-    parquet_exec.with_enable_page_index(true)
+    )
+    .with_predicate(predicate)
+    .build()
+    .with_enable_page_index(true)
 }
 
 #[tokio::test]
diff --git a/datafusion/core/tests/parquet/schema_coercion.rs 
b/datafusion/core/tests/parquet/schema_coercion.rs
index ac51b4f712..af9411f40e 100644
--- a/datafusion/core/tests/parquet/schema_coercion.rs
+++ b/datafusion/core/tests/parquet/schema_coercion.rs
@@ -59,13 +59,11 @@ async fn multi_parquet_coercion() {
         Field::new("c2", DataType::Int32, true),
         Field::new("c3", DataType::Float64, true),
     ]));
-    let parquet_exec = ParquetExec::new(
+    let parquet_exec = ParquetExec::builder(
         FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
             .with_file_group(file_group),
-        None,
-        None,
-        Default::default(),
-    );
+    )
+    .build();
 
     let session_ctx = SessionContext::new();
     let task_ctx = session_ctx.task_ctx();
@@ -115,14 +113,12 @@ async fn multi_parquet_coercion_projection() {
         Field::new("c2", DataType::Int32, true),
         Field::new("c3", DataType::Float64, true),
     ]));
-    let parquet_exec = ParquetExec::new(
+    let parquet_exec = ParquetExec::builder(
         FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
             .with_file_group(file_group)
             .with_projection(Some(vec![1, 0, 2])),
-        None,
-        None,
-        Default::default(),
-    );
+    )
+    .build();
 
     let session_ctx = SessionContext::new();
     let task_ctx = session_ctx.task_ctx();
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index a9965e1c81..550176a42e 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -224,12 +224,11 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                         )
                     })
                     .transpose()?;
-                Ok(Arc::new(ParquetExec::new(
-                    base_config,
-                    predicate,
-                    None,
-                    Default::default(),
-                )))
+                let mut builder = ParquetExec::builder(base_config);
+                if let Some(predicate) = predicate {
+                    builder = builder.with_predicate(predicate)
+                }
+                Ok(builder.build_arc())
             }
             PhysicalPlanType::AvroScan(scan) => {
                 Ok(Arc::new(AvroExec::new(parse_protobuf_file_scan_config(
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 55b346a482..df1995f465 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -582,12 +582,11 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> 
Result<()> {
         Operator::Eq,
         lit("1"),
     ));
-    roundtrip_test(Arc::new(ParquetExec::new(
-        scan_config,
-        Some(predicate),
-        None,
-        Default::default(),
-    )))
+    roundtrip_test(
+        ParquetExec::builder(scan_config)
+            .with_predicate(predicate)
+            .build_arc(),
+    )
 }
 
 #[tokio::test]
@@ -613,12 +612,7 @@ async fn 
roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> {
         output_ordering: vec![],
     };
 
-    roundtrip_test(Arc::new(ParquetExec::new(
-        scan_config,
-        None,
-        None,
-        Default::default(),
-    )))
+    roundtrip_test(ParquetExec::builder(scan_config).build_arc())
 }
 
 #[test]
diff --git a/datafusion/substrait/src/physical_plan/consumer.rs 
b/datafusion/substrait/src/physical_plan/consumer.rs
index 68f8b02b0f..39b38c94ec 100644
--- a/datafusion/substrait/src/physical_plan/consumer.rs
+++ b/datafusion/substrait/src/physical_plan/consumer.rs
@@ -121,12 +121,8 @@ pub async fn from_substrait_rel(
                         }
                     }
 
-                    Ok(Arc::new(ParquetExec::new(
-                        base_config,
-                        None,
-                        None,
-                        Default::default(),
-                    )) as Arc<dyn ExecutionPlan>)
+                    Ok(ParquetExec::builder(base_config).build_arc()
+                        as Arc<dyn ExecutionPlan>)
                 }
                 _ => not_impl_err!(
                     "Only LocalFile reads are supported when parsing physical"
diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
index aca0443194..4014670a7c 100644
--- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
@@ -45,12 +45,8 @@ async fn parquet_exec() -> Result<()> {
             123,
         )],
     ]);
-    let parquet_exec: Arc<dyn ExecutionPlan> = Arc::new(ParquetExec::new(
-        scan_config,
-        None,
-        None,
-        Default::default(),
-    ));
+    let parquet_exec: Arc<dyn ExecutionPlan> =
+        ParquetExec::builder(scan_config).build_arc();
 
     let mut extension_info: (
         Vec<extensions::SimpleExtensionDeclaration>,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to