This is an automated email from the ASF dual-hosted git repository.
jakevin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0f83079e33 refactor: move InlineTableScan into Analyzer. (#5683)
0f83079e33 is described below
commit 0f83079e33e3dc53ed1191c8fa284991d4e6531e
Author: jakevin <[email protected]>
AuthorDate: Fri Mar 24 10:51:31 2023 +0800
refactor: move InlineTableScan into Analyzer. (#5683)
---
.../optimizer/src/analyzer/count_wildcard_rule.rs | 7 +-
.../src/{ => analyzer}/inline_table_scan.rs | 147 +++++++++++++--------
datafusion/optimizer/src/analyzer/mod.rs | 8 +-
datafusion/optimizer/src/lib.rs | 1 -
datafusion/optimizer/src/optimizer.rs | 2 -
datafusion/optimizer/src/test/mod.rs | 16 +++
6 files changed, 117 insertions(+), 64 deletions(-)
diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
index 4b4c603bcf..2772090e02 100644
--- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
+++ b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
@@ -28,17 +28,12 @@ use crate::rewrite::TreeNodeRewritable;
/// Resolve issue: https://github.com/apache/arrow-datafusion/issues/5473.
pub struct CountWildcardRule {}
-impl Default for CountWildcardRule {
- fn default() -> Self {
- CountWildcardRule::new()
- }
-}
-
impl CountWildcardRule {
pub fn new() -> Self {
CountWildcardRule {}
}
}
+
impl AnalyzerRule for CountWildcardRule {
fn analyze(&self, plan: &LogicalPlan, _: &ConfigOptions) ->
Result<LogicalPlan> {
plan.clone().transform_down(&analyze_internal)
diff --git a/datafusion/optimizer/src/inline_table_scan.rs
b/datafusion/optimizer/src/analyzer/inline_table_scan.rs
similarity index 53%
rename from datafusion/optimizer/src/inline_table_scan.rs
rename to datafusion/optimizer/src/analyzer/inline_table_scan.rs
index 722a70cb38..8307238951 100644
--- a/datafusion/optimizer/src/inline_table_scan.rs
+++ b/datafusion/optimizer/src/analyzer/inline_table_scan.rs
@@ -15,73 +15,113 @@
// specific language governing permissions and limitations
// under the License.
-//! Optimizer rule to replace TableScan references
-//! such as DataFrames and Views and inlines the LogicalPlan
-//! to support further optimization
-use crate::optimizer::ApplyOrder;
-use crate::{OptimizerConfig, OptimizerRule};
+//! Analyzed rule to replace TableScan references
+//! such as DataFrames and Views and inlines the LogicalPlan.
+use std::sync::Arc;
+
+use datafusion_common::config::ConfigOptions;
use datafusion_common::Result;
-use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder,
TableScan};
+use datafusion_expr::expr_rewriter::rewrite_expr;
+use datafusion_expr::{
+ logical_plan::LogicalPlan, Expr, Filter, LogicalPlanBuilder, TableScan,
+};
+
+use crate::analyzer::AnalyzerRule;
+use crate::rewrite::TreeNodeRewritable;
-/// Optimization rule that inlines TableScan that provide a [LogicalPlan]
+/// Analyzed rule that inlines TableScan that provide a [LogicalPlan]
/// (DataFrame / ViewTable)
#[derive(Default)]
pub struct InlineTableScan;
impl InlineTableScan {
- #[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
-impl OptimizerRule for InlineTableScan {
- fn try_optimize(
- &self,
- plan: &LogicalPlan,
- _config: &dyn OptimizerConfig,
- ) -> Result<Option<LogicalPlan>> {
- match plan {
- // Match only on scans without filter / projection / fetch
- // Views and DataFrames won't have those added
- // during the early stage of planning
- LogicalPlan::TableScan(TableScan {
- source,
- table_name,
- filters,
- projection,
- ..
- }) if filters.is_empty() => {
- if let Some(sub_plan) = source.get_logical_plan() {
- let projection_exprs =
- generate_projection_expr(projection, sub_plan)?;
- let plan = LogicalPlanBuilder::from(sub_plan.clone())
- .project(projection_exprs)?
- // Since this This is creating a subquery like:
- //```sql
- // ...
- // FROM <view definition> as "table_name"
- // ```
- //
- // it doesn't make sense to have a qualified
- // reference (e.g. "foo"."bar") -- this convert to
- // string
- .alias(table_name.to_string())?;
- Ok(Some(plan.build()?))
- } else {
- Ok(None)
- }
- }
- _ => Ok(None),
- }
+impl AnalyzerRule for InlineTableScan {
+ fn analyze(&self, plan: &LogicalPlan, _: &ConfigOptions) ->
Result<LogicalPlan> {
+ plan.clone().transform_up(&analyze_internal)
}
fn name(&self) -> &str {
"inline_table_scan"
}
+}
- fn apply_order(&self) -> Option<ApplyOrder> {
- Some(ApplyOrder::BottomUp)
+fn analyze_internal(plan: LogicalPlan) -> Result<Option<LogicalPlan>> {
+ match plan {
+ // Match only on scans without filter / projection / fetch
+ // Views and DataFrames won't have those added
+ // during the early stage of planning
+ LogicalPlan::TableScan(TableScan {
+ source,
+ table_name,
+ filters,
+ projection,
+ ..
+ }) if filters.is_empty() => {
+ if let Some(sub_plan) = source.get_logical_plan() {
+ let projection_exprs = generate_projection_expr(&projection,
sub_plan)?;
+ let plan = LogicalPlanBuilder::from(sub_plan.clone())
+ .project(projection_exprs)?
+ // Since this This is creating a subquery like:
+ //```sql
+ // ...
+ // FROM <view definition> as "table_name"
+ // ```
+ //
+ // it doesn't make sense to have a qualified
+ // reference (e.g. "foo"."bar") -- this convert to
+ // string
+ .alias(table_name.to_string())?
+ .build()?;
+ Ok(Some(plan))
+ } else {
+ Ok(None)
+ }
+ }
+ LogicalPlan::Filter(filter) => {
+ let new_expr = rewrite_expr(filter.predicate.clone(),
rewrite_subquery)?;
+ Ok(Some(LogicalPlan::Filter(Filter::try_new(
+ new_expr,
+ filter.input,
+ )?)))
+ }
+ _ => Ok(None),
+ }
+}
+
+fn rewrite_subquery(expr: Expr) -> Result<Expr> {
+ match expr {
+ Expr::Exists { subquery, negated } => {
+ let plan = subquery.subquery.as_ref().clone();
+ let new_plan = plan.transform_up(&analyze_internal)?;
+ let subquery = subquery.with_plan(Arc::new(new_plan));
+ Ok(Expr::Exists { subquery, negated })
+ }
+ Expr::InSubquery {
+ expr,
+ subquery,
+ negated,
+ } => {
+ let plan = subquery.subquery.as_ref().clone();
+ let new_plan = plan.transform_up(&analyze_internal)?;
+ let subquery = subquery.with_plan(Arc::new(new_plan));
+ Ok(Expr::InSubquery {
+ expr,
+ subquery,
+ negated,
+ })
+ }
+ Expr::ScalarSubquery(subquery) => {
+ let plan = subquery.subquery.as_ref().clone();
+ let new_plan = plan.transform_up(&analyze_internal)?;
+ let subquery = subquery.with_plan(Arc::new(new_plan));
+ Ok(Expr::ScalarSubquery(subquery))
+ }
+ _ => Ok(expr),
}
}
@@ -107,10 +147,11 @@ mod tests {
use std::{sync::Arc, vec};
use arrow::datatypes::{DataType, Field, Schema};
+
use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder,
TableSource};
- use crate::inline_table_scan::InlineTableScan;
- use crate::test::assert_optimized_plan_eq;
+ use crate::analyzer::inline_table_scan::InlineTableScan;
+ use crate::test::assert_analyzed_plan_eq;
pub struct RawTableSource {}
@@ -185,7 +226,7 @@ mod tests {
\n Projection: y.a, y.b\
\n TableScan: y";
- assert_optimized_plan_eq(Arc::new(InlineTableScan::new()), &plan,
expected)
+ assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), &plan,
expected)
}
#[test]
@@ -201,6 +242,6 @@ mod tests {
\n Projection: y.a\
\n TableScan: y";
- assert_optimized_plan_eq(Arc::new(InlineTableScan::new()), &plan,
expected)
+ assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), &plan,
expected)
}
}
diff --git a/datafusion/optimizer/src/analyzer/mod.rs
b/datafusion/optimizer/src/analyzer/mod.rs
index 0982198bb8..e4b3d22cde 100644
--- a/datafusion/optimizer/src/analyzer/mod.rs
+++ b/datafusion/optimizer/src/analyzer/mod.rs
@@ -16,8 +16,10 @@
// under the License.
mod count_wildcard_rule;
+mod inline_table_scan;
use crate::analyzer::count_wildcard_rule::CountWildcardRule;
+use crate::analyzer::inline_table_scan::InlineTableScan;
use crate::rewrite::TreeNodeRewritable;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{DataFusionError, Result};
@@ -52,8 +54,10 @@ impl Default for Analyzer {
impl Analyzer {
/// Create a new analyzer using the recommended list of rules
pub fn new() -> Self {
- let rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>> =
- vec![Arc::new(CountWildcardRule::new())];
+ let rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>> = vec![
+ Arc::new(CountWildcardRule::new()),
+ Arc::new(InlineTableScan::new()),
+ ];
Self::with_rules(rules)
}
diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs
index 4be7bb3706..fa43f685f3 100644
--- a/datafusion/optimizer/src/lib.rs
+++ b/datafusion/optimizer/src/lib.rs
@@ -28,7 +28,6 @@ pub mod eliminate_outer_join;
pub mod eliminate_project;
pub mod extract_equijoin_predicate;
pub mod filter_null_join_keys;
-pub mod inline_table_scan;
pub mod merge_projection;
pub mod optimizer;
pub mod propagate_empty_relation;
diff --git a/datafusion/optimizer/src/optimizer.rs
b/datafusion/optimizer/src/optimizer.rs
index 35557b1252..bda4c47d11 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -29,7 +29,6 @@ use crate::eliminate_outer_join::EliminateOuterJoin;
use crate::eliminate_project::EliminateProjection;
use crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
use crate::filter_null_join_keys::FilterNullJoinKeys;
-use crate::inline_table_scan::InlineTableScan;
use crate::merge_projection::MergeProjection;
use crate::plan_signature::LogicalPlanSignature;
use crate::propagate_empty_relation::PropagateEmptyRelation;
@@ -210,7 +209,6 @@ impl Optimizer {
/// Create a new optimizer using the recommended list of rules
pub fn new() -> Self {
let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
- Arc::new(InlineTableScan::new()),
Arc::new(TypeCoercion::new()),
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
diff --git a/datafusion/optimizer/src/test/mod.rs
b/datafusion/optimizer/src/test/mod.rs
index cec21da313..439f44151e 100644
--- a/datafusion/optimizer/src/test/mod.rs
+++ b/datafusion/optimizer/src/test/mod.rs
@@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+use crate::analyzer::{Analyzer, AnalyzerRule};
use crate::optimizer::Optimizer;
use crate::{OptimizerContext, OptimizerRule};
use arrow::datatypes::{DataType, Field, Schema};
+use datafusion_common::config::ConfigOptions;
use datafusion_common::Result;
use datafusion_expr::{col, logical_plan::table_scan, LogicalPlan,
LogicalPlanBuilder};
use std::sync::Arc;
@@ -106,6 +108,20 @@ pub fn get_tpch_table_schema(table: &str) -> Schema {
}
}
+pub fn assert_analyzed_plan_eq(
+ rule: Arc<dyn AnalyzerRule + Send + Sync>,
+ plan: &LogicalPlan,
+ expected: &str,
+) -> Result<()> {
+ let options = ConfigOptions::default();
+ let analyzed_plan =
+ Analyzer::with_rules(vec![rule]).execute_and_check(plan, &options)?;
+ let formatted_plan = format!("{analyzed_plan:?}");
+ assert_eq!(formatted_plan, expected);
+
+ Ok(())
+}
+
pub fn assert_optimized_plan_eq(
rule: Arc<dyn OptimizerRule + Send + Sync>,
plan: &LogicalPlan,