Hanumath Rao Maduri created DRILL-6158:
------------------------------------------

             Summary: Create a mux operator for union exchange to enable two 
phase merging instead of foreman merging all the batches.
                 Key: DRILL-6158
                 URL: https://issues.apache.org/jira/browse/DRILL-6158
             Project: Apache Drill
          Issue Type: Bug
          Components: Query Planning & Optimization
    Affects Versions: 1.12.0
            Reporter: Hanumath Rao Maduri
            Assignee: Hanumath Rao Maduri
             Fix For: Future


Consider the following simple query

{code}
select zz1,zz2,a11 from dfs.tmp.viewtmp limit 100000 offset 10000000
{code}

The following plan is generated for this query
{code}
00-00    Screen : rowType = RecordType(ANY zz1, ANY zz2, ANY a11): rowcount = 
1.01E7, cumulative cost = {1.06048844E8 rows, 5.54015404E8 cpu, 0.0 io, 
1.56569100288E11 network, 4.64926176E7 memory}, id = 787
00-01      Project(zz1=[$0], zz2=[$1], a11=[$2]) : rowType = RecordType(ANY 
zz1, ANY zz2, ANY a11): rowcount = 1.01E7, cumulative cost = {1.05038844E8 
rows, 5.53005404E8 cpu, 0.0 io, 1.56569100288E11 network, 4.64926176E7 memory}, 
id = 786
00-02        SelectionVectorRemover : rowType = RecordType(ANY zz1, ANY zz2, 
ANY a11): rowcount = 1.01E7, cumulative cost = {1.05038844E8 rows, 5.53005404E8 
cpu, 0.0 io, 1.56569100288E11 network, 4.64926176E7 memory}, id = 785
00-03          Limit(offset=[10000000], fetch=[100000]) : rowType = 
RecordType(ANY zz1, ANY zz2, ANY a11): rowcount = 1.01E7, cumulative cost = 
{9.4938844E7 rows, 5.42905404E8 cpu, 0.0 io, 1.56569100288E11 network, 
4.64926176E7 memory}, id = 784
00-04            UnionExchange : rowType = RecordType(ANY zz1, ANY zz2, ANY 
a11): rowcount = 1.01E7, cumulative cost = {8.4838844E7 rows, 5.02505404E8 cpu, 
0.0 io, 1.56569100288E11 network, 4.64926176E7 memory}, id = 783
01-01              SelectionVectorRemover : rowType = RecordType(ANY zz1, ANY 
zz2, ANY a11): rowcount = 1.01E7, cumulative cost = {7.4738844E7 rows, 
4.21705404E8 cpu, 0.0 io, 3.2460300288E10 network, 4.64926176E7 memory}, id = 
782
01-02                Limit(fetch=[10100000]) : rowType = RecordType(ANY zz1, 
ANY zz2, ANY a11): rowcount = 1.01E7, cumulative cost = {6.4638844E7 rows, 
4.11605404E8 cpu, 0.0 io, 3.2460300288E10 network, 4.64926176E7 memory}, id = 
781
01-03                  Project(zz1=[$0], zz2=[$2], a11=[$1]) : rowType = 
RecordType(ANY zz1, ANY zz2, ANY a11): rowcount = 2.3306983E7, cumulative cost 
= {5.4538844E7 rows, 3.71205404E8 cpu, 0.0 io, 3.2460300288E10 network, 
4.64926176E7 memory}, id = 780
01-04                    HashJoin(condition=[=($0, $2)], joinType=[left]) : 
rowType = RecordType(ANY ZZ1, ANY A, ANY ZZ2): rowcount = 2.3306983E7, 
cumulative cost = {5.4538844E7 rows, 3.71205404E8 cpu, 0.0 io, 3.2460300288E10 
network, 4.64926176E7 memory}, id = 779
01-06                      Scan(groupscan=[EasyGroupScan 
[selectionRoot=maprfs:/tmp/csvd1, numFiles=3, columns=[`ZZ1`, `A`], 
files=[maprfs:/tmp/csvd1/D1111aamulti11random2.csv, 
maprfs:/tmp/csvd1/D1111aamulti11random21.csv, 
maprfs:/tmp/csvd1/D1111aamulti11random211.csv]]]) : rowType = RecordType(ANY 
ZZ1, ANY A): rowcount = 2.3306983E7, cumulative cost = {2.3306983E7 rows, 
4.6613966E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 776
01-05                      BroadcastExchange : rowType = RecordType(ANY ZZ2): 
rowcount = 2641626.0, cumulative cost = {5283252.0 rows, 2.3774634E7 cpu, 0.0 
io, 3.2460300288E10 network, 0.0 memory}, id = 778
02-01                        Scan(groupscan=[EasyGroupScan 
[selectionRoot=maprfs:/tmp/csvd2, numFiles=1, columns=[`ZZ2`], 
files=[maprfs:/tmp/csvd2/D222random2.csv]]]) : rowType = RecordType(ANY ZZ2): 
rowcount = 2641626.0, cumulative cost = {2641626.0 rows, 2641626.0 cpu, 0.0 io, 
0.0 network, 0.0 memory}, id = 777
{code}

In case of many minor fragments and huge cluster all the minor fragments 
feeding into unionExchange will be merged only at the foreman. Eventhough 
unionExchange is not a bottleneck interms of cpu but it creates huge memory 
pressure in terms of memory. 

It is observed that due to this mostly on a large cluster with many minor 
fragments it runs out of memory. 

In this scenario it is always better to locally merge the minor fragments 
pertaining to a DRILLBIT and send the single stream to the foreman. This 
divides the memory consumption to all the drillbits and then reduces the memory 
pressure at the foreman.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to