This is an automated email from the ASF dual-hosted git repository.
nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6bfede0e1 Add a utility function to get all of the PartitionedFile for
an ExecutionPlan (#5572)
6bfede0e1 is described below
commit 6bfede0e1e1a09bd06ea0166e01c0ad4834a1721
Author: yahoNanJing <[email protected]>
AuthorDate: Wed Mar 15 14:26:07 2023 +0800
Add a utility function to get all of the PartitionedFile for an
ExecutionPlan (#5572)
* Add a utility function to collect all of the PartitionedFile for an
ExecutionPlan
* Reorganize the tree node related files
* Move TreeNodeRewritable for ExecutionPlan to the tree_node
* Change self to &mut self for TreeNodeVisitor
* Refine the usage of TreeNodeRewriter
---------
Co-authored-by: yangzhong <[email protected]>
---
.../core/src/datasource/file_format/parquet.rs | 20 +++
.../src/physical_optimizer/coalesce_batches.rs | 2 +-
.../src/physical_optimizer/dist_enforcement.rs | 2 +-
.../physical_optimizer/global_sort_selection.rs | 2 +-
.../core/src/physical_optimizer/join_selection.rs | 2 +-
.../src/physical_optimizer/pipeline_checker.rs | 2 +-
.../core/src/physical_optimizer/pipeline_fixer.rs | 2 +-
datafusion/core/src/physical_optimizer/pruning.rs | 25 +--
.../src/physical_optimizer/sort_enforcement.rs | 2 +-
.../core/src/physical_plan/file_format/json.rs | 5 +
.../core/src/physical_plan/file_format/mod.rs | 48 ++++++
.../file_format/parquet/row_filter.rs | 3 +
datafusion/core/src/physical_plan/mod.rs | 2 +-
.../physical_plan/{rewrite.rs => tree_node/mod.rs} | 182 +++++++++++++--------
.../core/src/physical_plan/tree_node/rewritable.rs | 39 +++++
.../core/src/physical_plan/tree_node/visitable.rs | 28 ++++
datafusion/physical-expr/src/utils.rs | 29 +---
17 files changed, 278 insertions(+), 117 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 53e94167d..ba18e9f62 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -620,6 +620,7 @@ mod tests {
use super::*;
use crate::datasource::file_format::parquet::test_util::store_parquet;
+ use crate::physical_plan::file_format::get_scan_files;
use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{Array, ArrayRef, StringArray};
@@ -1215,6 +1216,25 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_get_scan_files() -> Result<()> {
+ let session_ctx = SessionContext::new();
+ let state = session_ctx.state();
+ let projection = Some(vec![9]);
+ let exec = get_exec(&state, "alltypes_plain.parquet", projection,
None).await?;
+ let scan_files = get_scan_files(exec)?;
+ assert_eq!(scan_files.len(), 1);
+ assert_eq!(scan_files[0].len(), 1);
+ assert_eq!(scan_files[0][0].len(), 1);
+ assert!(scan_files[0][0][0]
+ .object_meta
+ .location
+ .to_string()
+ .contains("alltypes_plain.parquet"));
+
+ Ok(())
+ }
+
fn check_page_index_validation(
page_index: Option<&ParquetColumnIndex>,
offset_index: Option<&ParquetOffsetIndex>,
diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index a1566c37c..9fd29cf89 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -24,7 +24,7 @@ use crate::{
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{
coalesce_batches::CoalesceBatchesExec, filter::FilterExec,
joins::HashJoinExec,
- repartition::RepartitionExec, rewrite::TreeNodeRewritable,
Partitioning,
+ repartition::RepartitionExec, tree_node::TreeNodeRewritable,
Partitioning,
},
};
use std::sync::Arc;
diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
index 919273af7..95d4427e6 100644
--- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
@@ -28,8 +28,8 @@ use crate::physical_plan::joins::{
};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
-use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortOptions;
+use crate::physical_plan::tree_node::TreeNodeRewritable;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::Partitioning;
use crate::physical_plan::{with_new_children_if_necessary, Distribution,
ExecutionPlan};
diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs
b/datafusion/core/src/physical_optimizer/global_sort_selection.rs
index 81b4b59e3..647558fbf 100644
--- a/datafusion/core/src/physical_optimizer/global_sort_selection.rs
+++ b/datafusion/core/src/physical_optimizer/global_sort_selection.rs
@@ -22,9 +22,9 @@ use std::sync::Arc;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
-use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::tree_node::TreeNodeRewritable;
use crate::physical_plan::ExecutionPlan;
/// Currently for a sort operator, if
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs
b/datafusion/core/src/physical_optimizer/join_selection.rs
index d9787881f..2308b2c85 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -32,7 +32,7 @@ use crate::physical_plan::{ExecutionPlan, PhysicalExpr};
use super::optimizer::PhysicalOptimizerRule;
use crate::error::Result;
-use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::tree_node::TreeNodeRewritable;
/// For hash join with the partition mode [PartitionMode::Auto], JoinSelection
rule will make
/// a cost based decision to select which PartitionMode
mode(Partitioned/CollectLeft) is optimal
diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs
b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
index 8a6b0e003..f097196cd 100644
--- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs
+++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
@@ -22,7 +22,7 @@
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
-use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::tree_node::TreeNodeRewritable;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use std::sync::Arc;
diff --git a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
index 7e85f0c0d..7532914c1 100644
--- a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
+++ b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
@@ -34,7 +34,7 @@ use crate::physical_plan::joins::{
convert_sort_expr_with_filter_schema, HashJoinExec, PartitionMode,
SymmetricHashJoinExec,
};
-use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::tree_node::TreeNodeRewritable;
use crate::physical_plan::ExecutionPlan;
use datafusion_common::DataFusionError;
use datafusion_expr::logical_plan::JoinType;
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs
b/datafusion/core/src/physical_optimizer/pruning.rs
index 80b72e68f..9185bf04d 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -46,7 +46,7 @@ use arrow::{
record_batch::RecordBatch,
};
use datafusion_common::{downcast_value, ScalarValue};
-use datafusion_physical_expr::rewrite::{TreeNodeRewritable, TreeNodeRewriter};
+use datafusion_physical_expr::rewrite::TreeNodeRewritable;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};
use log::trace;
@@ -643,28 +643,15 @@ fn rewrite_column_expr(
column_old: &phys_expr::Column,
column_new: &phys_expr::Column,
) -> Result<Arc<dyn PhysicalExpr>> {
- let mut rewriter = RewriteColumnExpr {
- column_old,
- column_new,
- };
- e.transform_using(&mut rewriter)
-}
-
-struct RewriteColumnExpr<'a> {
- column_old: &'a phys_expr::Column,
- column_new: &'a phys_expr::Column,
-}
-
-impl<'a> TreeNodeRewriter<Arc<dyn PhysicalExpr>> for RewriteColumnExpr<'a> {
- fn mutate(&mut self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn
PhysicalExpr>> {
+ e.transform(&|expr| {
if let Some(column) =
expr.as_any().downcast_ref::<phys_expr::Column>() {
- if column == self.column_old {
- return Ok(Arc::new(self.column_new.clone()));
+ if column == column_old {
+ return Ok(Some(Arc::new(column_new.clone())));
}
}
- Ok(expr)
- }
+ Ok(None)
+ })
}
fn reverse_operator(op: Operator) -> Result<Operator> {
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 70880b750..261c19600 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -39,9 +39,9 @@ use crate::physical_optimizer::utils::add_sort_above;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
-use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::tree_node::TreeNodeRewritable;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{with_new_children_if_necessary, Distribution,
ExecutionPlan};
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs
b/datafusion/core/src/physical_plan/file_format/json.rs
index 6f2608012..146227335 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -73,6 +73,11 @@ impl NdJsonExec {
file_compression_type,
}
}
+
+ /// Ref to the base configs
+ pub fn base_config(&self) -> &FileScanConfig {
+ &self.base_config
+ }
}
impl ExecutionPlan for NdJsonExec {
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs
b/datafusion/core/src/physical_plan/file_format/mod.rs
index 97b091ced..d7616a3c2 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -45,6 +45,10 @@ use crate::datasource::{
listing::{FileRange, PartitionedFile},
object_store::ObjectStoreUrl,
};
+use crate::physical_plan::tree_node::{
+ TreeNodeVisitable, TreeNodeVisitor, VisitRecursion,
+};
+use crate::physical_plan::ExecutionPlan;
use crate::{
error::{DataFusionError, Result},
scalar::ScalarValue,
@@ -68,6 +72,50 @@ pub fn partition_type_wrap(val_type: DataType) -> DataType {
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
}
+/// Get all of the [`PartitionedFile`] to be scanned for an [`ExecutionPlan`]
+pub fn get_scan_files(
+ plan: Arc<dyn ExecutionPlan>,
+) -> Result<Vec<Vec<Vec<PartitionedFile>>>> {
+ let mut collector = FileScanCollector::new();
+ plan.accept(&mut collector)?;
+ Ok(collector.file_groups)
+}
+
+struct FileScanCollector {
+ file_groups: Vec<Vec<Vec<PartitionedFile>>>,
+}
+
+impl FileScanCollector {
+ fn new() -> Self {
+ Self {
+ file_groups: vec![],
+ }
+ }
+}
+
+impl TreeNodeVisitor for FileScanCollector {
+ type N = Arc<dyn ExecutionPlan>;
+
+ fn pre_visit(&mut self, node: &Self::N) -> Result<VisitRecursion> {
+ let plan_any = node.as_any();
+ let file_groups =
+ if let Some(parquet_exec) = plan_any.downcast_ref::<ParquetExec>()
{
+ parquet_exec.base_config().file_groups.clone()
+ } else if let Some(avro_exec) =
plan_any.downcast_ref::<AvroExec>() {
+ avro_exec.base_config().file_groups.clone()
+ } else if let Some(json_exec) =
plan_any.downcast_ref::<NdJsonExec>() {
+ json_exec.base_config().file_groups.clone()
+ } else if let Some(csv_exec) = plan_any.downcast_ref::<CsvExec>() {
+ csv_exec.base_config().file_groups.clone()
+ } else {
+ return Ok(VisitRecursion::Continue);
+ };
+
+ self.file_groups.push(file_groups);
+ Ok(VisitRecursion::Stop)
+ }
+}
+
/// The base configurations to provide when creating a physical plan for
/// any given file format.
#[derive(Debug, Clone)]
diff --git
a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
index e1feafec1..54478edc7 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
@@ -219,13 +219,16 @@ impl<'a> TreeNodeRewriter<Arc<dyn PhysicalExpr>> for
FilterCandidateBuilder<'a>
if
DataType::is_nested(self.file_schema.field(idx).data_type()) {
self.non_primitive_columns = true;
+ return Ok(RewriteRecursion::Stop);
}
} else if self.table_schema.index_of(column.name()).is_err() {
// If the column does not exist in the (un-projected) table
schema then
// it must be a projected column.
self.projected_columns = true;
+ return Ok(RewriteRecursion::Stop);
}
}
+
Ok(RewriteRecursion::Continue)
}
diff --git a/datafusion/core/src/physical_plan/mod.rs
b/datafusion/core/src/physical_plan/mod.rs
index dbd1024ae..9e0e03a77 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -653,10 +653,10 @@ pub mod metrics;
pub mod planner;
pub mod projection;
pub mod repartition;
-pub mod rewrite;
pub mod sorts;
pub mod stream;
pub mod streaming;
+pub mod tree_node;
pub mod udaf;
pub mod union;
pub mod unnest;
diff --git a/datafusion/core/src/physical_plan/rewrite.rs
b/datafusion/core/src/physical_plan/tree_node/mod.rs
similarity index 66%
rename from datafusion/core/src/physical_plan/rewrite.rs
rename to datafusion/core/src/physical_plan/tree_node/mod.rs
index 2972b546b..327d938d4 100644
--- a/datafusion/core/src/physical_plan/rewrite.rs
+++ b/datafusion/core/src/physical_plan/tree_node/mod.rs
@@ -15,16 +15,119 @@
// specific language governing permissions and limitations
// under the License.
-//! Trait to make Executionplan rewritable
+//! This module provides common traits for visiting or rewriting tree nodes
easily.
+
+pub mod rewritable;
+pub mod visitable;
-use crate::physical_plan::with_new_children_if_necessary;
-use crate::physical_plan::ExecutionPlan;
use datafusion_common::Result;
-use std::sync::Arc;
+/// Implements the [visitor
+/// pattern](https://en.wikipedia.org/wiki/Visitor_pattern) for recursively
walking [`TreeNodeVisitable`]s.
+///
+/// [`TreeNodeVisitor`] allows keeping the algorithms
+/// separate from the code to traverse the structure of the `TreeNodeVisitable`
+/// tree and makes it easier to add new types of tree node and
+/// algorithms by.
+///
+/// When passed to[`TreeNodeVisitable::accept`], [`TreeNodeVisitor::pre_visit`]
+/// and [`TreeNodeVisitor::post_visit`] are invoked recursively
+/// on an node tree.
+///
+/// If an [`Err`] result is returned, recursion is stopped
+/// immediately.
+///
+/// If [`Recursion::Stop`] is returned on a call to pre_visit, no
+/// children of that tree node are visited, nor is post_visit
+/// called on that tree node
+pub trait TreeNodeVisitor: Sized {
+ /// The node type which is visitable.
+ type N: TreeNodeVisitable;
+
+ /// Invoked before any children of `node` are visited.
+ fn pre_visit(&mut self, node: &Self::N) -> Result<VisitRecursion>;
+
+ /// Invoked after all children of `node` are visited. Default
+ /// implementation does nothing.
+ fn post_visit(&mut self, _node: &Self::N) -> Result<()> {
+ Ok(())
+ }
+}
+
+/// Trait for types that can be visited by [`TreeNodeVisitor`]
+pub trait TreeNodeVisitable: Sized {
+ /// Return the children of this tree node
+ fn get_children(&self) -> Vec<Self>;
+
+ /// Accept a visitor, calling `visit` on all children of this
+ fn accept<V: TreeNodeVisitor<N = Self>>(&self, visitor: &mut V) ->
Result<()> {
+ match visitor.pre_visit(self)? {
+ VisitRecursion::Continue => {}
+ // If the recursion should stop, do not visit children
+ VisitRecursion::Stop => return Ok(()),
+ };
+
+ for child in self.get_children() {
+ child.accept(visitor)?;
+ }
+
+ visitor.post_visit(self)
+ }
+}
+
+/// Controls how the visitor recursion should proceed.
+pub enum VisitRecursion {
+ /// Attempt to visit all the children, recursively.
+ Continue,
+ /// Do not visit the children of this tree node, though the walk
+ /// of parents of this tree node will not be affected
+ Stop,
+}
-/// a Trait for marking tree node types that are rewritable
+/// Trait for marking tree node as rewritable
pub trait TreeNodeRewritable: Clone {
+ /// Convenience utils for writing optimizers rule: recursively apply the
given `op` to the node tree.
+ /// When `op` does not apply to a given node, it is left unchanged.
+ /// The default tree traversal direction is transform_up(Postorder
Traversal).
+ fn transform<F>(self, op: &F) -> Result<Self>
+ where
+ F: Fn(Self) -> Result<Option<Self>>,
+ {
+ self.transform_up(op)
+ }
+
+ /// Convenience utils for writing optimizers rule: recursively apply the
given 'op' to the node and all of its
+ /// children(Preorder Traversal).
+ /// When the `op` does not apply to a given node, it is left unchanged.
+ fn transform_down<F>(self, op: &F) -> Result<Self>
+ where
+ F: Fn(Self) -> Result<Option<Self>>,
+ {
+ let node_cloned = self.clone();
+ let after_op = match op(node_cloned)? {
+ Some(value) => value,
+ None => self,
+ };
+ after_op.map_children(|node| node.transform_down(op))
+ }
+
+ /// Convenience utils for writing optimizers rule: recursively apply the
given 'op' first to all of its
+ /// children and then itself(Postorder Traversal).
+ /// When the `op` does not apply to a given node, it is left unchanged.
+ fn transform_up<F>(self, op: &F) -> Result<Self>
+ where
+ F: Fn(Self) -> Result<Option<Self>>,
+ {
+ let after_op_children = self.map_children(|node|
node.transform_up(op))?;
+
+ let after_op_children_clone = after_op_children.clone();
+ let new_node = match op(after_op_children)? {
+ Some(value) => value,
+ None => after_op_children_clone,
+ };
+ Ok(new_node)
+ }
+
/// Transform the tree node using the given [TreeNodeRewriter]
/// It performs a depth first walk of an node and its children.
///
@@ -51,7 +154,7 @@ pub trait TreeNodeRewritable: Clone {
/// children of that node are visited, nor is mutate
/// called on that node
///
- fn transform_using<R: TreeNodeRewriter<Self>>(
+ fn transform_using<R: TreeNodeRewriter<N = Self>>(
self,
rewriter: &mut R,
) -> Result<Self> {
@@ -73,48 +176,6 @@ pub trait TreeNodeRewritable: Clone {
}
}
- /// Convenience utils for writing optimizers rule: recursively apply the
given `op` to the node tree.
- /// When `op` does not apply to a given node, it is left unchanged.
- /// The default tree traversal direction is transform_up(Postorder
Traversal).
- fn transform<F>(self, op: &F) -> Result<Self>
- where
- F: Fn(Self) -> Result<Option<Self>>,
- {
- self.transform_up(op)
- }
-
- /// Convenience utils for writing optimizers rule: recursively apply the
given 'op' to the node and all of its
- /// children(Preorder Traversal).
- /// When the `op` does not apply to a given node, it is left unchanged.
- fn transform_down<F>(self, op: &F) -> Result<Self>
- where
- F: Fn(Self) -> Result<Option<Self>>,
- {
- let node_cloned = self.clone();
- let after_op = match op(node_cloned)? {
- Some(value) => value,
- None => self,
- };
- after_op.map_children(|node| node.transform_down(op))
- }
-
- /// Convenience utils for writing optimizers rule: recursively apply the
given 'op' first to all of its
- /// children and then itself(Postorder Traversal).
- /// When the `op` does not apply to a given node, it is left unchanged.
- fn transform_up<F>(self, op: &F) -> Result<Self>
- where
- F: Fn(Self) -> Result<Option<Self>>,
- {
- let after_op_children = self.map_children(|node|
node.transform_up(op))?;
-
- let after_op_children_clone = after_op_children.clone();
- let new_node = match op(after_op_children)? {
- Some(value) => value,
- None => after_op_children_clone,
- };
- Ok(new_node)
- }
-
/// Apply transform `F` to the node's children, the transform `F` might
have a direction(Preorder or Postorder)
fn map_children<F>(self, transform: F) -> Result<Self>
where
@@ -124,16 +185,19 @@ pub trait TreeNodeRewritable: Clone {
/// Trait for potentially recursively transform an [`TreeNodeRewritable`] node
/// tree. When passed to `TreeNodeRewritable::transform_using`,
`TreeNodeRewriter::mutate` is
/// invoked recursively on all nodes of a tree.
-pub trait TreeNodeRewriter<N: TreeNodeRewritable>: Sized {
+pub trait TreeNodeRewriter: Sized {
+ /// The node type which is rewritable.
+ type N: TreeNodeRewritable;
+
/// Invoked before (Preorder) any children of `node` are rewritten /
/// visited. Default implementation returns
`Ok(RewriteRecursion::Continue)`
- fn pre_visit(&mut self, _node: &N) -> Result<RewriteRecursion> {
+ fn pre_visit(&mut self, _node: &Self::N) -> Result<RewriteRecursion> {
Ok(RewriteRecursion::Continue)
}
/// Invoked after (Postorder) all children of `node` have been mutated and
/// returns a potentially modified node.
- fn mutate(&mut self, node: N) -> Result<N>;
+ fn mutate(&mut self, node: Self::N) -> Result<Self::N>;
}
/// Controls how the [TreeNodeRewriter] recursion should proceed.
@@ -148,19 +212,3 @@ pub enum RewriteRecursion {
/// Keep recursive but skip apply op on this node
Skip,
}
-
-impl TreeNodeRewritable for Arc<dyn ExecutionPlan> {
- fn map_children<F>(self, transform: F) -> Result<Self>
- where
- F: FnMut(Self) -> Result<Self>,
- {
- let children = self.children();
- if !children.is_empty() {
- let new_children: Result<Vec<_>> =
- children.into_iter().map(transform).collect();
- with_new_children_if_necessary(self, new_children?)
- } else {
- Ok(self)
- }
- }
-}
diff --git a/datafusion/core/src/physical_plan/tree_node/rewritable.rs
b/datafusion/core/src/physical_plan/tree_node/rewritable.rs
new file mode 100644
index 000000000..004fc47fd
--- /dev/null
+++ b/datafusion/core/src/physical_plan/tree_node/rewritable.rs
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tree node rewritable implementations
+
+use crate::physical_plan::tree_node::TreeNodeRewritable;
+use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
+use datafusion_common::Result;
+use std::sync::Arc;
+
+impl TreeNodeRewritable for Arc<dyn ExecutionPlan> {
+ fn map_children<F>(self, transform: F) -> Result<Self>
+ where
+ F: FnMut(Self) -> Result<Self>,
+ {
+ let children = self.children();
+ if !children.is_empty() {
+ let new_children: Result<Vec<_>> =
+ children.into_iter().map(transform).collect();
+ with_new_children_if_necessary(self, new_children?)
+ } else {
+ Ok(self)
+ }
+ }
+}
diff --git a/datafusion/core/src/physical_plan/tree_node/visitable.rs
b/datafusion/core/src/physical_plan/tree_node/visitable.rs
new file mode 100644
index 000000000..935c8adb7
--- /dev/null
+++ b/datafusion/core/src/physical_plan/tree_node/visitable.rs
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tree node visitable implementations
+
+use crate::physical_plan::tree_node::TreeNodeVisitable;
+use crate::physical_plan::ExecutionPlan;
+use std::sync::Arc;
+
+impl TreeNodeVisitable for Arc<dyn ExecutionPlan> {
+ fn get_children(&self) -> Vec<Self> {
+ self.children()
+ }
+}
diff --git a/datafusion/physical-expr/src/utils.rs
b/datafusion/physical-expr/src/utils.rs
index a80a92bc5..5b357d931 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -377,35 +377,18 @@ pub fn reassign_predicate_columns(
schema: &SchemaRef,
ignore_not_found: bool,
) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> {
- let mut rewriter = ColumnAssigner {
- schema,
- ignore_not_found,
- };
- pred.transform_using(&mut rewriter)
-}
-
-#[derive(Debug)]
-struct ColumnAssigner<'a> {
- schema: &'a SchemaRef,
- ignore_not_found: bool,
-}
-
-impl<'a> TreeNodeRewriter<Arc<dyn PhysicalExpr>> for ColumnAssigner<'a> {
- fn mutate(
- &mut self,
- expr: Arc<dyn PhysicalExpr>,
- ) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> {
+ pred.transform(&|expr| {
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
- let index = match self.schema.index_of(column.name()) {
+ let index = match schema.index_of(column.name()) {
Ok(idx) => idx,
- Err(_) if self.ignore_not_found => usize::MAX,
+ Err(_) if ignore_not_found => usize::MAX,
Err(e) => return Err(e.into()),
};
- return Ok(Arc::new(Column::new(column.name(), index)));
+ return Ok(Some(Arc::new(Column::new(column.name(), index))));
}
- Ok(expr)
- }
+ Ok(None)
+ })
}
#[cfg(test)]