duongcongtoai opened a new pull request, #21555: URL: https://github.com/apache/datafusion/pull/21555
## Which issue does this PR close? https://github.com/apache/datafusion/issues/16059 has completed, but the result is not persisted into datafusion. Some sqllogic test is also failing => This PR brings back all the changes inside the GSOC work and a complete POC. I'll try to break it down into smaller components and bring them into datafusion: ## Prerequiste - logical/physical operator for delimget - left singlejoin support ## Major part - implement DependentJoinRewriter - implement DependentJoinDecorrelator - implement Deliminator <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> ## Rationale for this change ### DependentJoin As per paper 2 > We split the algorithm into three parts: First, a preparatory phase that identifies all non-trivial dependent joins and annotates them with information that the main algorithm needs. Second, the logic to eliminate dependent joins, which will be called for all non-trivial dependent joins in top-to-bottom order and which is the main algorithm, and third, the unnesting rules for individual operators. Note that we do not include a formalization of this approach due to space constraints, formal definitions and a proof of correctness can be found in a technical report [Ne24]. There is a need to detect non-trivial dependent join (i.e dependent joins where the RHS accesses columns provided by the LHS) and annotate metadata before decorrelation begins. The paper suggest the usage of index algebra, however, for now we go forward without such data structure "Using an indexed algebra is ideal for this phase because it can do every LCA computation in 𝑂(log 𝑛) without any additional data structures. If the DBMS does not support that functionality, the same information can be computed with worse asymptotic complexity by keeping track of the column sets that are available in the different parts of the tree." Given this query ``` SELECT * FROM customer WHERE c_mktsegment='AUTOMOBILE' AND (SELECT COUNT(*) FROM orders WHERE o_custkey=c_custkey AND (SELECT SUM(l_extendedprice) FROM lineitem WHERE l_orderkey=o_orderkey )>300000 )>5 ``` According to the paper, this query is constructed into this trees with some additional annotations <img width="826" height="394" alt="image" src="https://github.com/user-attachments/assets/e79531c3-bee5-49f2-9add-fe5cdba3bbe2" /> "we consider every column access and compute the lowest common ancestor (LCA) of the operator o1 that accesses the column and the operator o2 that provides the column. If the LCA is not o1 , it must be a dependent join 3 and we annotate 3 with the fact that o1 is accessing the left-hand side of 3" Explanation: - Node9 is a filter with expr `T3.a=T1.a`, with T1.a is a column provided by some operator/relation outside its current context (in Datafusion we call them OuterRefColumn). Now we need to annotate this access with extra information: - Where should the dependent join node for this access be (i.e Node5, Node4 or Node1) - Let's say we already detect D is the dependent join node, then which descendant of this node "provides" the column T1.a. In this case Node2 (not Node3) is the provider of column T1.a We introduce a new struct in Datafusion to contain these annotations ```rust pub struct CorrelatedColumnInfo { pub col: Column, // TODO: is data_type necessary? pub field: FieldRef, pub depth: usize, // the reference to the delim scan node map // this is usedful to construct delim scan operator later pub delim_scan_node_id: usize, } ``` > To implement this annotation similar to the paper, in Datafusion we use the tree traversal API on the root `LogicalPlan` node, specifically method `rewrite_with_subqueries`. This method ensure all the RHS of any potential dependent join node are visited first ``` macro_rules! handle_transform_recursion { ($F_DOWN:expr, $F_CHILD:expr, $F_UP:expr) => {{ $F_DOWN? .transform_children(|n| { n.map_subqueries($F_CHILD)? .transform_sibling(|n| n.map_children($F_CHILD)) })? .transform_parent($F_UP) }}; } ``` The goal of this traversal is to link between the accessor and the providers. There will be intermediate state persisted during the traversal ```rust pub struct DependentJoinRewriter { // each logical plan traversal will assign it a integer id current_id: usize, subquery_depth: usize, // each newly visted `LogicalPlan` is inserted inside this map for tracking nodes: IndexMap<usize, Node>, // all the node ids from root to the current node // this is mutated duriing traversal stack: Vec<usize>, // track for each column, the nodes/logical plan that reference to its within the tree, // but not yet resolved // during the tree traversal these entries will be resolved // by matching the column provider and accessor unresolved_outer_ref_columns: IndexMap<Column, Vec<ColumnAccess>>, // used to generate unique alias for subqueries appearing in the logical plan alias_generator: Arc<AliasGenerator>, // this is used to decorrelation optimizor later // to construct delim scan node. pub domain_columns_provider_nodes: IndexMap<usize, LogicalPlan>, } ``` let's walk through the logical plan from the paper above. Execution sequences happens like below ```mermaid flowchart TD A["(1)Projection: ..."] -->|"f_down(1)"| B["(2): customer.c_mktsegment = 'AUTOMOBILE' AND (subquery) > 5"] B -->|"f_up(1)"| A B --> |"f_down(2)"| D["(3)Subquery #1"] D --> |"f_up(2)"| B D --> E["(4)Projection: count(1)"] E --> D E --> F["(5)Aggregate: count(*)"] F --> G["(6)Filter: orders.o_custkey = outer_ref(customer.c_custkey) AND (subquery) > 300000"] G --> |"f_down(6): mark_outer_column_access(customer.c_custkey)"| I["(7)Subquery #2"] I --> J["(8)Projection: sum(lineitem.l_extendedprice)"] J --> K["(9)Aggregate: sum(lineitem.l_extendedprice)"] K --> L["(10)Filter: lineitem.l_orderkey = outer_ref(orders.o_orderkey)"] L --> |"f_down(10): mark_outer_column_access(orders.o_orderkey)"| M["(11)TableScan: lineitem"] B --> |"f_down(2):check_matching_column_provider -> resolve column access for customer.c_custkey"| C["(12)TableScan: customer"] G --> |"f_down(13):check_matching_column_provider -> resolve column access for orders.o_orderkey"| H["(13)TableScan: orders"] ``` Now pay attention to `f_down(6)` and `f_down(2)`. `f_down(6)` marks an appearance of `outer_ref(customer.c_custkey)`. The accessed stack will be [1,2,3,4,5,6]. `f_down(2)` marks the first logical plan that knows about the expression `customer.c_custkey` and will resolve the previous column access, when this happens the traversal stack is [1,2,12]. The LCA (lowest common ancestor) of the two stacks according to the algorithm is [2] and thus 2 should be converted into a dependent join logical plan later on. The same can be applied for the couple of `f_down(10)` and `f_down(13)`. Correlated subqueries are rewritten into dependent join nodes as followed ```mermaid flowchart TD A["Projection: customer.t3_id, customer.c_mktsegment, customer.c_custkey"] --> B["Projection: customer.t3_id, customer.c_mktsegment, customer.c_custkey"] B --> C["Filter: customer.c_mktsegment = 'AUTOMOBILE' AND __scalar_sq_2 > 5"] C --> D["DependentJoin depth=1 on customer.c_custkey"] D --> E["TableScan: customer"] D --> F["Projection: count(1)"] F --> G["Aggregate: count(*)"] G --> H["Projection: orders.t2_id, orders.o_custkey, orders.o_orderkey"] H --> I["Filter: orders.o_custkey = outer_ref(customer.c_custkey) AND __scalar_sq_1 > 300000"] I --> J["DependentJoin depth=2 on orders.o_orderkey"] J --> K["TableScan: orders"] J --> L["Projection: sum(lineitem.l_extendedprice)"] L --> M["Aggregate: sum(lineitem.l_extendedprice)"] M --> N["Filter: lineitem.l_orderkey = outer_ref(orders.o_orderkey)"] N --> O["TableScan: lineitem"] ``` [NOTE TO SELF]: looks like the provider of column c_custkey was not correctly detected (in paper it should be the filter node above table scan of customer), but in current implementation it is the table scan. This difference will yield significant performance on delim scan later on The collections of nodes that provides the columns will be persisted and passed to the next round of decorrelation optimizor (to construct delim_get). More details on this [TBU] -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
