[ 
https://issues.apache.org/jira/browse/DRILL-2236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jinfeng Ni updated DRILL-2236:
------------------------------
    Attachment: 0001-DRILL-2236-Optimize-hash-inner-join-by-swapping-inpu.patch

> Optimize hash inner join by swapping inputs based on row count comparison. 
> ---------------------------------------------------------------------------
>
>                 Key: DRILL-2236
>                 URL: https://issues.apache.org/jira/browse/DRILL-2236
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Query Planning & Optimization
>            Reporter: Jinfeng Ni
>            Assignee: Jinfeng Ni
>         Attachments: 
> 0001-DRILL-2236-Optimize-hash-inner-join-by-swapping-inpu.patch, 
> 0001-DRILL-2236-Optimize-hash-inner-join-by-swapping-inpu.patch
>
>
> Currently, Drill's planner does not consider all the possible join order 
> sequence during the planning phase, because one particular optimizer rule 
> (SwapJoinrule) is not enabled.  The reason of not enabling this rule is 
> because it would increase the planning time significantly otherwise.
> This means that the join sequence for some queries might not be optimal;  the 
> sequence in the FROM clause would impact what the final join sequence the 
> planner would get. For example, 
> {code}
> select c.c_custkey, c.c_name, n.n_name 
> from nation n, customer c 
> where n.n_nationkey = c.c_nationkey;
> {code}
> The "nation" table contains 25 rows, while "customer" table contains 1.5 
> million rows.  The optimal plan should put "customer" on the left side of 
> hash inner join, and "nation" on the right side, since hash table is built on 
> right side, and we would like to have hash table built on smaller dataset. 
> {code}
> select count(*) from customer;
> +------------+
> | EXPR$0 |
> +------------+
> | 1500000 |
> +------------+
> select count(*) from nation;
> +------------+
> | EXPR$0 |
> +------------+
> | 25 |
> +------------+
> {code}
> However, currently Drill planner will get the following join sequence : 
> NATION --> CUSTOMER.
> {code}
> 00-01      Project(c_custkey=[$0], c_name=[$1], n_name=[$2])
> 00-02        Project(c_custkey=[$3], c_name=[$4], n_name=[$1])
> 00-03          HashJoin(condition=[=($0, $2)], joinType=[inner])
> 00-05            Project(n_nationkey=[$1], n_name=[$0])
> 00-06              Scan(groupscan=[ParquetGroupScan 
> [entries=[ReadEntryWithPath 
> [path=file:/Users/jni/work/data/tpch-sf10/nation]], 
> selectionRoot=/Users/jni/work/data/tpch-sf10/nation, numFiles=1, 
> columns=[`n_nationkey`, `n_name`]]])
> 00-04            Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath 
> [path=file:/Users/jni/work/data/tpch-sf10/customer]], 
> selectionRoot=/Users/jni/work/data/tpch-sf10/customer, numFiles=1, 
> columns=[`c_nationkey`, `c_custkey`, `c_name`]]])
> {code}
> Notice in the above plan, LEFT is "nation" table, while RIGHT is "customer" 
> table. 
> Before we resolve the increased planning time related to "SwapJoinRule", as a 
> workaround for now, I would like to propose that we swap the inputs for hash 
> inner join, after the planner finishes the planning. That is, when we build 
> the physical plan to be run on Drill's execution engine, we swap the inputs 
> for hash inner join physical operators, based on row count comparison. 
> The proposed workaround could cause performance regression for some queries, 
> in particularly because the estimated row count is not accurate (especially 
> after Filter / Join / Aggregation) due to lack of complete statistics. 
> To remedy that regression risk, we will add a new planner option for this 
> feature, so that user could turn on/off this feature, if they see performance 
> regression. 
> With this feature enabled, the above query will get the plan like :
> {code}
> 00-01      Project(c_custkey=[$0], c_name=[$1], n_name=[$2])
> 00-02        Project(c_custkey=[$3], c_name=[$4], n_name=[$1])
> 00-03          HashJoin(condition=[=($0, $2)], joinType=[inner])
> 00-04            Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath 
> [path=file:/Users/jni/work/data/tpch-sf10/customer]], 
> selectionRoot=/Users/jni/work/data/tpch-sf10/customer, numFiles=1, 
> columns=[`c_nationkey`, `c_custkey`, `c_name`]]])
> 00-05            Project(n_nationkey=[$1], n_name=[$0])
> 00-06              Scan(groupscan=[ParquetGroupScan 
> [entries=[ReadEntryWithPath 
> [path=file:/Users/jni/work/data/tpch-sf10/nation]], 
> selectionRoot=/Users/jni/work/data/tpch-sf10/nation, numFiles=1, 
> columns=[`n_nationkey`, `n_name`]]])
> {code}
> Please note that once we resolve the issue of SwapJoinRule, we should  remove 
> this workaround solution in Drill's code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to