[ 
https://issues.apache.org/jira/browse/DRILL-5198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paul Rogers updated DRILL-5198:
-------------------------------
    Summary: Inefficient double exchange for single-slice sort query on large 
data  (was: Inefficient plan for single-slice sort query on large data)

> Inefficient double exchange for single-slice sort query on large data
> ---------------------------------------------------------------------
>
>                 Key: DRILL-5198
>                 URL: https://issues.apache.org/jira/browse/DRILL-5198
>             Project: Apache Drill
>          Issue Type: Bug
>    Affects Versions: 1.9.0
>            Reporter: Paul Rogers
>            Priority: Minor
>
> Testing of external sort revealed an unfortunate choice of plans for a 
> somewhat pathological query.
> The query has 17 GB of data in a single CSV file. The query does nothing 
> other than sort the data and filter out all rows. (The purpose is to exercise 
> the sort and discard results.) The query was twisted to force a sort and 
> filter (rather than the simpler Top N operator.)
> Query:
> {code}
> select * from (select * from dfs.`/big-csv-file.csv` order by columns[0])d 
> where d.columns[0] = 'bogus value';
> {code}
> Since the goal was to test the sort, we limited execution width to a single 
> slice on a single node. While this is an odd case, it does represent an 
> extreme form of resource allocation: forcing a query to run in the minimum 
> possible width, which may happen with admission control on a heavily loaded 
> cluster.
> Regardless of the reason, the plan produced has two network exchanges on a 
> single machine. Each exchange must:
> * Copy vector data to a heap buffer
> * Send the data over the (loopback) network
> * Release the value vectors
> * Receive data into a heap buffer
> * Allocate new value vectors
> * Copy heap data into a value vector
> The above is very costly and adds nothing to the query.
> The expected plan would either:
> * Run the entire query in a single minor fragment (with no exchanges), or
> * Use a (not yet existing) operator to do a "mock" exchange that transfers 
> ownership of vectors rather than making very expensive copies.
> Other issues are present as well, those will be filed as separate JIRA 
> tickets.
> The actual plan is below:
> {code}
> 00-00    Screen : rowType = RecordType(ANY *): rowcount = 2.691360795E7, 
> cumulative cost = {1.6444214457450001E9 rows, 2.6992589029593388E10 cpu, 0.0 
> io, 3.67460460544E12 network, 2.870784848E9 memory}, id = 459
> 00-01      Project(*=[$0]) : rowType = RecordType(ANY *): rowcount = 
> 2.691360795E7, cumulative cost = {1.64173008495E9 rows, 2.698989766879839E10 
> cpu, 0.0 io, 3.67460460544E12 network, 2.870784848E9 memory}, id = 458
> 00-02        SelectionVectorRemover : rowType = RecordType(ANY T0¦¦*): 
> rowcount = 2.691360795E7, cumulative cost = {1.64173008495E9 rows, 
> 2.698989766879839E10 cpu, 0.0 io, 3.67460460544E12 network, 2.870784848E9 
> memory}, id = 457
> 00-03          Filter(condition=[=(ITEM(ITEM($0, 'columns'), 0), 
> 'ljdfhwuehnoiueyf')]) : rowType = RecordType(ANY T0¦¦*): rowcount = 
> 2.691360795E7, cumulative cost = {1.614816477E9 rows, 2.696298406084839E10 
> cpu, 0.0 io, 3.67460460544E12 network, 2.870784848E9 memory}, id = 456
> 00-04            Project(T0¦¦*=[$0]) : rowType = RecordType(ANY T0¦¦*): 
> rowcount = 1.79424053E8, cumulative cost = {1.435392424E9 rows, 
> 2.613763341704839E10 cpu, 0.0 io, 3.67460460544E12 network, 2.870784848E9 
> memory}, id = 455
> 00-05              SingleMergeExchange(sort0=[1 ASC]) : rowType = 
> RecordType(ANY T0¦¦*, ANY EXPR$1): rowcount = 1.79424053E8, cumulative cost = 
> {1.435392424E9 rows, 2.613763341704839E10 cpu, 0.0 io, 3.67460460544E12 
> network, 2.870784848E9 memory}, id = 454
> 01-01                SelectionVectorRemover : rowType = RecordType(ANY T0¦¦*, 
> ANY EXPR$1): rowcount = 1.79424053E8, cumulative cost = {1.255968371E9 rows, 
> 2.470224099304839E10 cpu, 0.0 io, 2.204762763264E12 network, 2.870784848E9 
> memory}, id = 453
> 01-02                  Sort(sort0=[$1], dir0=[ASC]) : rowType = 
> RecordType(ANY T0¦¦*, ANY EXPR$1): rowcount = 1.79424053E8, cumulative cost = 
> {1.076544318E9 rows, 2.452281694004839E10 cpu, 0.0 io, 2.204762763264E12 
> network, 2.870784848E9 memory}, id = 452
> 01-03                    Project(T0¦¦*=[$0], EXPR$1=[$1]) : rowType = 
> RecordType(ANY T0¦¦*, ANY EXPR$1): rowcount = 1.79424053E8, cumulative cost = 
> {8.97120265E8 rows, 4.844449431E9 cpu, 0.0 io, 2.204762763264E12 network, 0.0 
> memory}, id = 451
> 01-04                      HashToRandomExchange(dist0=[[$1]]) : rowType = 
> RecordType(ANY T0¦¦*, ANY EXPR$1, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 
> 1.79424053E8, cumulative cost = {8.97120265E8 rows, 4.844449431E9 cpu, 0.0 
> io, 2.204762763264E12 network, 0.0 memory}, id = 450
> 02-01                        UnorderedMuxExchange : rowType = RecordType(ANY 
> T0¦¦*, ANY EXPR$1, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 1.79424053E8, 
> cumulative cost = {7.17696212E8 rows, 1.973664583E9 cpu, 0.0 io, 0.0 network, 
> 0.0 memory}, id = 449
> 03-01                          Project(T0¦¦*=[$0], EXPR$1=[$1], 
> E_X_P_R_H_A_S_H_F_I_E_L_D=[hash32AsDouble($1)]) : rowType = RecordType(ANY 
> T0¦¦*, ANY EXPR$1, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 1.79424053E8, 
> cumulative cost = {5.38272159E8 rows, 1.79424053E9 cpu, 0.0 io, 0.0 network, 
> 0.0 memory}, id = 448
> 03-02                            Project(T0¦¦*=[$0], EXPR$1=[ITEM($1, 0)]) : 
> rowType = RecordType(ANY T0¦¦*, ANY EXPR$1): rowcount = 1.79424053E8, 
> cumulative cost = {3.58848106E8 rows, 1.076544318E9 cpu, 0.0 io, 0.0 network, 
> 0.0 memory}, id = 447
> 03-03                              Project(T0¦¦*=[$0], columns=[$1]) : 
> rowType = RecordType(ANY T0¦¦*, ANY columns): rowcount = 1.79424053E8, 
> cumulative cost = {1.79424053E8 rows, 3.58848106E8 cpu, 0.0 io, 0.0 network, 
> 0.0 memory}, id = 446
> 03-04                                Scan(groupscan=[EasyGroupScan 
> [selectionRoot=maprfs:/drill/testdata/resource-manager/descending-col-length-8k.tbl,
>  numFiles=1, columns=[`*`], 
> files=[maprfs:///drill/testdata/resource-manager/descending-col-length-8k.tbl]]])
>  : rowType = (DrillRecordRow[*, columns]): rowcount = 1.79424053E8, 
> cumulative cost = {1.79424053E8 rows, 3.58848106E8 cpu, 0.0 io, 0.0 network, 
> 0.0 memory}, id = 445
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to