This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new b990987bdc Add PhysicalOptimizerRule::optimize_plan to allow passing
more context into optimizer rules (#18739)
b990987bdc is described below
commit b990987bdcdf1a9f1f4252810272076af261b15a
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Thu Nov 27 14:01:04 2025 +0530
Add PhysicalOptimizerRule::optimize_plan to allow passing more context into
optimizer rules (#18739)
Co-authored-by: Gabriel Musat Mestre <[email protected]>
---
datafusion/core/benches/push_down_filter.rs | 8 ++-
datafusion/core/src/physical_planner.rs | 7 +-
datafusion/core/tests/execution/coop.rs | 6 +-
datafusion/core/tests/parquet/file_statistics.rs | 6 +-
.../physical_optimizer/aggregate_statistics.rs | 21 ++++--
.../combine_partial_final_agg.rs | 9 +--
.../physical_optimizer/enforce_distribution.rs | 26 ++++++--
.../tests/physical_optimizer/enforce_sorting.rs | 13 ++--
.../physical_optimizer/filter_pushdown/mod.rs | 36 +++++++---
.../physical_optimizer/filter_pushdown/util.rs | 7 +-
.../tests/physical_optimizer/join_selection.rs | 45 +++++++++----
.../tests/physical_optimizer/limit_pushdown.rs | 47 +++++++++----
.../physical_optimizer/projection_pushdown.rs | 75 +++++++++++++++------
.../tests/physical_optimizer/sanity_checker.rs | 11 ++--
.../core/tests/physical_optimizer/test_utils.rs | 11 ++--
datafusion/execution/src/config.rs | 73 ++++++++++++++++----
.../physical-optimizer/src/aggregate_statistics.rs | 15 +++--
.../physical-optimizer/src/coalesce_batches.rs | 11 ++--
.../src/combine_partial_final_agg.rs | 7 +-
.../physical-optimizer/src/enforce_distribution.rs | 6 +-
datafusion/physical-optimizer/src/ensure_coop.rs | 14 ++--
.../physical-optimizer/src/filter_pushdown.rs | 16 ++++-
.../physical-optimizer/src/join_selection.rs | 7 +-
datafusion/physical-optimizer/src/lib.rs | 2 +-
.../physical-optimizer/src/limit_pushdown.rs | 7 +-
.../src/limit_pushdown_past_window.rs | 8 +--
.../src/limited_distinct_aggregation.rs | 8 +--
datafusion/physical-optimizer/src/optimizer.rs | 77 ++++++++++++++++++++--
.../physical-optimizer/src/output_requirements.rs | 7 +-
.../physical-optimizer/src/projection_pushdown.rs | 14 ++--
.../physical-optimizer/src/sanity_checker.rs | 9 +--
.../physical-optimizer/src/topk_aggregation.rs | 8 +--
.../physical-optimizer/src/update_aggr_exprs.rs | 9 ++-
33 files changed, 446 insertions(+), 180 deletions(-)
diff --git a/datafusion/core/benches/push_down_filter.rs
b/datafusion/core/benches/push_down_filter.rs
index 139fb12c30..bab103c171 100644
--- a/datafusion/core/benches/push_down_filter.rs
+++ b/datafusion/core/benches/push_down_filter.rs
@@ -20,10 +20,10 @@ use arrow::datatypes::{DataType, Field, Schema};
use bytes::{BufMut, BytesMut};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::config::ConfigOptions;
-use datafusion::prelude::{ParquetReadOptions, SessionContext};
+use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::ExecutionPlan;
use object_store::memory::InMemory;
use object_store::path::Path;
@@ -106,11 +106,13 @@ fn bench_push_down_filter(c: &mut Criterion) {
config.execution.parquet.pushdown_filters = true;
let plan = BenchmarkPlan { plan, config };
let optimizer = FilterPushdown::new();
+ let session_config = SessionConfig::from(plan.config.clone());
+ let optimizer_context = OptimizerContext::new(session_config);
c.bench_function("push_down_filter", |b| {
b.iter(|| {
optimizer
- .optimize(Arc::clone(&plan.plan), &plan.config)
+ .optimize_plan(Arc::clone(&plan.plan), &optimizer_context)
.unwrap();
});
});
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 2ae5aed30d..6f0b201185 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -92,7 +92,7 @@ use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::{
create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
};
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::empty::EmptyExec;
use datafusion_physical_plan::execution_plan::InvariantLevel;
use datafusion_physical_plan::joins::PiecewiseMergeJoinExec;
@@ -2271,11 +2271,14 @@ impl DefaultPhysicalPlanner {
// to verify that the plan fulfills the base requirements.
InvariantChecker(InvariantLevel::Always).check(&plan)?;
+ // Create optimizer context from session state
+ let optimizer_context =
OptimizerContext::new(session_state.config().clone());
+
let mut new_plan = Arc::clone(&plan);
for optimizer in optimizers {
let before_schema = new_plan.schema();
new_plan = optimizer
- .optimize(new_plan, session_state.config_options())
+ .optimize_plan(new_plan, &optimizer_context)
.map_err(|e| {
DataFusionError::Context(optimizer.name().to_string(),
Box::new(e))
})?;
diff --git a/datafusion/core/tests/execution/coop.rs
b/datafusion/core/tests/execution/coop.rs
index b6f406e967..c85b1dcdb5 100644
--- a/datafusion/core/tests/execution/coop.rs
+++ b/datafusion/core/tests/execution/coop.rs
@@ -40,7 +40,7 @@ use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::{LexOrdering,
PhysicalSortExpr};
use datafusion_physical_optimizer::ensure_coop::EnsureCooperative;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::coop::make_cooperative;
use datafusion_physical_plan::filter::FilterExec;
@@ -810,8 +810,8 @@ async fn query_yields(
task_ctx: Arc<TaskContext>,
) -> Result<(), Box<dyn Error>> {
// Run plan through EnsureCooperative
- let optimized =
- EnsureCooperative::new().optimize(plan,
task_ctx.session_config().options())?;
+ let optimizer_context =
OptimizerContext::new(task_ctx.session_config().clone());
+ let optimized = EnsureCooperative::new().optimize_plan(plan,
&optimizer_context)?;
// Get the stream
let stream = physical_plan::execute_stream(optimized, task_ctx)?;
diff --git a/datafusion/core/tests/parquet/file_statistics.rs
b/datafusion/core/tests/parquet/file_statistics.rs
index 64ee92eda2..db2715956d 100644
--- a/datafusion/core/tests/parquet/file_statistics.rs
+++ b/datafusion/core/tests/parquet/file_statistics.rs
@@ -40,7 +40,7 @@ use datafusion_expr::{col, lit, Expr};
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion_common::config::ConfigOptions;
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::ExecutionPlan;
use tempfile::tempdir;
@@ -84,8 +84,10 @@ async fn check_stats_precision_with_filter_pushdown() {
Arc::new(FilterExec::try_new(physical_filter,
exec_with_filter).unwrap())
as Arc<dyn ExecutionPlan>;
+ let session_config = SessionConfig::from(options.clone());
+ let optimizer_context = OptimizerContext::new(session_config);
let optimized_exec = FilterPushdown::new()
- .optimize(filtered_exec, &options)
+ .optimize_plan(filtered_exec, &optimizer_context)
.unwrap();
assert!(
diff --git a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs
b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs
index a79d743cb2..07c4a14283 100644
--- a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs
@@ -25,12 +25,13 @@ use arrow::record_batch::RecordBatch;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion_common::cast::as_int64_array;
-use datafusion_common::config::ConfigOptions;
use datafusion_common::Result;
+use datafusion_execution::config::SessionConfig;
use datafusion_execution::TaskContext;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{self, cast};
use datafusion_physical_optimizer::aggregate_statistics::AggregateStatistics;
+use datafusion_physical_optimizer::OptimizerContext;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::aggregates::AggregateExec;
use datafusion_physical_plan::aggregates::AggregateMode;
@@ -67,8 +68,10 @@ async fn assert_count_optim_success(
let task_ctx = Arc::new(TaskContext::default());
let plan: Arc<dyn ExecutionPlan> = Arc::new(plan);
- let config = ConfigOptions::new();
- let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan),
&config)?;
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
+ let optimized = AggregateStatistics::new()
+ .optimize_plan(Arc::clone(&plan), &optimizer_context)?;
// A ProjectionExec is a sign that the count optimization was applied
assert!(optimized.as_any().is::<ProjectionExec>());
@@ -264,8 +267,10 @@ async fn test_count_inexact_stat() -> Result<()> {
Arc::clone(&schema),
)?;
- let conf = ConfigOptions::new();
- let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg),
&conf)?;
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
+ let optimized = AggregateStatistics::new()
+ .optimize_plan(Arc::new(final_agg), &optimizer_context)?;
// check that the original ExecutionPlan was not replaced
assert!(optimized.as_any().is::<AggregateExec>());
@@ -308,8 +313,10 @@ async fn test_count_with_nulls_inexact_stat() ->
Result<()> {
Arc::clone(&schema),
)?;
- let conf = ConfigOptions::new();
- let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg),
&conf)?;
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
+ let optimized = AggregateStatistics::new()
+ .optimize_plan(Arc::new(final_agg), &optimizer_context)?;
// check that the original ExecutionPlan was not replaced
assert!(optimized.as_any().is::<AggregateExec>());
diff --git
a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs
b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs
index 9c76f6ab6f..b4b55a93a6 100644
--- a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs
@@ -26,7 +26,7 @@ use std::sync::Arc;
use crate::physical_optimizer::test_utils::parquet_exec;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use datafusion_common::config::ConfigOptions;
+use datafusion_execution::config::SessionConfig;
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_physical_expr::aggregate::{AggregateExprBuilder,
AggregateFunctionExpr};
@@ -34,7 +34,7 @@ use datafusion_physical_expr::expressions::{col, lit};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use
datafusion_physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
@@ -47,8 +47,9 @@ macro_rules! assert_optimized {
($PLAN: expr, @ $EXPECTED_LINES: literal $(,)?) => {
// run optimizer
let optimizer = CombinePartialFinalAggregate {};
- let config = ConfigOptions::new();
- let optimized = optimizer.optimize($PLAN, &config)?;
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
+ let optimized = optimizer.optimize_plan($PLAN, &optimizer_context)?;
// Now format correctly
let plan = displayable(optimized.as_ref()).indent(true).to_string();
let actual_lines = plan.trim();
diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
index a3d9a1e407..b1a0417703 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
@@ -52,7 +52,7 @@ use datafusion_physical_expr_common::sort_expr::{
use datafusion_physical_optimizer::enforce_distribution::*;
use datafusion_physical_optimizer::enforce_sorting::EnforceSorting;
use datafusion_physical_optimizer::output_requirements::OutputRequirements;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
@@ -489,7 +489,9 @@ impl TestConfig {
) -> Result<Arc<dyn ExecutionPlan>> {
// Add the ancillary output requirements operator at the start:
let optimizer = OutputRequirements::new_add_mode();
- let mut optimized = optimizer.optimize(plan.clone(), &self.config)?;
+ let session_config = SessionConfig::from(self.config.clone());
+ let optimizer_context = OptimizerContext::new(session_config.clone());
+ let mut optimized = optimizer.optimize_plan(plan.clone(),
&optimizer_context)?;
// This file has 2 rules that use tree node, apply these rules to
original plan consecutively
// After these operations tree nodes should be in a consistent state.
@@ -525,21 +527,25 @@ impl TestConfig {
}
for run in optimizers_to_run {
+ let session_config = SessionConfig::from(self.config.clone());
+ let optimizer_context =
OptimizerContext::new(session_config.clone());
optimized = match run {
Run::Distribution => {
let optimizer = EnforceDistribution::new();
- optimizer.optimize(optimized, &self.config)?
+ optimizer.optimize_plan(optimized, &optimizer_context)?
}
Run::Sorting => {
let optimizer = EnforceSorting::new();
- optimizer.optimize(optimized, &self.config)?
+ optimizer.optimize_plan(optimized, &optimizer_context)?
}
};
}
// Remove the ancillary output requirements operator when done:
let optimizer = OutputRequirements::new_remove_mode();
- let optimized = optimizer.optimize(optimized, &self.config)?;
+ let session_config = SessionConfig::from(self.config.clone());
+ let optimizer_context = OptimizerContext::new(session_config.clone());
+ let optimized = optimizer.optimize_plan(optimized,
&optimizer_context)?;
Ok(optimized)
}
@@ -3372,7 +3378,10 @@ SortRequiredExec: [a@0 ASC]
config.execution.target_partitions = 10;
config.optimizer.enable_round_robin_repartition = true;
config.optimizer.prefer_existing_sort = false;
- let dist_plan = EnforceDistribution::new().optimize(physical_plan,
&config)?;
+ let session_config = SessionConfig::from(config);
+ let optimizer_context = OptimizerContext::new(session_config.clone());
+ let dist_plan =
+ EnforceDistribution::new().optimize_plan(physical_plan,
&optimizer_context)?;
// Since at the start of the rule ordering requirement is not satisfied
// EnforceDistribution rule doesn't satisfy this requirement either.
assert_plan!(dist_plan, @r"
@@ -3408,7 +3417,10 @@ SortRequiredExec: [a@0 ASC]
config.execution.target_partitions = 10;
config.optimizer.enable_round_robin_repartition = true;
config.optimizer.prefer_existing_sort = false;
- let dist_plan = EnforceDistribution::new().optimize(physical_plan,
&config)?;
+ let session_config = SessionConfig::from(config);
+ let optimizer_context = OptimizerContext::new(session_config.clone());
+ let dist_plan =
+ EnforceDistribution::new().optimize_plan(physical_plan,
&optimizer_context)?;
// Since at the start of the rule ordering requirement is satisfied
// EnforceDistribution rule satisfy this requirement also.
assert_plan!(dist_plan, @r"
diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
index 3f7fa62d68..881ba35ef6 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs
@@ -56,7 +56,7 @@ use
datafusion_physical_optimizer::enforce_sorting::replace_with_order_preservin
use
datafusion_physical_optimizer::enforce_sorting::sort_pushdown::{SortPushDown,
assign_initial_requirements, pushdown_sorts};
use datafusion_physical_optimizer::enforce_distribution::EnforceDistribution;
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion::prelude::*;
use arrow::array::{Int32Array, RecordBatch};
use arrow::datatypes::{Field};
@@ -175,8 +175,10 @@ impl EnforceSortingTest {
let input_plan_string =
displayable(self.plan.as_ref()).indent(true).to_string();
// Run the actual optimizer
+ let session_config = SessionConfig::from(config);
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let optimized_physical_plan = EnforceSorting::new()
- .optimize(Arc::clone(&self.plan), &config)
+ .optimize_plan(Arc::clone(&self.plan), &optimizer_context)
.expect("enforce_sorting failed");
// Get string representation of the plan
@@ -2363,15 +2365,18 @@ async fn test_commutativity() -> Result<()> {
"#);
let config = ConfigOptions::new();
+ let session_config = SessionConfig::from(config);
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let rules = vec![
Arc::new(EnforceDistribution::new()) as Arc<dyn PhysicalOptimizerRule>,
Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,
];
let mut first_plan = orig_plan.clone();
for rule in rules {
- first_plan = rule.optimize(first_plan, &config)?;
+ first_plan = rule.optimize_plan(first_plan, &optimizer_context)?;
}
+ let optimizer_context2 = OptimizerContext::new(session_config.clone());
let rules = vec![
Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,
Arc::new(EnforceDistribution::new()) as Arc<dyn PhysicalOptimizerRule>,
@@ -2379,7 +2384,7 @@ async fn test_commutativity() -> Result<()> {
];
let mut second_plan = orig_plan.clone();
for rule in rules {
- second_plan = rule.optimize(second_plan, &config)?;
+ second_plan = rule.optimize_plan(second_plan, &optimizer_context2)?;
}
assert_eq!(get_plan_string(&first_plan), get_plan_string(&second_plan));
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
index 0903194b15..7b5557e617 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
@@ -47,7 +47,7 @@ use datafusion_physical_expr::{
};
use datafusion_physical_expr::{expressions::col, LexOrdering,
PhysicalSortExpr};
use datafusion_physical_optimizer::{
- filter_pushdown::FilterPushdown, PhysicalOptimizerRule,
+ filter_pushdown::FilterPushdown, OptimizerContext, PhysicalOptimizerRule,
};
use datafusion_physical_plan::{
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
@@ -244,8 +244,10 @@ async fn
test_dynamic_filter_pushdown_through_hash_join_with_topk() {
config.execution.parquet.pushdown_filters = true;
// Apply the FilterPushdown optimizer rule
+ let session_config = SessionConfig::from(config);
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let plan = FilterPushdown::new_post_optimization()
- .optimize(Arc::clone(&plan), &config)
+ .optimize_plan(Arc::clone(&plan), &optimizer_context)
.unwrap();
// Test that filters are pushed down correctly to each side of the join
@@ -721,8 +723,10 @@ async fn test_topk_dynamic_filter_pushdown() {
// Actually apply the optimization to the plan and put some data through
it to check that the filter is updated to reflect the TopK state
let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
+ let session_config = SessionConfig::from(config);
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let plan = FilterPushdown::new_post_optimization()
- .optimize(plan, &config)
+ .optimize_plan(plan, &optimizer_context)
.unwrap();
let config = SessionConfig::new().with_batch_size(2);
let session_ctx = SessionContext::new_with_config(config);
@@ -804,8 +808,10 @@ async fn
test_topk_dynamic_filter_pushdown_multi_column_sort() {
// Actually apply the optimization to the plan and put some data through
it to check that the filter is updated to reflect the TopK state
let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
+ let session_config = SessionConfig::from(config);
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let plan = FilterPushdown::new_post_optimization()
- .optimize(plan, &config)
+ .optimize_plan(plan, &optimizer_context)
.unwrap();
let config = SessionConfig::new().with_batch_size(2);
let session_ctx = SessionContext::new_with_config(config);
@@ -1048,8 +1054,10 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
config.optimizer.enable_dynamic_filter_pushdown = true;
+ let session_config = SessionConfig::from(config);
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let plan = FilterPushdown::new_post_optimization()
- .optimize(plan, &config)
+ .optimize_plan(plan, &optimizer_context)
.unwrap();
// Test for https://github.com/apache/datafusion/pull/17371: dynamic
filter linking survives `with_new_children`
@@ -1276,8 +1284,10 @@ async fn
test_hashjoin_dynamic_filter_pushdown_partitioned() {
let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
config.optimizer.enable_dynamic_filter_pushdown = true;
+ let session_config = SessionConfig::from(config);
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let plan = FilterPushdown::new_post_optimization()
- .optimize(plan, &config)
+ .optimize_plan(plan, &optimizer_context)
.unwrap();
let config = SessionConfig::new().with_batch_size(10);
let session_ctx = SessionContext::new_with_config(config);
@@ -1473,8 +1483,10 @@ async fn
test_hashjoin_dynamic_filter_pushdown_collect_left() {
let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
config.optimizer.enable_dynamic_filter_pushdown = true;
+ let session_config = SessionConfig::from(config);
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let plan = FilterPushdown::new_post_optimization()
- .optimize(plan, &config)
+ .optimize_plan(plan, &optimizer_context)
.unwrap();
let config = SessionConfig::new().with_batch_size(10);
let session_ctx = SessionContext::new_with_config(config);
@@ -1645,8 +1657,10 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
config.optimizer.enable_dynamic_filter_pushdown = true;
+ let session_config = SessionConfig::from(config);
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let plan = FilterPushdown::new_post_optimization()
- .optimize(outer_join, &config)
+ .optimize_plan(outer_join, &optimizer_context)
.unwrap();
let config = SessionConfig::new().with_batch_size(10);
let session_ctx = SessionContext::new_with_config(config);
@@ -2426,7 +2440,8 @@ async fn
test_hashjoin_dynamic_filter_all_partitions_empty() {
let mut config = SessionConfig::new();
config.options_mut().execution.parquet.pushdown_filters = true;
let optimizer = FilterPushdown::new_post_optimization();
- let plan = optimizer.optimize(plan, config.options()).unwrap();
+ let ctx = OptimizerContext::new(config.clone());
+ let plan = optimizer.optimize_plan(plan, &ctx).unwrap();
insta::assert_snapshot!(
format_plan_for_test(&plan),
@@ -2563,7 +2578,8 @@ async fn test_hashjoin_dynamic_filter_with_nulls() {
let mut config = SessionConfig::new();
config.options_mut().execution.parquet.pushdown_filters = true;
let optimizer = FilterPushdown::new_post_optimization();
- let plan = optimizer.optimize(plan, config.options()).unwrap();
+ let ctx = OptimizerContext::new(config.clone());
+ let plan = optimizer.optimize_plan(plan, &ctx).unwrap();
insta::assert_snapshot!(
format_plan_for_test(&plan),
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
index c32f7b2d0b..fc2be94b01 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
@@ -25,8 +25,9 @@ use datafusion_datasource::{
file_stream::FileOpener, schema_adapter::DefaultSchemaAdapterFactory,
schema_adapter::SchemaAdapterFactory, source::DataSourceExec,
PartitionedFile,
};
+use datafusion_execution::config::SessionConfig;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::filter::batch_filter;
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPhase,
PushedDown};
use datafusion_physical_plan::{
@@ -374,7 +375,9 @@ impl OptimizationTest {
let input = format_execution_plan(&input_plan);
let input_schema = input_plan.schema();
- let output_result = opt.optimize(input_plan, &parquet_pushdown_config);
+ let session_config = SessionConfig::from(parquet_pushdown_config);
+ let optimizer_context = OptimizerContext::new(session_config.clone());
+ let output_result = opt.optimize_plan(input_plan, &optimizer_context);
let output = output_result
.and_then(|plan| {
if opt.schema_check() && (plan.schema() != input_schema) {
diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs
b/datafusion/core/tests/physical_optimizer/join_selection.rs
index f9d3a04546..26b5e44853 100644
--- a/datafusion/core/tests/physical_optimizer/join_selection.rs
+++ b/datafusion/core/tests/physical_optimizer/join_selection.rs
@@ -29,6 +29,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::{stats::Precision, ColumnStatistics, JoinType,
ScalarValue};
use datafusion_common::{JoinSide, NullEquality};
use datafusion_common::{Result, Statistics};
+use datafusion_execution::config::SessionConfig;
use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream,
TaskContext};
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::col;
@@ -37,6 +38,7 @@ use datafusion_physical_expr::intervals::utils::check_support;
use datafusion_physical_expr::PhysicalExprRef;
use datafusion_physical_expr::{EquivalenceProperties, Partitioning,
PhysicalExpr};
use datafusion_physical_optimizer::join_selection::JoinSelection;
+use datafusion_physical_optimizer::OptimizerContext;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::displayable;
use datafusion_physical_plan::joins::utils::ColumnIndex;
@@ -226,8 +228,10 @@ async fn test_join_with_swap() {
.unwrap(),
);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let optimized_join = JoinSelection::new()
- .optimize(join, &ConfigOptions::new())
+ .optimize_plan(join, &optimizer_context)
.unwrap();
let swapping_projection = optimized_join
@@ -288,8 +292,10 @@ async fn test_left_join_no_swap() {
.unwrap(),
);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let optimized_join = JoinSelection::new()
- .optimize(join, &ConfigOptions::new())
+ .optimize_plan(join, &optimizer_context)
.unwrap();
let swapped_join = optimized_join
@@ -338,8 +344,10 @@ async fn test_join_with_swap_semi() {
let original_schema = join.schema();
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let optimized_join = JoinSelection::new()
- .optimize(Arc::new(join), &ConfigOptions::new())
+ .optimize_plan(Arc::new(join), &optimizer_context)
.unwrap();
let swapped_join = optimized_join
@@ -393,8 +401,10 @@ async fn test_join_with_swap_mark() {
let original_schema = join.schema();
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let optimized_join = JoinSelection::new()
- .optimize(Arc::new(join), &ConfigOptions::new())
+ .optimize_plan(Arc::new(join), &optimizer_context)
.unwrap();
let swapped_join = optimized_join
@@ -430,8 +440,10 @@ macro_rules! assert_optimized {
($PLAN: expr, @$EXPECTED_LINES: literal $(,)?) => {
let plan = Arc::new($PLAN);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let optimized = JoinSelection::new()
- .optimize(plan.clone(), &ConfigOptions::new())
+ .optimize_plan(plan.clone(), &optimizer_context)
.unwrap();
let plan_string =
displayable(optimized.as_ref()).indent(true).to_string();
@@ -522,8 +534,10 @@ async fn test_join_no_swap() {
.unwrap(),
);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let optimized_join = JoinSelection::new()
- .optimize(join, &ConfigOptions::new())
+ .optimize_plan(join, &optimizer_context)
.unwrap();
let swapped_join = optimized_join
@@ -571,8 +585,10 @@ async fn test_nl_join_with_swap(join_type: JoinType) {
.unwrap(),
);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let optimized_join = JoinSelection::new()
- .optimize(join, &ConfigOptions::new())
+ .optimize_plan(join, &optimizer_context)
.unwrap();
let swapping_projection = optimized_join
@@ -649,11 +665,10 @@ async fn test_nl_join_with_swap_no_proj(join_type:
JoinType) {
.unwrap(),
);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let optimized_join = JoinSelection::new()
- .optimize(
- Arc::<NestedLoopJoinExec>::clone(&join),
- &ConfigOptions::new(),
- )
+ .optimize_plan(Arc::<NestedLoopJoinExec>::clone(&join),
&optimizer_context)
.unwrap();
let swapped_join = optimized_join
@@ -910,8 +925,10 @@ fn check_join_partition_mode(
.unwrap(),
);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let optimized_join = JoinSelection::new()
- .optimize(join, &ConfigOptions::new())
+ .optimize_plan(join, &optimizer_context)
.unwrap();
if !is_swapped {
@@ -1555,8 +1572,10 @@ async fn test_join_with_maybe_swap_unbounded_case(t:
TestCase) -> Result<()> {
NullEquality::NullEqualsNothing,
)?) as _;
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let optimized_join_plan =
- JoinSelection::new().optimize(Arc::clone(&join),
&ConfigOptions::new())?;
+ JoinSelection::new().optimize_plan(Arc::clone(&join),
&optimizer_context)?;
// If swap did happen
let projection_added = optimized_join_plan.as_any().is::<ProjectionExec>();
diff --git a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
index 9d172db246..ef333c3067 100644
--- a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
@@ -24,13 +24,14 @@ use crate::physical_optimizer::test_utils::{
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use datafusion_common::config::ConfigOptions;
use datafusion_common::error::Result;
+use datafusion_execution::config::SessionConfig;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{col, lit, BinaryExpr};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr_common::sort_expr::{LexOrdering,
PhysicalSortExpr};
use datafusion_physical_optimizer::limit_pushdown::LimitPushdown;
+use datafusion_physical_optimizer::OptimizerContext;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::empty::EmptyExec;
use datafusion_physical_plan::filter::FilterExec;
@@ -101,8 +102,10 @@ fn
transforms_streaming_table_exec_into_fetching_version_when_skip_is_zero() ->
];
assert_eq!(initial, expected_initial);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
+ LimitPushdown::new().optimize_plan(global_limit, &optimizer_context)?;
let expected = [
"StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3],
infinite_source=true, fetch=5"
@@ -126,8 +129,10 @@ fn
transforms_streaming_table_exec_into_fetching_version_and_keeps_the_global_li
];
assert_eq!(initial, expected_initial);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
+ LimitPushdown::new().optimize_plan(global_limit, &optimizer_context)?;
let expected = [
"GlobalLimitExec: skip=2, fetch=5",
@@ -162,8 +167,10 @@ fn
transforms_coalesce_batches_exec_into_fetching_version_and_removes_local_limi
];
assert_eq!(initial, expected_initial);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
+ LimitPushdown::new().optimize_plan(global_limit, &optimizer_context)?;
let expected = [
"CoalescePartitionsExec: fetch=5",
@@ -194,8 +201,10 @@ fn pushes_global_limit_exec_through_projection_exec() ->
Result<()> {
];
assert_eq!(initial, expected_initial);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
+ LimitPushdown::new().optimize_plan(global_limit, &optimizer_context)?;
let expected = [
"ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
@@ -226,8 +235,10 @@ fn
pushes_global_limit_exec_through_projection_exec_and_transforms_coalesce_batc
assert_eq!(initial, expected_initial);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
+ LimitPushdown::new().optimize_plan(global_limit, &optimizer_context)?;
let expected = [
"ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
@@ -268,8 +279,10 @@ fn pushes_global_limit_into_multiple_fetch_plans() ->
Result<()> {
assert_eq!(initial, expected_initial);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
+ LimitPushdown::new().optimize_plan(global_limit, &optimizer_context)?;
let expected = [
"SortPreservingMergeExec: [c1@0 ASC], fetch=5",
@@ -304,8 +317,10 @@ fn
keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions() -> R
];
assert_eq!(initial, expected_initial);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
+ LimitPushdown::new().optimize_plan(global_limit, &optimizer_context)?;
let expected = [
"CoalescePartitionsExec: fetch=5",
@@ -334,8 +349,10 @@ fn merges_local_limit_with_local_limit() -> Result<()> {
assert_eq!(initial, expected_initial);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- LimitPushdown::new().optimize(parent_local_limit,
&ConfigOptions::new())?;
+ LimitPushdown::new().optimize_plan(parent_local_limit,
&optimizer_context)?;
let expected = ["GlobalLimitExec: skip=0, fetch=10", " EmptyExec"];
assert_eq!(get_plan_string(&after_optimize), expected);
@@ -359,8 +376,10 @@ fn merges_global_limit_with_global_limit() -> Result<()> {
assert_eq!(initial, expected_initial);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- LimitPushdown::new().optimize(parent_global_limit,
&ConfigOptions::new())?;
+ LimitPushdown::new().optimize_plan(parent_global_limit,
&optimizer_context)?;
let expected = ["GlobalLimitExec: skip=20, fetch=20", " EmptyExec"];
assert_eq!(get_plan_string(&after_optimize), expected);
@@ -384,8 +403,10 @@ fn merges_global_limit_with_local_limit() -> Result<()> {
assert_eq!(initial, expected_initial);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
+ LimitPushdown::new().optimize_plan(global_limit, &optimizer_context)?;
let expected = ["GlobalLimitExec: skip=20, fetch=20", " EmptyExec"];
assert_eq!(get_plan_string(&after_optimize), expected);
@@ -409,8 +430,10 @@ fn merges_local_limit_with_global_limit() -> Result<()> {
assert_eq!(initial, expected_initial);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- LimitPushdown::new().optimize(local_limit, &ConfigOptions::new())?;
+ LimitPushdown::new().optimize_plan(local_limit, &optimizer_context)?;
let expected = ["GlobalLimitExec: skip=20, fetch=20", " EmptyExec"];
assert_eq!(get_plan_string(&after_optimize), expected);
diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
index 80f4fbc305..1068e8ec03 100644
--- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
@@ -28,6 +28,7 @@ use datafusion_common::config::{ConfigOptions, CsvOptions};
use datafusion_common::{JoinSide, JoinType, NullEquality, Result, ScalarValue};
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::TableSchema;
+use datafusion_execution::config::SessionConfig;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::{
@@ -44,7 +45,7 @@ use datafusion_physical_expr_common::sort_expr::{
};
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
use datafusion_physical_optimizer::projection_pushdown::ProjectionPushdown;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
@@ -461,8 +462,10 @@ fn test_csv_after_projection() -> Result<()> {
"
);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+ ProjectionPushdown::new().optimize_plan(projection,
&optimizer_context)?;
let after_optimize_string = displayable(after_optimize.as_ref())
.indent(true)
@@ -499,8 +502,10 @@ fn test_memory_after_projection() -> Result<()> {
"
);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+ ProjectionPushdown::new().optimize_plan(projection,
&optimizer_context)?;
let after_optimize_string = displayable(after_optimize.as_ref())
.indent(true)
@@ -595,8 +600,10 @@ fn test_streaming_table_after_projection() -> Result<()> {
Arc::new(streaming_table) as _,
)?) as _;
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+ ProjectionPushdown::new().optimize_plan(projection,
&optimizer_context)?;
let result = after_optimize
.as_any()
@@ -695,8 +702,10 @@ fn test_projection_after_projection() -> Result<()> {
"
);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- ProjectionPushdown::new().optimize(top_projection,
&ConfigOptions::new())?;
+ ProjectionPushdown::new().optimize_plan(top_projection,
&optimizer_context)?;
let after_optimize_string = displayable(after_optimize.as_ref())
.indent(true)
@@ -760,8 +769,10 @@ fn test_output_req_after_projection() -> Result<()> {
"
);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+ ProjectionPushdown::new().optimize_plan(projection,
&optimizer_context)?;
let after_optimize_string = displayable(after_optimize.as_ref())
.indent(true)
@@ -850,8 +861,10 @@ fn test_coalesce_partitions_after_projection() ->
Result<()> {
"
);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+ ProjectionPushdown::new().optimize_plan(projection,
&optimizer_context)?;
let after_optimize_string = displayable(after_optimize.as_ref())
.indent(true)
@@ -907,8 +920,10 @@ fn test_filter_after_projection() -> Result<()> {
"
);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+ ProjectionPushdown::new().optimize_plan(projection,
&optimizer_context)?;
let after_optimize_string = displayable(after_optimize.as_ref())
.indent(true)
@@ -1009,8 +1024,10 @@ fn test_join_after_projection() -> Result<()> {
"
);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+ ProjectionPushdown::new().optimize_plan(projection,
&optimizer_context)?;
let after_optimize_string = displayable(after_optimize.as_ref())
.indent(true)
@@ -1137,8 +1154,10 @@ fn test_join_after_required_projection() -> Result<()> {
"
);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+ ProjectionPushdown::new().optimize_plan(projection,
&optimizer_context)?;
let after_optimize_string = displayable(after_optimize.as_ref())
.indent(true)
@@ -1215,8 +1234,10 @@ fn test_nested_loop_join_after_projection() ->
Result<()> {
"
);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize_string =
- ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+ ProjectionPushdown::new().optimize_plan(projection,
&optimizer_context)?;
let after_optimize_string = displayable(after_optimize_string.as_ref())
.indent(true)
.to_string();
@@ -1312,8 +1333,10 @@ fn test_hash_join_after_projection() -> Result<()> {
"
);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+ ProjectionPushdown::new().optimize_plan(projection,
&optimizer_context)?;
let after_optimize_string = displayable(after_optimize.as_ref())
.indent(true)
.to_string();
@@ -1340,8 +1363,10 @@ fn test_hash_join_after_projection() -> Result<()> {
join.clone(),
)?);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+ ProjectionPushdown::new().optimize_plan(projection,
&optimizer_context)?;
let after_optimize_string = displayable(after_optimize.as_ref())
.indent(true)
.to_string();
@@ -1393,8 +1418,10 @@ fn test_repartition_after_projection() -> Result<()> {
"
);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+ ProjectionPushdown::new().optimize_plan(projection,
&optimizer_context)?;
let after_optimize_string = displayable(after_optimize.as_ref())
.indent(true)
@@ -1463,8 +1490,10 @@ fn test_sort_after_projection() -> Result<()> {
"
);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+ ProjectionPushdown::new().optimize_plan(projection,
&optimizer_context)?;
let after_optimize_string = displayable(after_optimize.as_ref())
.indent(true)
@@ -1516,8 +1545,10 @@ fn test_sort_preserving_after_projection() -> Result<()>
{
"
);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+ ProjectionPushdown::new().optimize_plan(projection,
&optimizer_context)?;
let after_optimize_string = displayable(after_optimize.as_ref())
.indent(true)
@@ -1560,8 +1591,10 @@ fn test_union_after_projection() -> Result<()> {
"
);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+ ProjectionPushdown::new().optimize_plan(projection,
&optimizer_context)?;
let after_optimize_string = displayable(after_optimize.as_ref())
.indent(true)
@@ -1633,8 +1666,10 @@ fn test_partition_col_projection_pushdown() ->
Result<()> {
source,
)?);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+ ProjectionPushdown::new().optimize_plan(projection,
&optimizer_context)?;
let after_optimize_string = displayable(after_optimize.as_ref())
.indent(true)
@@ -1676,8 +1711,10 @@ fn test_partition_col_projection_pushdown_expr() ->
Result<()> {
source,
)?);
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
let after_optimize =
- ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+ ProjectionPushdown::new().optimize_plan(projection,
&optimizer_context)?;
let after_optimize_string = displayable(after_optimize.as_ref())
.indent(true)
diff --git a/datafusion/core/tests/physical_optimizer/sanity_checker.rs
b/datafusion/core/tests/physical_optimizer/sanity_checker.rs
index f46147de1b..ccacf35ec8 100644
--- a/datafusion/core/tests/physical_optimizer/sanity_checker.rs
+++ b/datafusion/core/tests/physical_optimizer/sanity_checker.rs
@@ -28,13 +28,13 @@ use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::datasource::stream::{FileStreamProvider, StreamConfig,
StreamTable};
use datafusion::prelude::{CsvReadOptions, SessionContext};
-use datafusion_common::config::ConfigOptions;
use datafusion_common::{JoinType, Result, ScalarValue};
+use datafusion_execution::config::SessionConfig;
use datafusion_physical_expr::expressions::{col, Literal};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::{displayable, ExecutionPlan};
@@ -393,9 +393,12 @@ fn create_test_schema2() -> SchemaRef {
/// Check if sanity checker should accept or reject plans.
fn assert_sanity_check(plan: &Arc<dyn ExecutionPlan>, is_sane: bool) {
let sanity_checker = SanityCheckPlan::new();
- let opts = ConfigOptions::default();
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
assert_eq!(
- sanity_checker.optimize(plan.clone(), &opts).is_ok(),
+ sanity_checker
+ .optimize_plan(plan.clone(), &optimizer_context)
+ .is_ok(),
is_sane
);
}
diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs
b/datafusion/core/tests/physical_optimizer/test_utils.rs
index e410c495c8..e164e7617c 100644
--- a/datafusion/core/tests/physical_optimizer/test_utils.rs
+++ b/datafusion/core/tests/physical_optimizer/test_utils.rs
@@ -29,12 +29,12 @@ use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::physical_plan::ParquetSource;
use datafusion::datasource::source::DataSourceExec;
-use datafusion_common::config::ConfigOptions;
use datafusion_common::stats::Precision;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::utils::expr::COUNT_STAR_EXPANSION;
use datafusion_common::{ColumnStatistics, JoinType, NullEquality, Result,
Statistics};
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+use datafusion_execution::config::SessionConfig;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::{WindowFrame, WindowFunctionDefinition};
@@ -46,7 +46,7 @@ use datafusion_physical_expr_common::sort_expr::{
LexOrdering, OrderingRequirements, PhysicalSortExpr,
};
use
datafusion_physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggregation;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
@@ -635,10 +635,11 @@ pub fn build_group_by(input_schema: &SchemaRef, columns:
Vec<String>) -> Physica
}
pub fn get_optimized_plan(plan: &Arc<dyn ExecutionPlan>) -> Result<String> {
- let config = ConfigOptions::new();
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config.clone());
- let optimized =
- LimitedDistinctAggregation::new().optimize(Arc::clone(plan), &config)?;
+ let optimized = LimitedDistinctAggregation::new()
+ .optimize_plan(Arc::clone(plan), &optimizer_context)?;
let optimized_result =
displayable(optimized.as_ref()).indent(true).to_string();
diff --git a/datafusion/execution/src/config.rs
b/datafusion/execution/src/config.rs
index a0b180bf40..443229a3cb 100644
--- a/datafusion/execution/src/config.rs
+++ b/datafusion/execution/src/config.rs
@@ -100,7 +100,7 @@ pub struct SessionConfig {
/// references to the same options.
options: Arc<ConfigOptions>,
/// Opaque extensions.
- extensions: AnyMap,
+ extensions: Arc<Extensions>,
}
impl Default for SessionConfig {
@@ -108,14 +108,53 @@ impl Default for SessionConfig {
Self {
options: Arc::new(ConfigOptions::new()),
// Assume no extensions by default.
- extensions: HashMap::with_capacity_and_hasher(
- 0,
- BuildHasherDefault::default(),
- ),
+ extensions: Arc::new(Extensions::default()),
}
}
}
+/// A type map for storing extensions.
+///
+/// Extensions are indexed by their type `T`. If multiple values of the same
type are provided, only the last one
+/// will be kept.
+///
+/// Extensions are opaque objects that are unknown to DataFusion itself but
can be downcast by optimizer rules,
+/// execution plans, or other components that have access to the session
config.
+/// They provide a flexible way to attach extra data or behavior to the
session config.
+#[derive(Clone, Debug)]
+pub struct Extensions {
+ inner: AnyMap,
+}
+
+impl Default for Extensions {
+ fn default() -> Self {
+ Self {
+ inner: HashMap::with_capacity_and_hasher(0,
BuildHasherDefault::default()),
+ }
+ }
+}
+
+impl Extensions {
+ pub fn insert<T>(&mut self, value: Arc<T>)
+ where
+ T: Send + Sync + 'static,
+ {
+ let id = TypeId::of::<T>();
+ self.inner.insert(id, value);
+ }
+
+ pub fn get<T>(&self) -> Option<Arc<T>>
+ where
+ T: Send + Sync + 'static,
+ {
+ let id = TypeId::of::<T>();
+ self.inner
+ .get(&id)
+ .cloned()
+ .map(|arc_any| Arc::downcast(arc_any).expect("TypeId unique"))
+ }
+}
+
impl SessionConfig {
/// Create an execution config with default setting
pub fn new() -> Self {
@@ -164,6 +203,20 @@ impl SessionConfig {
Arc::make_mut(&mut self.options)
}
+ /// Return a handle to the extensions.
+ ///
+ /// Can be used to read the current extensions.
+ pub fn extensions(&self) -> &Arc<Extensions> {
+ &self.extensions
+ }
+
+ /// Return a mutable handle to the extensions.
+ ///
+ /// Can be used to set extensions.
+ pub fn extensions_mut(&mut self) -> &mut Extensions {
+ Arc::make_mut(&mut self.extensions)
+ }
+
/// Set a configuration option
pub fn set(self, key: &str, value: &ScalarValue) -> Self {
self.set_str(key, &value.to_string())
@@ -580,9 +633,7 @@ impl SessionConfig {
where
T: Send + Sync + 'static,
{
- let ext = ext as Arc<dyn Any + Send + Sync + 'static>;
- let id = TypeId::of::<T>();
- self.extensions.insert(id, ext);
+ self.extensions_mut().insert::<T>(ext);
}
/// Get extension, if any for the specified type `T` exists.
@@ -592,11 +643,7 @@ impl SessionConfig {
where
T: Send + Sync + 'static,
{
- let id = TypeId::of::<T>();
- self.extensions
- .get(&id)
- .cloned()
- .map(|ext| Arc::downcast(ext).expect("TypeId unique"))
+ self.extensions.get::<T>()
}
}
diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs
b/datafusion/physical-optimizer/src/aggregate_statistics.rs
index 672317060d..eee333926b 100644
--- a/datafusion/physical-optimizer/src/aggregate_statistics.rs
+++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs
@@ -16,7 +16,6 @@
// under the License.
//! Utilizing exact statistics from sources to avoid scanning data
-use datafusion_common::config::ConfigOptions;
use datafusion_common::scalar::ScalarValue;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::Result;
@@ -27,7 +26,7 @@ use datafusion_physical_plan::udaf::{AggregateFunctionExpr,
StatisticsArgs};
use datafusion_physical_plan::{expressions, ExecutionPlan};
use std::sync::Arc;
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
/// Optimizer that uses available statistics for aggregate functions
#[derive(Default, Debug)]
@@ -43,10 +42,10 @@ impl AggregateStatistics {
impl PhysicalOptimizerRule for AggregateStatistics {
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
#[allow(clippy::only_used_in_recursion)] // See
https://github.com/rust-lang/rust-clippy/issues/14566
- fn optimize(
+ fn optimize_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
- config: &ConfigOptions,
+ context: &OptimizerContext,
) -> Result<Arc<dyn ExecutionPlan>> {
if let Some(partial_agg_exec) = take_optimizable(&*plan) {
let partial_agg_exec = partial_agg_exec
@@ -86,13 +85,15 @@ impl PhysicalOptimizerRule for AggregateStatistics {
)?))
} else {
plan.map_children(|child| {
- self.optimize(child, config).map(Transformed::yes)
+ self.optimize_plan(child, context).map(Transformed::yes)
})
.data()
}
} else {
- plan.map_children(|child| self.optimize(child,
config).map(Transformed::yes))
- .data()
+ plan.map_children(|child| {
+ self.optimize_plan(child, context).map(Transformed::yes)
+ })
+ .data()
}
}
diff --git a/datafusion/physical-optimizer/src/coalesce_batches.rs
b/datafusion/physical-optimizer/src/coalesce_batches.rs
index 61e4c0e7f1..956a8994ba 100644
--- a/datafusion/physical-optimizer/src/coalesce_batches.rs
+++ b/datafusion/physical-optimizer/src/coalesce_batches.rs
@@ -18,14 +18,12 @@
//! CoalesceBatches optimizer that groups batches together rows
//! in bigger batches to avoid overhead with small batches
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
use std::sync::Arc;
use datafusion_common::error::Result;
-use datafusion_common::{
- assert_eq_or_internal_err, config::ConfigOptions, DataFusionError,
-};
+use datafusion_common::{assert_eq_or_internal_err, DataFusionError};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_plan::{
async_func::AsyncFuncExec, coalesce_batches::CoalesceBatchesExec,
@@ -46,11 +44,12 @@ impl CoalesceBatches {
}
}
impl PhysicalOptimizerRule for CoalesceBatches {
- fn optimize(
+ fn optimize_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
- config: &ConfigOptions,
+ context: &OptimizerContext,
) -> Result<Arc<dyn ExecutionPlan>> {
+ let config = context.session_config().options();
if !config.execution.coalesce_batches {
return Ok(plan);
}
diff --git a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
index bffb2c9df9..f189a7f3f0 100644
--- a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
+++ b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
@@ -26,8 +26,7 @@ use datafusion_physical_plan::aggregates::{
};
use datafusion_physical_plan::ExecutionPlan;
-use crate::PhysicalOptimizerRule;
-use datafusion_common::config::ConfigOptions;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
use datafusion_physical_expr::{physical_exprs_equal, PhysicalExpr};
@@ -47,10 +46,10 @@ impl CombinePartialFinalAggregate {
}
impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
- fn optimize(
+ fn optimize_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
- _config: &ConfigOptions,
+ _context: &OptimizerContext,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_down(|plan| {
// Check if the plan is AggregateExec
diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs
b/datafusion/physical-optimizer/src/enforce_distribution.rs
index 4464c12ca7..78e5c4edb2 100644
--- a/datafusion/physical-optimizer/src/enforce_distribution.rs
+++ b/datafusion/physical-optimizer/src/enforce_distribution.rs
@@ -30,6 +30,7 @@ use crate::utils::{
add_sort_above_with_check, is_coalesce_partitions, is_repartition,
is_sort_preserving_merge,
};
+use crate::OptimizerContext;
use arrow::compute::SortOptions;
use datafusion_common::config::ConfigOptions;
@@ -190,11 +191,12 @@ impl EnforceDistribution {
}
impl PhysicalOptimizerRule for EnforceDistribution {
- fn optimize(
+ fn optimize_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
- config: &ConfigOptions,
+ context: &OptimizerContext,
) -> Result<Arc<dyn ExecutionPlan>> {
+ let config = context.session_config().options();
let top_down_join_key_reordering =
config.optimizer.top_down_join_key_reordering;
let adjusted = if top_down_join_key_reordering {
diff --git a/datafusion/physical-optimizer/src/ensure_coop.rs
b/datafusion/physical-optimizer/src/ensure_coop.rs
index 0c0b63c0b3..38bf1877a4 100644
--- a/datafusion/physical-optimizer/src/ensure_coop.rs
+++ b/datafusion/physical-optimizer/src/ensure_coop.rs
@@ -23,9 +23,8 @@
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
-use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion_common::Result;
use datafusion_physical_plan::coop::CooperativeExec;
@@ -62,10 +61,10 @@ impl PhysicalOptimizerRule for EnsureCooperative {
"EnsureCooperative"
}
- fn optimize(
+ fn optimize_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
- _config: &ConfigOptions,
+ _context: &OptimizerContext,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(|plan| {
let is_leaf = plan.children().is_empty();
@@ -96,16 +95,17 @@ impl PhysicalOptimizerRule for EnsureCooperative {
#[cfg(test)]
mod tests {
use super::*;
- use datafusion_common::config::ConfigOptions;
+ use datafusion_execution::config::SessionConfig;
use datafusion_physical_plan::{displayable, test::scan_partitioned};
use insta::assert_snapshot;
#[tokio::test]
async fn test_cooperative_exec_for_custom_exec() {
let test_custom_exec = scan_partitioned(1);
- let config = ConfigOptions::new();
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config);
let optimized = EnsureCooperative::new()
- .optimize(test_custom_exec, &config)
+ .optimize_plan(test_custom_exec, &optimizer_context)
.unwrap();
let display = displayable(optimized.as_ref()).indent(true).to_string();
diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs
b/datafusion/physical-optimizer/src/filter_pushdown.rs
index 8bed6c3aeb..8206d43208 100644
--- a/datafusion/physical-optimizer/src/filter_pushdown.rs
+++ b/datafusion/physical-optimizer/src/filter_pushdown.rs
@@ -33,7 +33,7 @@
use std::sync::Arc;
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{
@@ -418,6 +418,20 @@ impl Default for FilterPushdown {
}
impl PhysicalOptimizerRule for FilterPushdown {
+ fn optimize_plan(
+ &self,
+ plan: Arc<dyn ExecutionPlan>,
+ context: &OptimizerContext,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let config = context.session_config().options();
+ Ok(
+ push_down_filters(&Arc::clone(&plan), vec![], config, self.phase)?
+ .updated_node
+ .unwrap_or(plan),
+ )
+ }
+
+ #[allow(deprecated)]
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/physical-optimizer/src/join_selection.rs
b/datafusion/physical-optimizer/src/join_selection.rs
index b55c01f62e..b6ff8344a3 100644
--- a/datafusion/physical-optimizer/src/join_selection.rs
+++ b/datafusion/physical-optimizer/src/join_selection.rs
@@ -23,7 +23,7 @@
//! pipeline-friendly ones. To achieve the second goal, it selects the proper
//! `PartitionMode` and the build side using the available statistics for hash
joins.
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_common::config::ConfigOptions;
use datafusion_common::error::Result;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
@@ -103,11 +103,12 @@ fn supports_collect_by_thresholds(
}
impl PhysicalOptimizerRule for JoinSelection {
- fn optimize(
+ fn optimize_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
- config: &ConfigOptions,
+ context: &OptimizerContext,
) -> Result<Arc<dyn ExecutionPlan>> {
+ let config = context.session_config().options();
// First, we make pipeline-fixing modifications to joins so as to
accommodate
// unbounded inputs. Each pipeline-fixing subrule, which is a function
// of type `PipelineFixerSubrule`, takes a single
[`PipelineStatePropagator`]
diff --git a/datafusion/physical-optimizer/src/lib.rs
b/datafusion/physical-optimizer/src/lib.rs
index f4b82eed3c..afa8252528 100644
--- a/datafusion/physical-optimizer/src/lib.rs
+++ b/datafusion/physical-optimizer/src/lib.rs
@@ -47,4 +47,4 @@ pub mod topk_aggregation;
pub mod update_aggr_exprs;
pub mod utils;
-pub use optimizer::PhysicalOptimizerRule;
+pub use optimizer::{OptimizerContext, PhysicalOptimizerRule};
diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs
b/datafusion/physical-optimizer/src/limit_pushdown.rs
index 7469c3af93..d70e8da9dd 100644
--- a/datafusion/physical-optimizer/src/limit_pushdown.rs
+++ b/datafusion/physical-optimizer/src/limit_pushdown.rs
@@ -21,9 +21,8 @@
use std::fmt::Debug;
use std::sync::Arc;
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
-use datafusion_common::config::ConfigOptions;
use datafusion_common::error::Result;
use datafusion_common::tree_node::{Transformed, TreeNodeRecursion};
use datafusion_common::utils::combine_limit;
@@ -60,10 +59,10 @@ impl LimitPushdown {
}
impl PhysicalOptimizerRule for LimitPushdown {
- fn optimize(
+ fn optimize_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
- _config: &ConfigOptions,
+ _context: &OptimizerContext,
) -> Result<Arc<dyn ExecutionPlan>> {
let global_state = GlobalRequirements {
fetch: None,
diff --git a/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs
b/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs
index 1c671cd074..ee20f4d43b 100644
--- a/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs
+++ b/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs
@@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::PhysicalOptimizerRule;
-use datafusion_common::config::ConfigOptions;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::ScalarValue;
use datafusion_expr::{LimitEffect, WindowFrameBound, WindowFrameUnits};
@@ -71,11 +70,12 @@ impl TraverseState {
}
impl PhysicalOptimizerRule for LimitPushPastWindows {
- fn optimize(
+ fn optimize_plan(
&self,
original: Arc<dyn ExecutionPlan>,
- config: &ConfigOptions,
+ context: &OptimizerContext,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
+ let config = context.session_config().options();
if !config.optimizer.enable_window_limits {
return Ok(original);
}
diff --git a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs
b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs
index 3666ff3798..89828a87cb 100644
--- a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs
+++ b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs
@@ -24,11 +24,10 @@ use datafusion_physical_plan::aggregates::AggregateExec;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
-use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::Result;
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
use itertools::Itertools;
/// An optimizer rule that passes a `limit` hint into grouped aggregations
which don't require all
@@ -158,11 +157,12 @@ impl Default for LimitedDistinctAggregation {
}
impl PhysicalOptimizerRule for LimitedDistinctAggregation {
- fn optimize(
+ fn optimize_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
- config: &ConfigOptions,
+ context: &OptimizerContext,
) -> Result<Arc<dyn ExecutionPlan>> {
+ let config = context.session_config().options();
if config.optimizer.enable_distinct_aggregation_soft_limit {
plan.transform_down(|plan| {
Ok(
diff --git a/datafusion/physical-optimizer/src/optimizer.rs
b/datafusion/physical-optimizer/src/optimizer.rs
index 03c83bb5a0..0f3467bfbb 100644
--- a/datafusion/physical-optimizer/src/optimizer.rs
+++ b/datafusion/physical-optimizer/src/optimizer.rs
@@ -38,10 +38,37 @@ use crate::update_aggr_exprs::OptimizeAggregateOrder;
use crate::limit_pushdown_past_window::LimitPushPastWindows;
use datafusion_common::config::ConfigOptions;
-use datafusion_common::Result;
+use datafusion_common::{internal_err, Result};
+use datafusion_execution::config::SessionConfig;
use datafusion_physical_plan::ExecutionPlan;
-/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which
+/// Context for optimizing physical plans.
+///
+/// This context provides access to session configuration and optimizer
extensions.
+///
+/// Similar to [`TaskContext`] which provides context during execution,
+/// [`OptimizerContext`] provides context during optimization.
+///
+/// [`TaskContext`]:
https://docs.rs/datafusion/latest/datafusion/execution/struct.TaskContext.html
+#[derive(Debug, Clone)]
+pub struct OptimizerContext {
+ /// Session configuration
+ session_config: SessionConfig,
+}
+
+impl OptimizerContext {
+ /// Create a new OptimizerContext
+ pub fn new(session_config: SessionConfig) -> Self {
+ Self { session_config }
+ }
+
+ /// Return a reference to the session configuration
+ pub fn session_config(&self) -> &SessionConfig {
+ &self.session_config
+ }
+}
+
+/// `PhysicalOptimizerRule` transforms one [`ExecutionPlan`] into another which
/// computes the same results, but in a potentially more efficient way.
///
/// Use [`SessionState::add_physical_optimizer_rule`] to register additional
@@ -49,12 +76,52 @@ use datafusion_physical_plan::ExecutionPlan;
///
/// [`SessionState::add_physical_optimizer_rule`]:
https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_physical_optimizer_rule
pub trait PhysicalOptimizerRule: Debug {
+ /// Rewrite `plan` to an optimized form with additional context
+ ///
+ /// This is the preferred method for implementing optimization rules as it
+ /// provides access to the full optimizer context including session
configuration.
+ ///
+ /// The default implementation delegates to
[`PhysicalOptimizerRule::optimize`] for
+ /// backwards compatibility with existing implementations.
+ ///
+ /// New implementations should override this method instead of
`optimize()`.
+ ///
+ /// Once [`PhysicalOptimizerRule::optimize`] is deprecated and removed,
this
+ /// default implementation will be removed and this method will become
required.
+ /// This change is scheduled for DataFusion 54.0.0.
+ fn optimize_plan(
+ &self,
+ plan: Arc<dyn ExecutionPlan>,
+ context: &OptimizerContext,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ // Default implementation: delegate to the old method for backwards
compatibility
+ #[allow(deprecated)]
+ self.optimize(plan, context.session_config().options())
+ }
+
/// Rewrite `plan` to an optimized form
+ ///
+ /// This method is kept for backwards compatibility. New implementations
+ /// should implement [`optimize_plan`](Self::optimize_plan) instead, which
+ /// provides access to additional context.
+ ///
+ /// The default implementation returns an error indicating that neither
+ /// `optimize` nor `optimize_plan` was properly implemented. At least one
+ /// of these methods must be overridden.
+ #[deprecated(
+ since = "52.0.0",
+ note = "use `PhysicalOptimizerRule::optimize_plan` instead"
+ )]
fn optimize(
&self,
- plan: Arc<dyn ExecutionPlan>,
- config: &ConfigOptions,
- ) -> Result<Arc<dyn ExecutionPlan>>;
+ _plan: Arc<dyn ExecutionPlan>,
+ _config: &ConfigOptions,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ internal_err!(
+ "PhysicalOptimizerRule '{}' must implement either optimize() or
optimize_plan()",
+ self.name()
+ )
+ }
/// A human readable name for this optimizer rule
fn name(&self) -> &str;
diff --git a/datafusion/physical-optimizer/src/output_requirements.rs
b/datafusion/physical-optimizer/src/output_requirements.rs
index 9e5e980219..c11b1d362a 100644
--- a/datafusion/physical-optimizer/src/output_requirements.rs
+++ b/datafusion/physical-optimizer/src/output_requirements.rs
@@ -24,9 +24,8 @@
use std::sync::Arc;
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
-use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Result, Statistics};
use datafusion_execution::TaskContext;
@@ -303,10 +302,10 @@ impl ExecutionPlan for OutputRequirementExec {
}
impl PhysicalOptimizerRule for OutputRequirements {
- fn optimize(
+ fn optimize_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
- _config: &ConfigOptions,
+ _context: &OptimizerContext,
) -> Result<Arc<dyn ExecutionPlan>> {
match self.mode {
RuleMode::Add => require_top_ordering(plan),
diff --git a/datafusion/physical-optimizer/src/projection_pushdown.rs
b/datafusion/physical-optimizer/src/projection_pushdown.rs
index b5e002b51f..fa562825c6 100644
--- a/datafusion/physical-optimizer/src/projection_pushdown.rs
+++ b/datafusion/physical-optimizer/src/projection_pushdown.rs
@@ -20,13 +20,12 @@
//! projections one by one if the operator below is amenable to this. If a
//! projection reaches a source, it can even disappear from the plan entirely.
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
use arrow::datatypes::{Fields, Schema, SchemaRef};
use datafusion_common::alias::AliasGenerator;
use std::collections::HashSet;
use std::sync::Arc;
-use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
@@ -57,10 +56,10 @@ impl ProjectionPushdown {
}
impl PhysicalOptimizerRule for ProjectionPushdown {
- fn optimize(
+ fn optimize_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
- _config: &ConfigOptions,
+ _context: &OptimizerContext,
) -> Result<Arc<dyn ExecutionPlan>> {
let alias_generator = AliasGenerator::new();
let plan = plan
@@ -447,6 +446,8 @@ fn is_volatile_expression_tree(expr: &dyn PhysicalExpr) ->
bool {
mod test {
use super::*;
use arrow::datatypes::{DataType, Field, FieldRef, Schema};
+ use datafusion_common::config::ConfigOptions;
+ use datafusion_execution::config::SessionConfig;
use datafusion_expr_common::operator::Operator;
use datafusion_functions::math::random;
use datafusion_physical_expr::expressions::{binary, lit};
@@ -672,7 +673,10 @@ mod test {
)?;
let optimizer = ProjectionPushdown::new();
- let optimized_plan = optimizer.optimize(Arc::new(join),
&Default::default())?;
+ let session_config = SessionConfig::new();
+ let optimizer_context = OptimizerContext::new(session_config);
+ let optimized_plan =
+ optimizer.optimize_plan(Arc::new(join), &optimizer_context)?;
let displayable_plan =
displayable(optimized_plan.as_ref()).indent(false);
Ok(displayable_plan.to_string())
diff --git a/datafusion/physical-optimizer/src/sanity_checker.rs
b/datafusion/physical-optimizer/src/sanity_checker.rs
index acc70d39f0..fc9187ba54 100644
--- a/datafusion/physical-optimizer/src/sanity_checker.rs
+++ b/datafusion/physical-optimizer/src/sanity_checker.rs
@@ -26,7 +26,7 @@ use std::sync::Arc;
use datafusion_common::Result;
use datafusion_physical_plan::ExecutionPlan;
-use datafusion_common::config::{ConfigOptions, OptimizerOptions};
+use datafusion_common::config::OptimizerOptions;
use datafusion_common::plan_err;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::intervals::utils::{check_support,
is_datatype_supported};
@@ -34,7 +34,7 @@ use datafusion_physical_plan::execution_plan::{Boundedness,
EmissionType};
use datafusion_physical_plan::joins::SymmetricHashJoinExec;
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
use
datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list;
use itertools::izip;
@@ -54,11 +54,12 @@ impl SanityCheckPlan {
}
impl PhysicalOptimizerRule for SanityCheckPlan {
- fn optimize(
+ fn optimize_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
- config: &ConfigOptions,
+ context: &OptimizerContext,
) -> Result<Arc<dyn ExecutionPlan>> {
+ let config = context.session_config().options();
plan.transform_up(|p| check_plan_sanity(p, &config.optimizer))
.data()
}
diff --git a/datafusion/physical-optimizer/src/topk_aggregation.rs
b/datafusion/physical-optimizer/src/topk_aggregation.rs
index b7505f0df4..a55b90e839 100644
--- a/datafusion/physical-optimizer/src/topk_aggregation.rs
+++ b/datafusion/physical-optimizer/src/topk_aggregation.rs
@@ -19,9 +19,8 @@
use std::sync::Arc;
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
use arrow::datatypes::DataType;
-use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::Result;
use datafusion_physical_expr::expressions::Column;
@@ -145,11 +144,12 @@ impl Default for TopKAggregation {
}
impl PhysicalOptimizerRule for TopKAggregation {
- fn optimize(
+ fn optimize_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
- config: &ConfigOptions,
+ context: &OptimizerContext,
) -> Result<Arc<dyn ExecutionPlan>> {
+ let config = context.session_config().options();
if config.optimizer.enable_topk_aggregation {
plan.transform_down(|plan| {
Ok(if let Some(plan) = TopKAggregation::transform_sort(&plan) {
diff --git a/datafusion/physical-optimizer/src/update_aggr_exprs.rs
b/datafusion/physical-optimizer/src/update_aggr_exprs.rs
index 61bc715592..39852a2293 100644
--- a/datafusion/physical-optimizer/src/update_aggr_exprs.rs
+++ b/datafusion/physical-optimizer/src/update_aggr_exprs.rs
@@ -20,7 +20,6 @@
use std::sync::Arc;
-use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{plan_datafusion_err, Result};
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
@@ -29,7 +28,7 @@ use datafusion_physical_plan::aggregates::{concat_slices,
AggregateExec};
use datafusion_physical_plan::windows::get_ordered_partition_by_indices;
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
-use crate::PhysicalOptimizerRule;
+use crate::{OptimizerContext, PhysicalOptimizerRule};
/// This optimizer rule checks ordering requirements of aggregate expressions.
///
@@ -65,15 +64,15 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder {
/// # Arguments
///
/// * `plan` - The root of the execution plan to optimize.
- /// * `_config` - Configuration options (currently unused).
+ /// * `context` - Optimizer context containing configuration options.
///
/// # Returns
///
/// A `Result` containing the potentially optimized execution plan or an
error.
- fn optimize(
+ fn optimize_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
- _config: &ConfigOptions,
+ _context: &OptimizerContext,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(|plan| {
if let Some(aggr_exec) =
plan.as_any().downcast_ref::<AggregateExec>() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]