avantgardnerio commented on code in PR #2885:
URL: https://github.com/apache/arrow-datafusion/pull/2885#discussion_r926953876
##########
datafusion/optimizer/src/utils.rs:
##########
@@ -82,6 +119,199 @@ pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr])
-> LogicalPlan {
})
}
+/// Looks for correlating expressions: equality expressions with one field
from the subquery, and
+/// one not in the subquery (closed upon from outer scope)
+///
+/// # Arguments
+///
+/// * `exprs` - List of expressions that may or may not be joins
+/// * `fields` - HashSet of fully qualified (table.col) fields in subquery
schema
+///
+/// # Return value
+///
+/// Tuple of (expressions containing joins, remaining non-join expressions)
+pub fn find_join_exprs(
+ exprs: Vec<&Expr>,
+ schema: &DFSchemaRef,
+) -> Result<(Vec<Expr>, Vec<Expr>)> {
+ let fields: HashSet<_> = schema
+ .fields()
+ .iter()
+ .map(|it| it.qualified_name())
+ .collect();
+
+ let mut joins = vec![];
+ let mut others = vec![];
+ for filter in exprs.iter() {
+ let (left, op, right) = match filter {
+ Expr::BinaryExpr { left, op, right } => (*left.clone(), *op,
*right.clone()),
+ _ => {
+ others.push((*filter).clone());
+ continue;
+ }
+ };
+ let left = match left {
+ Expr::Column(c) => c,
+ _ => {
+ others.push((*filter).clone());
+ continue;
+ }
+ };
+ let right = match right {
+ Expr::Column(c) => c,
+ _ => {
+ others.push((*filter).clone());
+ continue;
+ }
+ };
+ if fields.contains(&left.flat_name()) &&
fields.contains(&right.flat_name()) {
+ others.push((*filter).clone());
+ continue; // both columns present (none closed-upon)
+ }
+ if !fields.contains(&left.flat_name()) &&
!fields.contains(&right.flat_name()) {
+ others.push((*filter).clone());
+ continue; // neither column present (syntax error?)
+ }
+ match op {
+ Operator::Eq => {}
+ Operator::NotEq => {}
+ _ => {
+ plan_err!(format!("can't optimize {} column comparison", op))?;
+ }
+ }
+
+ joins.push((*filter).clone())
+ }
+
+ Ok((joins, others))
+}
+
+/// Extracts correlating columns from expressions
+///
+/// # Arguments
+///
+/// * `exprs` - List of expressions that correlate a subquery to an outer scope
+/// * `fields` - HashSet of fully qualified (table.col) fields in subquery
schema
+///
+/// # Return value
+///
+/// Tuple of tuples ((outer-scope cols, subquery cols), non-equal expressions)
+pub fn exprs_to_join_cols(
+ exprs: &[Expr],
+ schema: &DFSchemaRef,
+ include_negated: bool,
+) -> Result<(Vec<Column>, Vec<Column>, Option<Expr>)> {
+ let fields: HashSet<_> = schema
+ .fields()
+ .iter()
+ .map(|it| it.qualified_name())
+ .collect();
+
+ let mut joins: Vec<(String, String)> = vec![];
+ let mut others: Vec<Expr> = vec![];
+ for filter in exprs.iter() {
+ let (left, op, right) = match filter {
+ Expr::BinaryExpr { left, op, right } => (*left.clone(), *op,
*right.clone()),
+ _ => plan_err!("Invalid expression!".to_string())?,
+ };
+ match op {
+ Operator::Eq => {}
+ Operator::NotEq => {
+ if !include_negated {
+ others.push((*filter).clone());
+ continue;
+ }
+ }
+ _ => plan_err!("Invalid expression!".to_string())?,
+ }
+ let left = match left {
+ Expr::Column(c) => c,
+ _ => plan_err!("Invalid expression!".to_string())?,
+ };
+ let right = match right {
+ Expr::Column(c) => c,
+ _ => plan_err!("Invalid expression!".to_string())?,
+ };
+ let sorted = if fields.contains(&left.flat_name()) {
+ (right.flat_name(), left.flat_name())
+ } else {
+ (left.flat_name(), right.flat_name())
+ };
+ joins.push(sorted);
+ }
+
+ let (left_cols, right_cols): (Vec<_>, Vec<_>) = joins
+ .into_iter()
+ .map(|(l, r)| (Column::from(l.as_str()), Column::from(r.as_str())))
+ .unzip();
+ let pred = combine_filters(&others);
+
+ Ok((left_cols, right_cols, pred))
+}
+
+/// Returns the first (and only) element in a slice, or an error
+///
+/// # Arguments
+///
+/// * `slice` - The slice to extract from
+///
+/// # Return value
+///
+/// The first element, or an error
+pub fn only_or_err<T>(slice: &[T]) -> Result<&T> {
+ match slice {
+ [it] => Ok(it),
+ [] => Err(DataFusionError::Plan("Empty slice!".to_string())),
Review Comment:
This could be user facing in cases like:
```
select * from customer where balance < (select subtotal, total from orders
where XXX)
```
Which would result in the error:
```
exactly one expression should be projected at
decorrelate_scalar_subquery.rs:168 caused by More than one item found!
```
(updated inner error on my last commit)
If there's disagreement I can break this PR in two and return a generic
error for now, and a separate PR with the context added back in. In the future
I will try harder to keep PRs smaller.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]