This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 9df957b63e API from `ParquetExec` to `ParquetExecBuilder` (#12799)
9df957b63e is described below
commit 9df957b63ea4c9fb787c6f9a3c15d1dcd394ac72
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Oct 8 16:58:08 2024 -0400
API from `ParquetExec` to `ParquetExecBuilder` (#12799)
* API to go from `ParquetExec` to `ParquetExecBuilder`
* fix potential regression
* Apply suggestions from code review
Co-authored-by: Nga Tran <[email protected]>
* add note about fields being re-created
---------
Co-authored-by: Nga Tran <[email protected]>
---
.../src/datasource/physical_plan/parquet/mod.rs | 92 +++++++++++++++++++++-
1 file changed, 89 insertions(+), 3 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 6afb66cc7c..743dd58969 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -166,6 +166,33 @@ pub use writer::plan_to_parquet;
/// [`RowFilter`]: parquet::arrow::arrow_reader::RowFilter
/// [Parquet PageIndex]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
///
+/// # Example: rewriting `ParquetExec`
+///
+/// You can modify a `ParquetExec` using [`ParquetExecBuilder`], for example
+/// to change files or add a predicate.
+///
+/// ```no_run
+/// # use std::sync::Arc;
+/// # use arrow::datatypes::Schema;
+/// # use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
+/// # use datafusion::datasource::listing::PartitionedFile;
+/// # fn parquet_exec() -> ParquetExec { unimplemented!() }
+/// // Split a single ParquetExec into multiple ParquetExecs, one for each file
+/// let exec = parquet_exec();
+/// let existing_file_groups = &exec.base_config().file_groups;
+/// let new_execs = existing_file_groups
+/// .iter()
+/// .map(|file_group| {
+/// // create a new exec by copying the existing exec into a builder
+/// let new_exec = exec.clone()
+/// .into_builder()
+/// .with_file_groups(vec![file_group.clone()])
+/// .build();
+/// new_exec
+/// })
+/// .collect::<Vec<_>>();
+/// ```
+///
/// # Implementing External Indexes
///
/// It is possible to restrict the row groups and selections within those row
@@ -257,6 +284,12 @@ pub struct ParquetExec {
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
}
+impl From<ParquetExec> for ParquetExecBuilder {
+ fn from(exec: ParquetExec) -> Self {
+ exec.into_builder()
+ }
+}
+
/// [`ParquetExecBuilder`], builder for [`ParquetExec`].
///
/// See example on [`ParquetExec`].
@@ -291,6 +324,12 @@ impl ParquetExecBuilder {
}
}
+ /// Update the list of files groups to read
+ pub fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>)
-> Self {
+ self.file_scan_config.file_groups = file_groups;
+ self
+ }
+
/// Set the filter predicate when reading.
///
/// See the "Predicate Pushdown" section of the [`ParquetExec`]
documenation
@@ -459,6 +498,34 @@ impl ParquetExec {
ParquetExecBuilder::new(file_scan_config)
}
+ /// Convert this `ParquetExec` into a builder for modification
+ pub fn into_builder(self) -> ParquetExecBuilder {
+ // list out fields so it is clear what is being dropped
+ // (note the fields which are dropped are re-created as part of calling
+ // `build` on the builder)
+ let Self {
+ base_config,
+ projected_statistics: _,
+ metrics: _,
+ predicate,
+ pruning_predicate: _,
+ page_pruning_predicate: _,
+ metadata_size_hint,
+ parquet_file_reader_factory,
+ cache: _,
+ table_parquet_options,
+ schema_adapter_factory,
+ } = self;
+ ParquetExecBuilder {
+ file_scan_config: base_config,
+ predicate,
+ metadata_size_hint,
+ table_parquet_options,
+ parquet_file_reader_factory,
+ schema_adapter_factory,
+ }
+ }
+
/// [`FileScanConfig`] that controls this scan (such as which files to
read)
pub fn base_config(&self) -> &FileScanConfig {
&self.base_config
@@ -479,9 +546,15 @@ impl ParquetExec {
self.pruning_predicate.as_ref()
}
+ /// return the optional file reader factory
+ pub fn parquet_file_reader_factory(
+ &self,
+ ) -> Option<&Arc<dyn ParquetFileReaderFactory>> {
+ self.parquet_file_reader_factory.as_ref()
+ }
+
/// Optional user defined parquet file reader factory.
///
- /// See documentation on
[`ParquetExecBuilder::with_parquet_file_reader_factory`]
pub fn with_parquet_file_reader_factory(
mut self,
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
@@ -490,6 +563,11 @@ impl ParquetExec {
self
}
+ /// return the optional schema adapter factory
+ pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn
SchemaAdapterFactory>> {
+ self.schema_adapter_factory.as_ref()
+ }
+
/// Optional schema adapter factory.
///
/// See documentation on
[`ParquetExecBuilder::with_schema_adapter_factory`]
@@ -586,7 +664,14 @@ impl ParquetExec {
)
}
- fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) ->
Self {
+ /// Updates the file groups to read and recalculates the output
partitioning
+ ///
+ /// Note this function does not update statistics or other properties
+ /// that depend on the file groups.
+ fn with_file_groups_and_update_partitioning(
+ mut self,
+ file_groups: Vec<Vec<PartitionedFile>>,
+ ) -> Self {
self.base_config.file_groups = file_groups;
// Changing file groups may invalidate output partitioning. Update it
also
let output_partitioning =
Self::output_partitioning_helper(&self.base_config);
@@ -679,7 +764,8 @@ impl ExecutionPlan for ParquetExec {
let mut new_plan = self.clone();
if let Some(repartitioned_file_groups) =
repartitioned_file_groups_option {
- new_plan = new_plan.with_file_groups(repartitioned_file_groups);
+ new_plan = new_plan
+
.with_file_groups_and_update_partitioning(repartitioned_file_groups);
}
Ok(Some(Arc::new(new_plan)))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]