This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b990987bdc Add PhysicalOptimizerRule::optimize_plan to allow passing 
more context into optimizer rules (#18739)
b990987bdc is described below

commit b990987bdcdf1a9f1f4252810272076af261b15a
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Thu Nov 27 14:01:04 2025 +0530

    Add PhysicalOptimizerRule::optimize_plan to allow passing more context into 
optimizer rules (#18739)
    
    Co-authored-by: Gabriel Musat Mestre <[email protected]>
---
 datafusion/core/benches/push_down_filter.rs        |  8 ++-
 datafusion/core/src/physical_planner.rs            |  7 +-
 datafusion/core/tests/execution/coop.rs            |  6 +-
 datafusion/core/tests/parquet/file_statistics.rs   |  6 +-
 .../physical_optimizer/aggregate_statistics.rs     | 21 ++++--
 .../combine_partial_final_agg.rs                   |  9 +--
 .../physical_optimizer/enforce_distribution.rs     | 26 ++++++--
 .../tests/physical_optimizer/enforce_sorting.rs    | 13 ++--
 .../physical_optimizer/filter_pushdown/mod.rs      | 36 +++++++---
 .../physical_optimizer/filter_pushdown/util.rs     |  7 +-
 .../tests/physical_optimizer/join_selection.rs     | 45 +++++++++----
 .../tests/physical_optimizer/limit_pushdown.rs     | 47 +++++++++----
 .../physical_optimizer/projection_pushdown.rs      | 75 +++++++++++++++------
 .../tests/physical_optimizer/sanity_checker.rs     | 11 ++--
 .../core/tests/physical_optimizer/test_utils.rs    | 11 ++--
 datafusion/execution/src/config.rs                 | 73 ++++++++++++++++----
 .../physical-optimizer/src/aggregate_statistics.rs | 15 +++--
 .../physical-optimizer/src/coalesce_batches.rs     | 11 ++--
 .../src/combine_partial_final_agg.rs               |  7 +-
 .../physical-optimizer/src/enforce_distribution.rs |  6 +-
 datafusion/physical-optimizer/src/ensure_coop.rs   | 14 ++--
 .../physical-optimizer/src/filter_pushdown.rs      | 16 ++++-
 .../physical-optimizer/src/join_selection.rs       |  7 +-
 datafusion/physical-optimizer/src/lib.rs           |  2 +-
 .../physical-optimizer/src/limit_pushdown.rs       |  7 +-
 .../src/limit_pushdown_past_window.rs              |  8 +--
 .../src/limited_distinct_aggregation.rs            |  8 +--
 datafusion/physical-optimizer/src/optimizer.rs     | 77 ++++++++++++++++++++--
 .../physical-optimizer/src/output_requirements.rs  |  7 +-
 .../physical-optimizer/src/projection_pushdown.rs  | 14 ++--
 .../physical-optimizer/src/sanity_checker.rs       |  9 +--
 .../physical-optimizer/src/topk_aggregation.rs     |  8 +--
 .../physical-optimizer/src/update_aggr_exprs.rs    |  9 ++-
 33 files changed, 446 insertions(+), 180 deletions(-)

diff --git a/datafusion/core/benches/push_down_filter.rs 
b/datafusion/core/benches/push_down_filter.rs
index 139fb12c30..bab103c171 100644
--- a/datafusion/core/benches/push_down_filter.rs
+++ b/datafusion/core/benches/push_down_filter.rs
@@ -20,10 +20,10 @@ use arrow::datatypes::{DataType, Field, Schema};
 use bytes::{BufMut, BytesMut};
 use criterion::{criterion_group, criterion_main, Criterion};
 use datafusion::config::ConfigOptions;
-use datafusion::prelude::{ParquetReadOptions, SessionContext};
+use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
 use datafusion_execution::object_store::ObjectStoreUrl;
 use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
 use datafusion_physical_plan::ExecutionPlan;
 use object_store::memory::InMemory;
 use object_store::path::Path;
@@ -106,11 +106,13 @@ fn bench_push_down_filter(c: &mut Criterion) {
     config.execution.parquet.pushdown_filters = true;
     let plan = BenchmarkPlan { plan, config };
     let optimizer = FilterPushdown::new();
+    let session_config = SessionConfig::from(plan.config.clone());
+    let optimizer_context = OptimizerContext::new(session_config);
 
     c.bench_function("push_down_filter", |b| {
         b.iter(|| {
             optimizer
-                .optimize(Arc::clone(&plan.plan), &plan.config)
+                .optimize_plan(Arc::clone(&plan.plan), &optimizer_context)
                 .unwrap();
         });
     });
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 2ae5aed30d..6f0b201185 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -92,7 +92,7 @@ use datafusion_physical_expr::expressions::Literal;
 use datafusion_physical_expr::{
     create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
 };
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
 use datafusion_physical_plan::empty::EmptyExec;
 use datafusion_physical_plan::execution_plan::InvariantLevel;
 use datafusion_physical_plan::joins::PiecewiseMergeJoinExec;
@@ -2271,11 +2271,14 @@ impl DefaultPhysicalPlanner {
         // to verify that the plan fulfills the base requirements.
         InvariantChecker(InvariantLevel::Always).check(&plan)?;
 
+        // Create optimizer context from session state
+        let optimizer_context = 
OptimizerContext::new(session_state.config().clone());
+
         let mut new_plan = Arc::clone(&plan);
         for optimizer in optimizers {
             let before_schema = new_plan.schema();
             new_plan = optimizer
-                .optimize(new_plan, session_state.config_options())
+                .optimize_plan(new_plan, &optimizer_context)
                 .map_err(|e| {
                     DataFusionError::Context(optimizer.name().to_string(), 
Box::new(e))
                 })?;
diff --git a/datafusion/core/tests/execution/coop.rs 
b/datafusion/core/tests/execution/coop.rs
index b6f406e967..c85b1dcdb5 100644
--- a/datafusion/core/tests/execution/coop.rs
+++ b/datafusion/core/tests/execution/coop.rs
@@ -40,7 +40,7 @@ use datafusion_physical_expr::Partitioning;
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
PhysicalSortExpr};
 use datafusion_physical_optimizer::ensure_coop::EnsureCooperative;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
 use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
 use datafusion_physical_plan::coop::make_cooperative;
 use datafusion_physical_plan::filter::FilterExec;
@@ -810,8 +810,8 @@ async fn query_yields(
     task_ctx: Arc<TaskContext>,
 ) -> Result<(), Box<dyn Error>> {
     // Run plan through EnsureCooperative
-    let optimized =
-        EnsureCooperative::new().optimize(plan, 
task_ctx.session_config().options())?;
+    let optimizer_context = 
OptimizerContext::new(task_ctx.session_config().clone());
+    let optimized = EnsureCooperative::new().optimize_plan(plan, 
&optimizer_context)?;
 
     // Get the stream
     let stream = physical_plan::execute_stream(optimized, task_ctx)?;
diff --git a/datafusion/core/tests/parquet/file_statistics.rs 
b/datafusion/core/tests/parquet/file_statistics.rs
index 64ee92eda2..db2715956d 100644
--- a/datafusion/core/tests/parquet/file_statistics.rs
+++ b/datafusion/core/tests/parquet/file_statistics.rs
@@ -40,7 +40,7 @@ use datafusion_expr::{col, lit, Expr};
 use datafusion::datasource::physical_plan::FileScanConfig;
 use datafusion_common::config::ConfigOptions;
 use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
 use datafusion_physical_plan::filter::FilterExec;
 use datafusion_physical_plan::ExecutionPlan;
 use tempfile::tempdir;
@@ -84,8 +84,10 @@ async fn check_stats_precision_with_filter_pushdown() {
         Arc::new(FilterExec::try_new(physical_filter, 
exec_with_filter).unwrap())
             as Arc<dyn ExecutionPlan>;
 
+    let session_config = SessionConfig::from(options.clone());
+    let optimizer_context = OptimizerContext::new(session_config);
     let optimized_exec = FilterPushdown::new()
-        .optimize(filtered_exec, &options)
+        .optimize_plan(filtered_exec, &optimizer_context)
         .unwrap();
 
     assert!(
diff --git a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs 
b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs
index a79d743cb2..07c4a14283 100644
--- a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs
@@ -25,12 +25,13 @@ use arrow::record_batch::RecordBatch;
 use datafusion::datasource::memory::MemorySourceConfig;
 use datafusion::datasource::source::DataSourceExec;
 use datafusion_common::cast::as_int64_array;
-use datafusion_common::config::ConfigOptions;
 use datafusion_common::Result;
+use datafusion_execution::config::SessionConfig;
 use datafusion_execution::TaskContext;
 use datafusion_expr::Operator;
 use datafusion_physical_expr::expressions::{self, cast};
 use datafusion_physical_optimizer::aggregate_statistics::AggregateStatistics;
+use datafusion_physical_optimizer::OptimizerContext;
 use datafusion_physical_optimizer::PhysicalOptimizerRule;
 use datafusion_physical_plan::aggregates::AggregateExec;
 use datafusion_physical_plan::aggregates::AggregateMode;
@@ -67,8 +68,10 @@ async fn assert_count_optim_success(
     let task_ctx = Arc::new(TaskContext::default());
     let plan: Arc<dyn ExecutionPlan> = Arc::new(plan);
 
-    let config = ConfigOptions::new();
-    let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), 
&config)?;
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
+    let optimized = AggregateStatistics::new()
+        .optimize_plan(Arc::clone(&plan), &optimizer_context)?;
 
     // A ProjectionExec is a sign that the count optimization was applied
     assert!(optimized.as_any().is::<ProjectionExec>());
@@ -264,8 +267,10 @@ async fn test_count_inexact_stat() -> Result<()> {
         Arc::clone(&schema),
     )?;
 
-    let conf = ConfigOptions::new();
-    let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), 
&conf)?;
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
+    let optimized = AggregateStatistics::new()
+        .optimize_plan(Arc::new(final_agg), &optimizer_context)?;
 
     // check that the original ExecutionPlan was not replaced
     assert!(optimized.as_any().is::<AggregateExec>());
@@ -308,8 +313,10 @@ async fn test_count_with_nulls_inexact_stat() -> 
Result<()> {
         Arc::clone(&schema),
     )?;
 
-    let conf = ConfigOptions::new();
-    let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), 
&conf)?;
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
+    let optimized = AggregateStatistics::new()
+        .optimize_plan(Arc::new(final_agg), &optimizer_context)?;
 
     // check that the original ExecutionPlan was not replaced
     assert!(optimized.as_any().is::<AggregateExec>());
diff --git 
a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs 
b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs
index 9c76f6ab6f..b4b55a93a6 100644
--- a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs
@@ -26,7 +26,7 @@ use std::sync::Arc;
 use crate::physical_optimizer::test_utils::parquet_exec;
 
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use datafusion_common::config::ConfigOptions;
+use datafusion_execution::config::SessionConfig;
 use datafusion_functions_aggregate::count::count_udaf;
 use datafusion_functions_aggregate::sum::sum_udaf;
 use datafusion_physical_expr::aggregate::{AggregateExprBuilder, 
AggregateFunctionExpr};
@@ -34,7 +34,7 @@ use datafusion_physical_expr::expressions::{col, lit};
 use datafusion_physical_expr::Partitioning;
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 use 
datafusion_physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
 use datafusion_physical_plan::aggregates::{
     AggregateExec, AggregateMode, PhysicalGroupBy,
 };
@@ -47,8 +47,9 @@ macro_rules! assert_optimized {
     ($PLAN: expr, @ $EXPECTED_LINES: literal $(,)?) => {
         // run optimizer
         let optimizer = CombinePartialFinalAggregate {};
-        let config = ConfigOptions::new();
-        let optimized = optimizer.optimize($PLAN, &config)?;
+        let session_config = SessionConfig::new();
+        let optimizer_context = OptimizerContext::new(session_config.clone());
+        let optimized = optimizer.optimize_plan($PLAN, &optimizer_context)?;
         // Now format correctly
         let plan = displayable(optimized.as_ref()).indent(true).to_string();
         let actual_lines = plan.trim();
diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
index a3d9a1e407..b1a0417703 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
@@ -52,7 +52,7 @@ use datafusion_physical_expr_common::sort_expr::{
 use datafusion_physical_optimizer::enforce_distribution::*;
 use datafusion_physical_optimizer::enforce_sorting::EnforceSorting;
 use datafusion_physical_optimizer::output_requirements::OutputRequirements;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
 use datafusion_physical_plan::aggregates::{
     AggregateExec, AggregateMode, PhysicalGroupBy,
 };
@@ -489,7 +489,9 @@ impl TestConfig {
     ) -> Result<Arc<dyn ExecutionPlan>> {
         // Add the ancillary output requirements operator at the start:
         let optimizer = OutputRequirements::new_add_mode();
-        let mut optimized = optimizer.optimize(plan.clone(), &self.config)?;
+        let session_config = SessionConfig::from(self.config.clone());
+        let optimizer_context = OptimizerContext::new(session_config.clone());
+        let mut optimized = optimizer.optimize_plan(plan.clone(), 
&optimizer_context)?;
 
         // This file has 2 rules that use tree node, apply these rules to 
original plan consecutively
         // After these operations tree nodes should be in a consistent state.
@@ -525,21 +527,25 @@ impl TestConfig {
         }
 
         for run in optimizers_to_run {
+            let session_config = SessionConfig::from(self.config.clone());
+            let optimizer_context = 
OptimizerContext::new(session_config.clone());
             optimized = match run {
                 Run::Distribution => {
                     let optimizer = EnforceDistribution::new();
-                    optimizer.optimize(optimized, &self.config)?
+                    optimizer.optimize_plan(optimized, &optimizer_context)?
                 }
                 Run::Sorting => {
                     let optimizer = EnforceSorting::new();
-                    optimizer.optimize(optimized, &self.config)?
+                    optimizer.optimize_plan(optimized, &optimizer_context)?
                 }
             };
         }
 
         // Remove the ancillary output requirements operator when done:
         let optimizer = OutputRequirements::new_remove_mode();
-        let optimized = optimizer.optimize(optimized, &self.config)?;
+        let session_config = SessionConfig::from(self.config.clone());
+        let optimizer_context = OptimizerContext::new(session_config.clone());
+        let optimized = optimizer.optimize_plan(optimized, 
&optimizer_context)?;
 
         Ok(optimized)
     }
@@ -3372,7 +3378,10 @@ SortRequiredExec: [a@0 ASC]
     config.execution.target_partitions = 10;
     config.optimizer.enable_round_robin_repartition = true;
     config.optimizer.prefer_existing_sort = false;
-    let dist_plan = EnforceDistribution::new().optimize(physical_plan, 
&config)?;
+    let session_config = SessionConfig::from(config);
+    let optimizer_context = OptimizerContext::new(session_config.clone());
+    let dist_plan =
+        EnforceDistribution::new().optimize_plan(physical_plan, 
&optimizer_context)?;
     // Since at the start of the rule ordering requirement is not satisfied
     // EnforceDistribution rule doesn't satisfy this requirement either.
     assert_plan!(dist_plan, @r"
@@ -3408,7 +3417,10 @@ SortRequiredExec: [a@0 ASC]
     config.execution.target_partitions = 10;
     config.optimizer.enable_round_robin_repartition = true;
     config.optimizer.prefer_existing_sort = false;
-    let dist_plan = EnforceDistribution::new().optimize(physical_plan, 
&config)?;
+    let session_config = SessionConfig::from(config);
+    let optimizer_context = OptimizerContext::new(session_config.clone());
+    let dist_plan =
+        EnforceDistribution::new().optimize_plan(physical_plan, 
&optimizer_context)?;
     // Since at the start of the rule ordering requirement is satisfied
     // EnforceDistribution rule satisfy this requirement also.
     assert_plan!(dist_plan, @r"
diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs 
b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
index 3f7fa62d68..881ba35ef6 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
@@ -56,7 +56,7 @@ use 
datafusion_physical_optimizer::enforce_sorting::replace_with_order_preservin
 use 
datafusion_physical_optimizer::enforce_sorting::sort_pushdown::{SortPushDown, 
assign_initial_requirements, pushdown_sorts};
 use datafusion_physical_optimizer::enforce_distribution::EnforceDistribution;
 use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
 use datafusion::prelude::*;
 use arrow::array::{Int32Array, RecordBatch};
 use arrow::datatypes::{Field};
@@ -175,8 +175,10 @@ impl EnforceSortingTest {
         let input_plan_string = 
displayable(self.plan.as_ref()).indent(true).to_string();
 
         // Run the actual optimizer
+        let session_config = SessionConfig::from(config);
+        let optimizer_context = OptimizerContext::new(session_config.clone());
         let optimized_physical_plan = EnforceSorting::new()
-            .optimize(Arc::clone(&self.plan), &config)
+            .optimize_plan(Arc::clone(&self.plan), &optimizer_context)
             .expect("enforce_sorting failed");
 
         // Get string representation of the plan
@@ -2363,15 +2365,18 @@ async fn test_commutativity() -> Result<()> {
     "#);
 
     let config = ConfigOptions::new();
+    let session_config = SessionConfig::from(config);
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let rules = vec![
         Arc::new(EnforceDistribution::new()) as Arc<dyn PhysicalOptimizerRule>,
         Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,
     ];
     let mut first_plan = orig_plan.clone();
     for rule in rules {
-        first_plan = rule.optimize(first_plan, &config)?;
+        first_plan = rule.optimize_plan(first_plan, &optimizer_context)?;
     }
 
+    let optimizer_context2 = OptimizerContext::new(session_config.clone());
     let rules = vec![
         Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,
         Arc::new(EnforceDistribution::new()) as Arc<dyn PhysicalOptimizerRule>,
@@ -2379,7 +2384,7 @@ async fn test_commutativity() -> Result<()> {
     ];
     let mut second_plan = orig_plan.clone();
     for rule in rules {
-        second_plan = rule.optimize(second_plan, &config)?;
+        second_plan = rule.optimize_plan(second_plan, &optimizer_context2)?;
     }
 
     assert_eq!(get_plan_string(&first_plan), get_plan_string(&second_plan));
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs 
b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
index 0903194b15..7b5557e617 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
@@ -47,7 +47,7 @@ use datafusion_physical_expr::{
 };
 use datafusion_physical_expr::{expressions::col, LexOrdering, 
PhysicalSortExpr};
 use datafusion_physical_optimizer::{
-    filter_pushdown::FilterPushdown, PhysicalOptimizerRule,
+    filter_pushdown::FilterPushdown, OptimizerContext, PhysicalOptimizerRule,
 };
 use datafusion_physical_plan::{
     aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
@@ -244,8 +244,10 @@ async fn 
test_dynamic_filter_pushdown_through_hash_join_with_topk() {
     config.execution.parquet.pushdown_filters = true;
 
     // Apply the FilterPushdown optimizer rule
+    let session_config = SessionConfig::from(config);
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let plan = FilterPushdown::new_post_optimization()
-        .optimize(Arc::clone(&plan), &config)
+        .optimize_plan(Arc::clone(&plan), &optimizer_context)
         .unwrap();
 
     // Test that filters are pushed down correctly to each side of the join
@@ -721,8 +723,10 @@ async fn test_topk_dynamic_filter_pushdown() {
     // Actually apply the optimization to the plan and put some data through 
it to check that the filter is updated to reflect the TopK state
     let mut config = ConfigOptions::default();
     config.execution.parquet.pushdown_filters = true;
+    let session_config = SessionConfig::from(config);
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let plan = FilterPushdown::new_post_optimization()
-        .optimize(plan, &config)
+        .optimize_plan(plan, &optimizer_context)
         .unwrap();
     let config = SessionConfig::new().with_batch_size(2);
     let session_ctx = SessionContext::new_with_config(config);
@@ -804,8 +808,10 @@ async fn 
test_topk_dynamic_filter_pushdown_multi_column_sort() {
     // Actually apply the optimization to the plan and put some data through 
it to check that the filter is updated to reflect the TopK state
     let mut config = ConfigOptions::default();
     config.execution.parquet.pushdown_filters = true;
+    let session_config = SessionConfig::from(config);
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let plan = FilterPushdown::new_post_optimization()
-        .optimize(plan, &config)
+        .optimize_plan(plan, &optimizer_context)
         .unwrap();
     let config = SessionConfig::new().with_batch_size(2);
     let session_ctx = SessionContext::new_with_config(config);
@@ -1048,8 +1054,10 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
     let mut config = ConfigOptions::default();
     config.execution.parquet.pushdown_filters = true;
     config.optimizer.enable_dynamic_filter_pushdown = true;
+    let session_config = SessionConfig::from(config);
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let plan = FilterPushdown::new_post_optimization()
-        .optimize(plan, &config)
+        .optimize_plan(plan, &optimizer_context)
         .unwrap();
 
     // Test for https://github.com/apache/datafusion/pull/17371: dynamic 
filter linking survives `with_new_children`
@@ -1276,8 +1284,10 @@ async fn 
test_hashjoin_dynamic_filter_pushdown_partitioned() {
     let mut config = ConfigOptions::default();
     config.execution.parquet.pushdown_filters = true;
     config.optimizer.enable_dynamic_filter_pushdown = true;
+    let session_config = SessionConfig::from(config);
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let plan = FilterPushdown::new_post_optimization()
-        .optimize(plan, &config)
+        .optimize_plan(plan, &optimizer_context)
         .unwrap();
     let config = SessionConfig::new().with_batch_size(10);
     let session_ctx = SessionContext::new_with_config(config);
@@ -1473,8 +1483,10 @@ async fn 
test_hashjoin_dynamic_filter_pushdown_collect_left() {
     let mut config = ConfigOptions::default();
     config.execution.parquet.pushdown_filters = true;
     config.optimizer.enable_dynamic_filter_pushdown = true;
+    let session_config = SessionConfig::from(config);
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let plan = FilterPushdown::new_post_optimization()
-        .optimize(plan, &config)
+        .optimize_plan(plan, &optimizer_context)
         .unwrap();
     let config = SessionConfig::new().with_batch_size(10);
     let session_ctx = SessionContext::new_with_config(config);
@@ -1645,8 +1657,10 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
     let mut config = ConfigOptions::default();
     config.execution.parquet.pushdown_filters = true;
     config.optimizer.enable_dynamic_filter_pushdown = true;
+    let session_config = SessionConfig::from(config);
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let plan = FilterPushdown::new_post_optimization()
-        .optimize(outer_join, &config)
+        .optimize_plan(outer_join, &optimizer_context)
         .unwrap();
     let config = SessionConfig::new().with_batch_size(10);
     let session_ctx = SessionContext::new_with_config(config);
@@ -2426,7 +2440,8 @@ async fn 
test_hashjoin_dynamic_filter_all_partitions_empty() {
     let mut config = SessionConfig::new();
     config.options_mut().execution.parquet.pushdown_filters = true;
     let optimizer = FilterPushdown::new_post_optimization();
-    let plan = optimizer.optimize(plan, config.options()).unwrap();
+    let ctx = OptimizerContext::new(config.clone());
+    let plan = optimizer.optimize_plan(plan, &ctx).unwrap();
 
     insta::assert_snapshot!(
         format_plan_for_test(&plan),
@@ -2563,7 +2578,8 @@ async fn test_hashjoin_dynamic_filter_with_nulls() {
     let mut config = SessionConfig::new();
     config.options_mut().execution.parquet.pushdown_filters = true;
     let optimizer = FilterPushdown::new_post_optimization();
-    let plan = optimizer.optimize(plan, config.options()).unwrap();
+    let ctx = OptimizerContext::new(config.clone());
+    let plan = optimizer.optimize_plan(plan, &ctx).unwrap();
 
     insta::assert_snapshot!(
         format_plan_for_test(&plan),
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs 
b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
index c32f7b2d0b..fc2be94b01 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
@@ -25,8 +25,9 @@ use datafusion_datasource::{
     file_stream::FileOpener, schema_adapter::DefaultSchemaAdapterFactory,
     schema_adapter::SchemaAdapterFactory, source::DataSourceExec, 
PartitionedFile,
 };
+use datafusion_execution::config::SessionConfig;
 use datafusion_physical_expr_common::physical_expr::fmt_sql;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
 use datafusion_physical_plan::filter::batch_filter;
 use datafusion_physical_plan::filter_pushdown::{FilterPushdownPhase, 
PushedDown};
 use datafusion_physical_plan::{
@@ -374,7 +375,9 @@ impl OptimizationTest {
         let input = format_execution_plan(&input_plan);
         let input_schema = input_plan.schema();
 
-        let output_result = opt.optimize(input_plan, &parquet_pushdown_config);
+        let session_config = SessionConfig::from(parquet_pushdown_config);
+        let optimizer_context = OptimizerContext::new(session_config.clone());
+        let output_result = opt.optimize_plan(input_plan, &optimizer_context);
         let output = output_result
             .and_then(|plan| {
                 if opt.schema_check() && (plan.schema() != input_schema) {
diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs 
b/datafusion/core/tests/physical_optimizer/join_selection.rs
index f9d3a04546..26b5e44853 100644
--- a/datafusion/core/tests/physical_optimizer/join_selection.rs
+++ b/datafusion/core/tests/physical_optimizer/join_selection.rs
@@ -29,6 +29,7 @@ use datafusion_common::config::ConfigOptions;
 use datafusion_common::{stats::Precision, ColumnStatistics, JoinType, 
ScalarValue};
 use datafusion_common::{JoinSide, NullEquality};
 use datafusion_common::{Result, Statistics};
+use datafusion_execution::config::SessionConfig;
 use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, 
TaskContext};
 use datafusion_expr::Operator;
 use datafusion_physical_expr::expressions::col;
@@ -37,6 +38,7 @@ use datafusion_physical_expr::intervals::utils::check_support;
 use datafusion_physical_expr::PhysicalExprRef;
 use datafusion_physical_expr::{EquivalenceProperties, Partitioning, 
PhysicalExpr};
 use datafusion_physical_optimizer::join_selection::JoinSelection;
+use datafusion_physical_optimizer::OptimizerContext;
 use datafusion_physical_optimizer::PhysicalOptimizerRule;
 use datafusion_physical_plan::displayable;
 use datafusion_physical_plan::joins::utils::ColumnIndex;
@@ -226,8 +228,10 @@ async fn test_join_with_swap() {
         .unwrap(),
     );
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let optimized_join = JoinSelection::new()
-        .optimize(join, &ConfigOptions::new())
+        .optimize_plan(join, &optimizer_context)
         .unwrap();
 
     let swapping_projection = optimized_join
@@ -288,8 +292,10 @@ async fn test_left_join_no_swap() {
         .unwrap(),
     );
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let optimized_join = JoinSelection::new()
-        .optimize(join, &ConfigOptions::new())
+        .optimize_plan(join, &optimizer_context)
         .unwrap();
 
     let swapped_join = optimized_join
@@ -338,8 +344,10 @@ async fn test_join_with_swap_semi() {
 
         let original_schema = join.schema();
 
+        let session_config = SessionConfig::new();
+        let optimizer_context = OptimizerContext::new(session_config.clone());
         let optimized_join = JoinSelection::new()
-            .optimize(Arc::new(join), &ConfigOptions::new())
+            .optimize_plan(Arc::new(join), &optimizer_context)
             .unwrap();
 
         let swapped_join = optimized_join
@@ -393,8 +401,10 @@ async fn test_join_with_swap_mark() {
 
         let original_schema = join.schema();
 
+        let session_config = SessionConfig::new();
+        let optimizer_context = OptimizerContext::new(session_config.clone());
         let optimized_join = JoinSelection::new()
-            .optimize(Arc::new(join), &ConfigOptions::new())
+            .optimize_plan(Arc::new(join), &optimizer_context)
             .unwrap();
 
         let swapped_join = optimized_join
@@ -430,8 +440,10 @@ macro_rules! assert_optimized {
     ($PLAN: expr, @$EXPECTED_LINES: literal $(,)?) => {
 
         let plan = Arc::new($PLAN);
+        let session_config = SessionConfig::new();
+        let optimizer_context = OptimizerContext::new(session_config.clone());
         let optimized = JoinSelection::new()
-            .optimize(plan.clone(), &ConfigOptions::new())
+            .optimize_plan(plan.clone(), &optimizer_context)
             .unwrap();
 
         let plan_string = 
displayable(optimized.as_ref()).indent(true).to_string();
@@ -522,8 +534,10 @@ async fn test_join_no_swap() {
         .unwrap(),
     );
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let optimized_join = JoinSelection::new()
-        .optimize(join, &ConfigOptions::new())
+        .optimize_plan(join, &optimizer_context)
         .unwrap();
 
     let swapped_join = optimized_join
@@ -571,8 +585,10 @@ async fn test_nl_join_with_swap(join_type: JoinType) {
         .unwrap(),
     );
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let optimized_join = JoinSelection::new()
-        .optimize(join, &ConfigOptions::new())
+        .optimize_plan(join, &optimizer_context)
         .unwrap();
 
     let swapping_projection = optimized_join
@@ -649,11 +665,10 @@ async fn test_nl_join_with_swap_no_proj(join_type: 
JoinType) {
         .unwrap(),
     );
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let optimized_join = JoinSelection::new()
-        .optimize(
-            Arc::<NestedLoopJoinExec>::clone(&join),
-            &ConfigOptions::new(),
-        )
+        .optimize_plan(Arc::<NestedLoopJoinExec>::clone(&join), 
&optimizer_context)
         .unwrap();
 
     let swapped_join = optimized_join
@@ -910,8 +925,10 @@ fn check_join_partition_mode(
         .unwrap(),
     );
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let optimized_join = JoinSelection::new()
-        .optimize(join, &ConfigOptions::new())
+        .optimize_plan(join, &optimizer_context)
         .unwrap();
 
     if !is_swapped {
@@ -1555,8 +1572,10 @@ async fn test_join_with_maybe_swap_unbounded_case(t: 
TestCase) -> Result<()> {
         NullEquality::NullEqualsNothing,
     )?) as _;
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let optimized_join_plan =
-        JoinSelection::new().optimize(Arc::clone(&join), 
&ConfigOptions::new())?;
+        JoinSelection::new().optimize_plan(Arc::clone(&join), 
&optimizer_context)?;
 
     // If swap did happen
     let projection_added = optimized_join_plan.as_any().is::<ProjectionExec>();
diff --git a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs 
b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
index 9d172db246..ef333c3067 100644
--- a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
@@ -24,13 +24,14 @@ use crate::physical_optimizer::test_utils::{
 
 use arrow::compute::SortOptions;
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use datafusion_common::config::ConfigOptions;
 use datafusion_common::error::Result;
+use datafusion_execution::config::SessionConfig;
 use datafusion_expr::Operator;
 use datafusion_physical_expr::expressions::{col, lit, BinaryExpr};
 use datafusion_physical_expr::Partitioning;
 use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
PhysicalSortExpr};
 use datafusion_physical_optimizer::limit_pushdown::LimitPushdown;
+use datafusion_physical_optimizer::OptimizerContext;
 use datafusion_physical_optimizer::PhysicalOptimizerRule;
 use datafusion_physical_plan::empty::EmptyExec;
 use datafusion_physical_plan::filter::FilterExec;
@@ -101,8 +102,10 @@ fn 
transforms_streaming_table_exec_into_fetching_version_when_skip_is_zero() ->
         ];
     assert_eq!(initial, expected_initial);
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
+        LimitPushdown::new().optimize_plan(global_limit, &optimizer_context)?;
 
     let expected = [
             "StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], 
infinite_source=true, fetch=5"
@@ -126,8 +129,10 @@ fn 
transforms_streaming_table_exec_into_fetching_version_and_keeps_the_global_li
         ];
     assert_eq!(initial, expected_initial);
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
+        LimitPushdown::new().optimize_plan(global_limit, &optimizer_context)?;
 
     let expected = [
             "GlobalLimitExec: skip=2, fetch=5",
@@ -162,8 +167,10 @@ fn 
transforms_coalesce_batches_exec_into_fetching_version_and_removes_local_limi
         ];
     assert_eq!(initial, expected_initial);
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
+        LimitPushdown::new().optimize_plan(global_limit, &optimizer_context)?;
 
     let expected = [
         "CoalescePartitionsExec: fetch=5",
@@ -194,8 +201,10 @@ fn pushes_global_limit_exec_through_projection_exec() -> 
Result<()> {
         ];
     assert_eq!(initial, expected_initial);
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
+        LimitPushdown::new().optimize_plan(global_limit, &optimizer_context)?;
 
     let expected = [
             "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
@@ -226,8 +235,10 @@ fn 
pushes_global_limit_exec_through_projection_exec_and_transforms_coalesce_batc
 
     assert_eq!(initial, expected_initial);
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
+        LimitPushdown::new().optimize_plan(global_limit, &optimizer_context)?;
 
     let expected = [
             "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
@@ -268,8 +279,10 @@ fn pushes_global_limit_into_multiple_fetch_plans() -> 
Result<()> {
 
     assert_eq!(initial, expected_initial);
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
+        LimitPushdown::new().optimize_plan(global_limit, &optimizer_context)?;
 
     let expected = [
             "SortPreservingMergeExec: [c1@0 ASC], fetch=5",
@@ -304,8 +317,10 @@ fn 
keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions() -> R
         ];
     assert_eq!(initial, expected_initial);
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
+        LimitPushdown::new().optimize_plan(global_limit, &optimizer_context)?;
 
     let expected = [
             "CoalescePartitionsExec: fetch=5",
@@ -334,8 +349,10 @@ fn merges_local_limit_with_local_limit() -> Result<()> {
 
     assert_eq!(initial, expected_initial);
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        LimitPushdown::new().optimize(parent_local_limit, 
&ConfigOptions::new())?;
+        LimitPushdown::new().optimize_plan(parent_local_limit, 
&optimizer_context)?;
 
     let expected = ["GlobalLimitExec: skip=0, fetch=10", "  EmptyExec"];
     assert_eq!(get_plan_string(&after_optimize), expected);
@@ -359,8 +376,10 @@ fn merges_global_limit_with_global_limit() -> Result<()> {
 
     assert_eq!(initial, expected_initial);
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        LimitPushdown::new().optimize(parent_global_limit, 
&ConfigOptions::new())?;
+        LimitPushdown::new().optimize_plan(parent_global_limit, 
&optimizer_context)?;
 
     let expected = ["GlobalLimitExec: skip=20, fetch=20", "  EmptyExec"];
     assert_eq!(get_plan_string(&after_optimize), expected);
@@ -384,8 +403,10 @@ fn merges_global_limit_with_local_limit() -> Result<()> {
 
     assert_eq!(initial, expected_initial);
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
+        LimitPushdown::new().optimize_plan(global_limit, &optimizer_context)?;
 
     let expected = ["GlobalLimitExec: skip=20, fetch=20", "  EmptyExec"];
     assert_eq!(get_plan_string(&after_optimize), expected);
@@ -409,8 +430,10 @@ fn merges_local_limit_with_global_limit() -> Result<()> {
 
     assert_eq!(initial, expected_initial);
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        LimitPushdown::new().optimize(local_limit, &ConfigOptions::new())?;
+        LimitPushdown::new().optimize_plan(local_limit, &optimizer_context)?;
 
     let expected = ["GlobalLimitExec: skip=20, fetch=20", "  EmptyExec"];
     assert_eq!(get_plan_string(&after_optimize), expected);
diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs 
b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
index 80f4fbc305..1068e8ec03 100644
--- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
@@ -28,6 +28,7 @@ use datafusion_common::config::{ConfigOptions, CsvOptions};
 use datafusion_common::{JoinSide, JoinType, NullEquality, Result, ScalarValue};
 use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
 use datafusion_datasource::TableSchema;
+use datafusion_execution::config::SessionConfig;
 use datafusion_execution::object_store::ObjectStoreUrl;
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_expr::{
@@ -44,7 +45,7 @@ use datafusion_physical_expr_common::sort_expr::{
 };
 use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
 use datafusion_physical_optimizer::projection_pushdown::ProjectionPushdown;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
 use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion_physical_plan::filter::FilterExec;
 use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
@@ -461,8 +462,10 @@ fn test_csv_after_projection() -> Result<()> {
     "
     );
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+        ProjectionPushdown::new().optimize_plan(projection, 
&optimizer_context)?;
 
     let after_optimize_string = displayable(after_optimize.as_ref())
         .indent(true)
@@ -499,8 +502,10 @@ fn test_memory_after_projection() -> Result<()> {
     "
     );
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+        ProjectionPushdown::new().optimize_plan(projection, 
&optimizer_context)?;
 
     let after_optimize_string = displayable(after_optimize.as_ref())
         .indent(true)
@@ -595,8 +600,10 @@ fn test_streaming_table_after_projection() -> Result<()> {
         Arc::new(streaming_table) as _,
     )?) as _;
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+        ProjectionPushdown::new().optimize_plan(projection, 
&optimizer_context)?;
 
     let result = after_optimize
         .as_any()
@@ -695,8 +702,10 @@ fn test_projection_after_projection() -> Result<()> {
     "
     );
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        ProjectionPushdown::new().optimize(top_projection, 
&ConfigOptions::new())?;
+        ProjectionPushdown::new().optimize_plan(top_projection, 
&optimizer_context)?;
 
     let after_optimize_string = displayable(after_optimize.as_ref())
         .indent(true)
@@ -760,8 +769,10 @@ fn test_output_req_after_projection() -> Result<()> {
     "
     );
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+        ProjectionPushdown::new().optimize_plan(projection, 
&optimizer_context)?;
 
     let after_optimize_string = displayable(after_optimize.as_ref())
         .indent(true)
@@ -850,8 +861,10 @@ fn test_coalesce_partitions_after_projection() -> 
Result<()> {
     "
     );
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+        ProjectionPushdown::new().optimize_plan(projection, 
&optimizer_context)?;
 
     let after_optimize_string = displayable(after_optimize.as_ref())
         .indent(true)
@@ -907,8 +920,10 @@ fn test_filter_after_projection() -> Result<()> {
     "
     );
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+        ProjectionPushdown::new().optimize_plan(projection, 
&optimizer_context)?;
 
     let after_optimize_string = displayable(after_optimize.as_ref())
         .indent(true)
@@ -1009,8 +1024,10 @@ fn test_join_after_projection() -> Result<()> {
     "
     );
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+        ProjectionPushdown::new().optimize_plan(projection, 
&optimizer_context)?;
 
     let after_optimize_string = displayable(after_optimize.as_ref())
         .indent(true)
@@ -1137,8 +1154,10 @@ fn test_join_after_required_projection() -> Result<()> {
     "
     );
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+        ProjectionPushdown::new().optimize_plan(projection, 
&optimizer_context)?;
 
     let after_optimize_string = displayable(after_optimize.as_ref())
         .indent(true)
@@ -1215,8 +1234,10 @@ fn test_nested_loop_join_after_projection() -> 
Result<()> {
     "
     );
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize_string =
-        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+        ProjectionPushdown::new().optimize_plan(projection, 
&optimizer_context)?;
     let after_optimize_string = displayable(after_optimize_string.as_ref())
         .indent(true)
         .to_string();
@@ -1312,8 +1333,10 @@ fn test_hash_join_after_projection() -> Result<()> {
     "
     );
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+        ProjectionPushdown::new().optimize_plan(projection, 
&optimizer_context)?;
     let after_optimize_string = displayable(after_optimize.as_ref())
         .indent(true)
         .to_string();
@@ -1340,8 +1363,10 @@ fn test_hash_join_after_projection() -> Result<()> {
         join.clone(),
     )?);
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+        ProjectionPushdown::new().optimize_plan(projection, 
&optimizer_context)?;
     let after_optimize_string = displayable(after_optimize.as_ref())
         .indent(true)
         .to_string();
@@ -1393,8 +1418,10 @@ fn test_repartition_after_projection() -> Result<()> {
     "
     );
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+        ProjectionPushdown::new().optimize_plan(projection, 
&optimizer_context)?;
 
     let after_optimize_string = displayable(after_optimize.as_ref())
         .indent(true)
@@ -1463,8 +1490,10 @@ fn test_sort_after_projection() -> Result<()> {
     "
     );
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+        ProjectionPushdown::new().optimize_plan(projection, 
&optimizer_context)?;
 
     let after_optimize_string = displayable(after_optimize.as_ref())
         .indent(true)
@@ -1516,8 +1545,10 @@ fn test_sort_preserving_after_projection() -> Result<()> 
{
     "
     );
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+        ProjectionPushdown::new().optimize_plan(projection, 
&optimizer_context)?;
 
     let after_optimize_string = displayable(after_optimize.as_ref())
         .indent(true)
@@ -1560,8 +1591,10 @@ fn test_union_after_projection() -> Result<()> {
     "
     );
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+        ProjectionPushdown::new().optimize_plan(projection, 
&optimizer_context)?;
 
     let after_optimize_string = displayable(after_optimize.as_ref())
         .indent(true)
@@ -1633,8 +1666,10 @@ fn test_partition_col_projection_pushdown() -> 
Result<()> {
         source,
     )?);
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+        ProjectionPushdown::new().optimize_plan(projection, 
&optimizer_context)?;
 
     let after_optimize_string = displayable(after_optimize.as_ref())
         .indent(true)
@@ -1676,8 +1711,10 @@ fn test_partition_col_projection_pushdown_expr() -> 
Result<()> {
         source,
     )?);
 
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     let after_optimize =
-        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+        ProjectionPushdown::new().optimize_plan(projection, 
&optimizer_context)?;
 
     let after_optimize_string = displayable(after_optimize.as_ref())
         .indent(true)
diff --git a/datafusion/core/tests/physical_optimizer/sanity_checker.rs 
b/datafusion/core/tests/physical_optimizer/sanity_checker.rs
index f46147de1b..ccacf35ec8 100644
--- a/datafusion/core/tests/physical_optimizer/sanity_checker.rs
+++ b/datafusion/core/tests/physical_optimizer/sanity_checker.rs
@@ -28,13 +28,13 @@ use arrow::compute::SortOptions;
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, 
StreamTable};
 use datafusion::prelude::{CsvReadOptions, SessionContext};
-use datafusion_common::config::ConfigOptions;
 use datafusion_common::{JoinType, Result, ScalarValue};
+use datafusion_execution::config::SessionConfig;
 use datafusion_physical_expr::expressions::{col, Literal};
 use datafusion_physical_expr::Partitioning;
 use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
 use datafusion_physical_plan::repartition::RepartitionExec;
 use datafusion_physical_plan::{displayable, ExecutionPlan};
 
@@ -393,9 +393,12 @@ fn create_test_schema2() -> SchemaRef {
 /// Check if sanity checker should accept or reject plans.
 fn assert_sanity_check(plan: &Arc<dyn ExecutionPlan>, is_sane: bool) {
     let sanity_checker = SanityCheckPlan::new();
-    let opts = ConfigOptions::default();
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
     assert_eq!(
-        sanity_checker.optimize(plan.clone(), &opts).is_ok(),
+        sanity_checker
+            .optimize_plan(plan.clone(), &optimizer_context)
+            .is_ok(),
         is_sane
     );
 }
diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs 
b/datafusion/core/tests/physical_optimizer/test_utils.rs
index e410c495c8..e164e7617c 100644
--- a/datafusion/core/tests/physical_optimizer/test_utils.rs
+++ b/datafusion/core/tests/physical_optimizer/test_utils.rs
@@ -29,12 +29,12 @@ use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::memory::MemorySourceConfig;
 use datafusion::datasource::physical_plan::ParquetSource;
 use datafusion::datasource::source::DataSourceExec;
-use datafusion_common::config::ConfigOptions;
 use datafusion_common::stats::Precision;
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::utils::expr::COUNT_STAR_EXPANSION;
 use datafusion_common::{ColumnStatistics, JoinType, NullEquality, Result, 
Statistics};
 use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+use datafusion_execution::config::SessionConfig;
 use datafusion_execution::object_store::ObjectStoreUrl;
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_expr::{WindowFrame, WindowFunctionDefinition};
@@ -46,7 +46,7 @@ use datafusion_physical_expr_common::sort_expr::{
     LexOrdering, OrderingRequirements, PhysicalSortExpr,
 };
 use 
datafusion_physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggregation;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
 use datafusion_physical_plan::aggregates::{
     AggregateExec, AggregateMode, PhysicalGroupBy,
 };
@@ -635,10 +635,11 @@ pub fn build_group_by(input_schema: &SchemaRef, columns: 
Vec<String>) -> Physica
 }
 
 pub fn get_optimized_plan(plan: &Arc<dyn ExecutionPlan>) -> Result<String> {
-    let config = ConfigOptions::new();
+    let session_config = SessionConfig::new();
+    let optimizer_context = OptimizerContext::new(session_config.clone());
 
-    let optimized =
-        LimitedDistinctAggregation::new().optimize(Arc::clone(plan), &config)?;
+    let optimized = LimitedDistinctAggregation::new()
+        .optimize_plan(Arc::clone(plan), &optimizer_context)?;
 
     let optimized_result = 
displayable(optimized.as_ref()).indent(true).to_string();
 
diff --git a/datafusion/execution/src/config.rs 
b/datafusion/execution/src/config.rs
index a0b180bf40..443229a3cb 100644
--- a/datafusion/execution/src/config.rs
+++ b/datafusion/execution/src/config.rs
@@ -100,7 +100,7 @@ pub struct SessionConfig {
     /// references to the same options.
     options: Arc<ConfigOptions>,
     /// Opaque extensions.
-    extensions: AnyMap,
+    extensions: Arc<Extensions>,
 }
 
 impl Default for SessionConfig {
@@ -108,14 +108,53 @@ impl Default for SessionConfig {
         Self {
             options: Arc::new(ConfigOptions::new()),
             // Assume no extensions by default.
-            extensions: HashMap::with_capacity_and_hasher(
-                0,
-                BuildHasherDefault::default(),
-            ),
+            extensions: Arc::new(Extensions::default()),
         }
     }
 }
 
+/// A type map for storing extensions.
+/// 
+/// Extensions are indexed by their type `T`. If multiple values of the same 
type are provided, only the last one
+/// will be kept.
+/// 
+/// Extensions are opaque objects that are unknown to DataFusion itself but 
can be downcast by optimizer rules,
+/// execution plans, or other components that have access to the session 
config.
+/// They provide a flexible way to attach extra data or behavior to the 
session config.
+#[derive(Clone, Debug)]
+pub struct Extensions {
+    inner: AnyMap,
+}
+
+impl Default for Extensions {
+    fn default() -> Self {
+        Self {
+            inner: HashMap::with_capacity_and_hasher(0, 
BuildHasherDefault::default()),
+        }
+    }
+}
+
+impl Extensions {
+    pub fn insert<T>(&mut self, value: Arc<T>)
+    where
+        T: Send + Sync + 'static,
+    {
+        let id = TypeId::of::<T>();
+        self.inner.insert(id, value);
+    }
+
+    pub fn get<T>(&self) -> Option<Arc<T>>
+    where
+        T: Send + Sync + 'static,
+    {
+        let id = TypeId::of::<T>();
+        self.inner
+            .get(&id)
+            .cloned()
+            .map(|arc_any| Arc::downcast(arc_any).expect("TypeId unique"))
+    }
+}
+
 impl SessionConfig {
     /// Create an execution config with default setting
     pub fn new() -> Self {
@@ -164,6 +203,20 @@ impl SessionConfig {
         Arc::make_mut(&mut self.options)
     }
 
+    /// Return a handle to the extensions.
+    ///
+    /// Can be used to read the current extensions.
+    pub fn extensions(&self) -> &Arc<Extensions> {
+        &self.extensions
+    }
+
+    /// Return a mutable handle to the extensions.
+    ///
+    /// Can be used to set extensions.
+    pub fn extensions_mut(&mut self) -> &mut Extensions {
+        Arc::make_mut(&mut self.extensions)
+    }
+
     /// Set a configuration option
     pub fn set(self, key: &str, value: &ScalarValue) -> Self {
         self.set_str(key, &value.to_string())
@@ -580,9 +633,7 @@ impl SessionConfig {
     where
         T: Send + Sync + 'static,
     {
-        let ext = ext as Arc<dyn Any + Send + Sync + 'static>;
-        let id = TypeId::of::<T>();
-        self.extensions.insert(id, ext);
+        self.extensions_mut().insert::<T>(ext);
     }
 
     /// Get extension, if any for the specified type `T` exists.
@@ -592,11 +643,7 @@ impl SessionConfig {
     where
         T: Send + Sync + 'static,
     {
-        let id = TypeId::of::<T>();
-        self.extensions
-            .get(&id)
-            .cloned()
-            .map(|ext| Arc::downcast(ext).expect("TypeId unique"))
+        self.extensions.get::<T>()
     }
 }
 
diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs 
b/datafusion/physical-optimizer/src/aggregate_statistics.rs
index 672317060d..eee333926b 100644
--- a/datafusion/physical-optimizer/src/aggregate_statistics.rs
+++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 //! Utilizing exact statistics from sources to avoid scanning data
-use datafusion_common::config::ConfigOptions;
 use datafusion_common::scalar::ScalarValue;
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::Result;
@@ -27,7 +26,7 @@ use datafusion_physical_plan::udaf::{AggregateFunctionExpr, 
StatisticsArgs};
 use datafusion_physical_plan::{expressions, ExecutionPlan};
 use std::sync::Arc;
 
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
 
 /// Optimizer that uses available statistics for aggregate functions
 #[derive(Default, Debug)]
@@ -43,10 +42,10 @@ impl AggregateStatistics {
 impl PhysicalOptimizerRule for AggregateStatistics {
     #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
     #[allow(clippy::only_used_in_recursion)] // See 
https://github.com/rust-lang/rust-clippy/issues/14566
-    fn optimize(
+    fn optimize_plan(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        config: &ConfigOptions,
+        context: &OptimizerContext,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         if let Some(partial_agg_exec) = take_optimizable(&*plan) {
             let partial_agg_exec = partial_agg_exec
@@ -86,13 +85,15 @@ impl PhysicalOptimizerRule for AggregateStatistics {
                 )?))
             } else {
                 plan.map_children(|child| {
-                    self.optimize(child, config).map(Transformed::yes)
+                    self.optimize_plan(child, context).map(Transformed::yes)
                 })
                 .data()
             }
         } else {
-            plan.map_children(|child| self.optimize(child, 
config).map(Transformed::yes))
-                .data()
+            plan.map_children(|child| {
+                self.optimize_plan(child, context).map(Transformed::yes)
+            })
+            .data()
         }
     }
 
diff --git a/datafusion/physical-optimizer/src/coalesce_batches.rs 
b/datafusion/physical-optimizer/src/coalesce_batches.rs
index 61e4c0e7f1..956a8994ba 100644
--- a/datafusion/physical-optimizer/src/coalesce_batches.rs
+++ b/datafusion/physical-optimizer/src/coalesce_batches.rs
@@ -18,14 +18,12 @@
 //! CoalesceBatches optimizer that groups batches together rows
 //! in bigger batches to avoid overhead with small batches
 
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
 
 use std::sync::Arc;
 
 use datafusion_common::error::Result;
-use datafusion_common::{
-    assert_eq_or_internal_err, config::ConfigOptions, DataFusionError,
-};
+use datafusion_common::{assert_eq_or_internal_err, DataFusionError};
 use datafusion_physical_expr::Partitioning;
 use datafusion_physical_plan::{
     async_func::AsyncFuncExec, coalesce_batches::CoalesceBatchesExec,
@@ -46,11 +44,12 @@ impl CoalesceBatches {
     }
 }
 impl PhysicalOptimizerRule for CoalesceBatches {
-    fn optimize(
+    fn optimize_plan(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        config: &ConfigOptions,
+        context: &OptimizerContext,
     ) -> Result<Arc<dyn ExecutionPlan>> {
+        let config = context.session_config().options();
         if !config.execution.coalesce_batches {
             return Ok(plan);
         }
diff --git a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs 
b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
index bffb2c9df9..f189a7f3f0 100644
--- a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
+++ b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
@@ -26,8 +26,7 @@ use datafusion_physical_plan::aggregates::{
 };
 use datafusion_physical_plan::ExecutionPlan;
 
-use crate::PhysicalOptimizerRule;
-use datafusion_common::config::ConfigOptions;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
 use datafusion_physical_expr::{physical_exprs_equal, PhysicalExpr};
@@ -47,10 +46,10 @@ impl CombinePartialFinalAggregate {
 }
 
 impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
-    fn optimize(
+    fn optimize_plan(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        _config: &ConfigOptions,
+        _context: &OptimizerContext,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         plan.transform_down(|plan| {
             // Check if the plan is AggregateExec
diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs 
b/datafusion/physical-optimizer/src/enforce_distribution.rs
index 4464c12ca7..78e5c4edb2 100644
--- a/datafusion/physical-optimizer/src/enforce_distribution.rs
+++ b/datafusion/physical-optimizer/src/enforce_distribution.rs
@@ -30,6 +30,7 @@ use crate::utils::{
     add_sort_above_with_check, is_coalesce_partitions, is_repartition,
     is_sort_preserving_merge,
 };
+use crate::OptimizerContext;
 
 use arrow::compute::SortOptions;
 use datafusion_common::config::ConfigOptions;
@@ -190,11 +191,12 @@ impl EnforceDistribution {
 }
 
 impl PhysicalOptimizerRule for EnforceDistribution {
-    fn optimize(
+    fn optimize_plan(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        config: &ConfigOptions,
+        context: &OptimizerContext,
     ) -> Result<Arc<dyn ExecutionPlan>> {
+        let config = context.session_config().options();
         let top_down_join_key_reordering = 
config.optimizer.top_down_join_key_reordering;
 
         let adjusted = if top_down_join_key_reordering {
diff --git a/datafusion/physical-optimizer/src/ensure_coop.rs 
b/datafusion/physical-optimizer/src/ensure_coop.rs
index 0c0b63c0b3..38bf1877a4 100644
--- a/datafusion/physical-optimizer/src/ensure_coop.rs
+++ b/datafusion/physical-optimizer/src/ensure_coop.rs
@@ -23,9 +23,8 @@
 use std::fmt::{Debug, Formatter};
 use std::sync::Arc;
 
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
 
-use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
 use datafusion_common::Result;
 use datafusion_physical_plan::coop::CooperativeExec;
@@ -62,10 +61,10 @@ impl PhysicalOptimizerRule for EnsureCooperative {
         "EnsureCooperative"
     }
 
-    fn optimize(
+    fn optimize_plan(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        _config: &ConfigOptions,
+        _context: &OptimizerContext,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         plan.transform_up(|plan| {
             let is_leaf = plan.children().is_empty();
@@ -96,16 +95,17 @@ impl PhysicalOptimizerRule for EnsureCooperative {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use datafusion_common::config::ConfigOptions;
+    use datafusion_execution::config::SessionConfig;
     use datafusion_physical_plan::{displayable, test::scan_partitioned};
     use insta::assert_snapshot;
 
     #[tokio::test]
     async fn test_cooperative_exec_for_custom_exec() {
         let test_custom_exec = scan_partitioned(1);
-        let config = ConfigOptions::new();
+        let session_config = SessionConfig::new();
+        let optimizer_context = OptimizerContext::new(session_config);
         let optimized = EnsureCooperative::new()
-            .optimize(test_custom_exec, &config)
+            .optimize_plan(test_custom_exec, &optimizer_context)
             .unwrap();
 
         let display = displayable(optimized.as_ref()).indent(true).to_string();
diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs 
b/datafusion/physical-optimizer/src/filter_pushdown.rs
index 8bed6c3aeb..8206d43208 100644
--- a/datafusion/physical-optimizer/src/filter_pushdown.rs
+++ b/datafusion/physical-optimizer/src/filter_pushdown.rs
@@ -33,7 +33,7 @@
 
 use std::sync::Arc;
 
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
 
 use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
 use datafusion_common::{
@@ -418,6 +418,20 @@ impl Default for FilterPushdown {
 }
 
 impl PhysicalOptimizerRule for FilterPushdown {
+    fn optimize_plan(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        context: &OptimizerContext,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let config = context.session_config().options();
+        Ok(
+            push_down_filters(&Arc::clone(&plan), vec![], config, self.phase)?
+                .updated_node
+                .unwrap_or(plan),
+        )
+    }
+
+    #[allow(deprecated)]
     fn optimize(
         &self,
         plan: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-optimizer/src/join_selection.rs 
b/datafusion/physical-optimizer/src/join_selection.rs
index b55c01f62e..b6ff8344a3 100644
--- a/datafusion/physical-optimizer/src/join_selection.rs
+++ b/datafusion/physical-optimizer/src/join_selection.rs
@@ -23,7 +23,7 @@
 //! pipeline-friendly ones. To achieve the second goal, it selects the proper
 //! `PartitionMode` and the build side using the available statistics for hash 
joins.
 
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::error::Result;
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
@@ -103,11 +103,12 @@ fn supports_collect_by_thresholds(
 }
 
 impl PhysicalOptimizerRule for JoinSelection {
-    fn optimize(
+    fn optimize_plan(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        config: &ConfigOptions,
+        context: &OptimizerContext,
     ) -> Result<Arc<dyn ExecutionPlan>> {
+        let config = context.session_config().options();
         // First, we make pipeline-fixing modifications to joins so as to 
accommodate
         // unbounded inputs. Each pipeline-fixing subrule, which is a function
         // of type `PipelineFixerSubrule`, takes a single 
[`PipelineStatePropagator`]
diff --git a/datafusion/physical-optimizer/src/lib.rs 
b/datafusion/physical-optimizer/src/lib.rs
index f4b82eed3c..afa8252528 100644
--- a/datafusion/physical-optimizer/src/lib.rs
+++ b/datafusion/physical-optimizer/src/lib.rs
@@ -47,4 +47,4 @@ pub mod topk_aggregation;
 pub mod update_aggr_exprs;
 pub mod utils;
 
-pub use optimizer::PhysicalOptimizerRule;
+pub use optimizer::{OptimizerContext, PhysicalOptimizerRule};
diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs 
b/datafusion/physical-optimizer/src/limit_pushdown.rs
index 7469c3af93..d70e8da9dd 100644
--- a/datafusion/physical-optimizer/src/limit_pushdown.rs
+++ b/datafusion/physical-optimizer/src/limit_pushdown.rs
@@ -21,9 +21,8 @@
 use std::fmt::Debug;
 use std::sync::Arc;
 
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
 
-use datafusion_common::config::ConfigOptions;
 use datafusion_common::error::Result;
 use datafusion_common::tree_node::{Transformed, TreeNodeRecursion};
 use datafusion_common::utils::combine_limit;
@@ -60,10 +59,10 @@ impl LimitPushdown {
 }
 
 impl PhysicalOptimizerRule for LimitPushdown {
-    fn optimize(
+    fn optimize_plan(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        _config: &ConfigOptions,
+        _context: &OptimizerContext,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let global_state = GlobalRequirements {
             fetch: None,
diff --git a/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs 
b/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs
index 1c671cd074..ee20f4d43b 100644
--- a/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs
+++ b/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::PhysicalOptimizerRule;
-use datafusion_common::config::ConfigOptions;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
 use datafusion_common::tree_node::{Transformed, TreeNode};
 use datafusion_common::ScalarValue;
 use datafusion_expr::{LimitEffect, WindowFrameBound, WindowFrameUnits};
@@ -71,11 +70,12 @@ impl TraverseState {
 }
 
 impl PhysicalOptimizerRule for LimitPushPastWindows {
-    fn optimize(
+    fn optimize_plan(
         &self,
         original: Arc<dyn ExecutionPlan>,
-        config: &ConfigOptions,
+        context: &OptimizerContext,
     ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
+        let config = context.session_config().options();
         if !config.optimizer.enable_window_limits {
             return Ok(original);
         }
diff --git a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs 
b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs
index 3666ff3798..89828a87cb 100644
--- a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs
+++ b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs
@@ -24,11 +24,10 @@ use datafusion_physical_plan::aggregates::AggregateExec;
 use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
 use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
 
-use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::Result;
 
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
 use itertools::Itertools;
 
 /// An optimizer rule that passes a `limit` hint into grouped aggregations 
which don't require all
@@ -158,11 +157,12 @@ impl Default for LimitedDistinctAggregation {
 }
 
 impl PhysicalOptimizerRule for LimitedDistinctAggregation {
-    fn optimize(
+    fn optimize_plan(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        config: &ConfigOptions,
+        context: &OptimizerContext,
     ) -> Result<Arc<dyn ExecutionPlan>> {
+        let config = context.session_config().options();
         if config.optimizer.enable_distinct_aggregation_soft_limit {
             plan.transform_down(|plan| {
                 Ok(
diff --git a/datafusion/physical-optimizer/src/optimizer.rs 
b/datafusion/physical-optimizer/src/optimizer.rs
index 03c83bb5a0..0f3467bfbb 100644
--- a/datafusion/physical-optimizer/src/optimizer.rs
+++ b/datafusion/physical-optimizer/src/optimizer.rs
@@ -38,10 +38,37 @@ use crate::update_aggr_exprs::OptimizeAggregateOrder;
 
 use crate::limit_pushdown_past_window::LimitPushPastWindows;
 use datafusion_common::config::ConfigOptions;
-use datafusion_common::Result;
+use datafusion_common::{internal_err, Result};
+use datafusion_execution::config::SessionConfig;
 use datafusion_physical_plan::ExecutionPlan;
 
-/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which
+/// Context for optimizing physical plans.
+///
+/// This context provides access to session configuration and optimizer 
extensions.
+///
+/// Similar to [`TaskContext`] which provides context during execution,
+/// [`OptimizerContext`] provides context during optimization.
+///
+/// [`TaskContext`]: 
https://docs.rs/datafusion/latest/datafusion/execution/struct.TaskContext.html
+#[derive(Debug, Clone)]
+pub struct OptimizerContext {
+    /// Session configuration
+    session_config: SessionConfig,
+}
+
+impl OptimizerContext {
+    /// Create a new OptimizerContext
+    pub fn new(session_config: SessionConfig) -> Self {
+        Self { session_config }
+    }
+
+    /// Return a reference to the session configuration
+    pub fn session_config(&self) -> &SessionConfig {
+        &self.session_config
+    }
+}
+
+/// `PhysicalOptimizerRule` transforms one [`ExecutionPlan`] into another which
 /// computes the same results, but in a potentially more efficient way.
 ///
 /// Use [`SessionState::add_physical_optimizer_rule`] to register additional
@@ -49,12 +76,52 @@ use datafusion_physical_plan::ExecutionPlan;
 ///
 /// [`SessionState::add_physical_optimizer_rule`]: 
https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_physical_optimizer_rule
 pub trait PhysicalOptimizerRule: Debug {
+    /// Rewrite `plan` to an optimized form with additional context
+    ///
+    /// This is the preferred method for implementing optimization rules as it
+    /// provides access to the full optimizer context including session 
configuration.
+    ///
+    /// The default implementation delegates to 
[`PhysicalOptimizerRule::optimize`] for
+    /// backwards compatibility with existing implementations.
+    ///
+    /// New implementations should override this method instead of 
`optimize()`.
+    ///
+    /// Once [`PhysicalOptimizerRule::optimize`] is deprecated and removed, 
this
+    /// default implementation will be removed and this method will become 
required.
+    /// This change is scheduled for DataFusion 54.0.0.
+    fn optimize_plan(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        context: &OptimizerContext,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Default implementation: delegate to the old method for backwards 
compatibility
+        #[allow(deprecated)]
+        self.optimize(plan, context.session_config().options())
+    }
+
     /// Rewrite `plan` to an optimized form
+    ///
+    /// This method is kept for backwards compatibility. New implementations
+    /// should implement [`optimize_plan`](Self::optimize_plan) instead, which
+    /// provides access to additional context.
+    ///
+    /// The default implementation returns an error indicating that neither
+    /// `optimize` nor `optimize_plan` was properly implemented. At least one
+    /// of these methods must be overridden.
+    #[deprecated(
+        since = "52.0.0",
+        note = "use `PhysicalOptimizerRule::optimize_plan` instead"
+    )]
     fn optimize(
         &self,
-        plan: Arc<dyn ExecutionPlan>,
-        config: &ConfigOptions,
-    ) -> Result<Arc<dyn ExecutionPlan>>;
+        _plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        internal_err!(
+            "PhysicalOptimizerRule '{}' must implement either optimize() or 
optimize_plan()",
+            self.name()
+        )
+    }
 
     /// A human readable name for this optimizer rule
     fn name(&self) -> &str;
diff --git a/datafusion/physical-optimizer/src/output_requirements.rs 
b/datafusion/physical-optimizer/src/output_requirements.rs
index 9e5e980219..c11b1d362a 100644
--- a/datafusion/physical-optimizer/src/output_requirements.rs
+++ b/datafusion/physical-optimizer/src/output_requirements.rs
@@ -24,9 +24,8 @@
 
 use std::sync::Arc;
 
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
 
-use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::{Result, Statistics};
 use datafusion_execution::TaskContext;
@@ -303,10 +302,10 @@ impl ExecutionPlan for OutputRequirementExec {
 }
 
 impl PhysicalOptimizerRule for OutputRequirements {
-    fn optimize(
+    fn optimize_plan(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        _config: &ConfigOptions,
+        _context: &OptimizerContext,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         match self.mode {
             RuleMode::Add => require_top_ordering(plan),
diff --git a/datafusion/physical-optimizer/src/projection_pushdown.rs 
b/datafusion/physical-optimizer/src/projection_pushdown.rs
index b5e002b51f..fa562825c6 100644
--- a/datafusion/physical-optimizer/src/projection_pushdown.rs
+++ b/datafusion/physical-optimizer/src/projection_pushdown.rs
@@ -20,13 +20,12 @@
 //! projections one by one if the operator below is amenable to this. If a
 //! projection reaches a source, it can even disappear from the plan entirely.
 
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
 use arrow::datatypes::{Fields, Schema, SchemaRef};
 use datafusion_common::alias::AliasGenerator;
 use std::collections::HashSet;
 use std::sync::Arc;
 
-use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::{
     Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
 };
@@ -57,10 +56,10 @@ impl ProjectionPushdown {
 }
 
 impl PhysicalOptimizerRule for ProjectionPushdown {
-    fn optimize(
+    fn optimize_plan(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        _config: &ConfigOptions,
+        _context: &OptimizerContext,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let alias_generator = AliasGenerator::new();
         let plan = plan
@@ -447,6 +446,8 @@ fn is_volatile_expression_tree(expr: &dyn PhysicalExpr) -> 
bool {
 mod test {
     use super::*;
     use arrow::datatypes::{DataType, Field, FieldRef, Schema};
+    use datafusion_common::config::ConfigOptions;
+    use datafusion_execution::config::SessionConfig;
     use datafusion_expr_common::operator::Operator;
     use datafusion_functions::math::random;
     use datafusion_physical_expr::expressions::{binary, lit};
@@ -672,7 +673,10 @@ mod test {
         )?;
 
         let optimizer = ProjectionPushdown::new();
-        let optimized_plan = optimizer.optimize(Arc::new(join), 
&Default::default())?;
+        let session_config = SessionConfig::new();
+        let optimizer_context = OptimizerContext::new(session_config);
+        let optimized_plan =
+            optimizer.optimize_plan(Arc::new(join), &optimizer_context)?;
 
         let displayable_plan = 
displayable(optimized_plan.as_ref()).indent(false);
         Ok(displayable_plan.to_string())
diff --git a/datafusion/physical-optimizer/src/sanity_checker.rs 
b/datafusion/physical-optimizer/src/sanity_checker.rs
index acc70d39f0..fc9187ba54 100644
--- a/datafusion/physical-optimizer/src/sanity_checker.rs
+++ b/datafusion/physical-optimizer/src/sanity_checker.rs
@@ -26,7 +26,7 @@ use std::sync::Arc;
 use datafusion_common::Result;
 use datafusion_physical_plan::ExecutionPlan;
 
-use datafusion_common::config::{ConfigOptions, OptimizerOptions};
+use datafusion_common::config::OptimizerOptions;
 use datafusion_common::plan_err;
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_physical_expr::intervals::utils::{check_support, 
is_datatype_supported};
@@ -34,7 +34,7 @@ use datafusion_physical_plan::execution_plan::{Boundedness, 
EmissionType};
 use datafusion_physical_plan::joins::SymmetricHashJoinExec;
 use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};
 
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
 use 
datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list;
 use itertools::izip;
 
@@ -54,11 +54,12 @@ impl SanityCheckPlan {
 }
 
 impl PhysicalOptimizerRule for SanityCheckPlan {
-    fn optimize(
+    fn optimize_plan(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        config: &ConfigOptions,
+        context: &OptimizerContext,
     ) -> Result<Arc<dyn ExecutionPlan>> {
+        let config = context.session_config().options();
         plan.transform_up(|p| check_plan_sanity(p, &config.optimizer))
             .data()
     }
diff --git a/datafusion/physical-optimizer/src/topk_aggregation.rs 
b/datafusion/physical-optimizer/src/topk_aggregation.rs
index b7505f0df4..a55b90e839 100644
--- a/datafusion/physical-optimizer/src/topk_aggregation.rs
+++ b/datafusion/physical-optimizer/src/topk_aggregation.rs
@@ -19,9 +19,8 @@
 
 use std::sync::Arc;
 
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
 use arrow::datatypes::DataType;
-use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::Result;
 use datafusion_physical_expr::expressions::Column;
@@ -145,11 +144,12 @@ impl Default for TopKAggregation {
 }
 
 impl PhysicalOptimizerRule for TopKAggregation {
-    fn optimize(
+    fn optimize_plan(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        config: &ConfigOptions,
+        context: &OptimizerContext,
     ) -> Result<Arc<dyn ExecutionPlan>> {
+        let config = context.session_config().options();
         if config.optimizer.enable_topk_aggregation {
             plan.transform_down(|plan| {
                 Ok(if let Some(plan) = TopKAggregation::transform_sort(&plan) {
diff --git a/datafusion/physical-optimizer/src/update_aggr_exprs.rs 
b/datafusion/physical-optimizer/src/update_aggr_exprs.rs
index 61bc715592..39852a2293 100644
--- a/datafusion/physical-optimizer/src/update_aggr_exprs.rs
+++ b/datafusion/physical-optimizer/src/update_aggr_exprs.rs
@@ -20,7 +20,6 @@
 
 use std::sync::Arc;
 
-use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::{plan_datafusion_err, Result};
 use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
@@ -29,7 +28,7 @@ use datafusion_physical_plan::aggregates::{concat_slices, 
AggregateExec};
 use datafusion_physical_plan::windows::get_ordered_partition_by_indices;
 use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
 
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
 
 /// This optimizer rule checks ordering requirements of aggregate expressions.
 ///
@@ -65,15 +64,15 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder {
     /// # Arguments
     ///
     /// * `plan` - The root of the execution plan to optimize.
-    /// * `_config` - Configuration options (currently unused).
+    /// * `context` - Optimizer context containing configuration options.
     ///
     /// # Returns
     ///
     /// A `Result` containing the potentially optimized execution plan or an 
error.
-    fn optimize(
+    fn optimize_plan(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        _config: &ConfigOptions,
+        _context: &OptimizerContext,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         plan.transform_up(|plan| {
             if let Some(aggr_exec) = 
plan.as_any().downcast_ref::<AggregateExec>() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to