[
https://issues.apache.org/jira/browse/DRILL-6735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16607732#comment-16607732
]
Boaz Ben-Zvi commented on DRILL-6735:
-------------------------------------
Note for the planner side: While this change is good for most cases, there
still could be few cases where the current implementation would work better.
Mainly when very many (join key) duplicates are expected on the build side.
If the planner can predict such cases (based on statistics), then the
planner should decide which of the two options to use.
> Enhance the Hash-Join Operator to perform Semi and Anti-Semi joins
> ------------------------------------------------------------------
>
> Key: DRILL-6735
> URL: https://issues.apache.org/jira/browse/DRILL-6735
> Project: Apache Drill
> Issue Type: Improvement
> Components: Execution - Relational Operators, Query Planning &
> Optimization
> Affects Versions: 1.14.0
> Reporter: Boaz Ben-Zvi
> Assignee: Boaz Ben-Zvi
> Priority: Major
> Fix For: 1.15.0
>
>
> Currently Drill implements Semi-Join (see DRILL-402) by using a regular join,
> with a DISTINCT operator under the build upstream side to eliminate
> duplicates. Typically a physical plan for the Semi uses a hash-join, with a
> hash-aggr performing the DISTINCT (see example below).
> This effectively builds the same hash table(s) twice - a big waste of
> time and memory.
> +Improvement+: Eliminate the Hash-Aggr from the plan, and notify the
> Hash-Join to perform a Semi-join. The HJ then would just skip the duplicates
> in its hash table(s), thus performing a Semi -Join.
> Example:
> {code}
> select c.c_first_name, c.c_last_name from dfs.`/data/json/s1/customer` c
> where c.c_customer_sk in (select s.ss_customer_sk from
> dfs.`/data/json/s1/store_sales` s) limit 4;
> {code}
> And the result plan (see the HJ at 01-03, and the Hash Agg at 01-05):
> {code}
> 00-00 Screen : rowType = RecordType(ANY c_first_name, ANY c_last_name):
> rowcount = 4.0, cumulative cost = {4693752.96 rows, 2.3095576720000003E7 cpu,
> 0.0 io, 2.1598011392E9 network, 3.5895861760000005E7 memory}, id = 1320
> 00-01 Project(c_first_name=[$1], c_last_name=[$2]) : rowType =
> RecordType(ANY c_first_name, ANY c_last_name): rowcount = 4.0, cumulative
> cost = {4693752.56 rows, 2.3095576320000004E7 cpu, 0.0 io, 2.1598011392E9
> network, 3.5895861760000005E7 memory}, id = 1319
> 00-02 Project(c_customer_sk=[$1], c_first_name=[$2], c_last_name=[$3],
> ss_customer_sk=[$0]) : rowType = RecordType(ANY c_customer_sk, ANY
> c_first_name, ANY c_last_name, ANY ss_customer_sk): rowcount = 4.0,
> cumulative cost = {4693748.56 rows, 2.3095568320000004E7 cpu, 0.0 io,
> 2.1598011392E9 network, 3.5895861760000005E7 memory}, id = 1318
> 00-03 SelectionVectorRemover : rowType = RecordType(ANY
> ss_customer_sk, ANY c_customer_sk, ANY c_first_name, ANY c_last_name):
> rowcount = 4.0, cumulative cost = {4693744.56 rows, 2.3095552320000004E7 cpu,
> 0.0 io, 2.1598011392E9 network, 3.5895861760000005E7 memory}, id = 1317
> 00-04 Limit(fetch=[4]) : rowType = RecordType(ANY ss_customer_sk,
> ANY c_customer_sk, ANY c_first_name, ANY c_last_name): rowcount = 4.0,
> cumulative cost = {4693740.56 rows, 2.3095548320000004E7 cpu, 0.0 io,
> 2.1598011392E9 network, 3.5895861760000005E7 memory}, id = 1316
> 00-05 UnionExchange : rowType = RecordType(ANY ss_customer_sk,
> ANY c_customer_sk, ANY c_first_name, ANY c_last_name): rowcount = 4.0,
> cumulative cost = {4693736.56 rows, 2.3095532320000004E7 cpu, 0.0 io,
> 2.1598011392E9 network, 3.5895861760000005E7 memory}, id = 1315
> 01-01 SelectionVectorRemover : rowType = RecordType(ANY
> ss_customer_sk, ANY c_customer_sk, ANY c_first_name, ANY c_last_name):
> rowcount = 4.0, cumulative cost = {4693732.56 rows, 2.3095500320000004E7 cpu,
> 0.0 io, 2.1597356032E9 network, 3.5895861760000005E7 memory}, id = 1314
> 01-02 Limit(fetch=[4]) : rowType = RecordType(ANY
> ss_customer_sk, ANY c_customer_sk, ANY c_first_name, ANY c_last_name):
> rowcount = 4.0, cumulative cost = {4693728.56 rows, 2.3095496320000004E7 cpu,
> 0.0 io, 2.1597356032E9 network, 3.5895861760000005E7 memory}, id = 1313
> 01-03 HashJoin(condition=[=($1, $0)], joinType=[inner]) :
> rowType = RecordType(ANY ss_customer_sk, ANY c_customer_sk, ANY c_first_name,
> ANY c_last_name): rowcount = 90182.8, cumulative cost = {4693724.56 rows,
> 2.3095480320000004E7 cpu, 0.0 io, 2.1597356032E9 network,
> 3.5895861760000005E7 memory}, id = 1312
> 01-05 HashAgg(group=[{0}]) : rowType = RecordType(ANY
> ss_customer_sk): rowcount = 18036.56, cumulative cost = {4509140.0 rows,
> 2.1824237600000005E7 cpu, 0.0 io, 1.4775549952E9 network,
> 3.4918780160000004E7 memory}, id = 1309
> 01-06 Project(ss_customer_sk=[$0]) : rowType =
> RecordType(ANY ss_customer_sk): rowcount = 180365.6, cumulative cost =
> {4328774.4 rows, 2.0381312800000004E7 cpu, 0.0 io, 1.4775549952E9 network,
> 3.17443456E7 memory}, id = 1308
> 01-07 HashToRandomExchange(dist0=[[$0]]) : rowType =
> RecordType(ANY ss_customer_sk, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount =
> 180365.6, cumulative cost = {4148408.8000000003 rows, 2.0200947200000003E7
> cpu, 0.0 io, 1.4775549952E9 network, 3.17443456E7 memory}, id = 1307
> 02-01 UnorderedMuxExchange : rowType =
> RecordType(ANY ss_customer_sk, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount =
> 180365.6, cumulative cost = {3968043.2 rows, 1.73150976E7 cpu, 0.0 io, 0.0
> network, 3.17443456E7 memory}, id = 1306
> 04-01 Project(ss_customer_sk=[$0],
> E_X_P_R_H_A_S_H_F_I_E_L_D=[hash32AsDouble($0, 1301011)]) : rowType =
> RecordType(ANY ss_customer_sk, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount =
> 180365.6, cumulative cost = {3787677.6 rows, 1.7134732E7 cpu, 0.0 io, 0.0
> network, 3.17443456E7 memory}, id = 1305
> 04-02 HashAgg(group=[{0}]) : rowType =
> RecordType(ANY ss_customer_sk): rowcount = 180365.6, cumulative cost =
> {3607312.0 rows, 1.6232904E7 cpu, 0.0 io, 0.0 network, 3.17443456E7 memory},
> id = 1304
> 04-03 Scan(groupscan=[EasyGroupScan
> [selectionRoot=file:/data/json/s1/store_sales, numFiles=1,
> columns=[`ss_customer_sk`],
> files=[file:/data/json/s1/store_sales/0_0_0.json]]]) : rowType =
> RecordType(ANY ss_customer_sk): rowcount = 1803656.0, cumulative cost =
> {1803656.0 rows, 1803656.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1303
> 01-04 BroadcastExchange : rowType = RecordType(ANY
> c_customer_sk, ANY c_first_name, ANY c_last_name): rowcount = 55516.0,
> cumulative cost = {111032.0 rows, 610676.0 cpu, 0.0 io, 6.82180608E8 network,
> 0.0 memory}, id = 1311
> 03-01 Scan(groupscan=[EasyGroupScan
> [selectionRoot=file:/data/json/s1/customer, numFiles=1,
> columns=[`c_customer_sk`, `c_first_name`, `c_last_name`],
> files=[file:/data/json/s1/customer/0_0_0.json]]]) : rowType = RecordType(ANY
> c_customer_sk, ANY c_first_name, ANY c_last_name): rowcount = 55516.0,
> cumulative cost = {55516.0 rows, 166548.0 cpu, 0.0 io, 0.0 network, 0.0
> memory}, id = 1310
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)