This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a2d4a3a25 refactor: add ExecutionPlan::file_scan_config to avoid 
downcasting (#7175)
6a2d4a3a25 is described below

commit 6a2d4a3a254c0495a398608d178496a191450750
Author: Martin Fischer <[email protected]>
AuthorDate: Thu Aug 3 14:49:02 2023 +0200

    refactor: add ExecutionPlan::file_scan_config to avoid downcasting (#7175)
    
    I want to feature-gate AvroExec in a followup commit, so
    that you cannot get bitten by AvroExec::execute returning
    an NotImplemented error if the "avro" feature isn't enabled.
    (Since idiomatic Rust code should work if it compiles.)
    
    As a preparation for that this commit gets rid of a
    `plan_any.downcast_ref::<AvroExec>()` call that couldn't be easily put
    behind a `cfg(feature = "avro")` without complicating the control flow.
---
 .../core/src/datasource/physical_plan/avro.rs      |  4 ++++
 .../core/src/datasource/physical_plan/csv.rs       |  4 ++++
 .../core/src/datasource/physical_plan/json.rs      |  4 ++++
 .../core/src/datasource/physical_plan/mod.rs       | 22 ++++++----------------
 .../core/src/datasource/physical_plan/parquet.rs   |  4 ++++
 datafusion/core/src/physical_plan/mod.rs           |  6 ++++++
 6 files changed, 28 insertions(+), 16 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs 
b/datafusion/core/src/datasource/physical_plan/avro.rs
index ec2f4db263..a221300cc7 100644
--- a/datafusion/core/src/datasource/physical_plan/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -159,6 +159,10 @@ impl ExecutionPlan for AvroExec {
     fn metrics(&self) -> Option<MetricsSet> {
         Some(self.metrics.clone_inner())
     }
+
+    fn file_scan_config(&self) -> Option<&FileScanConfig> {
+        Some(&self.base_config)
+    }
 }
 
 #[cfg(feature = "avro")]
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs 
b/datafusion/core/src/datasource/physical_plan/csv.rs
index 7b763855a6..7a9cd979e4 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -240,6 +240,10 @@ impl ExecutionPlan for CsvExec {
     fn metrics(&self) -> Option<MetricsSet> {
         Some(self.metrics.clone_inner())
     }
+
+    fn file_scan_config(&self) -> Option<&FileScanConfig> {
+        Some(&self.base_config)
+    }
 }
 
 /// A Config for [`CsvOpener`]
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs 
b/datafusion/core/src/datasource/physical_plan/json.rs
index 987bb83687..cbae85f6c8 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -168,6 +168,10 @@ impl ExecutionPlan for NdJsonExec {
     fn metrics(&self) -> Option<MetricsSet> {
         Some(self.metrics.clone_inner())
     }
+
+    fn file_scan_config(&self) -> Option<&FileScanConfig> {
+        Some(&self.base_config)
+    }
 }
 
 /// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs 
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 35fb789aaf..ac660770b1 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -113,22 +113,12 @@ pub fn get_scan_files(
 ) -> Result<Vec<Vec<Vec<PartitionedFile>>>> {
     let mut collector: Vec<Vec<Vec<PartitionedFile>>> = vec![];
     plan.apply(&mut |plan| {
-        let plan_any = plan.as_any();
-        let file_groups =
-            if let Some(parquet_exec) = plan_any.downcast_ref::<ParquetExec>() 
{
-                parquet_exec.base_config().file_groups.clone()
-            } else if let Some(avro_exec) = 
plan_any.downcast_ref::<AvroExec>() {
-                avro_exec.base_config().file_groups.clone()
-            } else if let Some(json_exec) = 
plan_any.downcast_ref::<NdJsonExec>() {
-                json_exec.base_config().file_groups.clone()
-            } else if let Some(csv_exec) = plan_any.downcast_ref::<CsvExec>() {
-                csv_exec.base_config().file_groups.clone()
-            } else {
-                return Ok(VisitRecursion::Continue);
-            };
-
-        collector.push(file_groups);
-        Ok(VisitRecursion::Skip)
+        if let Some(file_scan_config) = plan.file_scan_config() {
+            collector.push(file_scan_config.file_groups.clone());
+            Ok(VisitRecursion::Skip)
+        } else {
+            Ok(VisitRecursion::Continue)
+        }
     })?;
     Ok(collector)
 }
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs 
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 03e5f9ebdc..941d7fd20c 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -388,6 +388,10 @@ impl ExecutionPlan for ParquetExec {
     fn statistics(&self) -> Statistics {
         self.projected_statistics.clone()
     }
+
+    fn file_scan_config(&self) -> Option<&FileScanConfig> {
+        Some(&self.base_config)
+    }
 }
 
 /// Implements [`FileOpener`] for a parquet file
diff --git a/datafusion/core/src/physical_plan/mod.rs 
b/datafusion/core/src/physical_plan/mod.rs
index 66ecaeb6be..66254ee6f5 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -22,6 +22,7 @@ use self::metrics::MetricsSet;
 use self::{
     coalesce_partitions::CoalescePartitionsExec, 
display::DisplayableExecutionPlan,
 };
+use crate::datasource::physical_plan::FileScanConfig;
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use datafusion_common::Result;
 pub use datafusion_common::{ColumnStatistics, Statistics};
@@ -226,6 +227,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
 
     /// Returns the global output statistics for this `ExecutionPlan` node.
     fn statistics(&self) -> Statistics;
+
+    /// Returns the [`FileScanConfig`] in case this is a data source scanning 
execution plan or `None` otherwise.
+    fn file_scan_config(&self) -> Option<&FileScanConfig> {
+        None
+    }
 }
 
 /// Indicate whether a data exchange is needed for the input of `plan`, which 
will be very helpful

Reply via email to