This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6bfede0e1 Add a utility function to get all of the PartitionedFile for 
an ExecutionPlan (#5572)
6bfede0e1 is described below

commit 6bfede0e1e1a09bd06ea0166e01c0ad4834a1721
Author: yahoNanJing <[email protected]>
AuthorDate: Wed Mar 15 14:26:07 2023 +0800

    Add a utility function to get all of the PartitionedFile for an 
ExecutionPlan (#5572)
    
    * Add a utility function to collect all of the PartitionedFile for an 
ExecutionPlan
    
    * Reorganize the tree node related files
    
    * Move TreeNodeRewritable for ExecutionPlan to the tree_node
    
    * Change self to &mut self for TreeNodeVisitor
    
    * Refine the usage of TreeNodeRewriter
    
    ---------
    
    Co-authored-by: yangzhong <[email protected]>
---
 .../core/src/datasource/file_format/parquet.rs     |  20 +++
 .../src/physical_optimizer/coalesce_batches.rs     |   2 +-
 .../src/physical_optimizer/dist_enforcement.rs     |   2 +-
 .../physical_optimizer/global_sort_selection.rs    |   2 +-
 .../core/src/physical_optimizer/join_selection.rs  |   2 +-
 .../src/physical_optimizer/pipeline_checker.rs     |   2 +-
 .../core/src/physical_optimizer/pipeline_fixer.rs  |   2 +-
 datafusion/core/src/physical_optimizer/pruning.rs  |  25 +--
 .../src/physical_optimizer/sort_enforcement.rs     |   2 +-
 .../core/src/physical_plan/file_format/json.rs     |   5 +
 .../core/src/physical_plan/file_format/mod.rs      |  48 ++++++
 .../file_format/parquet/row_filter.rs              |   3 +
 datafusion/core/src/physical_plan/mod.rs           |   2 +-
 .../physical_plan/{rewrite.rs => tree_node/mod.rs} | 182 +++++++++++++--------
 .../core/src/physical_plan/tree_node/rewritable.rs |  39 +++++
 .../core/src/physical_plan/tree_node/visitable.rs  |  28 ++++
 datafusion/physical-expr/src/utils.rs              |  29 +---
 17 files changed, 278 insertions(+), 117 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 53e94167d..ba18e9f62 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -620,6 +620,7 @@ mod tests {
     use super::*;
 
     use crate::datasource::file_format::parquet::test_util::store_parquet;
+    use crate::physical_plan::file_format::get_scan_files;
     use crate::physical_plan::metrics::MetricValue;
     use crate::prelude::{SessionConfig, SessionContext};
     use arrow::array::{Array, ArrayRef, StringArray};
@@ -1215,6 +1216,25 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_get_scan_files() -> Result<()> {
+        let session_ctx = SessionContext::new();
+        let state = session_ctx.state();
+        let projection = Some(vec![9]);
+        let exec = get_exec(&state, "alltypes_plain.parquet", projection, 
None).await?;
+        let scan_files = get_scan_files(exec)?;
+        assert_eq!(scan_files.len(), 1);
+        assert_eq!(scan_files[0].len(), 1);
+        assert_eq!(scan_files[0][0].len(), 1);
+        assert!(scan_files[0][0][0]
+            .object_meta
+            .location
+            .to_string()
+            .contains("alltypes_plain.parquet"));
+
+        Ok(())
+    }
+
     fn check_page_index_validation(
         page_index: Option<&ParquetColumnIndex>,
         offset_index: Option<&ParquetOffsetIndex>,
diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs 
b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index a1566c37c..9fd29cf89 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -24,7 +24,7 @@ use crate::{
     physical_optimizer::PhysicalOptimizerRule,
     physical_plan::{
         coalesce_batches::CoalesceBatchesExec, filter::FilterExec, 
joins::HashJoinExec,
-        repartition::RepartitionExec, rewrite::TreeNodeRewritable, 
Partitioning,
+        repartition::RepartitionExec, tree_node::TreeNodeRewritable, 
Partitioning,
     },
 };
 use std::sync::Arc;
diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs 
b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
index 919273af7..95d4427e6 100644
--- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
@@ -28,8 +28,8 @@ use crate::physical_plan::joins::{
 };
 use crate::physical_plan::projection::ProjectionExec;
 use crate::physical_plan::repartition::RepartitionExec;
-use crate::physical_plan::rewrite::TreeNodeRewritable;
 use crate::physical_plan::sorts::sort::SortOptions;
+use crate::physical_plan::tree_node::TreeNodeRewritable;
 use crate::physical_plan::windows::WindowAggExec;
 use crate::physical_plan::Partitioning;
 use crate::physical_plan::{with_new_children_if_necessary, Distribution, 
ExecutionPlan};
diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs 
b/datafusion/core/src/physical_optimizer/global_sort_selection.rs
index 81b4b59e3..647558fbf 100644
--- a/datafusion/core/src/physical_optimizer/global_sort_selection.rs
+++ b/datafusion/core/src/physical_optimizer/global_sort_selection.rs
@@ -22,9 +22,9 @@ use std::sync::Arc;
 use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_optimizer::PhysicalOptimizerRule;
-use crate::physical_plan::rewrite::TreeNodeRewritable;
 use crate::physical_plan::sorts::sort::SortExec;
 use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::tree_node::TreeNodeRewritable;
 use crate::physical_plan::ExecutionPlan;
 
 /// Currently for a sort operator, if
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs 
b/datafusion/core/src/physical_optimizer/join_selection.rs
index d9787881f..2308b2c85 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -32,7 +32,7 @@ use crate::physical_plan::{ExecutionPlan, PhysicalExpr};
 
 use super::optimizer::PhysicalOptimizerRule;
 use crate::error::Result;
-use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::tree_node::TreeNodeRewritable;
 
 /// For hash join with the partition mode [PartitionMode::Auto], JoinSelection 
rule will make
 /// a cost based decision to select which PartitionMode 
mode(Partitioned/CollectLeft) is optimal
diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs 
b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
index 8a6b0e003..f097196cd 100644
--- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs
+++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
@@ -22,7 +22,7 @@
 use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_optimizer::PhysicalOptimizerRule;
-use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::tree_node::TreeNodeRewritable;
 use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
 use std::sync::Arc;
 
diff --git a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs 
b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
index 7e85f0c0d..7532914c1 100644
--- a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
+++ b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
@@ -34,7 +34,7 @@ use crate::physical_plan::joins::{
     convert_sort_expr_with_filter_schema, HashJoinExec, PartitionMode,
     SymmetricHashJoinExec,
 };
-use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::tree_node::TreeNodeRewritable;
 use crate::physical_plan::ExecutionPlan;
 use datafusion_common::DataFusionError;
 use datafusion_expr::logical_plan::JoinType;
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs 
b/datafusion/core/src/physical_optimizer/pruning.rs
index 80b72e68f..9185bf04d 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -46,7 +46,7 @@ use arrow::{
     record_batch::RecordBatch,
 };
 use datafusion_common::{downcast_value, ScalarValue};
-use datafusion_physical_expr::rewrite::{TreeNodeRewritable, TreeNodeRewriter};
+use datafusion_physical_expr::rewrite::TreeNodeRewritable;
 use datafusion_physical_expr::utils::collect_columns;
 use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};
 use log::trace;
@@ -643,28 +643,15 @@ fn rewrite_column_expr(
     column_old: &phys_expr::Column,
     column_new: &phys_expr::Column,
 ) -> Result<Arc<dyn PhysicalExpr>> {
-    let mut rewriter = RewriteColumnExpr {
-        column_old,
-        column_new,
-    };
-    e.transform_using(&mut rewriter)
-}
-
-struct RewriteColumnExpr<'a> {
-    column_old: &'a phys_expr::Column,
-    column_new: &'a phys_expr::Column,
-}
-
-impl<'a> TreeNodeRewriter<Arc<dyn PhysicalExpr>> for RewriteColumnExpr<'a> {
-    fn mutate(&mut self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn 
PhysicalExpr>> {
+    e.transform(&|expr| {
         if let Some(column) = 
expr.as_any().downcast_ref::<phys_expr::Column>() {
-            if column == self.column_old {
-                return Ok(Arc::new(self.column_new.clone()));
+            if column == column_old {
+                return Ok(Some(Arc::new(column_new.clone())));
             }
         }
 
-        Ok(expr)
-    }
+        Ok(None)
+    })
 }
 
 fn reverse_operator(op: Operator) -> Result<Operator> {
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs 
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 70880b750..261c19600 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -39,9 +39,9 @@ use crate::physical_optimizer::utils::add_sort_above;
 use crate::physical_optimizer::PhysicalOptimizerRule;
 use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
-use crate::physical_plan::rewrite::TreeNodeRewritable;
 use crate::physical_plan::sorts::sort::SortExec;
 use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::tree_node::TreeNodeRewritable;
 use crate::physical_plan::union::UnionExec;
 use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
 use crate::physical_plan::{with_new_children_if_necessary, Distribution, 
ExecutionPlan};
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs 
b/datafusion/core/src/physical_plan/file_format/json.rs
index 6f2608012..146227335 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -73,6 +73,11 @@ impl NdJsonExec {
             file_compression_type,
         }
     }
+
+    /// Ref to the base configs
+    pub fn base_config(&self) -> &FileScanConfig {
+        &self.base_config
+    }
 }
 
 impl ExecutionPlan for NdJsonExec {
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs 
b/datafusion/core/src/physical_plan/file_format/mod.rs
index 97b091ced..d7616a3c2 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -45,6 +45,10 @@ use crate::datasource::{
     listing::{FileRange, PartitionedFile},
     object_store::ObjectStoreUrl,
 };
+use crate::physical_plan::tree_node::{
+    TreeNodeVisitable, TreeNodeVisitor, VisitRecursion,
+};
+use crate::physical_plan::ExecutionPlan;
 use crate::{
     error::{DataFusionError, Result},
     scalar::ScalarValue,
@@ -68,6 +72,50 @@ pub fn partition_type_wrap(val_type: DataType) -> DataType {
     DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
 }
 
+/// Get all of the [`PartitionedFile`] to be scanned for an [`ExecutionPlan`]
+pub fn get_scan_files(
+    plan: Arc<dyn ExecutionPlan>,
+) -> Result<Vec<Vec<Vec<PartitionedFile>>>> {
+    let mut collector = FileScanCollector::new();
+    plan.accept(&mut collector)?;
+    Ok(collector.file_groups)
+}
+
+struct FileScanCollector {
+    file_groups: Vec<Vec<Vec<PartitionedFile>>>,
+}
+
+impl FileScanCollector {
+    fn new() -> Self {
+        Self {
+            file_groups: vec![],
+        }
+    }
+}
+
+impl TreeNodeVisitor for FileScanCollector {
+    type N = Arc<dyn ExecutionPlan>;
+
+    fn pre_visit(&mut self, node: &Self::N) -> Result<VisitRecursion> {
+        let plan_any = node.as_any();
+        let file_groups =
+            if let Some(parquet_exec) = plan_any.downcast_ref::<ParquetExec>() 
{
+                parquet_exec.base_config().file_groups.clone()
+            } else if let Some(avro_exec) = 
plan_any.downcast_ref::<AvroExec>() {
+                avro_exec.base_config().file_groups.clone()
+            } else if let Some(json_exec) = 
plan_any.downcast_ref::<NdJsonExec>() {
+                json_exec.base_config().file_groups.clone()
+            } else if let Some(csv_exec) = plan_any.downcast_ref::<CsvExec>() {
+                csv_exec.base_config().file_groups.clone()
+            } else {
+                return Ok(VisitRecursion::Continue);
+            };
+
+        self.file_groups.push(file_groups);
+        Ok(VisitRecursion::Stop)
+    }
+}
+
 /// The base configurations to provide when creating a physical plan for
 /// any given file format.
 #[derive(Debug, Clone)]
diff --git 
a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs 
b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
index e1feafec1..54478edc7 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
@@ -219,13 +219,16 @@ impl<'a> TreeNodeRewriter<Arc<dyn PhysicalExpr>> for 
FilterCandidateBuilder<'a>
 
                 if 
DataType::is_nested(self.file_schema.field(idx).data_type()) {
                     self.non_primitive_columns = true;
+                    return Ok(RewriteRecursion::Stop);
                 }
             } else if self.table_schema.index_of(column.name()).is_err() {
                 // If the column does not exist in the (un-projected) table 
schema then
                 // it must be a projected column.
                 self.projected_columns = true;
+                return Ok(RewriteRecursion::Stop);
             }
         }
+
         Ok(RewriteRecursion::Continue)
     }
 
diff --git a/datafusion/core/src/physical_plan/mod.rs 
b/datafusion/core/src/physical_plan/mod.rs
index dbd1024ae..9e0e03a77 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -653,10 +653,10 @@ pub mod metrics;
 pub mod planner;
 pub mod projection;
 pub mod repartition;
-pub mod rewrite;
 pub mod sorts;
 pub mod stream;
 pub mod streaming;
+pub mod tree_node;
 pub mod udaf;
 pub mod union;
 pub mod unnest;
diff --git a/datafusion/core/src/physical_plan/rewrite.rs 
b/datafusion/core/src/physical_plan/tree_node/mod.rs
similarity index 66%
rename from datafusion/core/src/physical_plan/rewrite.rs
rename to datafusion/core/src/physical_plan/tree_node/mod.rs
index 2972b546b..327d938d4 100644
--- a/datafusion/core/src/physical_plan/rewrite.rs
+++ b/datafusion/core/src/physical_plan/tree_node/mod.rs
@@ -15,16 +15,119 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Trait to make Executionplan rewritable
+//! This module provides common traits for visiting or rewriting tree nodes 
easily.
+
+pub mod rewritable;
+pub mod visitable;
 
-use crate::physical_plan::with_new_children_if_necessary;
-use crate::physical_plan::ExecutionPlan;
 use datafusion_common::Result;
 
-use std::sync::Arc;
+/// Implements the [visitor
+/// pattern](https://en.wikipedia.org/wiki/Visitor_pattern) for recursively 
walking [`TreeNodeVisitable`]s.
+///
+/// [`TreeNodeVisitor`] allows keeping the algorithms
+/// separate from the code to traverse the structure of the `TreeNodeVisitable`
+/// tree and makes it easier to add new types of tree node and
+/// algorithms by.
+///
+/// When passed to[`TreeNodeVisitable::accept`], [`TreeNodeVisitor::pre_visit`]
+/// and [`TreeNodeVisitor::post_visit`] are invoked recursively
+/// on an node tree.
+///
+/// If an [`Err`] result is returned, recursion is stopped
+/// immediately.
+///
+/// If [`Recursion::Stop`] is returned on a call to pre_visit, no
+/// children of that tree node are visited, nor is post_visit
+/// called on that tree node
+pub trait TreeNodeVisitor: Sized {
+    /// The node type which is visitable.
+    type N: TreeNodeVisitable;
+
+    /// Invoked before any children of `node` are visited.
+    fn pre_visit(&mut self, node: &Self::N) -> Result<VisitRecursion>;
+
+    /// Invoked after all children of `node` are visited. Default
+    /// implementation does nothing.
+    fn post_visit(&mut self, _node: &Self::N) -> Result<()> {
+        Ok(())
+    }
+}
+
+/// Trait for types that can be visited by [`TreeNodeVisitor`]
+pub trait TreeNodeVisitable: Sized {
+    /// Return the children of this tree node
+    fn get_children(&self) -> Vec<Self>;
+
+    /// Accept a visitor, calling `visit` on all children of this
+    fn accept<V: TreeNodeVisitor<N = Self>>(&self, visitor: &mut V) -> 
Result<()> {
+        match visitor.pre_visit(self)? {
+            VisitRecursion::Continue => {}
+            // If the recursion should stop, do not visit children
+            VisitRecursion::Stop => return Ok(()),
+        };
+
+        for child in self.get_children() {
+            child.accept(visitor)?;
+        }
+
+        visitor.post_visit(self)
+    }
+}
+
+/// Controls how the visitor recursion should proceed.
+pub enum VisitRecursion {
+    /// Attempt to visit all the children, recursively.
+    Continue,
+    /// Do not visit the children of this tree node, though the walk
+    /// of parents of this tree node will not be affected
+    Stop,
+}
 
-/// a Trait for marking tree node types that are rewritable
+/// Trait for marking tree node as rewritable
 pub trait TreeNodeRewritable: Clone {
+    /// Convenience utils for writing optimizers rule: recursively apply the 
given `op` to the node tree.
+    /// When `op` does not apply to a given node, it is left unchanged.
+    /// The default tree traversal direction is transform_up(Postorder 
Traversal).
+    fn transform<F>(self, op: &F) -> Result<Self>
+    where
+        F: Fn(Self) -> Result<Option<Self>>,
+    {
+        self.transform_up(op)
+    }
+
+    /// Convenience utils for writing optimizers rule: recursively apply the 
given 'op' to the node and all of its
+    /// children(Preorder Traversal).
+    /// When the `op` does not apply to a given node, it is left unchanged.
+    fn transform_down<F>(self, op: &F) -> Result<Self>
+    where
+        F: Fn(Self) -> Result<Option<Self>>,
+    {
+        let node_cloned = self.clone();
+        let after_op = match op(node_cloned)? {
+            Some(value) => value,
+            None => self,
+        };
+        after_op.map_children(|node| node.transform_down(op))
+    }
+
+    /// Convenience utils for writing optimizers rule: recursively apply the 
given 'op' first to all of its
+    /// children and then itself(Postorder Traversal).
+    /// When the `op` does not apply to a given node, it is left unchanged.
+    fn transform_up<F>(self, op: &F) -> Result<Self>
+    where
+        F: Fn(Self) -> Result<Option<Self>>,
+    {
+        let after_op_children = self.map_children(|node| 
node.transform_up(op))?;
+
+        let after_op_children_clone = after_op_children.clone();
+        let new_node = match op(after_op_children)? {
+            Some(value) => value,
+            None => after_op_children_clone,
+        };
+        Ok(new_node)
+    }
+
     /// Transform the tree node using the given [TreeNodeRewriter]
     /// It performs a depth first walk of an node and its children.
     ///
@@ -51,7 +154,7 @@ pub trait TreeNodeRewritable: Clone {
     /// children of that node are visited, nor is mutate
     /// called on that node
     ///
-    fn transform_using<R: TreeNodeRewriter<Self>>(
+    fn transform_using<R: TreeNodeRewriter<N = Self>>(
         self,
         rewriter: &mut R,
     ) -> Result<Self> {
@@ -73,48 +176,6 @@ pub trait TreeNodeRewritable: Clone {
         }
     }
 
-    /// Convenience utils for writing optimizers rule: recursively apply the 
given `op` to the node tree.
-    /// When `op` does not apply to a given node, it is left unchanged.
-    /// The default tree traversal direction is transform_up(Postorder 
Traversal).
-    fn transform<F>(self, op: &F) -> Result<Self>
-    where
-        F: Fn(Self) -> Result<Option<Self>>,
-    {
-        self.transform_up(op)
-    }
-
-    /// Convenience utils for writing optimizers rule: recursively apply the 
given 'op' to the node and all of its
-    /// children(Preorder Traversal).
-    /// When the `op` does not apply to a given node, it is left unchanged.
-    fn transform_down<F>(self, op: &F) -> Result<Self>
-    where
-        F: Fn(Self) -> Result<Option<Self>>,
-    {
-        let node_cloned = self.clone();
-        let after_op = match op(node_cloned)? {
-            Some(value) => value,
-            None => self,
-        };
-        after_op.map_children(|node| node.transform_down(op))
-    }
-
-    /// Convenience utils for writing optimizers rule: recursively apply the 
given 'op' first to all of its
-    /// children and then itself(Postorder Traversal).
-    /// When the `op` does not apply to a given node, it is left unchanged.
-    fn transform_up<F>(self, op: &F) -> Result<Self>
-    where
-        F: Fn(Self) -> Result<Option<Self>>,
-    {
-        let after_op_children = self.map_children(|node| 
node.transform_up(op))?;
-
-        let after_op_children_clone = after_op_children.clone();
-        let new_node = match op(after_op_children)? {
-            Some(value) => value,
-            None => after_op_children_clone,
-        };
-        Ok(new_node)
-    }
-
     /// Apply transform `F` to the node's children, the transform `F` might 
have a direction(Preorder or Postorder)
     fn map_children<F>(self, transform: F) -> Result<Self>
     where
@@ -124,16 +185,19 @@ pub trait TreeNodeRewritable: Clone {
 /// Trait for potentially recursively transform an [`TreeNodeRewritable`] node
 /// tree. When passed to `TreeNodeRewritable::transform_using`, 
`TreeNodeRewriter::mutate` is
 /// invoked recursively on all nodes of a tree.
-pub trait TreeNodeRewriter<N: TreeNodeRewritable>: Sized {
+pub trait TreeNodeRewriter: Sized {
+    /// The node type which is rewritable.
+    type N: TreeNodeRewritable;
+
     /// Invoked before (Preorder) any children of `node` are rewritten /
     /// visited. Default implementation returns 
`Ok(RewriteRecursion::Continue)`
-    fn pre_visit(&mut self, _node: &N) -> Result<RewriteRecursion> {
+    fn pre_visit(&mut self, _node: &Self::N) -> Result<RewriteRecursion> {
         Ok(RewriteRecursion::Continue)
     }
 
     /// Invoked after (Postorder) all children of `node` have been mutated and
     /// returns a potentially modified node.
-    fn mutate(&mut self, node: N) -> Result<N>;
+    fn mutate(&mut self, node: Self::N) -> Result<Self::N>;
 }
 
 /// Controls how the [TreeNodeRewriter] recursion should proceed.
@@ -148,19 +212,3 @@ pub enum RewriteRecursion {
     /// Keep recursive but skip apply op on this node
     Skip,
 }
-
-impl TreeNodeRewritable for Arc<dyn ExecutionPlan> {
-    fn map_children<F>(self, transform: F) -> Result<Self>
-    where
-        F: FnMut(Self) -> Result<Self>,
-    {
-        let children = self.children();
-        if !children.is_empty() {
-            let new_children: Result<Vec<_>> =
-                children.into_iter().map(transform).collect();
-            with_new_children_if_necessary(self, new_children?)
-        } else {
-            Ok(self)
-        }
-    }
-}
diff --git a/datafusion/core/src/physical_plan/tree_node/rewritable.rs 
b/datafusion/core/src/physical_plan/tree_node/rewritable.rs
new file mode 100644
index 000000000..004fc47fd
--- /dev/null
+++ b/datafusion/core/src/physical_plan/tree_node/rewritable.rs
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tree node rewritable implementations
+
+use crate::physical_plan::tree_node::TreeNodeRewritable;
+use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
+use datafusion_common::Result;
+use std::sync::Arc;
+
+impl TreeNodeRewritable for Arc<dyn ExecutionPlan> {
+    fn map_children<F>(self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.children();
+        if !children.is_empty() {
+            let new_children: Result<Vec<_>> =
+                children.into_iter().map(transform).collect();
+            with_new_children_if_necessary(self, new_children?)
+        } else {
+            Ok(self)
+        }
+    }
+}
diff --git a/datafusion/core/src/physical_plan/tree_node/visitable.rs 
b/datafusion/core/src/physical_plan/tree_node/visitable.rs
new file mode 100644
index 000000000..935c8adb7
--- /dev/null
+++ b/datafusion/core/src/physical_plan/tree_node/visitable.rs
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tree node visitable implementations
+
+use crate::physical_plan::tree_node::TreeNodeVisitable;
+use crate::physical_plan::ExecutionPlan;
+use std::sync::Arc;
+
+impl TreeNodeVisitable for Arc<dyn ExecutionPlan> {
+    fn get_children(&self) -> Vec<Self> {
+        self.children()
+    }
+}
diff --git a/datafusion/physical-expr/src/utils.rs 
b/datafusion/physical-expr/src/utils.rs
index a80a92bc5..5b357d931 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -377,35 +377,18 @@ pub fn reassign_predicate_columns(
     schema: &SchemaRef,
     ignore_not_found: bool,
 ) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> {
-    let mut rewriter = ColumnAssigner {
-        schema,
-        ignore_not_found,
-    };
-    pred.transform_using(&mut rewriter)
-}
-
-#[derive(Debug)]
-struct ColumnAssigner<'a> {
-    schema: &'a SchemaRef,
-    ignore_not_found: bool,
-}
-
-impl<'a> TreeNodeRewriter<Arc<dyn PhysicalExpr>> for ColumnAssigner<'a> {
-    fn mutate(
-        &mut self,
-        expr: Arc<dyn PhysicalExpr>,
-    ) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> {
+    pred.transform(&|expr| {
         if let Some(column) = expr.as_any().downcast_ref::<Column>() {
-            let index = match self.schema.index_of(column.name()) {
+            let index = match schema.index_of(column.name()) {
                 Ok(idx) => idx,
-                Err(_) if self.ignore_not_found => usize::MAX,
+                Err(_) if ignore_not_found => usize::MAX,
                 Err(e) => return Err(e.into()),
             };
-            return Ok(Arc::new(Column::new(column.name(), index)));
+            return Ok(Some(Arc::new(Column::new(column.name(), index))));
         }
 
-        Ok(expr)
-    }
+        Ok(None)
+    })
 }
 
 #[cfg(test)]

Reply via email to