irenjj commented on PR #17110:
URL: https://github.com/apache/datafusion/pull/17110#issuecomment-3172677607
# Decorrelate Subqueries
Complete decorrelation consists of three parts:
+ rewrite dependent join: rewrite_dependent_join.rs
+ decorrelate dependent join: decorrelate_dependent_join.rs
+ eliminate duplicated delim scan: deliminator.rs
## 1. Rewrite Dependent Join
Rewrite dependent join will rewrite plan_nodes containing subquery
expressions into dependent joins, to facilitate subsequent processing of
dependent joins, with the ultimate goal of eliminating dependent joins.
DependentJoinRewriter structure:
```rust
pub struct DependentJoinRewriter {
// ID of the currently traversed node, incremented on each f_down(),
decremented on each f_up().
current_id: usize,
// Depth of the current DependentJoin,
// incremented by 1 when encountering nested DependentJoin in f_down(),
// decremented by 1 when processing DependentJoin in f_up().
subquery_depth: usize,
// current_id -> Node
// Maintains an IndexMap structure that stores all plan_nodes currently
traversed.
nodes: IndexMap<usize, Node>,
// all the node ids from root to the current node
stack: Vec<usize>,
// track for each column, the nodes plan that reference to its within
the tree
all_outer_ref_columns: IndexMap<Column, Vec<ColumnAccess>>,
}
```
It performs pre-order traversal of the operator tree, then rewrites the
traversed operators through f_down() and f_up(). It assigns a unique current_id
to each plan_node when traversing.
### 1.1 f_down
f_down checks whether there are Subqueries in the expressions of each
plan_node, and marks plan_nodes containing Subqueries as is_dependent_join_node.
Subsequently, after traversing to the plan_node of the Subquery, it obtains
the Subquery type.
## 2. Decorrelate Dependent Join
Decorrelate dependent join performs decorrelation on the results generated
by rewrite dependent join.
Decorrelate dependent join is basically implemented according to the code in
the paper, but with slight differences. The entry function is
`DecorrelateDepednentJoin::rewrite()`, which constructs a
DependentJoinDecorrelator in rewrite() and calls the decorrelate() function.
In the decorrelate() function, it performs pre-order traversal of the
plan_node tree until it finds a DependentJoin:
```rust
fn decorrelate(plan: &LogicalPlan, ...) -> Result<LogicalPlan> {
if let LogicalPlan::DependentJoin(djoin) = plan {
// handle dependent join
} else {
Ok(plan
.clone()
.map_children(|n| Ok(Transformed::yes(self.decorrelate(&n, true,
0)?)))?
.data)
}
}
```
Processing DependentJoin consists of several major steps:
+ Handle Left Child
+ Handle Right Child
+ Construct Join Condition
DependentJoinDecorrelator structure:
```rust
pub struct DependentJoinDecorrelator {
// All correlated column information for the current depth.
domains: IndexSet<CorrelatedColumnInfo>,
// outer table column -> delim scan table column
correlated_map: IndexMap<Column, Column>,
// Whether the current DependentJoin is a nested DependentJoin
is_initial: bool,
// all correlated columns in current depth and downward
correlated_columns: Vec<CorrelatedColumnInfo>,
// check if we have to replace any COUNT aggregates into "CASE WHEN X IS
NULL THEN 0 ELSE COUNT END"
// store a mapping between a expr and its original index in the logplan
output
replacement_map: IndexMap<String, Expr>,
// if during the top down traversal, we observe any operator that
requires
// joining all rows from the lhs with nullable rows on the rhs
any_join: bool,
delim_scan_id: usize,
// All columns of the previously constructed delim scan.
dscan_cols: Vec<Column>,
}
```
### 2.1. Handle Left Child
First handle the left child, divided into two cases:
+ The currently processed DependentJoin is not a nested DependentJoin.
+ The currently processed DependentJoin is a nested DependentJoin.
Then based on the mapping relationship in self.correlated_map, rewrite outer
table columns to delim scan table columns.
#### 2.1.1. Non-Nesting DependentJoin
For non-nested DependentJoin, it recursively calls decorrelate(left), with
the purpose of inheriting the information of the current
DependentJoinDecorrelator. Since it's a non-nesting DependentJoin, no changes
need to be made to the correlated column information.
#### 2.1.2. Nesting DependentJoin
For Nesting DependentJoin, it determines whether the LHS has correlated
columns at the current depth (corresponding to accessing in the paper). The
logic for determining the existence of correlated columns is in
detect_correlated_expressions, which traverses the plan, finds all outer ref
expressions for each plan_node, and checks whether they are in the domains of
the current DecorrelateDependentJoin. The domains record all correlated columns
of the current depth.
+ If the LHS has no correlated columns at the current depth, a new
DependentJoinDecorrelator can be constructed to decorrelate the left child,
because subsequent LHS processing is independent of the information (mainly
correlated columns) in the current DependentJoinDecorrelator.
+ If the LHS has correlated columns at the current depth, the DependentJoin
in the LHS needs to be pushed down to eliminate the DependentJoin of the LHS at
the current depth.
### 2.2. Handle Right Child
Processing the RHS constructs a new DependentJoinDecorrelator. The domains
of the new DependentJoinDecorrelator are parent correlated columns of current
level + dependent join of current level. By inheriting the correlated columns
of the parent node, it handles multi-level dependent joins.
Since the RHS of DependentJoin must have correlated columns, the right child
is directly processed through push_down_dependent_join.
### 2.3. Join Condition
In delim_join_conditions, different join types are constructed based on the
subquery type in DependentJoin, and then Join Conditions of correlated column
IsNotDistinctFrom delim scan column are constructed.
### 2.4 Unnest Function
All Unnest logic is implemented in push_down_dependent_join, with different
processing for each type of plan_node.
## 3. Eliminate Duplicated DelimGet
The above two steps completed the decorrelation operation and introduced two
new logical operators `DelimJoin` and `DelimGet` to fully construct the logical
plan. At this stage, redundant `DelimJoin` and `DelimGet` will be eliminated.
Construct a new use case to illustrate how the work at this stage is carried
out:
```sql
SELECT s.name
FROM students s
WHERE s.grade > (
-- subquery 1
SELECT AVG(e1.score)
FROM exams e1
WHERE e1.sid = s.id
AND e1.score > (
-- subquery 2
SELECT MIN(e2.score)
FROM exams e2
WHERE e2.sid = s.id
AND e2.type = 'final'
)
);
```
After decorrelating the above SQL, the result is generated:
```text
Projection
|
Filter
|
DelimJoin1
/ \
Get Projection
|
ComparisonJoin
/ \
DelimGet1 Aggregate
|
Filter
|
DelimJoin2
/ \
CrossProduct Projection
/ \ |
Get DelimGet2 ComparisonJoin
/ \
DelimGet3 Aggregate
|
Filter
|
CrossProduct
/ \
Get DelimGet4
```
First, all DelimJoins and DelimGets under DelimJoins are collected as
candidates. For the joins array under each candidate, they are sorted by depth
from largest to smallest, finding the DelimGet with the deepest depth.
```text
candidate1: {
DelimJoin2,
joins: [DelimGet4(depth=3), DelimGet3(depth=1)],
}
candidate2: {
DelimJoin1,
joins: [DelimGet2(depth=5), DelimGet1(depth=1)],
}
Projection
|
Filter
|
DelimJoin1
/ ^ \
Get | Projection
| |
| ComparisonJoin
| / \
| DelimGet1 Aggregate
| | |
+------+ Filter
| |
| DelimJoin2
| / ^ \
| CrossProduct | Projection
| / \ | |
| Get DelimGet2 | ComparisonJoin
| | | / \
+ ----------------+ | DelimGet3 Aggregate
| | |
+------+ Filter
| |
| CrossProduct
| / \
| Get DelimGet4
| |
+--------------------------+
```
Process the deduplication of DelimGet under each candidate separately.
Taking candidate1 as an example:
```text
candidate1: {
DelimJoin2,
joins: [DelimGet4(depth=3), DelimGet3(depth=1)],
}
```
The situation is divided into two categories:
1. If DelimJoin has selection conditions (filter/where clauses), DelimGet
can be retained. This is because selection conditions can greatly reduce the
amount of data in the right subtree of DelimJoin. This is an optimization
trade-off that sacrifices certain deduplication optimization opportunities
(retaining one DelimGet) but gains better query performance (through early
filtering of selection conditions). This is why the code deliberately retains
the deepest join with DelimGet when there are selection conditions. In this
case:
+ Retaining the deepest join with DelimGet is meaningful because this
selection condition can filter out a large amount of data early
+ Removing other shallower joins is still safe because the deepest one
has already guaranteed data correctness
+ It also avoids duplicate join operations
2. For cases where DelimJoin has no selection conditions, it's necessary to
continue determining whether DelimGet can be removed. The ultimate goal is to
remove all correlated columns.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]