This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new ca8985ef1 Make OptimizerConfig a trait (#4631) (#4638) (#4645)
ca8985ef1 is described below
commit ca8985ef17739c2d342ac987b3de30a29a5fc76d
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Dec 16 18:21:53 2022 +0000
Make OptimizerConfig a trait (#4631) (#4638) (#4645)
* Make OptimizerConfig a trait (#4631) (#4638)
* Format
* Review feedback
---
datafusion-examples/examples/rewrite_expr.rs | 11 +-
datafusion/core/src/execution/context.rs | 58 ++++---
datafusion/core/tests/user_defined_plan.rs | 6 +-
datafusion/optimizer/README.md | 6 +-
.../optimizer/src/common_subexpr_eliminate.rs | 60 ++++----
.../optimizer/src/decorrelate_where_exists.rs | 16 +-
datafusion/optimizer/src/decorrelate_where_in.rs | 32 ++--
datafusion/optimizer/src/eliminate_cross_join.rs | 33 ++--
datafusion/optimizer/src/eliminate_filter.rs | 22 +--
datafusion/optimizer/src/eliminate_limit.rs | 32 ++--
datafusion/optimizer/src/eliminate_outer_join.rs | 23 +--
datafusion/optimizer/src/filter_null_join_keys.rs | 36 +++--
datafusion/optimizer/src/inline_table_scan.rs | 16 +-
datafusion/optimizer/src/lib.rs | 2 +-
datafusion/optimizer/src/optimizer.rs | 166 +++++++++++++--------
.../optimizer/src/propagate_empty_relation.rs | 12 +-
datafusion/optimizer/src/push_down_filter.rs | 33 ++--
datafusion/optimizer/src/push_down_limit.rs | 29 +---
datafusion/optimizer/src/push_down_projection.rs | 38 ++---
.../optimizer/src/rewrite_disjunctive_predicate.rs | 10 +-
.../optimizer/src/scalar_subquery_to_join.rs | 29 ++--
.../src/simplify_expressions/simplify_exprs.rs | 18 +--
.../optimizer/src/single_distinct_to_groupby.rs | 19 +--
.../optimizer/src/subquery_filter_to_join.rs | 15 +-
datafusion/optimizer/src/test/mod.rs | 8 +-
datafusion/optimizer/src/type_coercion.rs | 24 +--
.../optimizer/src/unwrap_cast_in_comparison.rs | 4 +-
datafusion/optimizer/src/utils.rs | 4 +-
datafusion/optimizer/tests/integration-test.rs | 8 +-
29 files changed, 359 insertions(+), 411 deletions(-)
diff --git a/datafusion-examples/examples/rewrite_expr.rs
b/datafusion-examples/examples/rewrite_expr.rs
index 216a6932c..a0be8c196 100644
--- a/datafusion-examples/examples/rewrite_expr.rs
+++ b/datafusion-examples/examples/rewrite_expr.rs
@@ -22,7 +22,7 @@ use datafusion_expr::{
AggregateUDF, Between, Expr, Filter, LogicalPlan, ScalarUDF, TableSource,
};
use datafusion_optimizer::optimizer::Optimizer;
-use datafusion_optimizer::{utils, OptimizerConfig, OptimizerRule};
+use datafusion_optimizer::{utils, OptimizerConfig, OptimizerContext,
OptimizerRule};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::dialect::PostgreSqlDialect;
use datafusion_sql::sqlparser::parser::Parser;
@@ -47,9 +47,8 @@ pub fn main() -> Result<()> {
// now run the optimizer with our custom rule
let optimizer = Optimizer::with_rules(vec![Arc::new(MyRule {})]);
- let mut optimizer_config =
OptimizerConfig::default().with_skip_failing_rules(false);
- let optimized_plan =
- optimizer.optimize(&logical_plan, &mut optimizer_config, observe)?;
+ let config = OptimizerContext::default().with_skip_failing_rules(false);
+ let optimized_plan = optimizer.optimize(&logical_plan, &config, observe)?;
println!(
"Optimized Logical Plan:\n\n{}\n",
optimized_plan.display_indent()
@@ -76,10 +75,10 @@ impl OptimizerRule for MyRule {
fn try_optimize(
&self,
plan: &LogicalPlan,
- _config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
// recurse down and optimize children first
- let plan = utils::optimize_children(self, plan, _config)?;
+ let plan = utils::optimize_children(self, plan, config)?;
match plan {
LogicalPlan::Filter(filter) => {
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index f98e21dd1..16f2a29f3 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -68,7 +68,7 @@ use crate::logical_expr::{
CreateView, DropTable, DropView, Explain, LogicalPlan, LogicalPlanBuilder,
SetVariable, TableSource, TableType, UNNAMED_TABLE,
};
-use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule};
+use crate::optimizer::{OptimizerContext, OptimizerRule};
use datafusion_sql::{ResolvedTableReference, TableReference};
use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
@@ -1557,14 +1557,6 @@ impl SessionState {
.register_catalog(config.default_catalog.clone(),
default_catalog);
}
- let optimizer_config = OptimizerConfig::new().filter_null_keys(
- config
- .config_options
- .read()
- .get_bool(OPT_FILTER_NULL_JOIN_KEYS)
- .unwrap_or_default(),
- );
-
let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync
+ Send>> = vec![
Arc::new(AggregateStatistics::new()),
Arc::new(JoinSelection::new()),
@@ -1593,7 +1585,7 @@ impl SessionState {
SessionState {
session_id,
- optimizer: Optimizer::new(&optimizer_config),
+ optimizer: Optimizer::new(),
physical_optimizers,
query_planner: Arc::new(DefaultQueryPlanner {}),
catalog_list,
@@ -1741,24 +1733,29 @@ impl SessionState {
/// Optimizes the logical plan by applying optimizer rules.
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
- let mut optimizer_config = OptimizerConfig::new()
- .with_skip_failing_rules(
- self.config
- .config_options
- .read()
- .get_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES)
- .unwrap_or_default(),
- )
- .with_max_passes(
- self.config
- .config_options
- .read()
- .get_u64(OPT_OPTIMIZER_MAX_PASSES)
- .unwrap_or_default() as u8,
- )
- .with_query_execution_start_time(
- self.execution_props.query_execution_start_time,
- );
+ // TODO: Implement OptimizerContext directly on DataFrame (#4631)
(#4626)
+ let config = {
+ let config_options = self.config.config_options.read();
+ OptimizerContext::new()
+ .with_skip_failing_rules(
+ config_options
+ .get_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES)
+ .unwrap_or_default(),
+ )
+ .with_max_passes(
+ config_options
+ .get_u64(OPT_OPTIMIZER_MAX_PASSES)
+ .unwrap_or_default() as u8,
+ )
+ .with_query_execution_start_time(
+ self.execution_props.query_execution_start_time,
+ )
+ .filter_null_keys(
+ config_options
+ .get_bool(OPT_FILTER_NULL_JOIN_KEYS)
+ .unwrap_or_default(),
+ )
+ };
if let LogicalPlan::Explain(e) = plan {
let mut stringified_plans = e.stringified_plans.clone();
@@ -1766,7 +1763,7 @@ impl SessionState {
// optimize the child plan, capturing the output of each optimizer
let plan = self.optimizer.optimize(
e.plan.as_ref(),
- &mut optimizer_config,
+ &config,
|optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = PlanType::OptimizedLogicalPlan {
optimizer_name };
@@ -1781,8 +1778,7 @@ impl SessionState {
schema: e.schema.clone(),
}))
} else {
- self.optimizer
- .optimize(plan, &mut optimizer_config, |_, _| {})
+ self.optimizer.optimize(plan, &config, |_, _| {})
}
}
diff --git a/datafusion/core/tests/user_defined_plan.rs
b/datafusion/core/tests/user_defined_plan.rs
index 4b5a7e66f..89cfd48e6 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -285,7 +285,7 @@ impl OptimizerRule for TopKOptimizerRule {
fn try_optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
// Note: this code simply looks for the pattern of a Limit followed by
a
// Sort and replaces it by a TopK node. It does not handle many
@@ -308,7 +308,7 @@ impl OptimizerRule for TopKOptimizerRule {
node: Arc::new(TopKPlanNode {
k: *fetch,
input: self
- .try_optimize(input.as_ref(),
optimizer_config)?
+ .try_optimize(input.as_ref(), config)?
.unwrap_or_else(|| input.as_ref().clone()),
expr: expr[0].clone(),
}),
@@ -319,7 +319,7 @@ impl OptimizerRule for TopKOptimizerRule {
// If we didn't find the Limit/Sort combination, recurse as
// normal and build the result.
- Ok(Some(optimize_children(self, plan, optimizer_config)?))
+ Ok(Some(optimize_children(self, plan, config)?))
}
fn name(&self) -> &str {
diff --git a/datafusion/optimizer/README.md b/datafusion/optimizer/README.md
index 51bc8e3ee..01c6f6dd2 100644
--- a/datafusion/optimizer/README.md
+++ b/datafusion/optimizer/README.md
@@ -42,9 +42,9 @@ and applying it to a logical plan to produce an optimized
logical plan.
// The `datafusion` crate provides a DataFrame API that can create a
LogicalPlan
let logical_plan = ...
-let mut config = OptimizerConfig::default();
+let mut config = OptimizerContext::default();
let optimizer = Optimizer::new(&config);
-let optimized_plan = optimizer.optimize(&logical_plan, &mut config, observe)?;
+let optimized_plan = optimizer.optimize(&logical_plan, &config, observe)?;
fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) {
println!(
@@ -82,7 +82,7 @@ pub trait OptimizerRule {
fn optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<LogicalPlan>;
/// A human readable name for this optimizer rule
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index 8236f20f3..8b1639cfc 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -17,8 +17,11 @@
//! Eliminate common sub-expression.
-use crate::{utils, OptimizerConfig, OptimizerRule};
+use std::collections::{BTreeSet, HashMap};
+use std::sync::Arc;
+
use arrow::datatypes::DataType;
+
use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError,
Result};
use datafusion_expr::{
col,
@@ -27,8 +30,8 @@ use datafusion_expr::{
logical_plan::{Aggregate, Filter, LogicalPlan, Projection, Sort, Window},
Expr, ExprSchemable,
};
-use std::collections::{BTreeSet, HashMap};
-use std::sync::Arc;
+
+use crate::{utils, OptimizerConfig, OptimizerRule};
/// A map from expression's identifier to tuple including
/// - the expression itself (cloned)
@@ -60,7 +63,7 @@ impl CommonSubexprEliminate {
arrays_list: &[&[Vec<(usize, String)>]],
input: &LogicalPlan,
expr_set: &mut ExprSet,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<(Vec<Vec<Expr>>, LogicalPlan)> {
let mut affected_id = BTreeSet::<Identifier>::new();
@@ -80,7 +83,7 @@ impl CommonSubexprEliminate {
.collect::<Result<Vec<_>>>()?;
let mut new_input = self
- .try_optimize(input, optimizer_config)?
+ .try_optimize(input, config)?
.unwrap_or_else(|| input.clone());
if !affected_id.is_empty() {
new_input = build_project_plan(new_input, affected_id, expr_set)?;
@@ -94,7 +97,7 @@ impl OptimizerRule for CommonSubexprEliminate {
fn try_optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let mut expr_set = ExprSet::new();
@@ -107,13 +110,8 @@ impl OptimizerRule for CommonSubexprEliminate {
let input_schema = Arc::clone(input.schema());
let arrays = to_arrays(expr, input_schema, &mut expr_set)?;
- let (mut new_expr, new_input) = self.rewrite_expr(
- &[expr],
- &[&arrays],
- input,
- &mut expr_set,
- optimizer_config,
- )?;
+ let (mut new_expr, new_input) =
+ self.rewrite_expr(&[expr], &[&arrays], input, &mut
expr_set, config)?;
Ok(Some(LogicalPlan::Projection(
Projection::try_new_with_schema(
@@ -140,7 +138,7 @@ impl OptimizerRule for CommonSubexprEliminate {
&[&[id_array]],
filter.input(),
&mut expr_set,
- optimizer_config,
+ config,
)?;
if let Some(predicate) = pop_expr(&mut new_expr)?.pop() {
@@ -167,7 +165,7 @@ impl OptimizerRule for CommonSubexprEliminate {
&[&arrays],
input,
&mut expr_set,
- optimizer_config,
+ config,
)?;
Ok(Some(LogicalPlan::Window(Window {
@@ -192,7 +190,7 @@ impl OptimizerRule for CommonSubexprEliminate {
&[&group_arrays, &aggr_arrays],
input,
&mut expr_set,
- optimizer_config,
+ config,
)?;
// note the reversed pop order.
let new_aggr_expr = pop_expr(&mut new_expr)?;
@@ -211,13 +209,8 @@ impl OptimizerRule for CommonSubexprEliminate {
let input_schema = Arc::clone(input.schema());
let arrays = to_arrays(expr, input_schema, &mut expr_set)?;
- let (mut new_expr, new_input) = self.rewrite_expr(
- &[expr],
- &[&arrays],
- input,
- &mut expr_set,
- optimizer_config,
- )?;
+ let (mut new_expr, new_input) =
+ self.rewrite_expr(&[expr], &[&arrays], input, &mut
expr_set, config)?;
Ok(Some(LogicalPlan::Sort(Sort {
expr: pop_expr(&mut new_expr)?,
@@ -249,11 +242,7 @@ impl OptimizerRule for CommonSubexprEliminate {
| LogicalPlan::Extension(_)
| LogicalPlan::Prepare(_) => {
// apply the optimization to all inputs of the plan
- Ok(Some(utils::optimize_children(
- self,
- plan,
- optimizer_config,
- )?))
+ Ok(Some(utils::optimize_children(self, plan, config)?))
}
}
}
@@ -572,20 +561,25 @@ fn replace_common_expr(
#[cfg(test)]
mod test {
- use super::*;
- use crate::test::*;
+ use std::iter;
+
use arrow::datatypes::{Field, Schema};
+
use datafusion_expr::logical_plan::{table_scan, JoinType};
use datafusion_expr::{
avg, binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder,
sum,
Operator,
};
- use std::iter;
+
+ use crate::optimizer::OptimizerContext;
+ use crate::test::*;
+
+ use super::*;
fn assert_optimized_plan_eq(expected: &str, plan: &LogicalPlan) {
let optimizer = CommonSubexprEliminate {};
let optimized_plan = optimizer
- .try_optimize(plan, &mut OptimizerConfig::new())
+ .try_optimize(plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
@@ -831,7 +825,7 @@ mod test {
.unwrap();
let rule = CommonSubexprEliminate {};
let optimized_plan = rule
- .try_optimize(&plan, &mut OptimizerConfig::new())
+ .try_optimize(&plan, &OptimizerContext::new())
.unwrap()
.unwrap();
diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs
b/datafusion/optimizer/src/decorrelate_where_exists.rs
index f0fa6bb9d..f1addf651 100644
--- a/datafusion/optimizer/src/decorrelate_where_exists.rs
+++ b/datafusion/optimizer/src/decorrelate_where_exists.rs
@@ -48,7 +48,7 @@ impl DecorrelateWhereExists {
fn extract_subquery_exprs(
&self,
predicate: &Expr,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
let filters = split_conjunction(predicate);
@@ -58,7 +58,7 @@ impl DecorrelateWhereExists {
match it {
Expr::Exists { subquery, negated } => {
let subquery = self
- .try_optimize(&subquery.subquery, optimizer_config)?
+ .try_optimize(&subquery.subquery, config)?
.map(Arc::new)
.unwrap_or_else(|| subquery.subquery.clone());
let subquery = Subquery { subquery };
@@ -77,7 +77,7 @@ impl OptimizerRule for DecorrelateWhereExists {
fn try_optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
@@ -86,11 +86,11 @@ impl OptimizerRule for DecorrelateWhereExists {
// Apply optimizer rule to current input
let optimized_input = self
- .try_optimize(filter_input, optimizer_config)?
+ .try_optimize(filter_input, config)?
.unwrap_or_else(|| filter_input.clone());
let (subqueries, other_exprs) =
- self.extract_subquery_exprs(predicate, optimizer_config)?;
+ self.extract_subquery_exprs(predicate, config)?;
let optimized_plan = LogicalPlan::Filter(Filter::try_new(
predicate.clone(),
Arc::new(optimized_input),
@@ -114,11 +114,7 @@ impl OptimizerRule for DecorrelateWhereExists {
}
_ => {
// Apply the optimization to all inputs of the plan
- Ok(Some(utils::optimize_children(
- self,
- plan,
- optimizer_config,
- )?))
+ Ok(Some(utils::optimize_children(self, plan, config)?))
}
}
}
diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs
b/datafusion/optimizer/src/decorrelate_where_in.rs
index de1725dc5..d2555ea5c 100644
--- a/datafusion/optimizer/src/decorrelate_where_in.rs
+++ b/datafusion/optimizer/src/decorrelate_where_in.rs
@@ -20,7 +20,7 @@ use crate::utils::{
only_or_err, split_conjunction, swap_table, verify_not_disjunction,
};
use crate::{utils, OptimizerConfig, OptimizerRule};
-use datafusion_common::context;
+use datafusion_common::{context, Result};
use datafusion_expr::logical_plan::{Filter, JoinType, Projection, Subquery};
use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
use log::debug;
@@ -46,7 +46,7 @@ impl DecorrelateWhereIn {
fn extract_subquery_exprs(
&self,
predicate: &Expr,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> datafusion_common::Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
let filters = split_conjunction(predicate); // TODO: disjunctions
@@ -60,7 +60,7 @@ impl DecorrelateWhereIn {
negated,
} => {
let subquery = self
- .try_optimize(&subquery.subquery, optimizer_config)?
+ .try_optimize(&subquery.subquery, config)?
.map(Arc::new)
.unwrap_or_else(|| subquery.subquery.clone());
let subquery = Subquery { subquery };
@@ -81,8 +81,8 @@ impl OptimizerRule for DecorrelateWhereIn {
fn try_optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
- ) -> datafusion_common::Result<Option<LogicalPlan>> {
+ config: &dyn OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
let predicate = filter.predicate();
@@ -90,11 +90,11 @@ impl OptimizerRule for DecorrelateWhereIn {
// Apply optimizer rule to current input
let optimized_input = self
- .try_optimize(filter_input, optimizer_config)?
+ .try_optimize(filter_input, config)?
.unwrap_or_else(|| filter_input.clone());
let (subqueries, other_exprs) =
- self.extract_subquery_exprs(predicate, optimizer_config)?;
+ self.extract_subquery_exprs(predicate, config)?;
let optimized_plan = LogicalPlan::Filter(Filter::try_new(
predicate.clone(),
Arc::new(optimized_input),
@@ -107,22 +107,14 @@ impl OptimizerRule for DecorrelateWhereIn {
// iterate through all exists clauses in predicate, turning
each into a join
let mut cur_input = filter_input.clone();
for subquery in subqueries {
- cur_input = optimize_where_in(
- &subquery,
- &cur_input,
- &other_exprs,
- optimizer_config,
- )?;
+ cur_input =
+ optimize_where_in(&subquery, &cur_input, &other_exprs,
config)?;
}
Ok(Some(cur_input))
}
_ => {
// Apply the optimization to all inputs of the plan
- Ok(Some(utils::optimize_children(
- self,
- plan,
- optimizer_config,
- )?))
+ Ok(Some(utils::optimize_children(self, plan, config)?))
}
}
}
@@ -136,7 +128,7 @@ fn optimize_where_in(
query_info: &SubqueryInfo,
outer_input: &LogicalPlan,
outer_other_exprs: &[Expr],
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> datafusion_common::Result<LogicalPlan> {
let proj = Projection::try_from_plan(&query_info.query.subquery)
.map_err(|e| context!("a projection is required", e))?;
@@ -179,7 +171,7 @@ fn optimize_where_in(
merge_cols((&[subquery_col], &subqry_cols), (&[outer_col],
&outer_cols));
// build subquery side of join - the thing the subquery was querying
- let subqry_alias = format!("__sq_{}", optimizer_config.next_id());
+ let subqry_alias = format!("__sq_{}", config.next_id());
let mut subqry_plan = LogicalPlanBuilder::from((*subqry_input).clone());
if let Some(expr) = conjunction(other_subqry_exprs) {
// if the subquery had additional expressions, restore them
diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs
b/datafusion/optimizer/src/eliminate_cross_join.rs
index 3e1227d02..338e58f68 100644
--- a/datafusion/optimizer/src/eliminate_cross_join.rs
+++ b/datafusion/optimizer/src/eliminate_cross_join.rs
@@ -16,7 +16,9 @@
// under the License.
//! Optimizer rule to eliminate cross join to inner join if join predicates
are available in filters.
-use crate::{utils, OptimizerConfig, OptimizerRule};
+use std::collections::HashSet;
+use std::sync::Arc;
+
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::expr::{BinaryExpr, Expr};
use datafusion_expr::logical_plan::{
@@ -27,8 +29,8 @@ use datafusion_expr::{
and, build_join_schema, or, wrap_projection_for_join_if_necessary,
ExprSchemable,
Operator,
};
-use std::collections::HashSet;
-use std::sync::Arc;
+
+use crate::{utils, OptimizerConfig, OptimizerRule};
#[derive(Default)]
pub struct EliminateCrossJoin;
@@ -54,7 +56,7 @@ impl OptimizerRule for EliminateCrossJoin {
fn try_optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
@@ -78,11 +80,7 @@ impl OptimizerRule for EliminateCrossJoin {
)?;
}
_ => {
- return Ok(Some(utils::optimize_children(
- self,
- plan,
- optimizer_config,
- )?));
+ return Ok(Some(utils::optimize_children(self, plan,
config)?));
}
}
@@ -102,7 +100,7 @@ impl OptimizerRule for EliminateCrossJoin {
)?;
}
- left = utils::optimize_children(self, &left,
optimizer_config)?;
+ left = utils::optimize_children(self, &left, config)?;
if plan.schema() != left.schema() {
left = LogicalPlan::Projection(Projection::new_from_schema(
@@ -128,11 +126,7 @@ impl OptimizerRule for EliminateCrossJoin {
}
}
- _ => Ok(Some(utils::optimize_children(
- self,
- plan,
- optimizer_config,
- )?)),
+ _ => Ok(Some(utils::optimize_children(self, plan, config)?)),
}
}
@@ -378,18 +372,21 @@ fn remove_join_expressions(
#[cfg(test)]
mod tests {
- use super::*;
- use crate::test::*;
use datafusion_expr::{
binary_expr, col, lit,
logical_plan::builder::LogicalPlanBuilder,
Operator::{And, Or},
};
+ use crate::optimizer::OptimizerContext;
+ use crate::test::*;
+
+ use super::*;
+
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: Vec<&str>) {
let rule = EliminateCrossJoin::new();
let optimized_plan = rule
- .try_optimize(plan, &mut OptimizerConfig::new())
+ .try_optimize(plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
let formatted = optimized_plan.display_indent_schema().to_string();
diff --git a/datafusion/optimizer/src/eliminate_filter.rs
b/datafusion/optimizer/src/eliminate_filter.rs
index 046d7b11f..7636a6a9f 100644
--- a/datafusion/optimizer/src/eliminate_filter.rs
+++ b/datafusion/optimizer/src/eliminate_filter.rs
@@ -18,13 +18,14 @@
//! Optimizer rule to replace `where false` on a plan with an empty relation.
//! This saves time in planning and executing the query.
//! Note that this rule should be applied after simplify expressions optimizer
rule.
-use crate::{utils, OptimizerConfig, OptimizerRule};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{
logical_plan::{EmptyRelation, LogicalPlan},
Expr,
};
+use crate::{utils, OptimizerConfig, OptimizerRule};
+
/// Optimization rule that elimanate the scalar value (true/false) filter with
an [LogicalPlan::EmptyRelation]
#[derive(Default)]
pub struct EliminateFilter;
@@ -40,7 +41,7 @@ impl OptimizerRule for EliminateFilter {
fn try_optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let predicate_and_input = match plan {
LogicalPlan::Filter(filter) => match filter.predicate() {
@@ -53,18 +54,14 @@ impl OptimizerRule for EliminateFilter {
};
match predicate_and_input {
- Some((true, input)) => self.try_optimize(input, optimizer_config),
+ Some((true, input)) => self.try_optimize(input, config),
Some((false, input)) =>
Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: input.schema().clone(),
}))),
None => {
// Apply the optimization to all inputs of the plan
- Ok(Some(utils::optimize_children(
- self,
- plan,
- optimizer_config,
- )?))
+ Ok(Some(utils::optimize_children(self, plan, config)?))
}
}
}
@@ -76,14 +73,17 @@ impl OptimizerRule for EliminateFilter {
#[cfg(test)]
mod tests {
- use super::*;
- use crate::test::*;
use datafusion_expr::{col, lit, logical_plan::builder::LogicalPlanBuilder,
sum};
+ use crate::optimizer::OptimizerContext;
+ use crate::test::*;
+
+ use super::*;
+
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = EliminateFilter::new();
let optimized_plan = rule
- .try_optimize(plan, &mut OptimizerConfig::new())
+ .try_optimize(plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
diff --git a/datafusion/optimizer/src/eliminate_limit.rs
b/datafusion/optimizer/src/eliminate_limit.rs
index 91771a940..840346d85 100644
--- a/datafusion/optimizer/src/eliminate_limit.rs
+++ b/datafusion/optimizer/src/eliminate_limit.rs
@@ -20,10 +20,11 @@
//! on a plan with an empty relation.
//! This rule also removes OFFSET 0 from the [LogicalPlan]
//! This saves time in planning and executing the query.
-use crate::{utils, OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan};
+use crate::{utils, OptimizerConfig, OptimizerRule};
+
/// Optimization rule that eliminate LIMIT 0 or useless LIMIT(skip:0,
fetch:None).
/// It can cooperate with `propagate_empty_relation` and `limit_push_down`.
#[derive(Default)]
@@ -40,7 +41,7 @@ impl OptimizerRule for EliminateLimit {
fn try_optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
if let LogicalPlan::Limit(limit) = plan {
match limit.fetch {
@@ -55,20 +56,12 @@ impl OptimizerRule for EliminateLimit {
None => {
if limit.skip == 0 {
let input = &*limit.input;
- return Ok(Some(utils::optimize_children(
- self,
- input,
- optimizer_config,
- )?));
+ return Ok(Some(utils::optimize_children(self, input,
config)?));
}
}
}
}
- Ok(Some(utils::optimize_children(
- self,
- plan,
- optimizer_config,
- )?))
+ Ok(Some(utils::optimize_children(self, plan, config)?))
}
fn name(&self) -> &str {
@@ -78,9 +71,6 @@ impl OptimizerRule for EliminateLimit {
#[cfg(test)]
mod tests {
- use super::*;
- use crate::push_down_limit::PushDownLimit;
- use crate::test::*;
use datafusion_common::Column;
use datafusion_expr::{
col,
@@ -88,9 +78,15 @@ mod tests {
sum,
};
+ use crate::optimizer::OptimizerContext;
+ use crate::push_down_limit::PushDownLimit;
+ use crate::test::*;
+
+ use super::*;
+
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) ->
Result<()> {
let optimized_plan = EliminateLimit::new()
- .try_optimize(plan, &mut OptimizerConfig::new())
+ .try_optimize(plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
@@ -104,11 +100,11 @@ mod tests {
expected: &str,
) -> Result<()> {
let optimized_plan = PushDownLimit::new()
- .try_optimize(plan, &mut OptimizerConfig::new())
+ .try_optimize(plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
let optimized_plan = EliminateLimit::new()
- .try_optimize(&optimized_plan, &mut OptimizerConfig::new())
+ .try_optimize(&optimized_plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs
b/datafusion/optimizer/src/eliminate_outer_join.rs
index c615d1fb0..cc535117d 100644
--- a/datafusion/optimizer/src/eliminate_outer_join.rs
+++ b/datafusion/optimizer/src/eliminate_outer_join.rs
@@ -64,7 +64,7 @@ impl OptimizerRule for EliminateOuterJoin {
fn try_optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => match filter.input().as_ref() {
@@ -109,23 +109,11 @@ impl OptimizerRule for EliminateOuterJoin {
null_equals_null: join.null_equals_null,
});
let new_plan = from_plan(plan, &plan.expressions(),
&[new_join])?;
- Ok(Some(utils::optimize_children(
- self,
- &new_plan,
- optimizer_config,
- )?))
+ Ok(Some(utils::optimize_children(self, &new_plan,
config)?))
}
- _ => Ok(Some(utils::optimize_children(
- self,
- plan,
- optimizer_config,
- )?)),
+ _ => Ok(Some(utils::optimize_children(self, plan, config)?)),
},
- _ => Ok(Some(utils::optimize_children(
- self,
- plan,
- optimizer_config,
- )?)),
+ _ => Ok(Some(utils::optimize_children(self, plan, config)?)),
}
}
@@ -307,6 +295,7 @@ fn extract_non_nullable_columns(
#[cfg(test)]
mod tests {
use super::*;
+ use crate::optimizer::OptimizerContext;
use crate::test::*;
use arrow::datatypes::DataType;
use datafusion_expr::{
@@ -319,7 +308,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) ->
Result<()> {
let rule = EliminateOuterJoin::new();
let optimized_plan = rule
- .try_optimize(plan, &mut OptimizerConfig::new())
+ .try_optimize(plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
diff --git a/datafusion/optimizer/src/filter_null_join_keys.rs
b/datafusion/optimizer/src/filter_null_join_keys.rs
index ee6c2cc5d..4bee88359 100644
--- a/datafusion/optimizer/src/filter_null_join_keys.rs
+++ b/datafusion/optimizer/src/filter_null_join_keys.rs
@@ -20,12 +20,14 @@
//! and then insert an `IsNotNull` filter on the nullable side since null
values
//! can never match.
-use crate::{utils, OptimizerConfig, OptimizerRule};
-use datafusion_common::{Column, DFField, DFSchemaRef};
+use std::sync::Arc;
+
+use datafusion_common::{Column, DFField, DFSchemaRef, Result};
use datafusion_expr::{
and, logical_plan::Filter, logical_plan::JoinType, Expr, LogicalPlan,
};
-use std::sync::Arc;
+
+use crate::{utils, OptimizerConfig, OptimizerRule};
/// The FilterNullJoinKeys rule will identify inner joins with equi-join
conditions
/// where the join key is nullable on one side and non-nullable on the other
side
@@ -34,22 +36,26 @@ use std::sync::Arc;
#[derive(Default)]
pub struct FilterNullJoinKeys {}
+impl FilterNullJoinKeys {
+ pub const NAME: &'static str = "filter_null_join_keys";
+}
+
impl OptimizerRule for FilterNullJoinKeys {
fn try_optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
- ) -> datafusion_common::Result<Option<LogicalPlan>> {
+ config: &dyn OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Join(join) if join.join_type == JoinType::Inner => {
// recurse down first and optimize inputs
let mut join = join.clone();
join.left = Arc::new(
- self.try_optimize(&join.left, optimizer_config)?
+ self.try_optimize(&join.left, config)?
.unwrap_or_else(|| join.left.as_ref().clone()),
);
join.right = Arc::new(
- self.try_optimize(&join.right, optimizer_config)?
+ self.try_optimize(&join.right, config)?
.unwrap_or_else(|| join.right.as_ref().clone()),
);
@@ -90,17 +96,13 @@ impl OptimizerRule for FilterNullJoinKeys {
}
_ => {
// Apply the optimization to all inputs of the plan
- Ok(Some(utils::optimize_children(
- self,
- plan,
- optimizer_config,
- )?))
+ Ok(Some(utils::optimize_children(self, plan, config)?))
}
}
}
fn name(&self) -> &str {
- "filter_null_join_keys"
+ Self::NAME
}
}
@@ -147,15 +149,19 @@ fn resolve_fields(
#[cfg(test)]
mod tests {
- use super::*;
use arrow::datatypes::{DataType, Field, Schema};
+
use datafusion_common::{Column, Result};
use datafusion_expr::logical_plan::table_scan;
use datafusion_expr::{logical_plan::JoinType, LogicalPlanBuilder};
+ use crate::optimizer::OptimizerContext;
+
+ use super::*;
+
fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan {
let rule = FilterNullJoinKeys::default();
- rule.try_optimize(plan, &mut OptimizerConfig::new())
+ rule.try_optimize(plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan")
}
diff --git a/datafusion/optimizer/src/inline_table_scan.rs
b/datafusion/optimizer/src/inline_table_scan.rs
index d8283bf71..fe24e675d 100644
--- a/datafusion/optimizer/src/inline_table_scan.rs
+++ b/datafusion/optimizer/src/inline_table_scan.rs
@@ -38,7 +38,7 @@ impl OptimizerRule for InlineTableScan {
fn try_optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
// Match only on scans without filter / projection / fetch
@@ -52,8 +52,7 @@ impl OptimizerRule for InlineTableScan {
}) if filters.is_empty() => {
if let Some(sub_plan) = source.get_logical_plan() {
// Recursively apply optimization
- let plan =
- utils::optimize_children(self, sub_plan,
optimizer_config)?;
+ let plan = utils::optimize_children(self, sub_plan,
config)?;
let plan = LogicalPlanBuilder::from(plan)
.project(vec![Expr::Wildcard])?
.alias(table_name)?;
@@ -67,11 +66,7 @@ impl OptimizerRule for InlineTableScan {
// Rest: Recurse
_ => {
// apply the optimization to all inputs of the plan
- Ok(Some(utils::optimize_children(
- self,
- plan,
- optimizer_config,
- )?))
+ Ok(Some(utils::optimize_children(self, plan, config)?))
}
}
}
@@ -88,7 +83,8 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder,
TableSource};
- use crate::{inline_table_scan::InlineTableScan, OptimizerConfig,
OptimizerRule};
+ use crate::optimizer::OptimizerContext;
+ use crate::{inline_table_scan::InlineTableScan, OptimizerRule};
pub struct RawTableSource {}
@@ -158,7 +154,7 @@ mod tests {
let plan =
scan.filter(col("x.a").eq(lit(1))).unwrap().build().unwrap();
let optimized_plan = rule
- .try_optimize(&plan, &mut OptimizerConfig::new())
+ .try_optimize(&plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs
index 1bf7ad58b..a4804ca5b 100644
--- a/datafusion/optimizer/src/lib.rs
+++ b/datafusion/optimizer/src/lib.rs
@@ -41,5 +41,5 @@ pub mod rewrite_disjunctive_predicate;
pub mod test;
pub mod unwrap_cast_in_comparison;
-pub use optimizer::{OptimizerConfig, OptimizerRule};
+pub use optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
pub use utils::optimize_children;
diff --git a/datafusion/optimizer/src/optimizer.rs
b/datafusion/optimizer/src/optimizer.rs
index 4e9eadc47..0dc651da2 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -41,6 +41,7 @@ use chrono::{DateTime, Utc};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::logical_plan::LogicalPlan;
use log::{debug, trace, warn};
+use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Instant;
@@ -54,7 +55,7 @@ pub trait OptimizerRule {
fn try_optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>>;
/// A human readable name for this optimizer rule
@@ -62,15 +63,35 @@ pub trait OptimizerRule {
}
/// Options to control the DataFusion Optimizer.
+pub trait OptimizerConfig {
+ /// Return the time at which the query execution started. This
+ /// time is used as the value for now()
+ fn query_execution_start_time(&self) -> DateTime<Utc>;
+
+ /// Returns false if the given rule should be skipped
+ fn rule_enabled(&self, name: &str) -> bool;
+
+ /// The optimizer will skip failing rules if this returns true
+ fn skip_failing_rules(&self) -> bool;
+
+ /// How many times to attempt to optimize the plan
+ fn max_passes(&self) -> u8;
+
+ /// Return a unique ID
+ ///
+ /// This is useful for assigning unique names to aliases
+ fn next_id(&self) -> usize;
+}
+
+/// A standalone [`OptimizerConfig`] that can be used independently
+/// of DataFusion's config management
#[derive(Debug)]
-pub struct OptimizerConfig {
+pub struct OptimizerContext {
/// Query execution start time that can be used to rewrite
/// expressions such as `now()` to use a literal value instead
query_execution_start_time: DateTime<Utc>,
/// id generator for optimizer passes
- // TODO this should not be on the config,
- // it should be its own 'OptimizerState' or something)
- next_id: usize,
+ next_id: AtomicUsize,
/// Option to skip rules that produce errors
skip_failing_rules: bool,
/// Specify whether to enable the filter_null_keys rule
@@ -79,12 +100,12 @@ pub struct OptimizerConfig {
max_passes: u8,
}
-impl OptimizerConfig {
+impl OptimizerContext {
/// Create optimizer config
pub fn new() -> Self {
Self {
query_execution_start_time: Utc::now(),
- next_id: 0, // useful for generating things like unique subquery
aliases
+ next_id: AtomicUsize::new(1),
skip_failing_rules: true,
filter_null_keys: true,
max_passes: 3,
@@ -119,24 +140,36 @@ impl OptimizerConfig {
self.max_passes = v;
self
}
+}
- /// Generate the next ID needed
- pub fn next_id(&mut self) -> usize {
- self.next_id += 1;
- self.next_id
+impl Default for OptimizerContext {
+ /// Create optimizer config
+ fn default() -> Self {
+ Self::new()
}
+}
- /// Return the time at which the query execution started. This
- /// time is used as the value for now()
- pub fn query_execution_start_time(&self) -> DateTime<Utc> {
+impl OptimizerConfig for OptimizerContext {
+ fn query_execution_start_time(&self) -> DateTime<Utc> {
self.query_execution_start_time
}
-}
-impl Default for OptimizerConfig {
- /// Create optimizer config
- fn default() -> Self {
- Self::new()
+ fn rule_enabled(&self, name: &str) -> bool {
+ self.filter_null_keys || name != FilterNullJoinKeys::NAME
+ }
+
+ fn skip_failing_rules(&self) -> bool {
+ self.skip_failing_rules
+ }
+
+ fn max_passes(&self) -> u8 {
+ self.max_passes
+ }
+
+ fn next_id(&self) -> usize {
+ use std::sync::atomic::Ordering;
+ // Can use relaxed ordering as not used for synchronisation
+ self.next_id.fetch_add(1, Ordering::Relaxed)
}
}
@@ -147,10 +180,16 @@ pub struct Optimizer {
pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}
+impl Default for Optimizer {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
impl Optimizer {
/// Create a new optimizer using the recommended list of rules
- pub fn new(config: &OptimizerConfig) -> Self {
- let mut rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
+ pub fn new() -> Self {
+ let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
Arc::new(InlineTableScan::new()),
Arc::new(TypeCoercion::new()),
Arc::new(SimplifyExpressions::new()),
@@ -169,22 +208,19 @@ impl Optimizer {
Arc::new(EliminateLimit::new()),
Arc::new(PropagateEmptyRelation::new()),
Arc::new(RewriteDisjunctivePredicate::new()),
+ Arc::new(FilterNullJoinKeys::default()),
+ Arc::new(EliminateOuterJoin::new()),
+ // Filters can't be pushed down past Limits, we should do
PushDownFilter after LimitPushDown
+ Arc::new(PushDownLimit::new()),
+ Arc::new(PushDownFilter::new()),
+ Arc::new(SingleDistinctToGroupBy::new()),
+ // The previous optimizations added expressions and projections,
+ // that might benefit from the following rules
+ Arc::new(SimplifyExpressions::new()),
+ Arc::new(UnwrapCastInComparison::new()),
+ Arc::new(CommonSubexprEliminate::new()),
+ Arc::new(PushDownProjection::new()),
];
- if config.filter_null_keys {
- rules.push(Arc::new(FilterNullJoinKeys::default()));
- }
- rules.push(Arc::new(EliminateOuterJoin::new()));
- // Filters can't be pushed down past Limits, we should do
PushDownFilter after LimitPushDown
- rules.push(Arc::new(PushDownLimit::new()));
- rules.push(Arc::new(PushDownFilter::new()));
- rules.push(Arc::new(SingleDistinctToGroupBy::new()));
-
- // The previous optimizations added expressions and projections,
- // that might benefit from the following rules
- rules.push(Arc::new(SimplifyExpressions::new()));
- rules.push(Arc::new(UnwrapCastInComparison::new()));
- rules.push(Arc::new(CommonSubexprEliminate::new()));
- rules.push(Arc::new(PushDownProjection::new()));
Self::with_rules(rules)
}
@@ -199,7 +235,7 @@ impl Optimizer {
pub fn optimize<F>(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
mut observer: F,
) -> Result<LogicalPlan>
where
@@ -209,11 +245,16 @@ impl Optimizer {
let mut plan_str = format!("{}", plan.display_indent());
let mut new_plan = plan.clone();
let mut i = 0;
- while i < optimizer_config.max_passes {
+ while i < config.max_passes() {
log_plan(&format!("Optimizer input (pass {})", i), &new_plan);
for rule in &self.rules {
- let result = rule.try_optimize(&new_plan, optimizer_config);
+ if !config.rule_enabled(rule.name()) {
+ debug!("Skipping rule {} due to optimizer config",
rule.name());
+ continue;
+ }
+
+ let result = rule.try_optimize(&new_plan, config);
match result {
Ok(Some(plan)) => {
if
!plan.schema().equivalent_names_and_types(new_plan.schema()) {
@@ -237,7 +278,7 @@ impl Optimizer {
);
}
Err(ref e) => {
- if optimizer_config.skip_failing_rules {
+ if config.skip_failing_rules() {
// Note to future readers: if you see this warning
it signals a
// bug in the DataFusion optimizer. Please
consider filing a ticket
// https://github.com/apache/arrow-datafusion
@@ -286,51 +327,49 @@ fn log_plan(description: &str, plan: &LogicalPlan) {
mod tests {
use crate::optimizer::Optimizer;
use crate::test::test_table_scan;
- use crate::{OptimizerConfig, OptimizerRule};
- use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError};
+ use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
+ use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError,
Result};
use datafusion_expr::logical_plan::EmptyRelation;
use datafusion_expr::{col, LogicalPlan, LogicalPlanBuilder, Projection};
use std::sync::Arc;
#[test]
- fn skip_failing_rule() -> Result<(), DataFusionError> {
+ fn skip_failing_rule() {
let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
- let mut config = OptimizerConfig::new().with_skip_failing_rules(true);
+ let config = OptimizerContext::new().with_skip_failing_rules(true);
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
- opt.optimize(&plan, &mut config, &observe)?;
- Ok(())
+ opt.optimize(&plan, &config, &observe).unwrap();
}
#[test]
- fn no_skip_failing_rule() -> Result<(), DataFusionError> {
+ fn no_skip_failing_rule() {
let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
- let mut config = OptimizerConfig::new().with_skip_failing_rules(false);
+ let config = OptimizerContext::new().with_skip_failing_rules(false);
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
- let result = opt.optimize(&plan, &mut config, &observe);
+ let err = opt.optimize(&plan, &config, &observe).unwrap_err();
assert_eq!(
"Internal error: Optimizer rule 'bad rule' failed due to
unexpected error: \
Error during planning: rule failed. This was likely caused by a
bug in \
DataFusion's code and we would welcome that you file an bug report
in our issue tracker",
- format!("{}", result.err().unwrap())
+ err.to_string()
);
- Ok(())
}
#[test]
- fn generate_different_schema() -> Result<(), DataFusionError> {
+ fn generate_different_schema() {
let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
- let mut config = OptimizerConfig::new().with_skip_failing_rules(false);
+ let config = OptimizerContext::new().with_skip_failing_rules(false);
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
- let result = opt.optimize(&plan, &mut config, &observe);
+ let err = opt.optimize(&plan, &config, &observe).unwrap_err();
assert_eq!(
"Internal error: Optimizer rule 'get table_scan rule' failed, due
to generate a different schema, \
original schema: DFSchema { fields: [], metadata: {} }, \
@@ -341,9 +380,8 @@ mod tests {
metadata: {} }. \
This was likely caused by a bug in DataFusion's code \
and we would welcome that you file an bug report in our issue
tracker",
- format!("{}", result.err().unwrap())
+ err.to_string()
);
- Ok(())
}
#[test]
@@ -351,7 +389,7 @@ mod tests {
// if the plan creates more metadata than previously (because
// some wrapping functions are removed, etc) do not error
let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
- let mut config = OptimizerConfig::new().with_skip_failing_rules(false);
+ let config = OptimizerContext::new().with_skip_failing_rules(false);
let input = Arc::new(test_table_scan().unwrap());
let input_schema = input.schema().clone();
@@ -364,7 +402,7 @@ mod tests {
// optimizing should be ok, but the schema will have changed (no
metadata)
assert_ne!(plan.schema().as_ref(), input_schema.as_ref());
- let optimized_plan = opt.optimize(&plan, &mut config,
&observe).unwrap();
+ let optimized_plan = opt.optimize(&plan, &config, &observe).unwrap();
// metadata was removed
assert_eq!(optimized_plan.schema().as_ref(), input_schema.as_ref());
}
@@ -399,9 +437,9 @@ mod tests {
impl OptimizerRule for BadRule {
fn try_optimize(
&self,
- _plan: &LogicalPlan,
- _optimizer_config: &mut OptimizerConfig,
- ) -> datafusion_common::Result<Option<LogicalPlan>> {
+ _: &LogicalPlan,
+ _: &dyn OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
Err(DataFusionError::Plan("rule failed".to_string()))
}
@@ -416,9 +454,9 @@ mod tests {
impl OptimizerRule for GetTableScanRule {
fn try_optimize(
&self,
- _plan: &LogicalPlan,
- _optimizer_config: &mut OptimizerConfig,
- ) -> datafusion_common::Result<Option<LogicalPlan>> {
+ _: &LogicalPlan,
+ _: &dyn OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
let table_scan = test_table_scan()?;
Ok(Some(LogicalPlanBuilder::from(table_scan).build()?))
}
diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs
b/datafusion/optimizer/src/propagate_empty_relation.rs
index b44a918fe..7ef769e21 100644
--- a/datafusion/optimizer/src/propagate_empty_relation.rs
+++ b/datafusion/optimizer/src/propagate_empty_relation.rs
@@ -37,11 +37,10 @@ impl OptimizerRule for PropagateEmptyRelation {
fn try_optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
// optimize child plans first
- let optimized_children_plan =
- utils::optimize_children(self, plan, optimizer_config)?;
+ let optimized_children_plan = utils::optimize_children(self, plan,
config)?;
match &optimized_children_plan {
LogicalPlan::EmptyRelation(_) => Ok(Some(optimized_children_plan)),
LogicalPlan::Projection(_)
@@ -204,6 +203,7 @@ fn empty_child(plan: &LogicalPlan) ->
Result<Option<LogicalPlan>> {
mod tests {
use crate::eliminate_filter::EliminateFilter;
use crate::test::{test_table_scan, test_table_scan_with_name};
+ use crate::OptimizerContext;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::logical_plan::table_scan;
@@ -217,7 +217,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = PropagateEmptyRelation::new();
let optimized_plan = rule
- .try_optimize(plan, &mut OptimizerConfig::new())
+ .try_optimize(plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
@@ -227,11 +227,11 @@ mod tests {
fn assert_together_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let optimize_one = EliminateFilter::new()
- .try_optimize(plan, &mut OptimizerConfig::new())
+ .try_optimize(plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
let optimize_two = PropagateEmptyRelation::new()
- .try_optimize(&optimize_one, &mut OptimizerConfig::new())
+ .try_optimize(&optimize_one, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimize_two);
diff --git a/datafusion/optimizer/src/push_down_filter.rs
b/datafusion/optimizer/src/push_down_filter.rs
index 7e90a4a79..0cf6c635b 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -506,7 +506,7 @@ impl OptimizerRule for PushDownFilter {
fn try_optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let filter = match plan {
LogicalPlan::Filter(filter) => filter,
@@ -517,22 +517,12 @@ impl OptimizerRule for PushDownFilter {
Some(optimized_plan) => Ok(Some(utils::optimize_children(
self,
&optimized_plan,
- optimizer_config,
- )?)),
- None => Ok(Some(utils::optimize_children(
- self,
- plan,
- optimizer_config,
+ config,
)?)),
+ None => Ok(Some(utils::optimize_children(self, plan,
config)?)),
};
}
- _ => {
- return Ok(Some(utils::optimize_children(
- self,
- plan,
- optimizer_config,
- )?))
- }
+ _ => return Ok(Some(utils::optimize_children(self, plan,
config)?)),
};
let child_plan = &**filter.input();
@@ -544,7 +534,7 @@ impl OptimizerRule for PushDownFilter {
new_predicate,
child_filter.input().clone(),
)?);
- return self.try_optimize(&new_plan, optimizer_config);
+ return self.try_optimize(&new_plan, config);
}
LogicalPlan::Repartition(_)
| LogicalPlan::Distinct(_)
@@ -745,11 +735,7 @@ impl OptimizerRule for PushDownFilter {
_ => plan.clone(),
};
- Ok(Some(utils::optimize_children(
- self,
- &new_plan,
- optimizer_config,
- )?))
+ Ok(Some(utils::optimize_children(self, &new_plan, config)?))
}
}
@@ -786,6 +772,7 @@ fn replace_cols_by_name(e: Expr, replace_map:
&HashMap<String, Expr>) -> Result<
mod tests {
use super::*;
use crate::test::*;
+ use crate::OptimizerContext;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion_common::DFSchema;
@@ -798,7 +785,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) ->
Result<()> {
let optimized_plan = PushDownFilter::new()
- .try_optimize(plan, &mut OptimizerConfig::new())
+ .try_optimize(plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
@@ -1947,7 +1934,7 @@ mod tests {
table_scan_with_pushdown_provider(TableProviderFilterPushDown::Inexact)?;
let optimised_plan = PushDownFilter::new()
- .try_optimize(&plan, &mut OptimizerConfig::new())
+ .try_optimize(&plan, &OptimizerContext::new())
.expect("failed to optimize plan")
.unwrap();
@@ -2299,7 +2286,7 @@ mod tests {
// Originally global state which can help to avoid duplicate Filters
been generated and pushed down.
// Now the global state is removed. Need to double confirm that avoid
duplicate Filters.
let optimized_plan = PushDownFilter::new()
- .try_optimize(&plan, &mut OptimizerConfig::new())
+ .try_optimize(&plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
assert_optimized_plan_eq(&optimized_plan, expected)
diff --git a/datafusion/optimizer/src/push_down_limit.rs
b/datafusion/optimizer/src/push_down_limit.rs
index 3da7e07db..ad5ceaea0 100644
--- a/datafusion/optimizer/src/push_down_limit.rs
+++ b/datafusion/optimizer/src/push_down_limit.rs
@@ -78,17 +78,11 @@ impl OptimizerRule for PushDownLimit {
fn try_optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let limit = match plan {
LogicalPlan::Limit(limit) => limit,
- _ => {
- return Ok(Some(utils::optimize_children(
- self,
- plan,
- optimizer_config,
- )?))
- }
+ _ => return Ok(Some(utils::optimize_children(self, plan,
config)?)),
};
if let LogicalPlan::Limit(child_limit) = &*limit.input {
@@ -118,18 +112,12 @@ impl OptimizerRule for PushDownLimit {
fetch: new_fetch,
input: Arc::new((*child_limit.input).clone()),
});
- return self.try_optimize(&plan, optimizer_config);
+ return self.try_optimize(&plan, config);
}
let fetch = match limit.fetch {
Some(fetch) => fetch,
- None => {
- return Ok(Some(utils::optimize_children(
- self,
- plan,
- optimizer_config,
- )?))
- }
+ None => return Ok(Some(utils::optimize_children(self, plan,
config)?)),
};
let skip = limit.skip;
@@ -237,11 +225,7 @@ impl OptimizerRule for PushDownLimit {
_ => plan.clone(),
};
- Ok(Some(utils::optimize_children(
- self,
- &plan,
- optimizer_config,
- )?))
+ Ok(Some(utils::optimize_children(self, &plan, config)?))
}
fn name(&self) -> &str {
@@ -263,6 +247,7 @@ mod test {
use super::*;
use crate::test::*;
+ use crate::OptimizerContext;
use datafusion_expr::{
col, exists,
logical_plan::{builder::LogicalPlanBuilder, JoinType, LogicalPlan},
@@ -271,7 +256,7 @@ mod test {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) ->
Result<()> {
let optimized_plan = PushDownLimit::new()
- .try_optimize(plan, &mut OptimizerConfig::new())
+ .try_optimize(plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
diff --git a/datafusion/optimizer/src/push_down_projection.rs
b/datafusion/optimizer/src/push_down_projection.rs
index 67a6c0d5c..01b089e42 100644
--- a/datafusion/optimizer/src/push_down_projection.rs
+++ b/datafusion/optimizer/src/push_down_projection.rs
@@ -49,7 +49,7 @@ impl OptimizerRule for PushDownProjection {
fn try_optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
// set of all columns referred by the plan (and thus considered
required by the root)
let required_columns = plan
@@ -63,7 +63,7 @@ impl OptimizerRule for PushDownProjection {
plan,
&required_columns,
false,
- optimizer_config,
+ config,
)?))
}
@@ -85,7 +85,7 @@ fn optimize_plan(
plan: &LogicalPlan,
required_columns: &HashSet<Column>, // set of columns required up to this
step
has_projection: bool,
- _optimizer_config: &OptimizerConfig,
+ _config: &dyn OptimizerConfig,
) -> Result<LogicalPlan> {
let mut new_required_columns = required_columns.clone();
let new_plan = match plan {
@@ -117,13 +117,8 @@ fn optimize_plan(
expr_to_columns(e, &mut new_required_columns)?
}
- let new_input = optimize_plan(
- _optimizer,
- input,
- &new_required_columns,
- true,
- _optimizer_config,
- )?;
+ let new_input =
+ optimize_plan(_optimizer, input, &new_required_columns, true,
_config)?;
let new_required_columns_optimized = new_input
.schema()
@@ -174,7 +169,7 @@ fn optimize_plan(
left,
&new_required_columns,
true,
- _optimizer_config,
+ _config,
)?);
let optimized_right = Arc::new(optimize_plan(
@@ -182,7 +177,7 @@ fn optimize_plan(
right,
&new_required_columns,
true,
- _optimizer_config,
+ _config,
)?);
let schema = build_join_schema(
@@ -229,7 +224,7 @@ fn optimize_plan(
input,
required_columns,
true,
- _optimizer_config,
+ _config,
)?)
.build();
};
@@ -245,7 +240,7 @@ fn optimize_plan(
input,
&new_required_columns,
true,
- _optimizer_config,
+ _config,
)?)
.window(new_window_expr)?
.build()
@@ -286,7 +281,7 @@ fn optimize_plan(
input,
&new_required_columns,
true,
- _optimizer_config,
+ _config,
)?),
group_expr.clone(),
new_aggr_expr,
@@ -316,7 +311,7 @@ fn optimize_plan(
&a.input,
&required_columns,
false,
- _optimizer_config,
+ _config,
)?),
verbose: a.verbose,
schema: a.schema.clone(),
@@ -348,7 +343,7 @@ fn optimize_plan(
input_plan,
&new_required_columns,
has_projection,
- _optimizer_config,
+ _config,
)
})
.collect::<Result<Vec<_>>>()?;
@@ -374,7 +369,7 @@ fn optimize_plan(
input,
&new_required_columns,
has_projection,
- _optimizer_config,
+ _config,
)?;
from_plan(plan, &plan.expressions(), &[child])
}
@@ -413,7 +408,7 @@ fn optimize_plan(
input_plan,
&new_required_columns,
has_projection,
- _optimizer_config,
+ _config,
)
})
.collect::<Result<Vec<_>>>()?;
@@ -527,6 +522,7 @@ fn push_down_scan(
mod tests {
use super::*;
use crate::test::*;
+ use crate::OptimizerContext;
use arrow::datatypes::{DataType, Schema};
use datafusion_expr::expr::Cast;
use datafusion_expr::{
@@ -1019,8 +1015,6 @@ mod tests {
fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
let rule = PushDownProjection::new();
- Ok(rule
- .try_optimize(plan, &mut OptimizerConfig::new())?
- .unwrap())
+ Ok(rule.try_optimize(plan, &OptimizerContext::new())?.unwrap())
}
}
diff --git a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs
b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs
index 079046273..0f9ba3d37 100644
--- a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs
+++ b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs
@@ -127,7 +127,7 @@ impl OptimizerRule for RewriteDisjunctivePredicate {
fn try_optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
@@ -136,16 +136,12 @@ impl OptimizerRule for RewriteDisjunctivePredicate {
let rewritten_expr = normalize_predicate(rewritten_predicate);
Ok(Some(LogicalPlan::Filter(Filter::try_new(
rewritten_expr,
- self.try_optimize(filter.input(), optimizer_config)?
+ self.try_optimize(filter.input(), config)?
.map(Arc::new)
.unwrap_or_else(|| filter.input().clone()),
)?)))
}
- _ => Ok(Some(utils::optimize_children(
- self,
- plan,
- optimizer_config,
- )?)),
+ _ => Ok(Some(utils::optimize_children(self, plan, config)?)),
}
}
diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs
b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index 12092621b..0a6110541 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -47,7 +47,7 @@ impl ScalarSubqueryToJoin {
fn extract_subquery_exprs(
&self,
predicate: &Expr,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
let filters = split_conjunction(predicate); // TODO: disjunctions
@@ -69,7 +69,7 @@ impl ScalarSubqueryToJoin {
_ => return Ok(()),
};
let subquery = self
- .try_optimize(&subquery.subquery,
optimizer_config)?
+ .try_optimize(&subquery.subquery, config)?
.map(Arc::new)
.unwrap_or_else(|| subquery.subquery.clone());
let subquery = Subquery { subquery };
@@ -93,17 +93,17 @@ impl OptimizerRule for ScalarSubqueryToJoin {
fn try_optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
// Apply optimizer rule to current input
let optimized_input = self
- .try_optimize(filter.input(), optimizer_config)?
+ .try_optimize(filter.input(), config)?
.unwrap_or_else(|| filter.input().as_ref().clone());
let (subqueries, other_exprs) =
- self.extract_subquery_exprs(filter.predicate(),
optimizer_config)?;
+ self.extract_subquery_exprs(filter.predicate(), config)?;
if subqueries.is_empty() {
// regular filter, no subquery exists clause here
@@ -116,12 +116,9 @@ impl OptimizerRule for ScalarSubqueryToJoin {
// iterate through all subqueries in predicate, turning each
into a join
let mut cur_input = filter.input().as_ref().clone();
for subquery in subqueries {
- if let Some(optimized_subquery) = optimize_scalar(
- &subquery,
- &cur_input,
- &other_exprs,
- optimizer_config,
- )? {
+ if let Some(optimized_subquery) =
+ optimize_scalar(&subquery, &cur_input, &other_exprs,
config)?
+ {
cur_input = optimized_subquery;
} else {
// if we can't handle all of the subqueries then bail
for now
@@ -135,11 +132,7 @@ impl OptimizerRule for ScalarSubqueryToJoin {
}
_ => {
// Apply the optimization to all inputs of the plan
- Ok(Some(utils::optimize_children(
- self,
- plan,
- optimizer_config,
- )?))
+ Ok(Some(utils::optimize_children(self, plan, config)?))
}
}
}
@@ -189,7 +182,7 @@ fn optimize_scalar(
query_info: &SubqueryInfo,
filter_input: &LogicalPlan,
outer_others: &[Expr],
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let subquery = query_info.query.subquery.as_ref();
debug!(
@@ -258,7 +251,7 @@ fn optimize_scalar(
}
// Only operate if one column is present and the other closed upon from
outside scope
- let subqry_alias = format!("__sq_{}", optimizer_config.next_id());
+ let subqry_alias = format!("__sq_{}", config.next_id());
let group_by: Vec<_> = subqry_cols
.iter()
.map(|it| Expr::Column(it.clone()))
diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
index 7b7ecd0d0..6645284e3 100644
--- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
+++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
@@ -46,11 +46,10 @@ impl OptimizerRule for SimplifyExpressions {
fn try_optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let mut execution_props = ExecutionProps::new();
- execution_props.query_execution_start_time =
- optimizer_config.query_execution_start_time();
+ execution_props.query_execution_start_time =
config.query_execution_start_time();
Ok(Some(Self::optimize_internal(plan, &execution_props)?))
}
}
@@ -128,6 +127,7 @@ mod tests {
use datafusion_common::ScalarValue;
use datafusion_expr::{or, Between, BinaryExpr, Cast, Operator};
+ use crate::OptimizerContext;
use datafusion_expr::logical_plan::table_scan;
use datafusion_expr::{
and, binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder,
Expr,
@@ -172,7 +172,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) ->
Result<()> {
let rule = SimplifyExpressions::new();
let optimized_plan = rule
- .try_optimize(plan, &mut OptimizerConfig::new())
+ .try_optimize(plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
@@ -380,12 +380,11 @@ mod tests {
// expect optimizing will result in an error, returning the error string
fn get_optimized_plan_err(plan: &LogicalPlan, date_time: &DateTime<Utc>)
-> String {
- let mut config =
- OptimizerConfig::new().with_query_execution_start_time(*date_time);
+ let config =
OptimizerContext::new().with_query_execution_start_time(*date_time);
let rule = SimplifyExpressions::new();
let err = rule
- .try_optimize(plan, &mut config)
+ .try_optimize(plan, &config)
.expect_err("expected optimization to fail");
err.to_string()
@@ -395,12 +394,11 @@ mod tests {
plan: &LogicalPlan,
date_time: &DateTime<Utc>,
) -> String {
- let mut config =
- OptimizerConfig::new().with_query_execution_start_time(*date_time);
+ let config =
OptimizerContext::new().with_query_execution_start_time(*date_time);
let rule = SimplifyExpressions::new();
let optimized_plan = rule
- .try_optimize(plan, &mut config)
+ .try_optimize(plan, &config)
.unwrap()
.expect("failed to optimize plan");
format!("{:?}", optimized_plan)
diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs
b/datafusion/optimizer/src/single_distinct_to_groupby.rs
index 2230033f0..4ba2e1599 100644
--- a/datafusion/optimizer/src/single_distinct_to_groupby.rs
+++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs
@@ -86,7 +86,7 @@ impl OptimizerRule for SingleDistinctToGroupBy {
fn try_optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Aggregate(Aggregate {
@@ -156,7 +156,7 @@ impl OptimizerRule for SingleDistinctToGroupBy {
Vec::new(),
)?);
let inner_agg =
- utils::optimize_children(self, &grouped_aggr,
optimizer_config)?;
+ utils::optimize_children(self, &grouped_aggr, config)?;
let outer_aggr_schema =
Arc::new(DFSchema::new_with_metadata(
outer_group_exprs
@@ -200,18 +200,10 @@ impl OptimizerRule for SingleDistinctToGroupBy {
)?,
)))
} else {
- Ok(Some(utils::optimize_children(
- self,
- plan,
- optimizer_config,
- )?))
+ Ok(Some(utils::optimize_children(self, plan, config)?))
}
}
- _ => Ok(Some(utils::optimize_children(
- self,
- plan,
- optimizer_config,
- )?)),
+ _ => Ok(Some(utils::optimize_children(self, plan, config)?)),
}
}
fn name(&self) -> &str {
@@ -223,6 +215,7 @@ impl OptimizerRule for SingleDistinctToGroupBy {
mod tests {
use super::*;
use crate::test::*;
+ use crate::OptimizerContext;
use datafusion_expr::expr::GroupingSet;
use datafusion_expr::{
col, count, count_distinct, lit,
logical_plan::builder::LogicalPlanBuilder, max,
@@ -232,7 +225,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = SingleDistinctToGroupBy::new();
let optimized_plan = rule
- .try_optimize(plan, &mut OptimizerConfig::new())
+ .try_optimize(plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs
b/datafusion/optimizer/src/subquery_filter_to_join.rs
index 36c18ea74..436d478b9 100644
--- a/datafusion/optimizer/src/subquery_filter_to_join.rs
+++ b/datafusion/optimizer/src/subquery_filter_to_join.rs
@@ -52,13 +52,13 @@ impl OptimizerRule for SubqueryFilterToJoin {
fn try_optimize(
&self,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
// Apply optimizer rule to current input
let optimized_input = self
- .try_optimize(filter.input(), optimizer_config)?
+ .try_optimize(filter.input(), config)?
.unwrap_or_else(|| filter.input().as_ref().clone());
// Splitting filter expression into components by AND
@@ -98,7 +98,7 @@ impl OptimizerRule for SubqueryFilterToJoin {
} => {
let right_input = self.try_optimize(
&subquery.subquery,
- optimizer_config
+ config
)?.unwrap_or_else(||subquery.subquery.as_ref().clone());
let right_schema = right_input.schema();
if right_schema.fields().len() != 1 {
@@ -168,11 +168,7 @@ impl OptimizerRule for SubqueryFilterToJoin {
}
_ => {
// Apply the optimization to all inputs of the plan
- Ok(Some(utils::optimize_children(
- self,
- plan,
- optimizer_config,
- )?))
+ Ok(Some(utils::optimize_children(self, plan, config)?))
}
}
}
@@ -204,6 +200,7 @@ fn extract_subquery_filters(expression: &Expr, extracted:
&mut Vec<Expr>) -> Res
mod tests {
use super::*;
use crate::test::*;
+ use crate::OptimizerContext;
use datafusion_expr::{
and, binary_expr, col, in_subquery, lit,
logical_plan::LogicalPlanBuilder,
not_in_subquery, or, Operator,
@@ -212,7 +209,7 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = SubqueryFilterToJoin::new();
let optimized_plan = rule
- .try_optimize(plan, &mut OptimizerConfig::new())
+ .try_optimize(plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{}",
optimized_plan.display_indent_schema());
diff --git a/datafusion/optimizer/src/test/mod.rs
b/datafusion/optimizer/src/test/mod.rs
index 1f51b38f6..462b94dd0 100644
--- a/datafusion/optimizer/src/test/mod.rs
+++ b/datafusion/optimizer/src/test/mod.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::{OptimizerConfig, OptimizerRule};
+use crate::{OptimizerContext, OptimizerRule};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::Result;
use datafusion_expr::{col, logical_plan::table_scan, LogicalPlan,
LogicalPlanBuilder};
@@ -107,7 +107,7 @@ pub fn assert_optimized_plan_eq(
expected: &str,
) {
let optimized_plan = rule
- .try_optimize(plan, &mut OptimizerConfig::new())
+ .try_optimize(plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{}", optimized_plan.display_indent_schema());
@@ -119,7 +119,7 @@ pub fn assert_optimizer_err(
plan: &LogicalPlan,
expected: &str,
) {
- let res = rule.try_optimize(plan, &mut OptimizerConfig::new());
+ let res = rule.try_optimize(plan, &OptimizerContext::new());
match res {
Ok(plan) => assert_eq!(format!("{}", plan.unwrap().display_indent()),
"An error"),
Err(ref e) => {
@@ -133,7 +133,7 @@ pub fn assert_optimizer_err(
pub fn assert_optimization_skipped(rule: &dyn OptimizerRule, plan:
&LogicalPlan) {
let new_plan = rule
- .try_optimize(plan, &mut OptimizerConfig::new())
+ .try_optimize(plan, &OptimizerContext::new())
.unwrap()
.unwrap();
assert_eq!(
diff --git a/datafusion/optimizer/src/type_coercion.rs
b/datafusion/optimizer/src/type_coercion.rs
index 8436f9b57..d1655fe35 100644
--- a/datafusion/optimizer/src/type_coercion.rs
+++ b/datafusion/optimizer/src/type_coercion.rs
@@ -17,9 +17,10 @@
//! Optimizer rule for type validation and coercion
-use crate::utils::rewrite_preserving_name;
-use crate::{OptimizerConfig, OptimizerRule};
+use std::sync::Arc;
+
use arrow::datatypes::{DataType, IntervalUnit};
+
use datafusion_common::{
parse_interval, DFSchema, DFSchemaRef, DataFusionError, Result,
ScalarValue,
};
@@ -39,7 +40,9 @@ use datafusion_expr::{
WindowFrame, WindowFrameBound, WindowFrameUnits,
};
use datafusion_expr::{ExprSchemable, Signature};
-use std::sync::Arc;
+
+use crate::utils::rewrite_preserving_name;
+use crate::{OptimizerConfig, OptimizerRule};
#[derive(Default)]
pub struct TypeCoercion {}
@@ -58,7 +61,7 @@ impl OptimizerRule for TypeCoercion {
fn try_optimize(
&self,
plan: &LogicalPlan,
- _optimizer_config: &mut OptimizerConfig,
+ _: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
Ok(Some(optimize_internal(&DFSchema::empty(), plan)?))
}
@@ -584,9 +587,10 @@ fn coerce_agg_exprs_for_signature(
#[cfg(test)]
mod test {
- use crate::type_coercion::{TypeCoercion, TypeCoercionRewriter};
- use crate::{OptimizerConfig, OptimizerRule};
+ use std::sync::Arc;
+
use arrow::datatypes::DataType;
+
use datafusion_common::{DFField, DFSchema, Result, ScalarValue};
use datafusion_expr::expr::Like;
use datafusion_expr::expr_rewriter::ExprRewritable;
@@ -602,12 +606,14 @@ mod test {
Signature, Volatility,
};
use datafusion_physical_expr::expressions::AvgAccumulator;
- use std::sync::Arc;
+
+ use crate::type_coercion::{TypeCoercion, TypeCoercionRewriter};
+ use crate::{OptimizerContext, OptimizerRule};
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) ->
Result<()> {
let rule = TypeCoercion::new();
- let mut config = OptimizerConfig::default();
- let plan = rule.try_optimize(plan, &mut config)?.unwrap();
+ let config = OptimizerContext::default();
+ let plan = rule.try_optimize(plan, &config)?.unwrap();
assert_eq!(expected, &format!("{:?}", plan));
Ok(())
}
diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
index 0d6a1265c..9f6d1a5ac 100644
--- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
+++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs
@@ -82,13 +82,13 @@ impl OptimizerRule for UnwrapCastInComparison {
fn try_optimize(
&self,
plan: &LogicalPlan,
- _optimizer_config: &mut OptimizerConfig,
+ _config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let new_inputs = plan
.inputs()
.into_iter()
.map(|input| {
- self.try_optimize(input, _optimizer_config)
+ self.try_optimize(input, _config)
.map(|o| o.unwrap_or_else(|| input.clone()))
})
.collect::<Result<Vec<_>>>()?;
diff --git a/datafusion/optimizer/src/utils.rs
b/datafusion/optimizer/src/utils.rs
index e2d326c16..324f49e44 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -40,12 +40,12 @@ use std::sync::Arc;
pub fn optimize_children(
optimizer: &impl OptimizerRule,
plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
+ config: &dyn OptimizerConfig,
) -> Result<LogicalPlan> {
let new_exprs = plan.expressions();
let mut new_inputs = Vec::with_capacity(plan.inputs().len());
for input in plan.inputs() {
- let new_input = optimizer.try_optimize(input, optimizer_config)?;
+ let new_input = optimizer.try_optimize(input, config)?;
new_inputs.push(new_input.unwrap_or_else(|| input.clone()))
}
from_plan(plan, &new_exprs, &new_inputs)
diff --git a/datafusion/optimizer/tests/integration-test.rs
b/datafusion/optimizer/tests/integration-test.rs
index 701d1a84c..62a8f1ef2 100644
--- a/datafusion/optimizer/tests/integration-test.rs
+++ b/datafusion/optimizer/tests/integration-test.rs
@@ -20,7 +20,7 @@ use chrono::{DateTime, NaiveDateTime, Utc};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource};
use datafusion_optimizer::optimizer::Optimizer;
-use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
+use datafusion_optimizer::{OptimizerContext, OptimizerRule};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::ast::Statement;
use datafusion_sql::sqlparser::dialect::GenericDialect;
@@ -330,12 +330,12 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
// hard code the return value of now()
let ts = NaiveDateTime::from_timestamp_opt(1666615693, 0).unwrap();
let now_time = DateTime::<Utc>::from_utc(ts, Utc);
- let mut config = OptimizerConfig::new()
+ let config = OptimizerContext::new()
.with_skip_failing_rules(false)
.with_query_execution_start_time(now_time);
- let optimizer = Optimizer::new(&config);
+ let optimizer = Optimizer::new();
// optimize the logical plan
- optimizer.optimize(&plan, &mut config, &observe)
+ optimizer.optimize(&plan, &config, &observe)
}
struct MySchemaProvider {}