This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 74a778ca60 feat: allow the customization of analyzer rules (#5963)
74a778ca60 is described below

commit 74a778ca6016a853a3c3add3fa8c6f12f4fe4561
Author: Ruihang Xia <[email protected]>
AuthorDate: Fri Apr 14 02:05:12 2023 +0800

    feat: allow the customization of analyzer rules (#5963)
    
    * feat: allow the customization of analyzer rules
    
    Signed-off-by: Ruihang Xia <[email protected]>
    
    * move analyzer out of optimizer
    
    Signed-off-by: Ruihang Xia <[email protected]>
    
    * add example analyzer rule
    
    Signed-off-by: Ruihang Xia <[email protected]>
    
    * Update datafusion/optimizer/src/optimizer.rs
    
    Co-authored-by: jakevin <[email protected]>
    
    * apply CR sugg.
    
    Signed-off-by: Ruihang Xia <[email protected]>
    
    ---------
    
    Signed-off-by: Ruihang Xia <[email protected]>
    Co-authored-by: jakevin <[email protected]>
---
 datafusion-examples/examples/rewrite_expr.rs       | 69 +++++++++++++++++++---
 datafusion/core/src/execution/context.rs           | 34 ++++++++++-
 .../optimizer/src/analyzer/count_wildcard_rule.rs  |  1 +
 datafusion/optimizer/src/analyzer/mod.rs           |  6 +-
 datafusion/optimizer/src/optimizer.rs              |  6 +-
 datafusion/optimizer/tests/integration-test.rs     |  7 ++-
 6 files changed, 104 insertions(+), 19 deletions(-)

diff --git a/datafusion-examples/examples/rewrite_expr.rs 
b/datafusion-examples/examples/rewrite_expr.rs
index 7a752e5c00..451205e4cb 100644
--- a/datafusion-examples/examples/rewrite_expr.rs
+++ b/datafusion-examples/examples/rewrite_expr.rs
@@ -18,10 +18,11 @@
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::{Transformed, TreeNode};
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
 use datafusion_expr::{
     AggregateUDF, Between, Expr, Filter, LogicalPlan, ScalarUDF, TableSource,
 };
+use datafusion_optimizer::analyzer::{Analyzer, AnalyzerRule};
 use datafusion_optimizer::optimizer::Optimizer;
 use datafusion_optimizer::{utils, OptimizerConfig, OptimizerContext, 
OptimizerRule};
 use datafusion_sql::planner::{ContextProvider, SqlToRel};
@@ -46,10 +47,18 @@ pub fn main() -> Result<()> {
         logical_plan.display_indent()
     );
 
-    // now run the optimizer with our custom rule
-    let optimizer = Optimizer::with_rules(vec![Arc::new(MyRule {})]);
+    // run the analyzer with our custom rule
     let config = OptimizerContext::default().with_skip_failing_rules(false);
-    let optimized_plan = optimizer.optimize(&logical_plan, &config, observe)?;
+    let analyzer = Analyzer::with_rules(vec![Arc::new(MyAnalyzerRule {})]);
+    let analyzed_plan = analyzer.execute_and_check(&logical_plan, 
config.options())?;
+    println!(
+        "Analyzed Logical Plan:\n\n{}\n",
+        analyzed_plan.display_indent()
+    );
+
+    // then run the optimizer with our custom rule
+    let optimizer = Optimizer::with_rules(vec![Arc::new(MyOptimizerRule {})]);
+    let optimized_plan = optimizer.optimize(&analyzed_plan, &config, observe)?;
     println!(
         "Optimized Logical Plan:\n\n{}\n",
         optimized_plan.display_indent()
@@ -66,11 +75,57 @@ fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) {
     )
 }
 
-struct MyRule {}
+/// An example analyzer rule that changes Int64 literals to UInt64
+struct MyAnalyzerRule {}
+
+impl AnalyzerRule for MyAnalyzerRule {
+    fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> 
Result<LogicalPlan> {
+        Self::analyze_plan(plan)
+    }
+
+    fn name(&self) -> &str {
+        "my_analyzer_rule"
+    }
+}
+
+impl MyAnalyzerRule {
+    fn analyze_plan(plan: LogicalPlan) -> Result<LogicalPlan> {
+        plan.transform(&|plan| {
+            Ok(match plan {
+                LogicalPlan::Filter(filter) => {
+                    let predicate = 
Self::analyze_expr(filter.predicate.clone())?;
+                    Transformed::Yes(LogicalPlan::Filter(Filter::try_new(
+                        predicate,
+                        filter.input,
+                    )?))
+                }
+                _ => Transformed::No(plan),
+            })
+        })
+    }
+
+    fn analyze_expr(expr: Expr) -> Result<Expr> {
+        expr.transform(&|expr| {
+            // closure is invoked for all sub expressions
+            Ok(match expr {
+                Expr::Literal(ScalarValue::Int64(i)) => {
+                    // transform to UInt64
+                    Transformed::Yes(Expr::Literal(ScalarValue::UInt64(
+                        i.map(|i| i as u64),
+                    )))
+                }
+                _ => Transformed::No(expr),
+            })
+        })
+    }
+}
+
+/// An example optimizer rule that rewrite BETWEEN expression to binary 
compare expressions
+struct MyOptimizerRule {}
 
-impl OptimizerRule for MyRule {
+impl OptimizerRule for MyOptimizerRule {
     fn name(&self) -> &str {
-        "my_rule"
+        "my_optimizer_rule"
     }
 
     fn try_optimize(
diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index 41076b69de..7c2255eaf3 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -105,7 +105,10 @@ use 
crate::physical_optimizer::global_sort_selection::GlobalSortSelection;
 use crate::physical_optimizer::pipeline_checker::PipelineChecker;
 use crate::physical_optimizer::pipeline_fixer::PipelineFixer;
 use crate::physical_optimizer::sort_enforcement::EnforceSorting;
-use datafusion_optimizer::OptimizerConfig;
+use datafusion_optimizer::{
+    analyzer::{Analyzer, AnalyzerRule},
+    OptimizerConfig,
+};
 use datafusion_sql::planner::object_name_to_table_reference;
 use uuid::Uuid;
 
@@ -1204,6 +1207,8 @@ impl QueryPlanner for DefaultQueryPlanner {
 pub struct SessionState {
     /// UUID for the session
     session_id: String,
+    /// Responsible for analyzing and rewrite a logical plan before 
optimization
+    analyzer: Analyzer,
     /// Responsible for optimizing a logical plan
     optimizer: Optimizer,
     /// Responsible for optimizing a physical execution plan
@@ -1345,6 +1350,7 @@ impl SessionState {
 
         SessionState {
             session_id,
+            analyzer: Analyzer::new(),
             optimizer: Optimizer::new(),
             physical_optimizers,
             query_planner: Arc::new(DefaultQueryPlanner {}),
@@ -1457,6 +1463,15 @@ impl SessionState {
         self
     }
 
+    /// Replace the analyzer rules
+    pub fn with_analyzer_rules(
+        mut self,
+        rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
+    ) -> Self {
+        self.analyzer = Analyzer::with_rules(rules);
+        self
+    }
+
     /// Replace the optimizer rules
     pub fn with_optimizer_rules(
         mut self,
@@ -1475,6 +1490,15 @@ impl SessionState {
         self
     }
 
+    /// Adds a new [`AnalyzerRule`]
+    pub fn add_analyzer_rule(
+        mut self,
+        analyzer_rule: Arc<dyn AnalyzerRule + Send + Sync>,
+    ) -> Self {
+        self.analyzer.rules.push(analyzer_rule);
+        self
+    }
+
     /// Adds a new [`OptimizerRule`]
     pub fn add_optimizer_rule(
         mut self,
@@ -1651,9 +1675,12 @@ impl SessionState {
         if let LogicalPlan::Explain(e) = plan {
             let mut stringified_plans = e.stringified_plans.clone();
 
+            let analyzed_plan = self
+                .analyzer
+                .execute_and_check(e.plan.as_ref(), self.options())?;
             // optimize the child plan, capturing the output of each optimizer
             let (plan, logical_optimization_succeeded) = match 
self.optimizer.optimize(
-                e.plan.as_ref(),
+                &analyzed_plan,
                 self,
                 |optimized_plan, optimizer| {
                     let optimizer_name = optimizer.name().to_string();
@@ -1679,7 +1706,8 @@ impl SessionState {
                 logical_optimization_succeeded,
             }))
         } else {
-            self.optimizer.optimize(plan, self, |_, _| {})
+            let analyzed_plan = self.analyzer.execute_and_check(plan, 
self.options())?;
+            self.optimizer.optimize(&analyzed_plan, self, |_, _| {})
         }
     }
 
diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs 
b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
index ba19108ceb..ecd00d7ac1 100644
--- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
+++ b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
@@ -26,6 +26,7 @@ use crate::analyzer::AnalyzerRule;
 
 /// Rewrite `Count(Expr:Wildcard)` to `Count(Expr:Literal)`.
 /// Resolve issue: https://github.com/apache/arrow-datafusion/issues/5473.
+#[derive(Default)]
 pub struct CountWildcardRule {}
 
 impl CountWildcardRule {
diff --git a/datafusion/optimizer/src/analyzer/mod.rs 
b/datafusion/optimizer/src/analyzer/mod.rs
index bb9b01c859..b5a29a2876 100644
--- a/datafusion/optimizer/src/analyzer/mod.rs
+++ b/datafusion/optimizer/src/analyzer/mod.rs
@@ -15,9 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod count_wildcard_rule;
-mod inline_table_scan;
-pub(crate) mod type_coercion;
+pub mod count_wildcard_rule;
+pub mod inline_table_scan;
+pub mod type_coercion;
 
 use crate::analyzer::count_wildcard_rule::CountWildcardRule;
 use crate::analyzer::inline_table_scan::InlineTableScan;
diff --git a/datafusion/optimizer/src/optimizer.rs 
b/datafusion/optimizer/src/optimizer.rs
index 2d3c1e816f..eb0b971f82 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -17,7 +17,6 @@
 
 //! Query optimizer traits
 
-use crate::analyzer::Analyzer;
 use crate::common_subexpr_eliminate::CommonSubexprEliminate;
 use crate::decorrelate_where_exists::DecorrelateWhereExists;
 use crate::decorrelate_where_in::DecorrelateWhereIn;
@@ -156,7 +155,7 @@ impl OptimizerConfig for OptimizerContext {
 /// A rule-based optimizer.
 #[derive(Clone)]
 pub struct Optimizer {
-    /// All rules to apply
+    /// All optimizer rules to apply
     pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
 }
 
@@ -264,8 +263,7 @@ impl Optimizer {
         F: FnMut(&LogicalPlan, &dyn OptimizerRule),
     {
         let options = config.options();
-        // execute_and_check has it's own timer
-        let mut new_plan = Analyzer::default().execute_and_check(plan, 
options)?;
+        let mut new_plan = plan.clone();
 
         let start_time = Instant::now();
 
diff --git a/datafusion/optimizer/tests/integration-test.rs 
b/datafusion/optimizer/tests/integration-test.rs
index 0b9134c8b8..e58a2aaa00 100644
--- a/datafusion/optimizer/tests/integration-test.rs
+++ b/datafusion/optimizer/tests/integration-test.rs
@@ -20,8 +20,9 @@ use chrono::{DateTime, NaiveDateTime, Utc};
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource};
+use datafusion_optimizer::analyzer::Analyzer;
 use datafusion_optimizer::optimizer::Optimizer;
-use datafusion_optimizer::{OptimizerContext, OptimizerRule};
+use datafusion_optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
 use datafusion_sql::planner::{ContextProvider, SqlToRel};
 use datafusion_sql::sqlparser::ast::Statement;
 use datafusion_sql::sqlparser::dialect::GenericDialect;
@@ -347,8 +348,10 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
     let config = OptimizerContext::new()
         .with_skip_failing_rules(false)
         .with_query_execution_start_time(now_time);
+    let analyzer = Analyzer::new();
     let optimizer = Optimizer::new();
-    // optimize the logical plan
+    // analyze and optimize the logical plan
+    let plan = analyzer.execute_and_check(&plan, config.options())?;
     optimizer.optimize(&plan, &config, &observe)
 }
 

Reply via email to