This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a5cf0b8902 Avoid copies in `InlineTableScan` via TreeNode API (#10038)
a5cf0b8902 is described below
commit a5cf0b8902ae55b81ac86b875c7e94cf1bdc205d
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Apr 12 07:37:06 2024 -0400
Avoid copies in `InlineTableScan` via TreeNode API (#10038)
* Avoid copies in `InlineTableScan` via TreeNode API
* Improve variable name
---
.../optimizer/src/analyzer/inline_table_scan.rs | 93 ++++++++--------------
1 file changed, 31 insertions(+), 62 deletions(-)
diff --git a/datafusion/optimizer/src/analyzer/inline_table_scan.rs
b/datafusion/optimizer/src/analyzer/inline_table_scan.rs
index 88202ffd21..cc5f870a9c 100644
--- a/datafusion/optimizer/src/analyzer/inline_table_scan.rs
+++ b/datafusion/optimizer/src/analyzer/inline_table_scan.rs
@@ -17,17 +17,13 @@
//! Analyzed rule to replace TableScan references
//! such as DataFrames and Views and inlines the LogicalPlan.
-use std::sync::Arc;
use crate::analyzer::AnalyzerRule;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Column, Result};
-use datafusion_expr::expr::{Exists, InSubquery};
-use datafusion_expr::{
- logical_plan::LogicalPlan, Expr, Filter, LogicalPlanBuilder, TableScan,
-};
+use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder,
TableScan};
/// Analyzed rule that inlines TableScan that provide a [`LogicalPlan`]
/// (DataFrame / ViewTable)
@@ -51,65 +47,38 @@ impl AnalyzerRule for InlineTableScan {
}
fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
- match plan {
- // Match only on scans without filter / projection / fetch
- // Views and DataFrames won't have those added
- // during the early stage of planning
- LogicalPlan::TableScan(TableScan {
- table_name,
- source,
- projection,
- filters,
- ..
- }) if filters.is_empty() && source.get_logical_plan().is_some() => {
- let sub_plan = source.get_logical_plan().unwrap();
- let projection_exprs = generate_projection_expr(&projection,
sub_plan)?;
- LogicalPlanBuilder::from(sub_plan.clone())
- .project(projection_exprs)?
- // Ensures that the reference to the inlined table remains the
- // same, meaning we don't have to change any of the parent
nodes
- // that reference this table.
- .alias(table_name)?
- .build()
- .map(Transformed::yes)
- }
- LogicalPlan::Filter(filter) => {
- let new_expr =
filter.predicate.transform(&rewrite_subquery).data()?;
- Filter::try_new(new_expr, filter.input)
- .map(|e| Transformed::yes(LogicalPlan::Filter(e)))
+ // rewrite any subqueries in the plan first
+ let transformed_plan =
+ plan.map_subqueries(|plan| plan.transform_up(&analyze_internal))?;
+
+ let transformed_plan = transformed_plan.transform_data(|plan| {
+ match plan {
+ // Match only on scans without filter / projection / fetch
+ // Views and DataFrames won't have those added
+ // during the early stage of planning
+ LogicalPlan::TableScan(TableScan {
+ table_name,
+ source,
+ projection,
+ filters,
+ ..
+ }) if filters.is_empty() && source.get_logical_plan().is_some() =>
{
+ let sub_plan = source.get_logical_plan().unwrap();
+ let projection_exprs = generate_projection_expr(&projection,
sub_plan)?;
+ LogicalPlanBuilder::from(sub_plan.clone())
+ .project(projection_exprs)?
+ // Ensures that the reference to the inlined table remains
the
+ // same, meaning we don't have to change any of the parent
nodes
+ // that reference this table.
+ .alias(table_name)?
+ .build()
+ .map(Transformed::yes)
+ }
+ _ => Ok(Transformed::no(plan)),
}
- _ => Ok(Transformed::no(plan)),
- }
-}
+ })?;
-fn rewrite_subquery(expr: Expr) -> Result<Transformed<Expr>> {
- match expr {
- Expr::Exists(Exists { subquery, negated }) => {
- let plan = subquery.subquery.as_ref().clone();
- let new_plan = plan.transform_up(&analyze_internal).data()?;
- let subquery = subquery.with_plan(Arc::new(new_plan));
- Ok(Transformed::yes(Expr::Exists(Exists { subquery, negated })))
- }
- Expr::InSubquery(InSubquery {
- expr,
- subquery,
- negated,
- }) => {
- let plan = subquery.subquery.as_ref().clone();
- let new_plan = plan.transform_up(&analyze_internal).data()?;
- let subquery = subquery.with_plan(Arc::new(new_plan));
- Ok(Transformed::yes(Expr::InSubquery(InSubquery::new(
- expr, subquery, negated,
- ))))
- }
- Expr::ScalarSubquery(subquery) => {
- let plan = subquery.subquery.as_ref().clone();
- let new_plan = plan.transform_up(&analyze_internal).data()?;
- let subquery = subquery.with_plan(Arc::new(new_plan));
- Ok(Transformed::yes(Expr::ScalarSubquery(subquery)))
- }
- _ => Ok(Transformed::no(expr)),
- }
+ Ok(transformed_plan)
}
fn generate_projection_expr(