This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e3487eee13 Stop copying LogicalPlan and Exprs in
`DecorrelatePredicateSubquery` (#10318)
e3487eee13 is described below
commit e3487eee1365a37b8651bf5a393ae8d2cc16e653
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed May 1 17:41:59 2024 -0400
Stop copying LogicalPlan and Exprs in `DecorrelatePredicateSubquery`
(#10318)
* Fix build with missing `use`
* Avoid clones in `DecorrelatePredicateSubquery`
---
.../src/decorrelate_predicate_subquery.rs | 168 ++++++++++++---------
1 file changed, 97 insertions(+), 71 deletions(-)
diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
index 2e72632170..b2650ac933 100644
--- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
+++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
@@ -26,17 +26,18 @@ use crate::utils::replace_qualified_name;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::alias::AliasGenerator;
-use datafusion_common::tree_node::{TransformedResult, TreeNode};
-use datafusion_common::{plan_err, Result};
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_common::{internal_err, plan_err, Result};
use datafusion_expr::expr::{Exists, InSubquery};
use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
use datafusion_expr::logical_plan::{JoinType, Subquery};
-use datafusion_expr::utils::{conjunction, split_conjunction};
+use datafusion_expr::utils::{conjunction, split_conjunction,
split_conjunction_owned};
use datafusion_expr::{
exists, in_subquery, not_exists, not_in_subquery, BinaryExpr, Expr, Filter,
LogicalPlan, LogicalPlanBuilder, Operator,
};
+use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use log::debug;
/// Optimizer rule for rewriting predicate(IN/EXISTS) subquery to left
semi/anti joins
@@ -49,6 +50,16 @@ impl DecorrelatePredicateSubquery {
Self::default()
}
+ fn rewrite_subquery(
+ &self,
+ mut subquery: Subquery,
+ config: &dyn OptimizerConfig,
+ ) -> Result<Subquery> {
+ subquery.subquery =
+ Arc::new(self.rewrite(unwrap_arc(subquery.subquery),
config)?.data);
+ Ok(subquery)
+ }
+
/// Finds expressions that have the predicate subqueries (and recurses
when found)
///
/// # Arguments
@@ -59,40 +70,32 @@ impl DecorrelatePredicateSubquery {
/// Returns a tuple (subqueries, non-subquery expressions)
fn extract_subquery_exprs(
&self,
- predicate: &Expr,
+ predicate: Expr,
config: &dyn OptimizerConfig,
) -> Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
- let filters = split_conjunction(predicate); // TODO: add ExistenceJoin
to support disjunctions
+ let filters = split_conjunction_owned(predicate); // TODO: add
ExistenceJoin to support disjunctions
let mut subqueries = vec![];
let mut others = vec![];
- for it in filters.iter() {
+ for it in filters.into_iter() {
match it {
Expr::InSubquery(InSubquery {
expr,
subquery,
negated,
}) => {
- let subquery_plan = self
- .try_optimize(&subquery.subquery, config)?
- .map(Arc::new)
- .unwrap_or_else(|| subquery.subquery.clone());
- let new_subquery = subquery.with_plan(subquery_plan);
+ let new_subquery = self.rewrite_subquery(subquery,
config)?;
subqueries.push(SubqueryInfo::new_with_in_expr(
new_subquery,
- (**expr).clone(),
- *negated,
+ *expr,
+ negated,
));
}
Expr::Exists(Exists { subquery, negated }) => {
- let subquery_plan = self
- .try_optimize(&subquery.subquery, config)?
- .map(Arc::new)
- .unwrap_or_else(|| subquery.subquery.clone());
- let new_subquery = subquery.with_plan(subquery_plan);
- subqueries.push(SubqueryInfo::new(new_subquery, *negated));
+ let new_subquery = self.rewrite_subquery(subquery,
config)?;
+ subqueries.push(SubqueryInfo::new(new_subquery, negated));
}
- _ => others.push((*it).clone()),
+ expr => others.push(expr),
}
}
@@ -103,62 +106,85 @@ impl DecorrelatePredicateSubquery {
impl OptimizerRule for DecorrelatePredicateSubquery {
fn try_optimize(
&self,
- plan: &LogicalPlan,
- config: &dyn OptimizerConfig,
+ _plan: &LogicalPlan,
+ _config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
- match plan {
- LogicalPlan::Filter(filter) => {
- let (subqueries, mut other_exprs) =
- self.extract_subquery_exprs(&filter.predicate, config)?;
- if subqueries.is_empty() {
- // regular filter, no subquery exists clause here
- return Ok(None);
- }
+ internal_err!("Should have called
DecorrelatePredicateSubquery::rewrite")
+ }
- // iterate through all exists clauses in predicate, turning
each into a join
- let mut cur_input = filter.input.as_ref().clone();
- for subquery in subqueries {
- if let Some(plan) =
- build_join(&subquery, &cur_input,
config.alias_generator())?
- {
- cur_input = plan;
- } else {
- // If the subquery can not be converted to a Join,
reconstruct the subquery expression and add it to the Filter
- let sub_query_expr = match subquery {
- SubqueryInfo {
- query,
- where_in_expr: Some(expr),
- negated: false,
- } => in_subquery(expr, query.subquery.clone()),
- SubqueryInfo {
- query,
- where_in_expr: Some(expr),
- negated: true,
- } => not_in_subquery(expr, query.subquery.clone()),
- SubqueryInfo {
- query,
- where_in_expr: None,
- negated: false,
- } => exists(query.subquery.clone()),
- SubqueryInfo {
- query,
- where_in_expr: None,
- negated: true,
- } => not_exists(query.subquery.clone()),
- };
- other_exprs.push(sub_query_expr);
- }
- }
+ fn supports_rewrite(&self) -> bool {
+ true
+ }
- let expr = conjunction(other_exprs);
- if let Some(expr) = expr {
- let new_filter = Filter::try_new(expr,
Arc::new(cur_input))?;
- cur_input = LogicalPlan::Filter(new_filter);
- }
- Ok(Some(cur_input))
+ fn rewrite(
+ &self,
+ plan: LogicalPlan,
+ config: &dyn OptimizerConfig,
+ ) -> Result<Transformed<LogicalPlan>> {
+ let LogicalPlan::Filter(filter) = plan else {
+ return Ok(Transformed::no(plan));
+ };
+
+ // if there are no subqueries in the predicate, return the original
plan
+ let has_subqueries = split_conjunction(&filter.predicate)
+ .iter()
+ .any(|expr| matches!(expr, Expr::InSubquery(_) | Expr::Exists(_)));
+ if !has_subqueries {
+ return Ok(Transformed::no(LogicalPlan::Filter(filter)));
+ }
+
+ let Filter {
+ predicate, input, ..
+ } = filter;
+ let (subqueries, mut other_exprs) =
+ self.extract_subquery_exprs(predicate, config)?;
+ if subqueries.is_empty() {
+ return internal_err!(
+ "can not find expected subqueries in
DecorrelatePredicateSubquery"
+ );
+ }
+
+ // iterate through all exists clauses in predicate, turning each into
a join
+ let mut cur_input = unwrap_arc(input);
+ for subquery in subqueries {
+ if let Some(plan) =
+ build_join(&subquery, &cur_input, config.alias_generator())?
+ {
+ cur_input = plan;
+ } else {
+ // If the subquery can not be converted to a Join, reconstruct
the subquery expression and add it to the Filter
+ let sub_query_expr = match subquery {
+ SubqueryInfo {
+ query,
+ where_in_expr: Some(expr),
+ negated: false,
+ } => in_subquery(expr, query.subquery),
+ SubqueryInfo {
+ query,
+ where_in_expr: Some(expr),
+ negated: true,
+ } => not_in_subquery(expr, query.subquery),
+ SubqueryInfo {
+ query,
+ where_in_expr: None,
+ negated: false,
+ } => exists(query.subquery),
+ SubqueryInfo {
+ query,
+ where_in_expr: None,
+ negated: true,
+ } => not_exists(query.subquery),
+ };
+ other_exprs.push(sub_query_expr);
}
- _ => Ok(None),
}
+
+ let expr = conjunction(other_exprs);
+ if let Some(expr) = expr {
+ let new_filter = Filter::try_new(expr, Arc::new(cur_input))?;
+ cur_input = LogicalPlan::Filter(new_filter);
+ }
+ Ok(Transformed::yes(cur_input))
}
fn name(&self) -> &str {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]