LorrensP-2158466 commented on issue #17719: URL: https://github.com/apache/datafusion/issues/17719#issuecomment-3529349758
Alright, sorry for the delay, school is much busier than expected :). I've implemented the part about projections/filters/... between subsequent joins on top of @JanKaul PR. You can find it here: https://github.com/apache/datafusion/compare/main...LorrensP-2158466:datafusion:reorder_join I introduced a `WrappedJoin` struct which is used as the Edge data in the `QueryGraph`, this data structure contains the `Join` itself and also the operators that live between the join and it's parent join. When we construct a particular QueryGraph with the join order algorithm, we re-apply those wrappers on top of the new join. I don't have much time to heavily test it, so I just enabled `OptimizeProjectections` in the test of @JanKaul, this is the result: ```rust // the tables, second parameter is num_rows: let customer = scan_tpch_table_with_stats("customer", 150); let orders = scan_tpch_table_with_stats("orders", 1_500); let lineitem = scan_tpch_table_with_stats("lineitem", 6_000); ``` These are the plans printed out: ``` UnoptimizedPlan: Sort: orders.o_totalprice DESC NULLS FIRST Aggregate: groupBy=[[customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_totalprice]], aggr=[[sum(lineitem.l_quantity)]] Filter: orders.o_orderkey IN (<subquery>) Subquery: Projection: lineitem.l_orderkey Filter: sum(lineitem.l_quantity) > Int32(300) Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[sum(lineitem.l_quantity)]] TableScan: lineitem Inner Join: orders.o_orderkey = lineitem.l_orderkey Inner Join: customer.c_custkey = orders.o_custkey TableScan: customer TableScan: orders TableScan: lineitem After standard optimization: Limit: skip=0, fetch=100 Sort: orders.o_totalprice DESC NULLS FIRST Aggregate: groupBy=[[customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_totalprice]], aggr=[[sum(lineitem.l_quantity)]] LeftSemi Join: orders.o_orderkey = __correlated_sq_1.l_orderkey Projection: customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice, lineitem.l_quantity Inner Join: orders.o_orderkey = lineitem.l_orderkey Projection: customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice Inner Join: customer.c_custkey = orders.o_custkey TableScan: customer projection=[c_custkey, c_name] TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] TableScan: lineitem projection=[l_orderkey, l_quantity] SubqueryAlias: __correlated_sq_1 Filter: sum(lineitem.l_quantity) > Int32(300) Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[]] TableScan: lineitem projection=[l_orderkey] Optimized Plan with Join Ordering: Limit: skip=0, fetch=100 Sort: orders.o_totalprice DESC NULLS FIRST Aggregate: groupBy=[[customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_totalprice]], aggr=[[sum(lineitem.l_quantity)]] Projection: customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice, lineitem.l_quantity Inner Join: orders.o_orderkey = lineitem.l_orderkey Projection: customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice Inner Join: customer.c_custkey = orders.o_custkey RightSemi Join: __correlated_sq_1.l_orderkey = orders.o_orderkey SubqueryAlias: __correlated_sq_1 Filter: sum(lineitem.l_quantity) > Int32(300) Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[]] TableScan: lineitem projection=[l_orderkey] TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice] TableScan: customer projection=[c_custkey, c_name] TableScan: lineitem projection=[l_orderkey, l_quantity] ``` Seems to me that it works (don't know if it's optimal)? it's the same order as with `OptimizeProjections` turned off: ``` Optimized Plan: Limit: skip=0, fetch=100 Sort: orders.o_totalprice DESC NULLS FIRST Aggregate: groupBy=[[customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_totalprice]], aggr=[[sum(lineitem.l_quantity)]] Inner Join: orders.o_orderkey = lineitem.l_orderkey Inner Join: orders.o_custkey = customer.c_custkey RightSemi Join: __correlated_sq_1.l_orderkey = orders.o_orderkey SubqueryAlias: __correlated_sq_1 Projection: lineitem.l_orderkey Filter: sum(lineitem.l_quantity) > Int32(300) Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[sum(lineitem.l_quantity)]] TableScan: lineitem TableScan: orders TableScan: customer TableScan: lineitem ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
