This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new ca8985ef1 Make OptimizerConfig a trait (#4631) (#4638) (#4645)
ca8985ef1 is described below

commit ca8985ef17739c2d342ac987b3de30a29a5fc76d
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Dec 16 18:21:53 2022 +0000

    Make OptimizerConfig a trait (#4631) (#4638) (#4645)
    
    * Make OptimizerConfig a trait (#4631) (#4638)
    
    * Format
    
    * Review feedback
---
 datafusion-examples/examples/rewrite_expr.rs       |  11 +-
 datafusion/core/src/execution/context.rs           |  58 ++++---
 datafusion/core/tests/user_defined_plan.rs         |   6 +-
 datafusion/optimizer/README.md                     |   6 +-
 .../optimizer/src/common_subexpr_eliminate.rs      |  60 ++++----
 .../optimizer/src/decorrelate_where_exists.rs      |  16 +-
 datafusion/optimizer/src/decorrelate_where_in.rs   |  32 ++--
 datafusion/optimizer/src/eliminate_cross_join.rs   |  33 ++--
 datafusion/optimizer/src/eliminate_filter.rs       |  22 +--
 datafusion/optimizer/src/eliminate_limit.rs        |  32 ++--
 datafusion/optimizer/src/eliminate_outer_join.rs   |  23 +--
 datafusion/optimizer/src/filter_null_join_keys.rs  |  36 +++--
 datafusion/optimizer/src/inline_table_scan.rs      |  16 +-
 datafusion/optimizer/src/lib.rs                    |   2 +-
 datafusion/optimizer/src/optimizer.rs              | 166 +++++++++++++--------
 .../optimizer/src/propagate_empty_relation.rs      |  12 +-
 datafusion/optimizer/src/push_down_filter.rs       |  33 ++--
 datafusion/optimizer/src/push_down_limit.rs        |  29 +---
 datafusion/optimizer/src/push_down_projection.rs   |  38 ++---
 .../optimizer/src/rewrite_disjunctive_predicate.rs |  10 +-
 .../optimizer/src/scalar_subquery_to_join.rs       |  29 ++--
 .../src/simplify_expressions/simplify_exprs.rs     |  18 +--
 .../optimizer/src/single_distinct_to_groupby.rs    |  19 +--
 .../optimizer/src/subquery_filter_to_join.rs       |  15 +-
 datafusion/optimizer/src/test/mod.rs               |   8 +-
 datafusion/optimizer/src/type_coercion.rs          |  24 +--
 .../optimizer/src/unwrap_cast_in_comparison.rs     |   4 +-
 datafusion/optimizer/src/utils.rs                  |   4 +-
 datafusion/optimizer/tests/integration-test.rs     |   8 +-
 29 files changed, 359 insertions(+), 411 deletions(-)

diff --git a/datafusion-examples/examples/rewrite_expr.rs 
b/datafusion-examples/examples/rewrite_expr.rs
index 216a6932c..a0be8c196 100644
--- a/datafusion-examples/examples/rewrite_expr.rs
+++ b/datafusion-examples/examples/rewrite_expr.rs
@@ -22,7 +22,7 @@ use datafusion_expr::{
     AggregateUDF, Between, Expr, Filter, LogicalPlan, ScalarUDF, TableSource,
 };
 use datafusion_optimizer::optimizer::Optimizer;
-use datafusion_optimizer::{utils, OptimizerConfig, OptimizerRule};
+use datafusion_optimizer::{utils, OptimizerConfig, OptimizerContext, 
OptimizerRule};
 use datafusion_sql::planner::{ContextProvider, SqlToRel};
 use datafusion_sql::sqlparser::dialect::PostgreSqlDialect;
 use datafusion_sql::sqlparser::parser::Parser;
@@ -47,9 +47,8 @@ pub fn main() -> Result<()> {
 
     // now run the optimizer with our custom rule
     let optimizer = Optimizer::with_rules(vec![Arc::new(MyRule {})]);
-    let mut optimizer_config = 
OptimizerConfig::default().with_skip_failing_rules(false);
-    let optimized_plan =
-        optimizer.optimize(&logical_plan, &mut optimizer_config, observe)?;
+    let config = OptimizerContext::default().with_skip_failing_rules(false);
+    let optimized_plan = optimizer.optimize(&logical_plan, &config, observe)?;
     println!(
         "Optimized Logical Plan:\n\n{}\n",
         optimized_plan.display_indent()
@@ -76,10 +75,10 @@ impl OptimizerRule for MyRule {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        _config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         // recurse down and optimize children first
-        let plan = utils::optimize_children(self, plan, _config)?;
+        let plan = utils::optimize_children(self, plan, config)?;
 
         match plan {
             LogicalPlan::Filter(filter) => {
diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index f98e21dd1..16f2a29f3 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -68,7 +68,7 @@ use crate::logical_expr::{
     CreateView, DropTable, DropView, Explain, LogicalPlan, LogicalPlanBuilder,
     SetVariable, TableSource, TableType, UNNAMED_TABLE,
 };
-use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule};
+use crate::optimizer::{OptimizerContext, OptimizerRule};
 use datafusion_sql::{ResolvedTableReference, TableReference};
 
 use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
@@ -1557,14 +1557,6 @@ impl SessionState {
                 .register_catalog(config.default_catalog.clone(), 
default_catalog);
         }
 
-        let optimizer_config = OptimizerConfig::new().filter_null_keys(
-            config
-                .config_options
-                .read()
-                .get_bool(OPT_FILTER_NULL_JOIN_KEYS)
-                .unwrap_or_default(),
-        );
-
         let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync 
+ Send>> = vec![
             Arc::new(AggregateStatistics::new()),
             Arc::new(JoinSelection::new()),
@@ -1593,7 +1585,7 @@ impl SessionState {
 
         SessionState {
             session_id,
-            optimizer: Optimizer::new(&optimizer_config),
+            optimizer: Optimizer::new(),
             physical_optimizers,
             query_planner: Arc::new(DefaultQueryPlanner {}),
             catalog_list,
@@ -1741,24 +1733,29 @@ impl SessionState {
 
     /// Optimizes the logical plan by applying optimizer rules.
     pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
-        let mut optimizer_config = OptimizerConfig::new()
-            .with_skip_failing_rules(
-                self.config
-                    .config_options
-                    .read()
-                    .get_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES)
-                    .unwrap_or_default(),
-            )
-            .with_max_passes(
-                self.config
-                    .config_options
-                    .read()
-                    .get_u64(OPT_OPTIMIZER_MAX_PASSES)
-                    .unwrap_or_default() as u8,
-            )
-            .with_query_execution_start_time(
-                self.execution_props.query_execution_start_time,
-            );
+        // TODO: Implement OptimizerContext directly on DataFrame (#4631) 
(#4626)
+        let config = {
+            let config_options = self.config.config_options.read();
+            OptimizerContext::new()
+                .with_skip_failing_rules(
+                    config_options
+                        .get_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES)
+                        .unwrap_or_default(),
+                )
+                .with_max_passes(
+                    config_options
+                        .get_u64(OPT_OPTIMIZER_MAX_PASSES)
+                        .unwrap_or_default() as u8,
+                )
+                .with_query_execution_start_time(
+                    self.execution_props.query_execution_start_time,
+                )
+                .filter_null_keys(
+                    config_options
+                        .get_bool(OPT_FILTER_NULL_JOIN_KEYS)
+                        .unwrap_or_default(),
+                )
+        };
 
         if let LogicalPlan::Explain(e) = plan {
             let mut stringified_plans = e.stringified_plans.clone();
@@ -1766,7 +1763,7 @@ impl SessionState {
             // optimize the child plan, capturing the output of each optimizer
             let plan = self.optimizer.optimize(
                 e.plan.as_ref(),
-                &mut optimizer_config,
+                &config,
                 |optimized_plan, optimizer| {
                     let optimizer_name = optimizer.name().to_string();
                     let plan_type = PlanType::OptimizedLogicalPlan { 
optimizer_name };
@@ -1781,8 +1778,7 @@ impl SessionState {
                 schema: e.schema.clone(),
             }))
         } else {
-            self.optimizer
-                .optimize(plan, &mut optimizer_config, |_, _| {})
+            self.optimizer.optimize(plan, &config, |_, _| {})
         }
     }
 
diff --git a/datafusion/core/tests/user_defined_plan.rs 
b/datafusion/core/tests/user_defined_plan.rs
index 4b5a7e66f..89cfd48e6 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -285,7 +285,7 @@ impl OptimizerRule for TopKOptimizerRule {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         // Note: this code simply looks for the pattern of a Limit followed by 
a
         // Sort and replaces it by a TopK node. It does not handle many
@@ -308,7 +308,7 @@ impl OptimizerRule for TopKOptimizerRule {
                         node: Arc::new(TopKPlanNode {
                             k: *fetch,
                             input: self
-                                .try_optimize(input.as_ref(), 
optimizer_config)?
+                                .try_optimize(input.as_ref(), config)?
                                 .unwrap_or_else(|| input.as_ref().clone()),
                             expr: expr[0].clone(),
                         }),
@@ -319,7 +319,7 @@ impl OptimizerRule for TopKOptimizerRule {
 
         // If we didn't find the Limit/Sort combination, recurse as
         // normal and build the result.
-        Ok(Some(optimize_children(self, plan, optimizer_config)?))
+        Ok(Some(optimize_children(self, plan, config)?))
     }
 
     fn name(&self) -> &str {
diff --git a/datafusion/optimizer/README.md b/datafusion/optimizer/README.md
index 51bc8e3ee..01c6f6dd2 100644
--- a/datafusion/optimizer/README.md
+++ b/datafusion/optimizer/README.md
@@ -42,9 +42,9 @@ and applying it to a logical plan to produce an optimized 
logical plan.
 // The `datafusion` crate provides a DataFrame API that can create a 
LogicalPlan
 let logical_plan = ...
 
-let mut config = OptimizerConfig::default();
+let mut config = OptimizerContext::default();
 let optimizer = Optimizer::new(&config);
-let optimized_plan = optimizer.optimize(&logical_plan, &mut config, observe)?;
+let optimized_plan = optimizer.optimize(&logical_plan, &config, observe)?;
 
 fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) {
     println!(
@@ -82,7 +82,7 @@ pub trait OptimizerRule {
     fn optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<LogicalPlan>;
 
     /// A human readable name for this optimizer rule
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs 
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index 8236f20f3..8b1639cfc 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -17,8 +17,11 @@
 
 //! Eliminate common sub-expression.
 
-use crate::{utils, OptimizerConfig, OptimizerRule};
+use std::collections::{BTreeSet, HashMap};
+use std::sync::Arc;
+
 use arrow::datatypes::DataType;
+
 use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError, 
Result};
 use datafusion_expr::{
     col,
@@ -27,8 +30,8 @@ use datafusion_expr::{
     logical_plan::{Aggregate, Filter, LogicalPlan, Projection, Sort, Window},
     Expr, ExprSchemable,
 };
-use std::collections::{BTreeSet, HashMap};
-use std::sync::Arc;
+
+use crate::{utils, OptimizerConfig, OptimizerRule};
 
 /// A map from expression's identifier to tuple including
 /// - the expression itself (cloned)
@@ -60,7 +63,7 @@ impl CommonSubexprEliminate {
         arrays_list: &[&[Vec<(usize, String)>]],
         input: &LogicalPlan,
         expr_set: &mut ExprSet,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<(Vec<Vec<Expr>>, LogicalPlan)> {
         let mut affected_id = BTreeSet::<Identifier>::new();
 
@@ -80,7 +83,7 @@ impl CommonSubexprEliminate {
             .collect::<Result<Vec<_>>>()?;
 
         let mut new_input = self
-            .try_optimize(input, optimizer_config)?
+            .try_optimize(input, config)?
             .unwrap_or_else(|| input.clone());
         if !affected_id.is_empty() {
             new_input = build_project_plan(new_input, affected_id, expr_set)?;
@@ -94,7 +97,7 @@ impl OptimizerRule for CommonSubexprEliminate {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         let mut expr_set = ExprSet::new();
 
@@ -107,13 +110,8 @@ impl OptimizerRule for CommonSubexprEliminate {
                 let input_schema = Arc::clone(input.schema());
                 let arrays = to_arrays(expr, input_schema, &mut expr_set)?;
 
-                let (mut new_expr, new_input) = self.rewrite_expr(
-                    &[expr],
-                    &[&arrays],
-                    input,
-                    &mut expr_set,
-                    optimizer_config,
-                )?;
+                let (mut new_expr, new_input) =
+                    self.rewrite_expr(&[expr], &[&arrays], input, &mut 
expr_set, config)?;
 
                 Ok(Some(LogicalPlan::Projection(
                     Projection::try_new_with_schema(
@@ -140,7 +138,7 @@ impl OptimizerRule for CommonSubexprEliminate {
                     &[&[id_array]],
                     filter.input(),
                     &mut expr_set,
-                    optimizer_config,
+                    config,
                 )?;
 
                 if let Some(predicate) = pop_expr(&mut new_expr)?.pop() {
@@ -167,7 +165,7 @@ impl OptimizerRule for CommonSubexprEliminate {
                     &[&arrays],
                     input,
                     &mut expr_set,
-                    optimizer_config,
+                    config,
                 )?;
 
                 Ok(Some(LogicalPlan::Window(Window {
@@ -192,7 +190,7 @@ impl OptimizerRule for CommonSubexprEliminate {
                     &[&group_arrays, &aggr_arrays],
                     input,
                     &mut expr_set,
-                    optimizer_config,
+                    config,
                 )?;
                 // note the reversed pop order.
                 let new_aggr_expr = pop_expr(&mut new_expr)?;
@@ -211,13 +209,8 @@ impl OptimizerRule for CommonSubexprEliminate {
                 let input_schema = Arc::clone(input.schema());
                 let arrays = to_arrays(expr, input_schema, &mut expr_set)?;
 
-                let (mut new_expr, new_input) = self.rewrite_expr(
-                    &[expr],
-                    &[&arrays],
-                    input,
-                    &mut expr_set,
-                    optimizer_config,
-                )?;
+                let (mut new_expr, new_input) =
+                    self.rewrite_expr(&[expr], &[&arrays], input, &mut 
expr_set, config)?;
 
                 Ok(Some(LogicalPlan::Sort(Sort {
                     expr: pop_expr(&mut new_expr)?,
@@ -249,11 +242,7 @@ impl OptimizerRule for CommonSubexprEliminate {
             | LogicalPlan::Extension(_)
             | LogicalPlan::Prepare(_) => {
                 // apply the optimization to all inputs of the plan
-                Ok(Some(utils::optimize_children(
-                    self,
-                    plan,
-                    optimizer_config,
-                )?))
+                Ok(Some(utils::optimize_children(self, plan, config)?))
             }
         }
     }
@@ -572,20 +561,25 @@ fn replace_common_expr(
 
 #[cfg(test)]
 mod test {
-    use super::*;
-    use crate::test::*;
+    use std::iter;
+
     use arrow::datatypes::{Field, Schema};
+
     use datafusion_expr::logical_plan::{table_scan, JoinType};
     use datafusion_expr::{
         avg, binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, 
sum,
         Operator,
     };
-    use std::iter;
+
+    use crate::optimizer::OptimizerContext;
+    use crate::test::*;
+
+    use super::*;
 
     fn assert_optimized_plan_eq(expected: &str, plan: &LogicalPlan) {
         let optimizer = CommonSubexprEliminate {};
         let optimized_plan = optimizer
-            .try_optimize(plan, &mut OptimizerConfig::new())
+            .try_optimize(plan, &OptimizerContext::new())
             .unwrap()
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
@@ -831,7 +825,7 @@ mod test {
             .unwrap();
         let rule = CommonSubexprEliminate {};
         let optimized_plan = rule
-            .try_optimize(&plan, &mut OptimizerConfig::new())
+            .try_optimize(&plan, &OptimizerContext::new())
             .unwrap()
             .unwrap();
 
diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs 
b/datafusion/optimizer/src/decorrelate_where_exists.rs
index f0fa6bb9d..f1addf651 100644
--- a/datafusion/optimizer/src/decorrelate_where_exists.rs
+++ b/datafusion/optimizer/src/decorrelate_where_exists.rs
@@ -48,7 +48,7 @@ impl DecorrelateWhereExists {
     fn extract_subquery_exprs(
         &self,
         predicate: &Expr,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
         let filters = split_conjunction(predicate);
 
@@ -58,7 +58,7 @@ impl DecorrelateWhereExists {
             match it {
                 Expr::Exists { subquery, negated } => {
                     let subquery = self
-                        .try_optimize(&subquery.subquery, optimizer_config)?
+                        .try_optimize(&subquery.subquery, config)?
                         .map(Arc::new)
                         .unwrap_or_else(|| subquery.subquery.clone());
                     let subquery = Subquery { subquery };
@@ -77,7 +77,7 @@ impl OptimizerRule for DecorrelateWhereExists {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         match plan {
             LogicalPlan::Filter(filter) => {
@@ -86,11 +86,11 @@ impl OptimizerRule for DecorrelateWhereExists {
 
                 // Apply optimizer rule to current input
                 let optimized_input = self
-                    .try_optimize(filter_input, optimizer_config)?
+                    .try_optimize(filter_input, config)?
                     .unwrap_or_else(|| filter_input.clone());
 
                 let (subqueries, other_exprs) =
-                    self.extract_subquery_exprs(predicate, optimizer_config)?;
+                    self.extract_subquery_exprs(predicate, config)?;
                 let optimized_plan = LogicalPlan::Filter(Filter::try_new(
                     predicate.clone(),
                     Arc::new(optimized_input),
@@ -114,11 +114,7 @@ impl OptimizerRule for DecorrelateWhereExists {
             }
             _ => {
                 // Apply the optimization to all inputs of the plan
-                Ok(Some(utils::optimize_children(
-                    self,
-                    plan,
-                    optimizer_config,
-                )?))
+                Ok(Some(utils::optimize_children(self, plan, config)?))
             }
         }
     }
diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs 
b/datafusion/optimizer/src/decorrelate_where_in.rs
index de1725dc5..d2555ea5c 100644
--- a/datafusion/optimizer/src/decorrelate_where_in.rs
+++ b/datafusion/optimizer/src/decorrelate_where_in.rs
@@ -20,7 +20,7 @@ use crate::utils::{
     only_or_err, split_conjunction, swap_table, verify_not_disjunction,
 };
 use crate::{utils, OptimizerConfig, OptimizerRule};
-use datafusion_common::context;
+use datafusion_common::{context, Result};
 use datafusion_expr::logical_plan::{Filter, JoinType, Projection, Subquery};
 use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
 use log::debug;
@@ -46,7 +46,7 @@ impl DecorrelateWhereIn {
     fn extract_subquery_exprs(
         &self,
         predicate: &Expr,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> datafusion_common::Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
         let filters = split_conjunction(predicate); // TODO: disjunctions
 
@@ -60,7 +60,7 @@ impl DecorrelateWhereIn {
                     negated,
                 } => {
                     let subquery = self
-                        .try_optimize(&subquery.subquery, optimizer_config)?
+                        .try_optimize(&subquery.subquery, config)?
                         .map(Arc::new)
                         .unwrap_or_else(|| subquery.subquery.clone());
                     let subquery = Subquery { subquery };
@@ -81,8 +81,8 @@ impl OptimizerRule for DecorrelateWhereIn {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
-    ) -> datafusion_common::Result<Option<LogicalPlan>> {
+        config: &dyn OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
         match plan {
             LogicalPlan::Filter(filter) => {
                 let predicate = filter.predicate();
@@ -90,11 +90,11 @@ impl OptimizerRule for DecorrelateWhereIn {
 
                 // Apply optimizer rule to current input
                 let optimized_input = self
-                    .try_optimize(filter_input, optimizer_config)?
+                    .try_optimize(filter_input, config)?
                     .unwrap_or_else(|| filter_input.clone());
 
                 let (subqueries, other_exprs) =
-                    self.extract_subquery_exprs(predicate, optimizer_config)?;
+                    self.extract_subquery_exprs(predicate, config)?;
                 let optimized_plan = LogicalPlan::Filter(Filter::try_new(
                     predicate.clone(),
                     Arc::new(optimized_input),
@@ -107,22 +107,14 @@ impl OptimizerRule for DecorrelateWhereIn {
                 // iterate through all exists clauses in predicate, turning 
each into a join
                 let mut cur_input = filter_input.clone();
                 for subquery in subqueries {
-                    cur_input = optimize_where_in(
-                        &subquery,
-                        &cur_input,
-                        &other_exprs,
-                        optimizer_config,
-                    )?;
+                    cur_input =
+                        optimize_where_in(&subquery, &cur_input, &other_exprs, 
config)?;
                 }
                 Ok(Some(cur_input))
             }
             _ => {
                 // Apply the optimization to all inputs of the plan
-                Ok(Some(utils::optimize_children(
-                    self,
-                    plan,
-                    optimizer_config,
-                )?))
+                Ok(Some(utils::optimize_children(self, plan, config)?))
             }
         }
     }
@@ -136,7 +128,7 @@ fn optimize_where_in(
     query_info: &SubqueryInfo,
     outer_input: &LogicalPlan,
     outer_other_exprs: &[Expr],
-    optimizer_config: &mut OptimizerConfig,
+    config: &dyn OptimizerConfig,
 ) -> datafusion_common::Result<LogicalPlan> {
     let proj = Projection::try_from_plan(&query_info.query.subquery)
         .map_err(|e| context!("a projection is required", e))?;
@@ -179,7 +171,7 @@ fn optimize_where_in(
         merge_cols((&[subquery_col], &subqry_cols), (&[outer_col], 
&outer_cols));
 
     // build subquery side of join - the thing the subquery was querying
-    let subqry_alias = format!("__sq_{}", optimizer_config.next_id());
+    let subqry_alias = format!("__sq_{}", config.next_id());
     let mut subqry_plan = LogicalPlanBuilder::from((*subqry_input).clone());
     if let Some(expr) = conjunction(other_subqry_exprs) {
         // if the subquery had additional expressions, restore them
diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs 
b/datafusion/optimizer/src/eliminate_cross_join.rs
index 3e1227d02..338e58f68 100644
--- a/datafusion/optimizer/src/eliminate_cross_join.rs
+++ b/datafusion/optimizer/src/eliminate_cross_join.rs
@@ -16,7 +16,9 @@
 // under the License.
 
 //! Optimizer rule to eliminate cross join to inner join if join predicates 
are available in filters.
-use crate::{utils, OptimizerConfig, OptimizerRule};
+use std::collections::HashSet;
+use std::sync::Arc;
+
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::expr::{BinaryExpr, Expr};
 use datafusion_expr::logical_plan::{
@@ -27,8 +29,8 @@ use datafusion_expr::{
     and, build_join_schema, or, wrap_projection_for_join_if_necessary, 
ExprSchemable,
     Operator,
 };
-use std::collections::HashSet;
-use std::sync::Arc;
+
+use crate::{utils, OptimizerConfig, OptimizerRule};
 
 #[derive(Default)]
 pub struct EliminateCrossJoin;
@@ -54,7 +56,7 @@ impl OptimizerRule for EliminateCrossJoin {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         match plan {
             LogicalPlan::Filter(filter) => {
@@ -78,11 +80,7 @@ impl OptimizerRule for EliminateCrossJoin {
                         )?;
                     }
                     _ => {
-                        return Ok(Some(utils::optimize_children(
-                            self,
-                            plan,
-                            optimizer_config,
-                        )?));
+                        return Ok(Some(utils::optimize_children(self, plan, 
config)?));
                     }
                 }
 
@@ -102,7 +100,7 @@ impl OptimizerRule for EliminateCrossJoin {
                     )?;
                 }
 
-                left = utils::optimize_children(self, &left, 
optimizer_config)?;
+                left = utils::optimize_children(self, &left, config)?;
 
                 if plan.schema() != left.schema() {
                     left = LogicalPlan::Projection(Projection::new_from_schema(
@@ -128,11 +126,7 @@ impl OptimizerRule for EliminateCrossJoin {
                 }
             }
 
-            _ => Ok(Some(utils::optimize_children(
-                self,
-                plan,
-                optimizer_config,
-            )?)),
+            _ => Ok(Some(utils::optimize_children(self, plan, config)?)),
         }
     }
 
@@ -378,18 +372,21 @@ fn remove_join_expressions(
 
 #[cfg(test)]
 mod tests {
-    use super::*;
-    use crate::test::*;
     use datafusion_expr::{
         binary_expr, col, lit,
         logical_plan::builder::LogicalPlanBuilder,
         Operator::{And, Or},
     };
 
+    use crate::optimizer::OptimizerContext;
+    use crate::test::*;
+
+    use super::*;
+
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: Vec<&str>) {
         let rule = EliminateCrossJoin::new();
         let optimized_plan = rule
-            .try_optimize(plan, &mut OptimizerConfig::new())
+            .try_optimize(plan, &OptimizerContext::new())
             .unwrap()
             .expect("failed to optimize plan");
         let formatted = optimized_plan.display_indent_schema().to_string();
diff --git a/datafusion/optimizer/src/eliminate_filter.rs 
b/datafusion/optimizer/src/eliminate_filter.rs
index 046d7b11f..7636a6a9f 100644
--- a/datafusion/optimizer/src/eliminate_filter.rs
+++ b/datafusion/optimizer/src/eliminate_filter.rs
@@ -18,13 +18,14 @@
 //! Optimizer rule to replace `where false` on a plan with an empty relation.
 //! This saves time in planning and executing the query.
 //! Note that this rule should be applied after simplify expressions optimizer 
rule.
-use crate::{utils, OptimizerConfig, OptimizerRule};
 use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::{
     logical_plan::{EmptyRelation, LogicalPlan},
     Expr,
 };
 
+use crate::{utils, OptimizerConfig, OptimizerRule};
+
 /// Optimization rule that elimanate the scalar value (true/false) filter with 
an [LogicalPlan::EmptyRelation]
 #[derive(Default)]
 pub struct EliminateFilter;
@@ -40,7 +41,7 @@ impl OptimizerRule for EliminateFilter {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         let predicate_and_input = match plan {
             LogicalPlan::Filter(filter) => match filter.predicate() {
@@ -53,18 +54,14 @@ impl OptimizerRule for EliminateFilter {
         };
 
         match predicate_and_input {
-            Some((true, input)) => self.try_optimize(input, optimizer_config),
+            Some((true, input)) => self.try_optimize(input, config),
             Some((false, input)) => 
Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
                 produce_one_row: false,
                 schema: input.schema().clone(),
             }))),
             None => {
                 // Apply the optimization to all inputs of the plan
-                Ok(Some(utils::optimize_children(
-                    self,
-                    plan,
-                    optimizer_config,
-                )?))
+                Ok(Some(utils::optimize_children(self, plan, config)?))
             }
         }
     }
@@ -76,14 +73,17 @@ impl OptimizerRule for EliminateFilter {
 
 #[cfg(test)]
 mod tests {
-    use super::*;
-    use crate::test::*;
     use datafusion_expr::{col, lit, logical_plan::builder::LogicalPlanBuilder, 
sum};
 
+    use crate::optimizer::OptimizerContext;
+    use crate::test::*;
+
+    use super::*;
+
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = EliminateFilter::new();
         let optimized_plan = rule
-            .try_optimize(plan, &mut OptimizerConfig::new())
+            .try_optimize(plan, &OptimizerContext::new())
             .unwrap()
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
diff --git a/datafusion/optimizer/src/eliminate_limit.rs 
b/datafusion/optimizer/src/eliminate_limit.rs
index 91771a940..840346d85 100644
--- a/datafusion/optimizer/src/eliminate_limit.rs
+++ b/datafusion/optimizer/src/eliminate_limit.rs
@@ -20,10 +20,11 @@
 //! on a plan with an empty relation.
 //! This rule also removes OFFSET 0 from the [LogicalPlan]
 //! This saves time in planning and executing the query.
-use crate::{utils, OptimizerConfig, OptimizerRule};
 use datafusion_common::Result;
 use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan};
 
+use crate::{utils, OptimizerConfig, OptimizerRule};
+
 /// Optimization rule that eliminate LIMIT 0 or useless LIMIT(skip:0, 
fetch:None).
 /// It can cooperate with `propagate_empty_relation` and `limit_push_down`.
 #[derive(Default)]
@@ -40,7 +41,7 @@ impl OptimizerRule for EliminateLimit {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         if let LogicalPlan::Limit(limit) = plan {
             match limit.fetch {
@@ -55,20 +56,12 @@ impl OptimizerRule for EliminateLimit {
                 None => {
                     if limit.skip == 0 {
                         let input = &*limit.input;
-                        return Ok(Some(utils::optimize_children(
-                            self,
-                            input,
-                            optimizer_config,
-                        )?));
+                        return Ok(Some(utils::optimize_children(self, input, 
config)?));
                     }
                 }
             }
         }
-        Ok(Some(utils::optimize_children(
-            self,
-            plan,
-            optimizer_config,
-        )?))
+        Ok(Some(utils::optimize_children(self, plan, config)?))
     }
 
     fn name(&self) -> &str {
@@ -78,9 +71,6 @@ impl OptimizerRule for EliminateLimit {
 
 #[cfg(test)]
 mod tests {
-    use super::*;
-    use crate::push_down_limit::PushDownLimit;
-    use crate::test::*;
     use datafusion_common::Column;
     use datafusion_expr::{
         col,
@@ -88,9 +78,15 @@ mod tests {
         sum,
     };
 
+    use crate::optimizer::OptimizerContext;
+    use crate::push_down_limit::PushDownLimit;
+    use crate::test::*;
+
+    use super::*;
+
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> 
Result<()> {
         let optimized_plan = EliminateLimit::new()
-            .try_optimize(plan, &mut OptimizerConfig::new())
+            .try_optimize(plan, &OptimizerContext::new())
             .unwrap()
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
@@ -104,11 +100,11 @@ mod tests {
         expected: &str,
     ) -> Result<()> {
         let optimized_plan = PushDownLimit::new()
-            .try_optimize(plan, &mut OptimizerConfig::new())
+            .try_optimize(plan, &OptimizerContext::new())
             .unwrap()
             .expect("failed to optimize plan");
         let optimized_plan = EliminateLimit::new()
-            .try_optimize(&optimized_plan, &mut OptimizerConfig::new())
+            .try_optimize(&optimized_plan, &OptimizerContext::new())
             .unwrap()
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs 
b/datafusion/optimizer/src/eliminate_outer_join.rs
index c615d1fb0..cc535117d 100644
--- a/datafusion/optimizer/src/eliminate_outer_join.rs
+++ b/datafusion/optimizer/src/eliminate_outer_join.rs
@@ -64,7 +64,7 @@ impl OptimizerRule for EliminateOuterJoin {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         match plan {
             LogicalPlan::Filter(filter) => match filter.input().as_ref() {
@@ -109,23 +109,11 @@ impl OptimizerRule for EliminateOuterJoin {
                         null_equals_null: join.null_equals_null,
                     });
                     let new_plan = from_plan(plan, &plan.expressions(), 
&[new_join])?;
-                    Ok(Some(utils::optimize_children(
-                        self,
-                        &new_plan,
-                        optimizer_config,
-                    )?))
+                    Ok(Some(utils::optimize_children(self, &new_plan, 
config)?))
                 }
-                _ => Ok(Some(utils::optimize_children(
-                    self,
-                    plan,
-                    optimizer_config,
-                )?)),
+                _ => Ok(Some(utils::optimize_children(self, plan, config)?)),
             },
-            _ => Ok(Some(utils::optimize_children(
-                self,
-                plan,
-                optimizer_config,
-            )?)),
+            _ => Ok(Some(utils::optimize_children(self, plan, config)?)),
         }
     }
 
@@ -307,6 +295,7 @@ fn extract_non_nullable_columns(
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::optimizer::OptimizerContext;
     use crate::test::*;
     use arrow::datatypes::DataType;
     use datafusion_expr::{
@@ -319,7 +308,7 @@ mod tests {
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> 
Result<()> {
         let rule = EliminateOuterJoin::new();
         let optimized_plan = rule
-            .try_optimize(plan, &mut OptimizerConfig::new())
+            .try_optimize(plan, &OptimizerContext::new())
             .unwrap()
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
diff --git a/datafusion/optimizer/src/filter_null_join_keys.rs 
b/datafusion/optimizer/src/filter_null_join_keys.rs
index ee6c2cc5d..4bee88359 100644
--- a/datafusion/optimizer/src/filter_null_join_keys.rs
+++ b/datafusion/optimizer/src/filter_null_join_keys.rs
@@ -20,12 +20,14 @@
 //! and then insert an `IsNotNull` filter on the nullable side since null 
values
 //! can never match.
 
-use crate::{utils, OptimizerConfig, OptimizerRule};
-use datafusion_common::{Column, DFField, DFSchemaRef};
+use std::sync::Arc;
+
+use datafusion_common::{Column, DFField, DFSchemaRef, Result};
 use datafusion_expr::{
     and, logical_plan::Filter, logical_plan::JoinType, Expr, LogicalPlan,
 };
-use std::sync::Arc;
+
+use crate::{utils, OptimizerConfig, OptimizerRule};
 
 /// The FilterNullJoinKeys rule will identify inner joins with equi-join 
conditions
 /// where the join key is nullable on one side and non-nullable on the other 
side
@@ -34,22 +36,26 @@ use std::sync::Arc;
 #[derive(Default)]
 pub struct FilterNullJoinKeys {}
 
+impl FilterNullJoinKeys {
+    pub const NAME: &'static str = "filter_null_join_keys";
+}
+
 impl OptimizerRule for FilterNullJoinKeys {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
-    ) -> datafusion_common::Result<Option<LogicalPlan>> {
+        config: &dyn OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
         match plan {
             LogicalPlan::Join(join) if join.join_type == JoinType::Inner => {
                 // recurse down first and optimize inputs
                 let mut join = join.clone();
                 join.left = Arc::new(
-                    self.try_optimize(&join.left, optimizer_config)?
+                    self.try_optimize(&join.left, config)?
                         .unwrap_or_else(|| join.left.as_ref().clone()),
                 );
                 join.right = Arc::new(
-                    self.try_optimize(&join.right, optimizer_config)?
+                    self.try_optimize(&join.right, config)?
                         .unwrap_or_else(|| join.right.as_ref().clone()),
                 );
 
@@ -90,17 +96,13 @@ impl OptimizerRule for FilterNullJoinKeys {
             }
             _ => {
                 // Apply the optimization to all inputs of the plan
-                Ok(Some(utils::optimize_children(
-                    self,
-                    plan,
-                    optimizer_config,
-                )?))
+                Ok(Some(utils::optimize_children(self, plan, config)?))
             }
         }
     }
 
     fn name(&self) -> &str {
-        "filter_null_join_keys"
+        Self::NAME
     }
 }
 
@@ -147,15 +149,19 @@ fn resolve_fields(
 
 #[cfg(test)]
 mod tests {
-    use super::*;
     use arrow::datatypes::{DataType, Field, Schema};
+
     use datafusion_common::{Column, Result};
     use datafusion_expr::logical_plan::table_scan;
     use datafusion_expr::{logical_plan::JoinType, LogicalPlanBuilder};
 
+    use crate::optimizer::OptimizerContext;
+
+    use super::*;
+
     fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan {
         let rule = FilterNullJoinKeys::default();
-        rule.try_optimize(plan, &mut OptimizerConfig::new())
+        rule.try_optimize(plan, &OptimizerContext::new())
             .unwrap()
             .expect("failed to optimize plan")
     }
diff --git a/datafusion/optimizer/src/inline_table_scan.rs 
b/datafusion/optimizer/src/inline_table_scan.rs
index d8283bf71..fe24e675d 100644
--- a/datafusion/optimizer/src/inline_table_scan.rs
+++ b/datafusion/optimizer/src/inline_table_scan.rs
@@ -38,7 +38,7 @@ impl OptimizerRule for InlineTableScan {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         match plan {
             // Match only on scans without filter / projection / fetch
@@ -52,8 +52,7 @@ impl OptimizerRule for InlineTableScan {
             }) if filters.is_empty() => {
                 if let Some(sub_plan) = source.get_logical_plan() {
                     // Recursively apply optimization
-                    let plan =
-                        utils::optimize_children(self, sub_plan, 
optimizer_config)?;
+                    let plan = utils::optimize_children(self, sub_plan, 
config)?;
                     let plan = LogicalPlanBuilder::from(plan)
                         .project(vec![Expr::Wildcard])?
                         .alias(table_name)?;
@@ -67,11 +66,7 @@ impl OptimizerRule for InlineTableScan {
             // Rest: Recurse
             _ => {
                 // apply the optimization to all inputs of the plan
-                Ok(Some(utils::optimize_children(
-                    self,
-                    plan,
-                    optimizer_config,
-                )?))
+                Ok(Some(utils::optimize_children(self, plan, config)?))
             }
         }
     }
@@ -88,7 +83,8 @@ mod tests {
     use arrow::datatypes::{DataType, Field, Schema};
     use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, 
TableSource};
 
-    use crate::{inline_table_scan::InlineTableScan, OptimizerConfig, 
OptimizerRule};
+    use crate::optimizer::OptimizerContext;
+    use crate::{inline_table_scan::InlineTableScan, OptimizerRule};
 
     pub struct RawTableSource {}
 
@@ -158,7 +154,7 @@ mod tests {
         let plan = 
scan.filter(col("x.a").eq(lit(1))).unwrap().build().unwrap();
 
         let optimized_plan = rule
-            .try_optimize(&plan, &mut OptimizerConfig::new())
+            .try_optimize(&plan, &OptimizerContext::new())
             .unwrap()
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs
index 1bf7ad58b..a4804ca5b 100644
--- a/datafusion/optimizer/src/lib.rs
+++ b/datafusion/optimizer/src/lib.rs
@@ -41,5 +41,5 @@ pub mod rewrite_disjunctive_predicate;
 pub mod test;
 pub mod unwrap_cast_in_comparison;
 
-pub use optimizer::{OptimizerConfig, OptimizerRule};
+pub use optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
 pub use utils::optimize_children;
diff --git a/datafusion/optimizer/src/optimizer.rs 
b/datafusion/optimizer/src/optimizer.rs
index 4e9eadc47..0dc651da2 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -41,6 +41,7 @@ use chrono::{DateTime, Utc};
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::logical_plan::LogicalPlan;
 use log::{debug, trace, warn};
+use std::sync::atomic::AtomicUsize;
 use std::sync::Arc;
 use std::time::Instant;
 
@@ -54,7 +55,7 @@ pub trait OptimizerRule {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>>;
 
     /// A human readable name for this optimizer rule
@@ -62,15 +63,35 @@ pub trait OptimizerRule {
 }
 
 /// Options to control the DataFusion Optimizer.
+pub trait OptimizerConfig {
+    /// Return the time at which the query execution started. This
+    /// time is used as the value for now()
+    fn query_execution_start_time(&self) -> DateTime<Utc>;
+
+    /// Returns false if the given rule should be skipped
+    fn rule_enabled(&self, name: &str) -> bool;
+
+    /// The optimizer will skip failing rules if this returns true
+    fn skip_failing_rules(&self) -> bool;
+
+    /// How many times to attempt to optimize the plan
+    fn max_passes(&self) -> u8;
+
+    /// Return a unique ID
+    ///
+    /// This is useful for assigning unique names to aliases
+    fn next_id(&self) -> usize;
+}
+
+/// A standalone [`OptimizerConfig`] that can be used independently
+/// of DataFusion's config management
 #[derive(Debug)]
-pub struct OptimizerConfig {
+pub struct OptimizerContext {
     /// Query execution start time that can be used to rewrite
     /// expressions such as `now()` to use a literal value instead
     query_execution_start_time: DateTime<Utc>,
     /// id generator for optimizer passes
-    // TODO this should not be on the config,
-    // it should be its own 'OptimizerState' or something)
-    next_id: usize,
+    next_id: AtomicUsize,
     /// Option to skip rules that produce errors
     skip_failing_rules: bool,
     /// Specify whether to enable the filter_null_keys rule
@@ -79,12 +100,12 @@ pub struct OptimizerConfig {
     max_passes: u8,
 }
 
-impl OptimizerConfig {
+impl OptimizerContext {
     /// Create optimizer config
     pub fn new() -> Self {
         Self {
             query_execution_start_time: Utc::now(),
-            next_id: 0, // useful for generating things like unique subquery 
aliases
+            next_id: AtomicUsize::new(1),
             skip_failing_rules: true,
             filter_null_keys: true,
             max_passes: 3,
@@ -119,24 +140,36 @@ impl OptimizerConfig {
         self.max_passes = v;
         self
     }
+}
 
-    /// Generate the next ID needed
-    pub fn next_id(&mut self) -> usize {
-        self.next_id += 1;
-        self.next_id
+impl Default for OptimizerContext {
+    /// Create optimizer config
+    fn default() -> Self {
+        Self::new()
     }
+}
 
-    /// Return the time at which the query execution started. This
-    /// time is used as the value for now()
-    pub fn query_execution_start_time(&self) -> DateTime<Utc> {
+impl OptimizerConfig for OptimizerContext {
+    fn query_execution_start_time(&self) -> DateTime<Utc> {
         self.query_execution_start_time
     }
-}
 
-impl Default for OptimizerConfig {
-    /// Create optimizer config
-    fn default() -> Self {
-        Self::new()
+    fn rule_enabled(&self, name: &str) -> bool {
+        self.filter_null_keys || name != FilterNullJoinKeys::NAME
+    }
+
+    fn skip_failing_rules(&self) -> bool {
+        self.skip_failing_rules
+    }
+
+    fn max_passes(&self) -> u8 {
+        self.max_passes
+    }
+
+    fn next_id(&self) -> usize {
+        use std::sync::atomic::Ordering;
+        // Can use relaxed ordering as not used for synchronisation
+        self.next_id.fetch_add(1, Ordering::Relaxed)
     }
 }
 
@@ -147,10 +180,16 @@ pub struct Optimizer {
     pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
 }
 
+impl Default for Optimizer {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
 impl Optimizer {
     /// Create a new optimizer using the recommended list of rules
-    pub fn new(config: &OptimizerConfig) -> Self {
-        let mut rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
+    pub fn new() -> Self {
+        let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
             Arc::new(InlineTableScan::new()),
             Arc::new(TypeCoercion::new()),
             Arc::new(SimplifyExpressions::new()),
@@ -169,22 +208,19 @@ impl Optimizer {
             Arc::new(EliminateLimit::new()),
             Arc::new(PropagateEmptyRelation::new()),
             Arc::new(RewriteDisjunctivePredicate::new()),
+            Arc::new(FilterNullJoinKeys::default()),
+            Arc::new(EliminateOuterJoin::new()),
+            // Filters can't be pushed down past Limits, we should do 
PushDownFilter after LimitPushDown
+            Arc::new(PushDownLimit::new()),
+            Arc::new(PushDownFilter::new()),
+            Arc::new(SingleDistinctToGroupBy::new()),
+            // The previous optimizations added expressions and projections,
+            // that might benefit from the following rules
+            Arc::new(SimplifyExpressions::new()),
+            Arc::new(UnwrapCastInComparison::new()),
+            Arc::new(CommonSubexprEliminate::new()),
+            Arc::new(PushDownProjection::new()),
         ];
-        if config.filter_null_keys {
-            rules.push(Arc::new(FilterNullJoinKeys::default()));
-        }
-        rules.push(Arc::new(EliminateOuterJoin::new()));
-        // Filters can't be pushed down past Limits, we should do 
PushDownFilter after LimitPushDown
-        rules.push(Arc::new(PushDownLimit::new()));
-        rules.push(Arc::new(PushDownFilter::new()));
-        rules.push(Arc::new(SingleDistinctToGroupBy::new()));
-
-        // The previous optimizations added expressions and projections,
-        // that might benefit from the following rules
-        rules.push(Arc::new(SimplifyExpressions::new()));
-        rules.push(Arc::new(UnwrapCastInComparison::new()));
-        rules.push(Arc::new(CommonSubexprEliminate::new()));
-        rules.push(Arc::new(PushDownProjection::new()));
 
         Self::with_rules(rules)
     }
@@ -199,7 +235,7 @@ impl Optimizer {
     pub fn optimize<F>(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
         mut observer: F,
     ) -> Result<LogicalPlan>
     where
@@ -209,11 +245,16 @@ impl Optimizer {
         let mut plan_str = format!("{}", plan.display_indent());
         let mut new_plan = plan.clone();
         let mut i = 0;
-        while i < optimizer_config.max_passes {
+        while i < config.max_passes() {
             log_plan(&format!("Optimizer input (pass {})", i), &new_plan);
 
             for rule in &self.rules {
-                let result = rule.try_optimize(&new_plan, optimizer_config);
+                if !config.rule_enabled(rule.name()) {
+                    debug!("Skipping rule {} due to optimizer config", 
rule.name());
+                    continue;
+                }
+
+                let result = rule.try_optimize(&new_plan, config);
                 match result {
                     Ok(Some(plan)) => {
                         if 
!plan.schema().equivalent_names_and_types(new_plan.schema()) {
@@ -237,7 +278,7 @@ impl Optimizer {
                         );
                     }
                     Err(ref e) => {
-                        if optimizer_config.skip_failing_rules {
+                        if config.skip_failing_rules() {
                             // Note to future readers: if you see this warning 
it signals a
                             // bug in the DataFusion optimizer. Please 
consider filing a ticket
                             // https://github.com/apache/arrow-datafusion
@@ -286,51 +327,49 @@ fn log_plan(description: &str, plan: &LogicalPlan) {
 mod tests {
     use crate::optimizer::Optimizer;
     use crate::test::test_table_scan;
-    use crate::{OptimizerConfig, OptimizerRule};
-    use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError};
+    use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
+    use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError, 
Result};
     use datafusion_expr::logical_plan::EmptyRelation;
     use datafusion_expr::{col, LogicalPlan, LogicalPlanBuilder, Projection};
     use std::sync::Arc;
 
     #[test]
-    fn skip_failing_rule() -> Result<(), DataFusionError> {
+    fn skip_failing_rule() {
         let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
-        let mut config = OptimizerConfig::new().with_skip_failing_rules(true);
+        let config = OptimizerContext::new().with_skip_failing_rules(true);
         let plan = LogicalPlan::EmptyRelation(EmptyRelation {
             produce_one_row: false,
             schema: Arc::new(DFSchema::empty()),
         });
-        opt.optimize(&plan, &mut config, &observe)?;
-        Ok(())
+        opt.optimize(&plan, &config, &observe).unwrap();
     }
 
     #[test]
-    fn no_skip_failing_rule() -> Result<(), DataFusionError> {
+    fn no_skip_failing_rule() {
         let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
-        let mut config = OptimizerConfig::new().with_skip_failing_rules(false);
+        let config = OptimizerContext::new().with_skip_failing_rules(false);
         let plan = LogicalPlan::EmptyRelation(EmptyRelation {
             produce_one_row: false,
             schema: Arc::new(DFSchema::empty()),
         });
-        let result = opt.optimize(&plan, &mut config, &observe);
+        let err = opt.optimize(&plan, &config, &observe).unwrap_err();
         assert_eq!(
             "Internal error: Optimizer rule 'bad rule' failed due to 
unexpected error: \
             Error during planning: rule failed. This was likely caused by a 
bug in \
             DataFusion's code and we would welcome that you file an bug report 
in our issue tracker",
-            format!("{}", result.err().unwrap())
+            err.to_string()
         );
-        Ok(())
     }
 
     #[test]
-    fn generate_different_schema() -> Result<(), DataFusionError> {
+    fn generate_different_schema() {
         let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
-        let mut config = OptimizerConfig::new().with_skip_failing_rules(false);
+        let config = OptimizerContext::new().with_skip_failing_rules(false);
         let plan = LogicalPlan::EmptyRelation(EmptyRelation {
             produce_one_row: false,
             schema: Arc::new(DFSchema::empty()),
         });
-        let result = opt.optimize(&plan, &mut config, &observe);
+        let err = opt.optimize(&plan, &config, &observe).unwrap_err();
         assert_eq!(
             "Internal error: Optimizer rule 'get table_scan rule' failed, due 
to generate a different schema, \
              original schema: DFSchema { fields: [], metadata: {} }, \
@@ -341,9 +380,8 @@ mod tests {
              metadata: {} }. \
              This was likely caused by a bug in DataFusion's code \
              and we would welcome that you file an bug report in our issue 
tracker",
-            format!("{}", result.err().unwrap())
+            err.to_string()
         );
-        Ok(())
     }
 
     #[test]
@@ -351,7 +389,7 @@ mod tests {
         // if the plan creates more metadata than previously (because
         // some wrapping functions are removed, etc) do not error
         let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
-        let mut config = OptimizerConfig::new().with_skip_failing_rules(false);
+        let config = OptimizerContext::new().with_skip_failing_rules(false);
 
         let input = Arc::new(test_table_scan().unwrap());
         let input_schema = input.schema().clone();
@@ -364,7 +402,7 @@ mod tests {
 
         // optimizing should be ok, but the schema will have changed  (no 
metadata)
         assert_ne!(plan.schema().as_ref(), input_schema.as_ref());
-        let optimized_plan = opt.optimize(&plan, &mut config, 
&observe).unwrap();
+        let optimized_plan = opt.optimize(&plan, &config, &observe).unwrap();
         // metadata was removed
         assert_eq!(optimized_plan.schema().as_ref(), input_schema.as_ref());
     }
@@ -399,9 +437,9 @@ mod tests {
     impl OptimizerRule for BadRule {
         fn try_optimize(
             &self,
-            _plan: &LogicalPlan,
-            _optimizer_config: &mut OptimizerConfig,
-        ) -> datafusion_common::Result<Option<LogicalPlan>> {
+            _: &LogicalPlan,
+            _: &dyn OptimizerConfig,
+        ) -> Result<Option<LogicalPlan>> {
             Err(DataFusionError::Plan("rule failed".to_string()))
         }
 
@@ -416,9 +454,9 @@ mod tests {
     impl OptimizerRule for GetTableScanRule {
         fn try_optimize(
             &self,
-            _plan: &LogicalPlan,
-            _optimizer_config: &mut OptimizerConfig,
-        ) -> datafusion_common::Result<Option<LogicalPlan>> {
+            _: &LogicalPlan,
+            _: &dyn OptimizerConfig,
+        ) -> Result<Option<LogicalPlan>> {
             let table_scan = test_table_scan()?;
             Ok(Some(LogicalPlanBuilder::from(table_scan).build()?))
         }
diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs 
b/datafusion/optimizer/src/propagate_empty_relation.rs
index b44a918fe..7ef769e21 100644
--- a/datafusion/optimizer/src/propagate_empty_relation.rs
+++ b/datafusion/optimizer/src/propagate_empty_relation.rs
@@ -37,11 +37,10 @@ impl OptimizerRule for PropagateEmptyRelation {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         // optimize child plans first
-        let optimized_children_plan =
-            utils::optimize_children(self, plan, optimizer_config)?;
+        let optimized_children_plan = utils::optimize_children(self, plan, 
config)?;
         match &optimized_children_plan {
             LogicalPlan::EmptyRelation(_) => Ok(Some(optimized_children_plan)),
             LogicalPlan::Projection(_)
@@ -204,6 +203,7 @@ fn empty_child(plan: &LogicalPlan) -> 
Result<Option<LogicalPlan>> {
 mod tests {
     use crate::eliminate_filter::EliminateFilter;
     use crate::test::{test_table_scan, test_table_scan_with_name};
+    use crate::OptimizerContext;
     use arrow::datatypes::{DataType, Field, Schema};
     use datafusion_common::{Column, ScalarValue};
     use datafusion_expr::logical_plan::table_scan;
@@ -217,7 +217,7 @@ mod tests {
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = PropagateEmptyRelation::new();
         let optimized_plan = rule
-            .try_optimize(plan, &mut OptimizerConfig::new())
+            .try_optimize(plan, &OptimizerContext::new())
             .unwrap()
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
@@ -227,11 +227,11 @@ mod tests {
 
     fn assert_together_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let optimize_one = EliminateFilter::new()
-            .try_optimize(plan, &mut OptimizerConfig::new())
+            .try_optimize(plan, &OptimizerContext::new())
             .unwrap()
             .expect("failed to optimize plan");
         let optimize_two = PropagateEmptyRelation::new()
-            .try_optimize(&optimize_one, &mut OptimizerConfig::new())
+            .try_optimize(&optimize_one, &OptimizerContext::new())
             .unwrap()
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimize_two);
diff --git a/datafusion/optimizer/src/push_down_filter.rs 
b/datafusion/optimizer/src/push_down_filter.rs
index 7e90a4a79..0cf6c635b 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -506,7 +506,7 @@ impl OptimizerRule for PushDownFilter {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         let filter = match plan {
             LogicalPlan::Filter(filter) => filter,
@@ -517,22 +517,12 @@ impl OptimizerRule for PushDownFilter {
                     Some(optimized_plan) => Ok(Some(utils::optimize_children(
                         self,
                         &optimized_plan,
-                        optimizer_config,
-                    )?)),
-                    None => Ok(Some(utils::optimize_children(
-                        self,
-                        plan,
-                        optimizer_config,
+                        config,
                     )?)),
+                    None => Ok(Some(utils::optimize_children(self, plan, 
config)?)),
                 };
             }
-            _ => {
-                return Ok(Some(utils::optimize_children(
-                    self,
-                    plan,
-                    optimizer_config,
-                )?))
-            }
+            _ => return Ok(Some(utils::optimize_children(self, plan, 
config)?)),
         };
 
         let child_plan = &**filter.input();
@@ -544,7 +534,7 @@ impl OptimizerRule for PushDownFilter {
                     new_predicate,
                     child_filter.input().clone(),
                 )?);
-                return self.try_optimize(&new_plan, optimizer_config);
+                return self.try_optimize(&new_plan, config);
             }
             LogicalPlan::Repartition(_)
             | LogicalPlan::Distinct(_)
@@ -745,11 +735,7 @@ impl OptimizerRule for PushDownFilter {
             _ => plan.clone(),
         };
 
-        Ok(Some(utils::optimize_children(
-            self,
-            &new_plan,
-            optimizer_config,
-        )?))
+        Ok(Some(utils::optimize_children(self, &new_plan, config)?))
     }
 }
 
@@ -786,6 +772,7 @@ fn replace_cols_by_name(e: Expr, replace_map: 
&HashMap<String, Expr>) -> Result<
 mod tests {
     use super::*;
     use crate::test::*;
+    use crate::OptimizerContext;
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
     use async_trait::async_trait;
     use datafusion_common::DFSchema;
@@ -798,7 +785,7 @@ mod tests {
 
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> 
Result<()> {
         let optimized_plan = PushDownFilter::new()
-            .try_optimize(plan, &mut OptimizerConfig::new())
+            .try_optimize(plan, &OptimizerContext::new())
             .unwrap()
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
@@ -1947,7 +1934,7 @@ mod tests {
             
table_scan_with_pushdown_provider(TableProviderFilterPushDown::Inexact)?;
 
         let optimised_plan = PushDownFilter::new()
-            .try_optimize(&plan, &mut OptimizerConfig::new())
+            .try_optimize(&plan, &OptimizerContext::new())
             .expect("failed to optimize plan")
             .unwrap();
 
@@ -2299,7 +2286,7 @@ mod tests {
         // Originally global state which can help to avoid duplicate Filters 
been generated and pushed down.
         // Now the global state is removed. Need to double confirm that avoid 
duplicate Filters.
         let optimized_plan = PushDownFilter::new()
-            .try_optimize(&plan, &mut OptimizerConfig::new())
+            .try_optimize(&plan, &OptimizerContext::new())
             .unwrap()
             .expect("failed to optimize plan");
         assert_optimized_plan_eq(&optimized_plan, expected)
diff --git a/datafusion/optimizer/src/push_down_limit.rs 
b/datafusion/optimizer/src/push_down_limit.rs
index 3da7e07db..ad5ceaea0 100644
--- a/datafusion/optimizer/src/push_down_limit.rs
+++ b/datafusion/optimizer/src/push_down_limit.rs
@@ -78,17 +78,11 @@ impl OptimizerRule for PushDownLimit {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         let limit = match plan {
             LogicalPlan::Limit(limit) => limit,
-            _ => {
-                return Ok(Some(utils::optimize_children(
-                    self,
-                    plan,
-                    optimizer_config,
-                )?))
-            }
+            _ => return Ok(Some(utils::optimize_children(self, plan, 
config)?)),
         };
 
         if let LogicalPlan::Limit(child_limit) = &*limit.input {
@@ -118,18 +112,12 @@ impl OptimizerRule for PushDownLimit {
                 fetch: new_fetch,
                 input: Arc::new((*child_limit.input).clone()),
             });
-            return self.try_optimize(&plan, optimizer_config);
+            return self.try_optimize(&plan, config);
         }
 
         let fetch = match limit.fetch {
             Some(fetch) => fetch,
-            None => {
-                return Ok(Some(utils::optimize_children(
-                    self,
-                    plan,
-                    optimizer_config,
-                )?))
-            }
+            None => return Ok(Some(utils::optimize_children(self, plan, 
config)?)),
         };
         let skip = limit.skip;
 
@@ -237,11 +225,7 @@ impl OptimizerRule for PushDownLimit {
             _ => plan.clone(),
         };
 
-        Ok(Some(utils::optimize_children(
-            self,
-            &plan,
-            optimizer_config,
-        )?))
+        Ok(Some(utils::optimize_children(self, &plan, config)?))
     }
 
     fn name(&self) -> &str {
@@ -263,6 +247,7 @@ mod test {
 
     use super::*;
     use crate::test::*;
+    use crate::OptimizerContext;
     use datafusion_expr::{
         col, exists,
         logical_plan::{builder::LogicalPlanBuilder, JoinType, LogicalPlan},
@@ -271,7 +256,7 @@ mod test {
 
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> 
Result<()> {
         let optimized_plan = PushDownLimit::new()
-            .try_optimize(plan, &mut OptimizerConfig::new())
+            .try_optimize(plan, &OptimizerContext::new())
             .unwrap()
             .expect("failed to optimize plan");
 
diff --git a/datafusion/optimizer/src/push_down_projection.rs 
b/datafusion/optimizer/src/push_down_projection.rs
index 67a6c0d5c..01b089e42 100644
--- a/datafusion/optimizer/src/push_down_projection.rs
+++ b/datafusion/optimizer/src/push_down_projection.rs
@@ -49,7 +49,7 @@ impl OptimizerRule for PushDownProjection {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         // set of all columns referred by the plan (and thus considered 
required by the root)
         let required_columns = plan
@@ -63,7 +63,7 @@ impl OptimizerRule for PushDownProjection {
             plan,
             &required_columns,
             false,
-            optimizer_config,
+            config,
         )?))
     }
 
@@ -85,7 +85,7 @@ fn optimize_plan(
     plan: &LogicalPlan,
     required_columns: &HashSet<Column>, // set of columns required up to this 
step
     has_projection: bool,
-    _optimizer_config: &OptimizerConfig,
+    _config: &dyn OptimizerConfig,
 ) -> Result<LogicalPlan> {
     let mut new_required_columns = required_columns.clone();
     let new_plan = match plan {
@@ -117,13 +117,8 @@ fn optimize_plan(
                 expr_to_columns(e, &mut new_required_columns)?
             }
 
-            let new_input = optimize_plan(
-                _optimizer,
-                input,
-                &new_required_columns,
-                true,
-                _optimizer_config,
-            )?;
+            let new_input =
+                optimize_plan(_optimizer, input, &new_required_columns, true, 
_config)?;
 
             let new_required_columns_optimized = new_input
                 .schema()
@@ -174,7 +169,7 @@ fn optimize_plan(
                 left,
                 &new_required_columns,
                 true,
-                _optimizer_config,
+                _config,
             )?);
 
             let optimized_right = Arc::new(optimize_plan(
@@ -182,7 +177,7 @@ fn optimize_plan(
                 right,
                 &new_required_columns,
                 true,
-                _optimizer_config,
+                _config,
             )?);
 
             let schema = build_join_schema(
@@ -229,7 +224,7 @@ fn optimize_plan(
                     input,
                     required_columns,
                     true,
-                    _optimizer_config,
+                    _config,
                 )?)
                 .build();
             };
@@ -245,7 +240,7 @@ fn optimize_plan(
                 input,
                 &new_required_columns,
                 true,
-                _optimizer_config,
+                _config,
             )?)
             .window(new_window_expr)?
             .build()
@@ -286,7 +281,7 @@ fn optimize_plan(
                     input,
                     &new_required_columns,
                     true,
-                    _optimizer_config,
+                    _config,
                 )?),
                 group_expr.clone(),
                 new_aggr_expr,
@@ -316,7 +311,7 @@ fn optimize_plan(
                     &a.input,
                     &required_columns,
                     false,
-                    _optimizer_config,
+                    _config,
                 )?),
                 verbose: a.verbose,
                 schema: a.schema.clone(),
@@ -348,7 +343,7 @@ fn optimize_plan(
                         input_plan,
                         &new_required_columns,
                         has_projection,
-                        _optimizer_config,
+                        _config,
                     )
                 })
                 .collect::<Result<Vec<_>>>()?;
@@ -374,7 +369,7 @@ fn optimize_plan(
                 input,
                 &new_required_columns,
                 has_projection,
-                _optimizer_config,
+                _config,
             )?;
             from_plan(plan, &plan.expressions(), &[child])
         }
@@ -413,7 +408,7 @@ fn optimize_plan(
                         input_plan,
                         &new_required_columns,
                         has_projection,
-                        _optimizer_config,
+                        _config,
                     )
                 })
                 .collect::<Result<Vec<_>>>()?;
@@ -527,6 +522,7 @@ fn push_down_scan(
 mod tests {
     use super::*;
     use crate::test::*;
+    use crate::OptimizerContext;
     use arrow::datatypes::{DataType, Schema};
     use datafusion_expr::expr::Cast;
     use datafusion_expr::{
@@ -1019,8 +1015,6 @@ mod tests {
 
     fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
         let rule = PushDownProjection::new();
-        Ok(rule
-            .try_optimize(plan, &mut OptimizerConfig::new())?
-            .unwrap())
+        Ok(rule.try_optimize(plan, &OptimizerContext::new())?.unwrap())
     }
 }
diff --git a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs 
b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs
index 079046273..0f9ba3d37 100644
--- a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs
+++ b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs
@@ -127,7 +127,7 @@ impl OptimizerRule for RewriteDisjunctivePredicate {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         match plan {
             LogicalPlan::Filter(filter) => {
@@ -136,16 +136,12 @@ impl OptimizerRule for RewriteDisjunctivePredicate {
                 let rewritten_expr = normalize_predicate(rewritten_predicate);
                 Ok(Some(LogicalPlan::Filter(Filter::try_new(
                     rewritten_expr,
-                    self.try_optimize(filter.input(), optimizer_config)?
+                    self.try_optimize(filter.input(), config)?
                         .map(Arc::new)
                         .unwrap_or_else(|| filter.input().clone()),
                 )?)))
             }
-            _ => Ok(Some(utils::optimize_children(
-                self,
-                plan,
-                optimizer_config,
-            )?)),
+            _ => Ok(Some(utils::optimize_children(self, plan, config)?)),
         }
     }
 
diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs 
b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index 12092621b..0a6110541 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -47,7 +47,7 @@ impl ScalarSubqueryToJoin {
     fn extract_subquery_exprs(
         &self,
         predicate: &Expr,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
         let filters = split_conjunction(predicate); // TODO: disjunctions
 
@@ -69,7 +69,7 @@ impl ScalarSubqueryToJoin {
                                 _ => return Ok(()),
                             };
                             let subquery = self
-                                .try_optimize(&subquery.subquery, 
optimizer_config)?
+                                .try_optimize(&subquery.subquery, config)?
                                 .map(Arc::new)
                                 .unwrap_or_else(|| subquery.subquery.clone());
                             let subquery = Subquery { subquery };
@@ -93,17 +93,17 @@ impl OptimizerRule for ScalarSubqueryToJoin {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         match plan {
             LogicalPlan::Filter(filter) => {
                 // Apply optimizer rule to current input
                 let optimized_input = self
-                    .try_optimize(filter.input(), optimizer_config)?
+                    .try_optimize(filter.input(), config)?
                     .unwrap_or_else(|| filter.input().as_ref().clone());
 
                 let (subqueries, other_exprs) =
-                    self.extract_subquery_exprs(filter.predicate(), 
optimizer_config)?;
+                    self.extract_subquery_exprs(filter.predicate(), config)?;
 
                 if subqueries.is_empty() {
                     // regular filter, no subquery exists clause here
@@ -116,12 +116,9 @@ impl OptimizerRule for ScalarSubqueryToJoin {
                 // iterate through all subqueries in predicate, turning each 
into a join
                 let mut cur_input = filter.input().as_ref().clone();
                 for subquery in subqueries {
-                    if let Some(optimized_subquery) = optimize_scalar(
-                        &subquery,
-                        &cur_input,
-                        &other_exprs,
-                        optimizer_config,
-                    )? {
+                    if let Some(optimized_subquery) =
+                        optimize_scalar(&subquery, &cur_input, &other_exprs, 
config)?
+                    {
                         cur_input = optimized_subquery;
                     } else {
                         // if we can't handle all of the subqueries then bail 
for now
@@ -135,11 +132,7 @@ impl OptimizerRule for ScalarSubqueryToJoin {
             }
             _ => {
                 // Apply the optimization to all inputs of the plan
-                Ok(Some(utils::optimize_children(
-                    self,
-                    plan,
-                    optimizer_config,
-                )?))
+                Ok(Some(utils::optimize_children(self, plan, config)?))
             }
         }
     }
@@ -189,7 +182,7 @@ fn optimize_scalar(
     query_info: &SubqueryInfo,
     filter_input: &LogicalPlan,
     outer_others: &[Expr],
-    optimizer_config: &mut OptimizerConfig,
+    config: &dyn OptimizerConfig,
 ) -> Result<Option<LogicalPlan>> {
     let subquery = query_info.query.subquery.as_ref();
     debug!(
@@ -258,7 +251,7 @@ fn optimize_scalar(
     }
 
     // Only operate if one column is present and the other closed upon from 
outside scope
-    let subqry_alias = format!("__sq_{}", optimizer_config.next_id());
+    let subqry_alias = format!("__sq_{}", config.next_id());
     let group_by: Vec<_> = subqry_cols
         .iter()
         .map(|it| Expr::Column(it.clone()))
diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs 
b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
index 7b7ecd0d0..6645284e3 100644
--- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
+++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
@@ -46,11 +46,10 @@ impl OptimizerRule for SimplifyExpressions {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         let mut execution_props = ExecutionProps::new();
-        execution_props.query_execution_start_time =
-            optimizer_config.query_execution_start_time();
+        execution_props.query_execution_start_time = 
config.query_execution_start_time();
         Ok(Some(Self::optimize_internal(plan, &execution_props)?))
     }
 }
@@ -128,6 +127,7 @@ mod tests {
     use datafusion_common::ScalarValue;
     use datafusion_expr::{or, Between, BinaryExpr, Cast, Operator};
 
+    use crate::OptimizerContext;
     use datafusion_expr::logical_plan::table_scan;
     use datafusion_expr::{
         and, binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, 
Expr,
@@ -172,7 +172,7 @@ mod tests {
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> 
Result<()> {
         let rule = SimplifyExpressions::new();
         let optimized_plan = rule
-            .try_optimize(plan, &mut OptimizerConfig::new())
+            .try_optimize(plan, &OptimizerContext::new())
             .unwrap()
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
@@ -380,12 +380,11 @@ mod tests {
 
     // expect optimizing will result in an error, returning the error string
     fn get_optimized_plan_err(plan: &LogicalPlan, date_time: &DateTime<Utc>) 
-> String {
-        let mut config =
-            OptimizerConfig::new().with_query_execution_start_time(*date_time);
+        let config = 
OptimizerContext::new().with_query_execution_start_time(*date_time);
         let rule = SimplifyExpressions::new();
 
         let err = rule
-            .try_optimize(plan, &mut config)
+            .try_optimize(plan, &config)
             .expect_err("expected optimization to fail");
 
         err.to_string()
@@ -395,12 +394,11 @@ mod tests {
         plan: &LogicalPlan,
         date_time: &DateTime<Utc>,
     ) -> String {
-        let mut config =
-            OptimizerConfig::new().with_query_execution_start_time(*date_time);
+        let config = 
OptimizerContext::new().with_query_execution_start_time(*date_time);
         let rule = SimplifyExpressions::new();
 
         let optimized_plan = rule
-            .try_optimize(plan, &mut config)
+            .try_optimize(plan, &config)
             .unwrap()
             .expect("failed to optimize plan");
         format!("{:?}", optimized_plan)
diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs 
b/datafusion/optimizer/src/single_distinct_to_groupby.rs
index 2230033f0..4ba2e1599 100644
--- a/datafusion/optimizer/src/single_distinct_to_groupby.rs
+++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs
@@ -86,7 +86,7 @@ impl OptimizerRule for SingleDistinctToGroupBy {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         match plan {
             LogicalPlan::Aggregate(Aggregate {
@@ -156,7 +156,7 @@ impl OptimizerRule for SingleDistinctToGroupBy {
                         Vec::new(),
                     )?);
                     let inner_agg =
-                        utils::optimize_children(self, &grouped_aggr, 
optimizer_config)?;
+                        utils::optimize_children(self, &grouped_aggr, config)?;
 
                     let outer_aggr_schema = 
Arc::new(DFSchema::new_with_metadata(
                         outer_group_exprs
@@ -200,18 +200,10 @@ impl OptimizerRule for SingleDistinctToGroupBy {
                         )?,
                     )))
                 } else {
-                    Ok(Some(utils::optimize_children(
-                        self,
-                        plan,
-                        optimizer_config,
-                    )?))
+                    Ok(Some(utils::optimize_children(self, plan, config)?))
                 }
             }
-            _ => Ok(Some(utils::optimize_children(
-                self,
-                plan,
-                optimizer_config,
-            )?)),
+            _ => Ok(Some(utils::optimize_children(self, plan, config)?)),
         }
     }
     fn name(&self) -> &str {
@@ -223,6 +215,7 @@ impl OptimizerRule for SingleDistinctToGroupBy {
 mod tests {
     use super::*;
     use crate::test::*;
+    use crate::OptimizerContext;
     use datafusion_expr::expr::GroupingSet;
     use datafusion_expr::{
         col, count, count_distinct, lit, 
logical_plan::builder::LogicalPlanBuilder, max,
@@ -232,7 +225,7 @@ mod tests {
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = SingleDistinctToGroupBy::new();
         let optimized_plan = rule
-            .try_optimize(plan, &mut OptimizerConfig::new())
+            .try_optimize(plan, &OptimizerContext::new())
             .unwrap()
             .expect("failed to optimize plan");
 
diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs 
b/datafusion/optimizer/src/subquery_filter_to_join.rs
index 36c18ea74..436d478b9 100644
--- a/datafusion/optimizer/src/subquery_filter_to_join.rs
+++ b/datafusion/optimizer/src/subquery_filter_to_join.rs
@@ -52,13 +52,13 @@ impl OptimizerRule for SubqueryFilterToJoin {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         match plan {
             LogicalPlan::Filter(filter) => {
                 // Apply optimizer rule to current input
                 let optimized_input = self
-                    .try_optimize(filter.input(), optimizer_config)?
+                    .try_optimize(filter.input(), config)?
                     .unwrap_or_else(|| filter.input().as_ref().clone());
 
                 // Splitting filter expression into components by AND
@@ -98,7 +98,7 @@ impl OptimizerRule for SubqueryFilterToJoin {
                         } => {
                             let right_input = self.try_optimize(
                                 &subquery.subquery,
-                                optimizer_config
+                                config
                             
)?.unwrap_or_else(||subquery.subquery.as_ref().clone());
                             let right_schema = right_input.schema();
                             if right_schema.fields().len() != 1 {
@@ -168,11 +168,7 @@ impl OptimizerRule for SubqueryFilterToJoin {
             }
             _ => {
                 // Apply the optimization to all inputs of the plan
-                Ok(Some(utils::optimize_children(
-                    self,
-                    plan,
-                    optimizer_config,
-                )?))
+                Ok(Some(utils::optimize_children(self, plan, config)?))
             }
         }
     }
@@ -204,6 +200,7 @@ fn extract_subquery_filters(expression: &Expr, extracted: 
&mut Vec<Expr>) -> Res
 mod tests {
     use super::*;
     use crate::test::*;
+    use crate::OptimizerContext;
     use datafusion_expr::{
         and, binary_expr, col, in_subquery, lit, 
logical_plan::LogicalPlanBuilder,
         not_in_subquery, or, Operator,
@@ -212,7 +209,7 @@ mod tests {
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = SubqueryFilterToJoin::new();
         let optimized_plan = rule
-            .try_optimize(plan, &mut OptimizerConfig::new())
+            .try_optimize(plan, &OptimizerContext::new())
             .unwrap()
             .expect("failed to optimize plan");
         let formatted_plan = format!("{}", 
optimized_plan.display_indent_schema());
diff --git a/datafusion/optimizer/src/test/mod.rs 
b/datafusion/optimizer/src/test/mod.rs
index 1f51b38f6..462b94dd0 100644
--- a/datafusion/optimizer/src/test/mod.rs
+++ b/datafusion/optimizer/src/test/mod.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::{OptimizerConfig, OptimizerRule};
+use crate::{OptimizerContext, OptimizerRule};
 use arrow::datatypes::{DataType, Field, Schema};
 use datafusion_common::Result;
 use datafusion_expr::{col, logical_plan::table_scan, LogicalPlan, 
LogicalPlanBuilder};
@@ -107,7 +107,7 @@ pub fn assert_optimized_plan_eq(
     expected: &str,
 ) {
     let optimized_plan = rule
-        .try_optimize(plan, &mut OptimizerConfig::new())
+        .try_optimize(plan, &OptimizerContext::new())
         .unwrap()
         .expect("failed to optimize plan");
     let formatted_plan = format!("{}", optimized_plan.display_indent_schema());
@@ -119,7 +119,7 @@ pub fn assert_optimizer_err(
     plan: &LogicalPlan,
     expected: &str,
 ) {
-    let res = rule.try_optimize(plan, &mut OptimizerConfig::new());
+    let res = rule.try_optimize(plan, &OptimizerContext::new());
     match res {
         Ok(plan) => assert_eq!(format!("{}", plan.unwrap().display_indent()), 
"An error"),
         Err(ref e) => {
@@ -133,7 +133,7 @@ pub fn assert_optimizer_err(
 
 pub fn assert_optimization_skipped(rule: &dyn OptimizerRule, plan: 
&LogicalPlan) {
     let new_plan = rule
-        .try_optimize(plan, &mut OptimizerConfig::new())
+        .try_optimize(plan, &OptimizerContext::new())
         .unwrap()
         .unwrap();
     assert_eq!(
diff --git a/datafusion/optimizer/src/type_coercion.rs 
b/datafusion/optimizer/src/type_coercion.rs
index 8436f9b57..d1655fe35 100644
--- a/datafusion/optimizer/src/type_coercion.rs
+++ b/datafusion/optimizer/src/type_coercion.rs
@@ -17,9 +17,10 @@
 
 //! Optimizer rule for type validation and coercion
 
-use crate::utils::rewrite_preserving_name;
-use crate::{OptimizerConfig, OptimizerRule};
+use std::sync::Arc;
+
 use arrow::datatypes::{DataType, IntervalUnit};
+
 use datafusion_common::{
     parse_interval, DFSchema, DFSchemaRef, DataFusionError, Result, 
ScalarValue,
 };
@@ -39,7 +40,9 @@ use datafusion_expr::{
     WindowFrame, WindowFrameBound, WindowFrameUnits,
 };
 use datafusion_expr::{ExprSchemable, Signature};
-use std::sync::Arc;
+
+use crate::utils::rewrite_preserving_name;
+use crate::{OptimizerConfig, OptimizerRule};
 
 #[derive(Default)]
 pub struct TypeCoercion {}
@@ -58,7 +61,7 @@ impl OptimizerRule for TypeCoercion {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        _optimizer_config: &mut OptimizerConfig,
+        _: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         Ok(Some(optimize_internal(&DFSchema::empty(), plan)?))
     }
@@ -584,9 +587,10 @@ fn coerce_agg_exprs_for_signature(
 
 #[cfg(test)]
 mod test {
-    use crate::type_coercion::{TypeCoercion, TypeCoercionRewriter};
-    use crate::{OptimizerConfig, OptimizerRule};
+    use std::sync::Arc;
+
     use arrow::datatypes::DataType;
+
     use datafusion_common::{DFField, DFSchema, Result, ScalarValue};
     use datafusion_expr::expr::Like;
     use datafusion_expr::expr_rewriter::ExprRewritable;
@@ -602,12 +606,14 @@ mod test {
         Signature, Volatility,
     };
     use datafusion_physical_expr::expressions::AvgAccumulator;
-    use std::sync::Arc;
+
+    use crate::type_coercion::{TypeCoercion, TypeCoercionRewriter};
+    use crate::{OptimizerContext, OptimizerRule};
 
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> 
Result<()> {
         let rule = TypeCoercion::new();
-        let mut config = OptimizerConfig::default();
-        let plan = rule.try_optimize(plan, &mut config)?.unwrap();
+        let config = OptimizerContext::default();
+        let plan = rule.try_optimize(plan, &config)?.unwrap();
         assert_eq!(expected, &format!("{:?}", plan));
         Ok(())
     }
diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs 
b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
index 0d6a1265c..9f6d1a5ac 100644
--- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
+++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
@@ -82,13 +82,13 @@ impl OptimizerRule for UnwrapCastInComparison {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
-        _optimizer_config: &mut OptimizerConfig,
+        _config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
         let new_inputs = plan
             .inputs()
             .into_iter()
             .map(|input| {
-                self.try_optimize(input, _optimizer_config)
+                self.try_optimize(input, _config)
                     .map(|o| o.unwrap_or_else(|| input.clone()))
             })
             .collect::<Result<Vec<_>>>()?;
diff --git a/datafusion/optimizer/src/utils.rs 
b/datafusion/optimizer/src/utils.rs
index e2d326c16..324f49e44 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -40,12 +40,12 @@ use std::sync::Arc;
 pub fn optimize_children(
     optimizer: &impl OptimizerRule,
     plan: &LogicalPlan,
-    optimizer_config: &mut OptimizerConfig,
+    config: &dyn OptimizerConfig,
 ) -> Result<LogicalPlan> {
     let new_exprs = plan.expressions();
     let mut new_inputs = Vec::with_capacity(plan.inputs().len());
     for input in plan.inputs() {
-        let new_input = optimizer.try_optimize(input, optimizer_config)?;
+        let new_input = optimizer.try_optimize(input, config)?;
         new_inputs.push(new_input.unwrap_or_else(|| input.clone()))
     }
     from_plan(plan, &new_exprs, &new_inputs)
diff --git a/datafusion/optimizer/tests/integration-test.rs 
b/datafusion/optimizer/tests/integration-test.rs
index 701d1a84c..62a8f1ef2 100644
--- a/datafusion/optimizer/tests/integration-test.rs
+++ b/datafusion/optimizer/tests/integration-test.rs
@@ -20,7 +20,7 @@ use chrono::{DateTime, NaiveDateTime, Utc};
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource};
 use datafusion_optimizer::optimizer::Optimizer;
-use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
+use datafusion_optimizer::{OptimizerContext, OptimizerRule};
 use datafusion_sql::planner::{ContextProvider, SqlToRel};
 use datafusion_sql::sqlparser::ast::Statement;
 use datafusion_sql::sqlparser::dialect::GenericDialect;
@@ -330,12 +330,12 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
     // hard code the return value of now()
     let ts = NaiveDateTime::from_timestamp_opt(1666615693, 0).unwrap();
     let now_time = DateTime::<Utc>::from_utc(ts, Utc);
-    let mut config = OptimizerConfig::new()
+    let config = OptimizerContext::new()
         .with_skip_failing_rules(false)
         .with_query_execution_start_time(now_time);
-    let optimizer = Optimizer::new(&config);
+    let optimizer = Optimizer::new();
     // optimize the logical plan
-    optimizer.optimize(&plan, &mut config, &observe)
+    optimizer.optimize(&plan, &config, &observe)
 }
 
 struct MySchemaProvider {}


Reply via email to