This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6a2d4a3a25 refactor: add ExecutionPlan::file_scan_config to avoid
downcasting (#7175)
6a2d4a3a25 is described below
commit 6a2d4a3a254c0495a398608d178496a191450750
Author: Martin Fischer <[email protected]>
AuthorDate: Thu Aug 3 14:49:02 2023 +0200
refactor: add ExecutionPlan::file_scan_config to avoid downcasting (#7175)
I want to feature-gate AvroExec in a followup commit, so
that you cannot get bitten by AvroExec::execute returning
an NotImplemented error if the "avro" feature isn't enabled.
(Since idiomatic Rust code should work if it compiles.)
As a preparation for that this commit gets rid of a
`plan_any.downcast_ref::<AvroExec>()` call that couldn't be easily put
behind a `cfg(feature = "avro")` without complicating the control flow.
---
.../core/src/datasource/physical_plan/avro.rs | 4 ++++
.../core/src/datasource/physical_plan/csv.rs | 4 ++++
.../core/src/datasource/physical_plan/json.rs | 4 ++++
.../core/src/datasource/physical_plan/mod.rs | 22 ++++++----------------
.../core/src/datasource/physical_plan/parquet.rs | 4 ++++
datafusion/core/src/physical_plan/mod.rs | 6 ++++++
6 files changed, 28 insertions(+), 16 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs
b/datafusion/core/src/datasource/physical_plan/avro.rs
index ec2f4db263..a221300cc7 100644
--- a/datafusion/core/src/datasource/physical_plan/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -159,6 +159,10 @@ impl ExecutionPlan for AvroExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
+
+ fn file_scan_config(&self) -> Option<&FileScanConfig> {
+ Some(&self.base_config)
+ }
}
#[cfg(feature = "avro")]
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs
b/datafusion/core/src/datasource/physical_plan/csv.rs
index 7b763855a6..7a9cd979e4 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -240,6 +240,10 @@ impl ExecutionPlan for CsvExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
+
+ fn file_scan_config(&self) -> Option<&FileScanConfig> {
+ Some(&self.base_config)
+ }
}
/// A Config for [`CsvOpener`]
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs
b/datafusion/core/src/datasource/physical_plan/json.rs
index 987bb83687..cbae85f6c8 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -168,6 +168,10 @@ impl ExecutionPlan for NdJsonExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
+
+ fn file_scan_config(&self) -> Option<&FileScanConfig> {
+ Some(&self.base_config)
+ }
}
/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 35fb789aaf..ac660770b1 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -113,22 +113,12 @@ pub fn get_scan_files(
) -> Result<Vec<Vec<Vec<PartitionedFile>>>> {
let mut collector: Vec<Vec<Vec<PartitionedFile>>> = vec![];
plan.apply(&mut |plan| {
- let plan_any = plan.as_any();
- let file_groups =
- if let Some(parquet_exec) = plan_any.downcast_ref::<ParquetExec>()
{
- parquet_exec.base_config().file_groups.clone()
- } else if let Some(avro_exec) =
plan_any.downcast_ref::<AvroExec>() {
- avro_exec.base_config().file_groups.clone()
- } else if let Some(json_exec) =
plan_any.downcast_ref::<NdJsonExec>() {
- json_exec.base_config().file_groups.clone()
- } else if let Some(csv_exec) = plan_any.downcast_ref::<CsvExec>() {
- csv_exec.base_config().file_groups.clone()
- } else {
- return Ok(VisitRecursion::Continue);
- };
-
- collector.push(file_groups);
- Ok(VisitRecursion::Skip)
+ if let Some(file_scan_config) = plan.file_scan_config() {
+ collector.push(file_scan_config.file_groups.clone());
+ Ok(VisitRecursion::Skip)
+ } else {
+ Ok(VisitRecursion::Continue)
+ }
})?;
Ok(collector)
}
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 03e5f9ebdc..941d7fd20c 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -388,6 +388,10 @@ impl ExecutionPlan for ParquetExec {
fn statistics(&self) -> Statistics {
self.projected_statistics.clone()
}
+
+ fn file_scan_config(&self) -> Option<&FileScanConfig> {
+ Some(&self.base_config)
+ }
}
/// Implements [`FileOpener`] for a parquet file
diff --git a/datafusion/core/src/physical_plan/mod.rs
b/datafusion/core/src/physical_plan/mod.rs
index 66ecaeb6be..66254ee6f5 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -22,6 +22,7 @@ use self::metrics::MetricsSet;
use self::{
coalesce_partitions::CoalescePartitionsExec,
display::DisplayableExecutionPlan,
};
+use crate::datasource::physical_plan::FileScanConfig;
use crate::physical_plan::expressions::PhysicalSortExpr;
use datafusion_common::Result;
pub use datafusion_common::{ColumnStatistics, Statistics};
@@ -226,6 +227,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// Returns the global output statistics for this `ExecutionPlan` node.
fn statistics(&self) -> Statistics;
+
+ /// Returns the [`FileScanConfig`] in case this is a data source scanning
execution plan or `None` otherwise.
+ fn file_scan_config(&self) -> Option<&FileScanConfig> {
+ None
+ }
}
/// Indicate whether a data exchange is needed for the input of `plan`, which
will be very helpful