This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 77352b2411 Add `ParquetExec::builder()`, deprecate `ParquetExec::new`
(#10636)
77352b2411 is described below
commit 77352b2411b5d9340374c30e21b861b0d0d46f82
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed May 29 16:46:18 2024 -0400
Add `ParquetExec::builder()`, deprecate `ParquetExec::new` (#10636)
* Add `ParquetExec::builder()`, deprecate `ParquetExec::new`
* Add a #[must_use]
---
.../core/src/datasource/file_format/parquet.rs | 22 +-
.../src/datasource/physical_plan/parquet/mod.rs | 239 +++++++++++++++++----
datafusion/core/src/datasource/schema_adapter.rs | 6 +-
.../combine_partial_final_agg.rs | 8 +-
.../src/physical_optimizer/enforce_distribution.rs | 16 +-
.../core/src/physical_optimizer/test_utils.rs | 16 +-
datafusion/core/src/test_util/parquet.rs | 22 +-
datafusion/core/tests/parquet/custom_reader.rs | 6 +-
datafusion/core/tests/parquet/page_pruning.rs | 11 +-
datafusion/core/tests/parquet/schema_coercion.rs | 16 +-
datafusion/proto/src/physical_plan/mod.rs | 11 +-
.../proto/tests/cases/roundtrip_physical_plan.rs | 18 +-
datafusion/substrait/src/physical_plan/consumer.rs | 8 +-
.../tests/cases/roundtrip_physical_plan.rs | 8 +-
14 files changed, 265 insertions(+), 142 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index e102cfc372..39e6900ed5 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -30,7 +30,7 @@ use crate::arrow::array::{
};
use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
-use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig,
ParquetExec};
+use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig};
use crate::datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
@@ -75,6 +75,7 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinSet;
+use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
use futures::{StreamExt, TryStreamExt};
use hashbrown::HashMap;
use object_store::path::Path;
@@ -253,17 +254,22 @@ impl FileFormat for ParquetFormat {
conf: FileScanConfig,
filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ let mut builder =
+ ParquetExecBuilder::new_with_options(conf, self.options.clone());
+
// If enable pruning then combine the filters to build the predicate.
// If disable pruning then set the predicate to None, thus readers
// will not prune data based on the statistics.
- let predicate = self.enable_pruning().then(||
filters.cloned()).flatten();
+ if self.enable_pruning() {
+ if let Some(predicate) = filters.cloned() {
+ builder = builder.with_predicate(predicate);
+ }
+ }
+ if let Some(metadata_size_hint) = self.metadata_size_hint() {
+ builder = builder.with_metadata_size_hint(metadata_size_hint);
+ }
- Ok(Arc::new(ParquetExec::new(
- conf,
- predicate,
- self.metadata_size_hint(),
- self.options.clone(),
- )))
+ Ok(builder.build_arc())
}
async fn create_writer_physical_plan(
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 6655125ea8..ac7c39bbdb 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -104,6 +104,27 @@ pub use statistics::{RequestedStatistics,
StatisticsConverter};
/// `───────────────────'
///
/// ```
+///
+/// # Example: Create a `ParquetExec`
+/// ```
+/// # use std::sync::Arc;
+/// # use arrow::datatypes::Schema;
+/// # use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
+/// # use datafusion::datasource::listing::PartitionedFile;
+/// # let file_schema = Arc::new(Schema::empty());
+/// # let object_store_url = ObjectStoreUrl::local_filesystem();
+/// # use datafusion_execution::object_store::ObjectStoreUrl;
+/// # use datafusion_physical_expr::expressions::lit;
+/// # let predicate = lit(true);
+/// // Create a ParquetExec for reading `file1.parquet` with a file size of
100MB
+/// let file_scan_config = FileScanConfig::new(object_store_url, file_schema)
+/// .with_file(PartitionedFile::new("file1.parquet", 100*1024*1024));
+/// let exec = ParquetExec::builder(file_scan_config)
+/// // Provide a predicate for filtering row groups/pages
+/// .with_predicate(predicate)
+/// .build();
+/// ```
+///
/// # Features
///
/// Supports the following optimizations:
@@ -131,7 +152,7 @@ pub use statistics::{RequestedStatistics,
StatisticsConverter};
/// * metadata_size_hint: controls the number of bytes read from the end of the
/// file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a
/// custom reader is used, it supplies the metadata directly and this parameter
-/// is ignored. See [`Self::with_parquet_file_reader_factory`] for more
details.
+/// is ignored. [`ParquetExecBuilder::with_metadata_size_hint`] for more
details.
///
/// # Execution Overview
///
@@ -141,9 +162,9 @@ pub use statistics::{RequestedStatistics,
StatisticsConverter};
/// * Step 2: When the stream is polled, the [`ParquetOpener`] is called to
open
/// the file.
///
-/// * Step 3: The `ParquetOpener` gets the file metadata via
-/// [`ParquetFileReaderFactory`] and applies any predicates
-/// and projections to determine what pages must be read.
+/// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata)
+/// via [`ParquetFileReaderFactory`] and applies any predicates and projections
+/// to determine what pages must be read.
///
/// * Step 4: The stream begins reading data, fetching the required pages
/// and incrementally decoding them.
@@ -154,6 +175,7 @@ pub use statistics::{RequestedStatistics,
StatisticsConverter};
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
/// [`SchemaAdapter`]: crate::datasource::schema_adapter::SchemaAdapter
+/// [`ParquetMetadata`]: parquet::file::metadata::ParquetMetaData
#[derive(Debug, Clone)]
pub struct ParquetExec {
/// Base configuration for this scan
@@ -179,14 +201,125 @@ pub struct ParquetExec {
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
}
-impl ParquetExec {
- /// Create a new Parquet reader execution plan provided file list and
schema.
- pub fn new(
- base_config: FileScanConfig,
- predicate: Option<Arc<dyn PhysicalExpr>>,
- metadata_size_hint: Option<usize>,
+/// [`ParquetExecBuilder`]`, builder for [`ParquetExec`].
+///
+/// See example on [`ParquetExec`].
+pub struct ParquetExecBuilder {
+ file_scan_config: FileScanConfig,
+ predicate: Option<Arc<dyn PhysicalExpr>>,
+ metadata_size_hint: Option<usize>,
+ table_parquet_options: TableParquetOptions,
+ parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
+ schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
+}
+
+impl ParquetExecBuilder {
+ /// Create a new builder to read the provided file scan configuration
+ pub fn new(file_scan_config: FileScanConfig) -> Self {
+ Self::new_with_options(file_scan_config,
TableParquetOptions::default())
+ }
+
+ /// Create a new builder to read the data specified in the file scan
+ /// configuration with the provided `TableParquetOptions`.
+ pub fn new_with_options(
+ file_scan_config: FileScanConfig,
table_parquet_options: TableParquetOptions,
) -> Self {
+ Self {
+ file_scan_config,
+ predicate: None,
+ metadata_size_hint: None,
+ table_parquet_options,
+ parquet_file_reader_factory: None,
+ schema_adapter_factory: None,
+ }
+ }
+
+ /// Set the predicate for the scan.
+ ///
+ /// The ParquetExec uses this predicate to filter row groups and data pages
+ /// using the Parquet statistics and bloom filters.
+ ///
+ /// If the predicate can not be used to prune the scan, it is ignored (no
+ /// error is raised).
+ pub fn with_predicate(mut self, predicate: Arc<dyn PhysicalExpr>) -> Self {
+ self.predicate = Some(predicate);
+ self
+ }
+
+ /// Set the metadata size hint
+ ///
+ /// This value determines how many bytes at the end of the file the default
+ /// [`ParquetFileReaderFactory`] will request in the initial IO. If this is
+ /// too small, the ParquetExec will need to make additional IO requests to
+ /// read the footer.
+ pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) ->
Self {
+ self.metadata_size_hint = Some(metadata_size_hint);
+ self
+ }
+
+ /// Set the table parquet options that control how the ParquetExec reads.
+ ///
+ /// See also [`Self::new_with_options`]
+ pub fn with_table_parquet_options(
+ mut self,
+ table_parquet_options: TableParquetOptions,
+ ) -> Self {
+ self.table_parquet_options = table_parquet_options;
+ self
+ }
+
+ /// Set optional user defined parquet file reader factory.
+ ///
+ /// You can use [`ParquetFileReaderFactory`] to more precisely control how
+ /// data is read from parquet files (e.g. skip re-reading metadata,
coalesce
+ /// I/O operations, etc).
+ ///
+ /// The default reader factory reads directly from an [`ObjectStore`]
+ /// instance using individual I/O operations for the footer and each page.
+ ///
+ /// If a custom `ParquetFileReaderFactory` is provided, then data access
+ /// operations will be routed to this factory instead of `ObjectStore`.
+ pub fn with_parquet_file_reader_factory(
+ mut self,
+ parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
+ ) -> Self {
+ self.parquet_file_reader_factory = Some(parquet_file_reader_factory);
+ self
+ }
+
+ /// Set optional schema adapter factory.
+ ///
+ /// [`SchemaAdapterFactory`] allows user to specify how fields from the
+ /// parquet file get mapped to that of the table schema. The default
schema
+ /// adapter uses arrow's cast library to map the parquet fields to the
table
+ /// schema.
+ pub fn with_schema_adapter_factory(
+ mut self,
+ schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+ ) -> Self {
+ self.schema_adapter_factory = Some(schema_adapter_factory);
+ self
+ }
+
+ /// Convenience: build an `Arc`d `ParquetExec` from this builder
+ pub fn build_arc(self) -> Arc<ParquetExec> {
+ Arc::new(self.build())
+ }
+
+ /// Build a [`ParquetExec`]
+ #[must_use]
+ pub fn build(self) -> ParquetExec {
+ let Self {
+ file_scan_config,
+ predicate,
+ metadata_size_hint,
+ table_parquet_options,
+ parquet_file_reader_factory,
+ schema_adapter_factory,
+ } = self;
+
+ let base_config = file_scan_config;
debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate:
{:?}, limit: {:?}",
base_config.file_groups, base_config.projection, predicate,
base_config.limit);
@@ -225,12 +358,12 @@ impl ParquetExec {
let (projected_schema, projected_statistics,
projected_output_ordering) =
base_config.project();
- let cache = Self::compute_properties(
+ let cache = ParquetExec::compute_properties(
projected_schema,
&projected_output_ordering,
&base_config,
);
- Self {
+ ParquetExec {
base_config,
projected_statistics,
metrics,
@@ -238,12 +371,44 @@ impl ParquetExec {
pruning_predicate,
page_pruning_predicate,
metadata_size_hint,
- parquet_file_reader_factory: None,
+ parquet_file_reader_factory,
cache,
table_parquet_options,
- schema_adapter_factory: None,
+ schema_adapter_factory,
}
}
+}
+
+impl ParquetExec {
+ /// Create a new Parquet reader execution plan provided file list and
schema.
+ #[deprecated(
+ since = "39.0.0",
+ note = "use `ParquetExec::builder` or `ParquetExecBuilder`"
+ )]
+ pub fn new(
+ base_config: FileScanConfig,
+ predicate: Option<Arc<dyn PhysicalExpr>>,
+ metadata_size_hint: Option<usize>,
+ table_parquet_options: TableParquetOptions,
+ ) -> Self {
+ let mut builder =
+ ParquetExecBuilder::new_with_options(base_config,
table_parquet_options);
+ if let Some(predicate) = predicate {
+ builder = builder.with_predicate(predicate);
+ }
+ if let Some(metadata_size_hint) = metadata_size_hint {
+ builder = builder.with_metadata_size_hint(metadata_size_hint);
+ }
+ builder.build()
+ }
+
+ /// Return a [`ParquetExecBuilder`].
+ ///
+ /// See example on [`ParquetExec`] and [`ParquetExecBuilder`] for
specifying
+ /// parquet table options.
+ pub fn builder(file_scan_config: FileScanConfig) -> ParquetExecBuilder {
+ ParquetExecBuilder::new(file_scan_config)
+ }
/// [`FileScanConfig`] that controls this scan (such as which files to
read)
pub fn base_config(&self) -> &FileScanConfig {
@@ -267,13 +432,7 @@ impl ParquetExec {
/// Optional user defined parquet file reader factory.
///
- /// You can use [`ParquetFileReaderFactory`] to more precisely control how
- /// data is read from parquet files (e.g. skip re-reading metadata,
coalesce
- /// I/O operations, etc).
- ///
- /// The default reader factory reads directly from an [`ObjectStore`]
- /// instance using individual I/O operations for the footer and then for
- /// each page.
+ /// See documentation on
[`ParquetExecBuilder::with_parquet_file_reader_factory`]
pub fn with_parquet_file_reader_factory(
mut self,
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
@@ -284,9 +443,7 @@ impl ParquetExec {
/// Optional schema adapter factory.
///
- /// `SchemaAdapterFactory` allows user to specify how fields from the
parquet file get mapped to
- /// that of the table schema. The default schema adapter uses arrow's
cast library to map
- /// the parquet fields to the table schema.
+ /// See documentation on
[`ParquetExecBuilder::with_schema_adapter_factory`]
pub fn with_schema_adapter_factory(
mut self,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
@@ -1033,15 +1190,17 @@ mod tests {
let predicate = predicate.map(|p| logical2physical(&p,
&file_schema));
// prepare the scan
- let mut parquet_exec = ParquetExec::new(
+ let mut builder = ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::local_filesystem(),
file_schema)
.with_file_group(file_group)
.with_projection(projection),
- predicate,
- None,
- Default::default(),
);
+ if let Some(predicate) = predicate {
+ builder = builder.with_predicate(predicate);
+ }
+ let mut parquet_exec = builder.build();
+
if pushdown_predicate {
parquet_exec = parquet_exec
.with_pushdown_filters(true)
@@ -1684,13 +1843,11 @@ mod tests {
expected_row_num: Option<usize>,
file_schema: SchemaRef,
) -> Result<()> {
- let parquet_exec = ParquetExec::new(
+ let parquet_exec = ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::local_filesystem(),
file_schema)
.with_file_groups(file_groups),
- None,
- None,
- Default::default(),
- );
+ )
+ .build();
assert_eq!(
parquet_exec
.properties()
@@ -1786,7 +1943,7 @@ mod tests {
),
]);
- let parquet_exec = ParquetExec::new(
+ let parquet_exec = ParquetExec::builder(
FileScanConfig::new(object_store_url, schema.clone())
.with_file(partitioned_file)
// file has 10 cols so index 12 should be month and 13 should
be day
@@ -1803,10 +1960,8 @@ mod tests {
false,
),
]),
- None,
- None,
- Default::default(),
- );
+ )
+ .build();
assert_eq!(
parquet_exec.cache.output_partitioning().partition_count(),
1
@@ -1861,13 +2016,11 @@ mod tests {
};
let file_schema = Arc::new(Schema::empty());
- let parquet_exec = ParquetExec::new(
+ let parquet_exec = ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::local_filesystem(),
file_schema)
.with_file(partitioned_file),
- None,
- None,
- Default::default(),
- );
+ )
+ .build();
let mut results = parquet_exec.execute(0, state.task_ctx())?;
let batch = results.next().await.unwrap();
diff --git a/datafusion/core/src/datasource/schema_adapter.rs
b/datafusion/core/src/datasource/schema_adapter.rs
index 1838a3354b..77fde608fd 100644
--- a/datafusion/core/src/datasource/schema_adapter.rs
+++ b/datafusion/core/src/datasource/schema_adapter.rs
@@ -258,13 +258,11 @@ mod tests {
let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
// prepare the scan
- let parquet_exec = ParquetExec::new(
+ let parquet_exec = ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema)
.with_file(partitioned_file),
- None,
- None,
- Default::default(),
)
+ .build()
.with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}));
let session_ctx = SessionContext::new();
diff --git
a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index b93f4012b0..909c8acdb8 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -245,16 +245,14 @@ mod tests {
}
fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
- Arc::new(ParquetExec::new(
+ ParquetExec::builder(
FileScanConfig::new(
ObjectStoreUrl::parse("test:///").unwrap(),
schema.clone(),
)
.with_file(PartitionedFile::new("x".to_string(), 100)),
- None,
- None,
- Default::default(),
- ))
+ )
+ .build_arc()
}
fn partial_aggregate_exec(
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 9eb5aafd81..88fa3a978a 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1431,14 +1431,12 @@ pub(crate) mod tests {
pub(crate) fn parquet_exec_with_sort(
output_ordering: Vec<Vec<PhysicalSortExpr>>,
) -> Arc<ParquetExec> {
- Arc::new(ParquetExec::new(
+ ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(),
schema())
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_output_ordering(output_ordering),
- None,
- None,
- Default::default(),
- ))
+ )
+ .build_arc()
}
fn parquet_exec_multiple() -> Arc<ParquetExec> {
@@ -1449,17 +1447,15 @@ pub(crate) mod tests {
fn parquet_exec_multiple_sorted(
output_ordering: Vec<Vec<PhysicalSortExpr>>,
) -> Arc<ParquetExec> {
- Arc::new(ParquetExec::new(
+ ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(),
schema())
.with_file_groups(vec![
vec![PartitionedFile::new("x".to_string(), 100)],
vec![PartitionedFile::new("y".to_string(), 100)],
])
.with_output_ordering(output_ordering),
- None,
- None,
- Default::default(),
- ))
+ )
+ .build_arc()
}
fn csv_exec() -> Arc<CsvExec> {
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs
b/datafusion/core/src/physical_optimizer/test_utils.rs
index 4d926847e4..cfd0312f81 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -274,13 +274,11 @@ pub fn sort_preserving_merge_exec(
/// Create a non sorted parquet exec
pub fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
- Arc::new(ParquetExec::new(
+ ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(),
schema.clone())
.with_file(PartitionedFile::new("x".to_string(), 100)),
- None,
- None,
- Default::default(),
- ))
+ )
+ .build_arc()
}
// Created a sorted parquet exec
@@ -290,14 +288,12 @@ pub fn parquet_exec_sorted(
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
- Arc::new(ParquetExec::new(
+ ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(),
schema.clone())
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_output_ordering(vec![sort_exprs]),
- None,
- None,
- Default::default(),
- ))
+ )
+ .build_arc()
}
pub fn union_exec(input: Vec<Arc<dyn ExecutionPlan>>) -> Arc<dyn
ExecutionPlan> {
diff --git a/datafusion/core/src/test_util/parquet.rs
b/datafusion/core/src/test_util/parquet.rs
index ed539d29bd..9f06ad9308 100644
--- a/datafusion/core/src/test_util/parquet.rs
+++ b/datafusion/core/src/test_util/parquet.rs
@@ -37,6 +37,7 @@ use crate::physical_plan::metrics::MetricsSet;
use crate::physical_plan::ExecutionPlan;
use crate::prelude::{Expr, SessionConfig, SessionContext};
+use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
use object_store::path::Path;
use object_store::ObjectMeta;
use parquet::arrow::ArrowWriter;
@@ -163,22 +164,19 @@ impl TestParquetFile {
let filter = simplifier.coerce(filter, &df_schema).unwrap();
let physical_filter_expr =
create_physical_expr(&filter, &df_schema,
&ExecutionProps::default())?;
- let parquet_exec = Arc::new(ParquetExec::new(
- scan_config,
- Some(physical_filter_expr.clone()),
- None,
- parquet_options,
- ));
+
+ let parquet_exec =
+ ParquetExecBuilder::new_with_options(scan_config,
parquet_options)
+ .with_predicate(physical_filter_expr.clone())
+ .build_arc();
let exec = Arc::new(FilterExec::try_new(physical_filter_expr,
parquet_exec)?);
Ok(exec)
} else {
- Ok(Arc::new(ParquetExec::new(
- scan_config,
- None,
- None,
- parquet_options,
- )))
+ Ok(
+ ParquetExecBuilder::new_with_options(scan_config,
parquet_options)
+ .build_arc(),
+ )
}
}
diff --git a/datafusion/core/tests/parquet/custom_reader.rs
b/datafusion/core/tests/parquet/custom_reader.rs
index 4f50c55c62..0e515fd464 100644
--- a/datafusion/core/tests/parquet/custom_reader.rs
+++ b/datafusion/core/tests/parquet/custom_reader.rs
@@ -75,17 +75,15 @@ async fn
route_data_access_ops_to_parquet_file_reader_factory() {
.collect();
// prepare the scan
- let parquet_exec = ParquetExec::new(
+ let parquet_exec = ParquetExec::builder(
FileScanConfig::new(
// just any url that doesn't point to in memory object store
ObjectStoreUrl::local_filesystem(),
file_schema,
)
.with_file_group(file_group),
- None,
- None,
- Default::default(),
)
+ .build()
.with_parquet_file_reader_factory(Arc::new(InMemoryParquetFileReaderFactory(
Arc::clone(&in_memory_object_store),
)));
diff --git a/datafusion/core/tests/parquet/page_pruning.rs
b/datafusion/core/tests/parquet/page_pruning.rs
index 2e9cda40c3..15efd4bcd9 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -70,13 +70,12 @@ async fn get_parquet_exec(state: &SessionState, filter:
Expr) -> ParquetExec {
let execution_props = ExecutionProps::new();
let predicate = create_physical_expr(&filter, &df_schema,
&execution_props).unwrap();
- let parquet_exec = ParquetExec::new(
+ ParquetExec::builder(
FileScanConfig::new(object_store_url,
schema).with_file(partitioned_file),
- Some(predicate),
- None,
- Default::default(),
- );
- parquet_exec.with_enable_page_index(true)
+ )
+ .with_predicate(predicate)
+ .build()
+ .with_enable_page_index(true)
}
#[tokio::test]
diff --git a/datafusion/core/tests/parquet/schema_coercion.rs
b/datafusion/core/tests/parquet/schema_coercion.rs
index ac51b4f712..af9411f40e 100644
--- a/datafusion/core/tests/parquet/schema_coercion.rs
+++ b/datafusion/core/tests/parquet/schema_coercion.rs
@@ -59,13 +59,11 @@ async fn multi_parquet_coercion() {
Field::new("c2", DataType::Int32, true),
Field::new("c3", DataType::Float64, true),
]));
- let parquet_exec = ParquetExec::new(
+ let parquet_exec = ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
.with_file_group(file_group),
- None,
- None,
- Default::default(),
- );
+ )
+ .build();
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
@@ -115,14 +113,12 @@ async fn multi_parquet_coercion_projection() {
Field::new("c2", DataType::Int32, true),
Field::new("c3", DataType::Float64, true),
]));
- let parquet_exec = ParquetExec::new(
+ let parquet_exec = ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
.with_file_group(file_group)
.with_projection(Some(vec![1, 0, 2])),
- None,
- None,
- Default::default(),
- );
+ )
+ .build();
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index a9965e1c81..550176a42e 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -224,12 +224,11 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
)
})
.transpose()?;
- Ok(Arc::new(ParquetExec::new(
- base_config,
- predicate,
- None,
- Default::default(),
- )))
+ let mut builder = ParquetExec::builder(base_config);
+ if let Some(predicate) = predicate {
+ builder = builder.with_predicate(predicate)
+ }
+ Ok(builder.build_arc())
}
PhysicalPlanType::AvroScan(scan) => {
Ok(Arc::new(AvroExec::new(parse_protobuf_file_scan_config(
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 55b346a482..df1995f465 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -582,12 +582,11 @@ fn roundtrip_parquet_exec_with_pruning_predicate() ->
Result<()> {
Operator::Eq,
lit("1"),
));
- roundtrip_test(Arc::new(ParquetExec::new(
- scan_config,
- Some(predicate),
- None,
- Default::default(),
- )))
+ roundtrip_test(
+ ParquetExec::builder(scan_config)
+ .with_predicate(predicate)
+ .build_arc(),
+ )
}
#[tokio::test]
@@ -613,12 +612,7 @@ async fn
roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> {
output_ordering: vec![],
};
- roundtrip_test(Arc::new(ParquetExec::new(
- scan_config,
- None,
- None,
- Default::default(),
- )))
+ roundtrip_test(ParquetExec::builder(scan_config).build_arc())
}
#[test]
diff --git a/datafusion/substrait/src/physical_plan/consumer.rs
b/datafusion/substrait/src/physical_plan/consumer.rs
index 68f8b02b0f..39b38c94ec 100644
--- a/datafusion/substrait/src/physical_plan/consumer.rs
+++ b/datafusion/substrait/src/physical_plan/consumer.rs
@@ -121,12 +121,8 @@ pub async fn from_substrait_rel(
}
}
- Ok(Arc::new(ParquetExec::new(
- base_config,
- None,
- None,
- Default::default(),
- )) as Arc<dyn ExecutionPlan>)
+ Ok(ParquetExec::builder(base_config).build_arc()
+ as Arc<dyn ExecutionPlan>)
}
_ => not_impl_err!(
"Only LocalFile reads are supported when parsing physical"
diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
index aca0443194..4014670a7c 100644
--- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
@@ -45,12 +45,8 @@ async fn parquet_exec() -> Result<()> {
123,
)],
]);
- let parquet_exec: Arc<dyn ExecutionPlan> = Arc::new(ParquetExec::new(
- scan_config,
- None,
- None,
- Default::default(),
- ));
+ let parquet_exec: Arc<dyn ExecutionPlan> =
+ ParquetExec::builder(scan_config).build_arc();
let mut extension_info: (
Vec<extensions::SimpleExtensionDeclaration>,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]