mingmwang commented on code in PR #5345:
URL: https://github.com/apache/arrow-datafusion/pull/5345#discussion_r1129015636
##########
datafusion/optimizer/src/decorrelate_where_exists.rs:
##########
@@ -142,69 +144,70 @@ fn optimize_exists(
query_info: &SubqueryInfo,
outer_input: &LogicalPlan,
) -> Result<Option<LogicalPlan>> {
- let maybe_subqury_filter = match query_info.query.subquery.as_ref() {
- LogicalPlan::Distinct(subqry_distinct) => match
subqry_distinct.input.as_ref() {
- LogicalPlan::Projection(subqry_proj) => &subqry_proj.input,
- _ => {
- return Ok(None);
- }
- },
- LogicalPlan::Projection(subqry_proj) => &subqry_proj.input,
- _ => {
- // Subquery currently only supports distinct or projection
- return Ok(None);
- }
- }
- .as_ref();
+ let subquery = query_info.query.subquery.as_ref();
+ if let Some((join_filter, optimized_subquery)) =
optimize_subquery(subquery)? {
+ // join our sub query into the main plan
+ let join_type = match query_info.negated {
+ true => JoinType::LeftAnti,
+ false => JoinType::LeftSemi,
+ };
+
+ let new_plan = LogicalPlanBuilder::from(outer_input.clone())
+ .join(
+ optimized_subquery,
+ join_type,
+ (Vec::<Column>::new(), Vec::<Column>::new()),
+ Some(join_filter),
+ )?
+ .build()?;
- // extract join filters
- let (join_filters, subquery_input) =
extract_join_filters(maybe_subqury_filter)?;
- // cannot optimize non-correlated subquery
- if join_filters.is_empty() {
- return Ok(None);
+ Ok(Some(new_plan))
+ } else {
+ Ok(None)
}
-
- let input_schema = subquery_input.schema();
- let subquery_cols: BTreeSet<Column> =
- join_filters
- .iter()
- .try_fold(BTreeSet::new(), |mut cols, expr| {
- let using_cols: Vec<Column> = expr
- .to_columns()?
+}
+/// Optimize the subquery and extract the possible join filter.
+/// This function can't optimize non-correlated subquery, and will return None.
+fn optimize_subquery(subquery: &LogicalPlan) -> Result<Option<(Expr,
LogicalPlan)>> {
+ match subquery {
+ LogicalPlan::Distinct(subqry_distinct) => {
+ let distinct_input = &subqry_distinct.input;
+ let optimized_plan =
+ optimize_subquery(distinct_input)?.map(|(filters, right)| {
+ (
+ filters,
+ LogicalPlan::Distinct(Distinct {
+ input: Arc::new(right),
+ }),
+ )
+ });
+ Ok(optimized_plan)
+ }
Review Comment:
> I am not sure the behavior of `Distinct` is correct, so do not handle
`Aggregate` here.
I will take a closer look at this PR tomorrow. If you do not know how to
handle `Aggregate` here, you can just leave it here and only handle the
`Distinct` case.
A simplest approach is checking whether there are out reference used by
`Aggregate` expressions, if there are, return Err and else add the correlated
columns to be part of the group by columns. (Only allow out reference columns
referred in `Filter` expressions or `Join` expressions, this will limit the
supported cases of `Subqueries`, but more safe.)
For example if the original inner aggregate is `group by inner_a` and there
is correlation condition like `outer_b = inner_b`, them add the `inner_b` to be
part of the group by conditions. It is not a perfect solution and sometimes
might cause some bug.
SparkSQL has the similar logic in the rule `pullOutCorrelatedPredicates`
```scala
case a @ Aggregate(grouping, expressions, child) =>
val referencesToAdd = missingReferences(a)
if (referencesToAdd.nonEmpty) {
Aggregate(grouping ++ referencesToAdd, expressions ++
referencesToAdd, child)
} else {
a
}
```
The latest SparkSQL's implementation also has bug here (need to differ the
original Aggregate is Scalar Aggregate or Vector Aggregate)
SQL to reproduce the bug, you can have a try on both PostgreSQL and SparkSQL
```
CREATE TABLE t1 (id INT,name String);
CREATE TABLE t2 (id INT,name String);
insert into t1 values (11, "a"), (11, "a"), (22, "b"), (33, "c"), (44, "d"),
(null, "e");
insert into t2 values (111, "z"), (111, "z"), (222, "y"), (444, "x"), (555,
"w"), (null, "v");
```
```
-- Should output all t1
SELECT t1.id, t1.name FROM t1 WHERE EXISTS (select 0);
-- Should output all t1
SELECT t1.id, t1.name FROM t1 WHERE EXISTS (SELECT count(*) FROM t2 WHERE
t2.id = t1.id);
-- Should output all t1
SELECT t1.id, t1.name FROM t1 WHERE EXISTS (SELECT count(*) FROM t2 WHERE
t2.id = t1.id having count(*) = 0);
```
PostgreSQL does not support decorrelate `Subqueries` which include
`Aggregates`. SparkSQL supports some cases but not all the cases and the
implementation has bug.
I'm going to implement a general subquery decorrelation rule based on the
two well known papers.
https://github.com/apache/arrow-datafusion/issues/5492
Orthogonal Optimization of Subqueries and Aggregation
https://dl.acm.org/doi/10.1145/375663.375748
Unnesting Arbitrary Queries
https://cs.emis.de/LNI/Proceedings/Proceedings241/383.pdf
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]