This is an automated email from the ASF dual-hosted git repository.

jakevin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f83079e33 refactor: move InlineTableScan into Analyzer. (#5683)
0f83079e33 is described below

commit 0f83079e33e3dc53ed1191c8fa284991d4e6531e
Author: jakevin <[email protected]>
AuthorDate: Fri Mar 24 10:51:31 2023 +0800

    refactor: move InlineTableScan into Analyzer. (#5683)
---
 .../optimizer/src/analyzer/count_wildcard_rule.rs  |   7 +-
 .../src/{ => analyzer}/inline_table_scan.rs        | 147 +++++++++++++--------
 datafusion/optimizer/src/analyzer/mod.rs           |   8 +-
 datafusion/optimizer/src/lib.rs                    |   1 -
 datafusion/optimizer/src/optimizer.rs              |   2 -
 datafusion/optimizer/src/test/mod.rs               |  16 +++
 6 files changed, 117 insertions(+), 64 deletions(-)

diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs 
b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
index 4b4c603bcf..2772090e02 100644
--- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
+++ b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
@@ -28,17 +28,12 @@ use crate::rewrite::TreeNodeRewritable;
 /// Resolve issue: https://github.com/apache/arrow-datafusion/issues/5473.
 pub struct CountWildcardRule {}
 
-impl Default for CountWildcardRule {
-    fn default() -> Self {
-        CountWildcardRule::new()
-    }
-}
-
 impl CountWildcardRule {
     pub fn new() -> Self {
         CountWildcardRule {}
     }
 }
+
 impl AnalyzerRule for CountWildcardRule {
     fn analyze(&self, plan: &LogicalPlan, _: &ConfigOptions) -> 
Result<LogicalPlan> {
         plan.clone().transform_down(&analyze_internal)
diff --git a/datafusion/optimizer/src/inline_table_scan.rs 
b/datafusion/optimizer/src/analyzer/inline_table_scan.rs
similarity index 53%
rename from datafusion/optimizer/src/inline_table_scan.rs
rename to datafusion/optimizer/src/analyzer/inline_table_scan.rs
index 722a70cb38..8307238951 100644
--- a/datafusion/optimizer/src/inline_table_scan.rs
+++ b/datafusion/optimizer/src/analyzer/inline_table_scan.rs
@@ -15,73 +15,113 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Optimizer rule to replace TableScan references
-//! such as DataFrames and Views and inlines the LogicalPlan
-//! to support further optimization
-use crate::optimizer::ApplyOrder;
-use crate::{OptimizerConfig, OptimizerRule};
+//! Analyzed rule to replace TableScan references
+//! such as DataFrames and Views and inlines the LogicalPlan.
+use std::sync::Arc;
+
+use datafusion_common::config::ConfigOptions;
 use datafusion_common::Result;
-use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder, 
TableScan};
+use datafusion_expr::expr_rewriter::rewrite_expr;
+use datafusion_expr::{
+    logical_plan::LogicalPlan, Expr, Filter, LogicalPlanBuilder, TableScan,
+};
+
+use crate::analyzer::AnalyzerRule;
+use crate::rewrite::TreeNodeRewritable;
 
-/// Optimization rule that inlines TableScan that provide a [LogicalPlan]
+/// Analyzed rule that inlines TableScan that provide a [LogicalPlan]
 /// (DataFrame / ViewTable)
 #[derive(Default)]
 pub struct InlineTableScan;
 
 impl InlineTableScan {
-    #[allow(missing_docs)]
     pub fn new() -> Self {
         Self {}
     }
 }
 
-impl OptimizerRule for InlineTableScan {
-    fn try_optimize(
-        &self,
-        plan: &LogicalPlan,
-        _config: &dyn OptimizerConfig,
-    ) -> Result<Option<LogicalPlan>> {
-        match plan {
-            // Match only on scans without filter / projection / fetch
-            // Views and DataFrames won't have those added
-            // during the early stage of planning
-            LogicalPlan::TableScan(TableScan {
-                source,
-                table_name,
-                filters,
-                projection,
-                ..
-            }) if filters.is_empty() => {
-                if let Some(sub_plan) = source.get_logical_plan() {
-                    let projection_exprs =
-                        generate_projection_expr(projection, sub_plan)?;
-                    let plan = LogicalPlanBuilder::from(sub_plan.clone())
-                        .project(projection_exprs)?
-                        // Since this This is creating a subquery like:
-                        //```sql
-                        // ...
-                        // FROM <view definition> as "table_name"
-                        // ```
-                        //
-                        // it doesn't make sense to have a qualified
-                        // reference (e.g. "foo"."bar") -- this convert to
-                        // string
-                        .alias(table_name.to_string())?;
-                    Ok(Some(plan.build()?))
-                } else {
-                    Ok(None)
-                }
-            }
-            _ => Ok(None),
-        }
+impl AnalyzerRule for InlineTableScan {
+    fn analyze(&self, plan: &LogicalPlan, _: &ConfigOptions) -> 
Result<LogicalPlan> {
+        plan.clone().transform_up(&analyze_internal)
     }
 
     fn name(&self) -> &str {
         "inline_table_scan"
     }
+}
 
-    fn apply_order(&self) -> Option<ApplyOrder> {
-        Some(ApplyOrder::BottomUp)
+fn analyze_internal(plan: LogicalPlan) -> Result<Option<LogicalPlan>> {
+    match plan {
+        // Match only on scans without filter / projection / fetch
+        // Views and DataFrames won't have those added
+        // during the early stage of planning
+        LogicalPlan::TableScan(TableScan {
+            source,
+            table_name,
+            filters,
+            projection,
+            ..
+        }) if filters.is_empty() => {
+            if let Some(sub_plan) = source.get_logical_plan() {
+                let projection_exprs = generate_projection_expr(&projection, 
sub_plan)?;
+                let plan = LogicalPlanBuilder::from(sub_plan.clone())
+                    .project(projection_exprs)?
+                    // Since this This is creating a subquery like:
+                    //```sql
+                    // ...
+                    // FROM <view definition> as "table_name"
+                    // ```
+                    //
+                    // it doesn't make sense to have a qualified
+                    // reference (e.g. "foo"."bar") -- this convert to
+                    // string
+                    .alias(table_name.to_string())?
+                    .build()?;
+                Ok(Some(plan))
+            } else {
+                Ok(None)
+            }
+        }
+        LogicalPlan::Filter(filter) => {
+            let new_expr = rewrite_expr(filter.predicate.clone(), 
rewrite_subquery)?;
+            Ok(Some(LogicalPlan::Filter(Filter::try_new(
+                new_expr,
+                filter.input,
+            )?)))
+        }
+        _ => Ok(None),
+    }
+}
+
+fn rewrite_subquery(expr: Expr) -> Result<Expr> {
+    match expr {
+        Expr::Exists { subquery, negated } => {
+            let plan = subquery.subquery.as_ref().clone();
+            let new_plan = plan.transform_up(&analyze_internal)?;
+            let subquery = subquery.with_plan(Arc::new(new_plan));
+            Ok(Expr::Exists { subquery, negated })
+        }
+        Expr::InSubquery {
+            expr,
+            subquery,
+            negated,
+        } => {
+            let plan = subquery.subquery.as_ref().clone();
+            let new_plan = plan.transform_up(&analyze_internal)?;
+            let subquery = subquery.with_plan(Arc::new(new_plan));
+            Ok(Expr::InSubquery {
+                expr,
+                subquery,
+                negated,
+            })
+        }
+        Expr::ScalarSubquery(subquery) => {
+            let plan = subquery.subquery.as_ref().clone();
+            let new_plan = plan.transform_up(&analyze_internal)?;
+            let subquery = subquery.with_plan(Arc::new(new_plan));
+            Ok(Expr::ScalarSubquery(subquery))
+        }
+        _ => Ok(expr),
     }
 }
 
@@ -107,10 +147,11 @@ mod tests {
     use std::{sync::Arc, vec};
 
     use arrow::datatypes::{DataType, Field, Schema};
+
     use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, 
TableSource};
 
-    use crate::inline_table_scan::InlineTableScan;
-    use crate::test::assert_optimized_plan_eq;
+    use crate::analyzer::inline_table_scan::InlineTableScan;
+    use crate::test::assert_analyzed_plan_eq;
 
     pub struct RawTableSource {}
 
@@ -185,7 +226,7 @@ mod tests {
         \n    Projection: y.a, y.b\
         \n      TableScan: y";
 
-        assert_optimized_plan_eq(Arc::new(InlineTableScan::new()), &plan, 
expected)
+        assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), &plan, 
expected)
     }
 
     #[test]
@@ -201,6 +242,6 @@ mod tests {
         \n  Projection: y.a\
         \n    TableScan: y";
 
-        assert_optimized_plan_eq(Arc::new(InlineTableScan::new()), &plan, 
expected)
+        assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), &plan, 
expected)
     }
 }
diff --git a/datafusion/optimizer/src/analyzer/mod.rs 
b/datafusion/optimizer/src/analyzer/mod.rs
index 0982198bb8..e4b3d22cde 100644
--- a/datafusion/optimizer/src/analyzer/mod.rs
+++ b/datafusion/optimizer/src/analyzer/mod.rs
@@ -16,8 +16,10 @@
 // under the License.
 
 mod count_wildcard_rule;
+mod inline_table_scan;
 
 use crate::analyzer::count_wildcard_rule::CountWildcardRule;
+use crate::analyzer::inline_table_scan::InlineTableScan;
 use crate::rewrite::TreeNodeRewritable;
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::{DataFusionError, Result};
@@ -52,8 +54,10 @@ impl Default for Analyzer {
 impl Analyzer {
     /// Create a new analyzer using the recommended list of rules
     pub fn new() -> Self {
-        let rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>> =
-            vec![Arc::new(CountWildcardRule::new())];
+        let rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>> = vec![
+            Arc::new(CountWildcardRule::new()),
+            Arc::new(InlineTableScan::new()),
+        ];
         Self::with_rules(rules)
     }
 
diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs
index 4be7bb3706..fa43f685f3 100644
--- a/datafusion/optimizer/src/lib.rs
+++ b/datafusion/optimizer/src/lib.rs
@@ -28,7 +28,6 @@ pub mod eliminate_outer_join;
 pub mod eliminate_project;
 pub mod extract_equijoin_predicate;
 pub mod filter_null_join_keys;
-pub mod inline_table_scan;
 pub mod merge_projection;
 pub mod optimizer;
 pub mod propagate_empty_relation;
diff --git a/datafusion/optimizer/src/optimizer.rs 
b/datafusion/optimizer/src/optimizer.rs
index 35557b1252..bda4c47d11 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -29,7 +29,6 @@ use crate::eliminate_outer_join::EliminateOuterJoin;
 use crate::eliminate_project::EliminateProjection;
 use crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
 use crate::filter_null_join_keys::FilterNullJoinKeys;
-use crate::inline_table_scan::InlineTableScan;
 use crate::merge_projection::MergeProjection;
 use crate::plan_signature::LogicalPlanSignature;
 use crate::propagate_empty_relation::PropagateEmptyRelation;
@@ -210,7 +209,6 @@ impl Optimizer {
     /// Create a new optimizer using the recommended list of rules
     pub fn new() -> Self {
         let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
-            Arc::new(InlineTableScan::new()),
             Arc::new(TypeCoercion::new()),
             Arc::new(SimplifyExpressions::new()),
             Arc::new(UnwrapCastInComparison::new()),
diff --git a/datafusion/optimizer/src/test/mod.rs 
b/datafusion/optimizer/src/test/mod.rs
index cec21da313..439f44151e 100644
--- a/datafusion/optimizer/src/test/mod.rs
+++ b/datafusion/optimizer/src/test/mod.rs
@@ -15,9 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::analyzer::{Analyzer, AnalyzerRule};
 use crate::optimizer::Optimizer;
 use crate::{OptimizerContext, OptimizerRule};
 use arrow::datatypes::{DataType, Field, Schema};
+use datafusion_common::config::ConfigOptions;
 use datafusion_common::Result;
 use datafusion_expr::{col, logical_plan::table_scan, LogicalPlan, 
LogicalPlanBuilder};
 use std::sync::Arc;
@@ -106,6 +108,20 @@ pub fn get_tpch_table_schema(table: &str) -> Schema {
     }
 }
 
+pub fn assert_analyzed_plan_eq(
+    rule: Arc<dyn AnalyzerRule + Send + Sync>,
+    plan: &LogicalPlan,
+    expected: &str,
+) -> Result<()> {
+    let options = ConfigOptions::default();
+    let analyzed_plan =
+        Analyzer::with_rules(vec![rule]).execute_and_check(plan, &options)?;
+    let formatted_plan = format!("{analyzed_plan:?}");
+    assert_eq!(formatted_plan, expected);
+
+    Ok(())
+}
+
 pub fn assert_optimized_plan_eq(
     rule: Arc<dyn OptimizerRule + Send + Sync>,
     plan: &LogicalPlan,

Reply via email to