[
https://issues.apache.org/jira/browse/DRILL-5198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Paul Rogers updated DRILL-5198:
-------------------------------
Summary: Inefficient double exchange for single-slice sort query on large
data (was: Inefficient plan for single-slice sort query on large data)
> Inefficient double exchange for single-slice sort query on large data
> ---------------------------------------------------------------------
>
> Key: DRILL-5198
> URL: https://issues.apache.org/jira/browse/DRILL-5198
> Project: Apache Drill
> Issue Type: Bug
> Affects Versions: 1.9.0
> Reporter: Paul Rogers
> Priority: Minor
>
> Testing of external sort revealed an unfortunate choice of plans for a
> somewhat pathological query.
> The query has 17 GB of data in a single CSV file. The query does nothing
> other than sort the data and filter out all rows. (The purpose is to exercise
> the sort and discard results.) The query was twisted to force a sort and
> filter (rather than the simpler Top N operator.)
> Query:
> {code}
> select * from (select * from dfs.`/big-csv-file.csv` order by columns[0])d
> where d.columns[0] = 'bogus value';
> {code}
> Since the goal was to test the sort, we limited execution width to a single
> slice on a single node. While this is an odd case, it does represent an
> extreme form of resource allocation: forcing a query to run in the minimum
> possible width, which may happen with admission control on a heavily loaded
> cluster.
> Regardless of the reason, the plan produced has two network exchanges on a
> single machine. Each exchange must:
> * Copy vector data to a heap buffer
> * Send the data over the (loopback) network
> * Release the value vectors
> * Receive data into a heap buffer
> * Allocate new value vectors
> * Copy heap data into a value vector
> The above is very costly and adds nothing to the query.
> The expected plan would either:
> * Run the entire query in a single minor fragment (with no exchanges), or
> * Use a (not yet existing) operator to do a "mock" exchange that transfers
> ownership of vectors rather than making very expensive copies.
> Other issues are present as well, those will be filed as separate JIRA
> tickets.
> The actual plan is below:
> {code}
> 00-00 Screen : rowType = RecordType(ANY *): rowcount = 2.691360795E7,
> cumulative cost = {1.6444214457450001E9 rows, 2.6992589029593388E10 cpu, 0.0
> io, 3.67460460544E12 network, 2.870784848E9 memory}, id = 459
> 00-01 Project(*=[$0]) : rowType = RecordType(ANY *): rowcount =
> 2.691360795E7, cumulative cost = {1.64173008495E9 rows, 2.698989766879839E10
> cpu, 0.0 io, 3.67460460544E12 network, 2.870784848E9 memory}, id = 458
> 00-02 SelectionVectorRemover : rowType = RecordType(ANY T0¦¦*):
> rowcount = 2.691360795E7, cumulative cost = {1.64173008495E9 rows,
> 2.698989766879839E10 cpu, 0.0 io, 3.67460460544E12 network, 2.870784848E9
> memory}, id = 457
> 00-03 Filter(condition=[=(ITEM(ITEM($0, 'columns'), 0),
> 'ljdfhwuehnoiueyf')]) : rowType = RecordType(ANY T0¦¦*): rowcount =
> 2.691360795E7, cumulative cost = {1.614816477E9 rows, 2.696298406084839E10
> cpu, 0.0 io, 3.67460460544E12 network, 2.870784848E9 memory}, id = 456
> 00-04 Project(T0¦¦*=[$0]) : rowType = RecordType(ANY T0¦¦*):
> rowcount = 1.79424053E8, cumulative cost = {1.435392424E9 rows,
> 2.613763341704839E10 cpu, 0.0 io, 3.67460460544E12 network, 2.870784848E9
> memory}, id = 455
> 00-05 SingleMergeExchange(sort0=[1 ASC]) : rowType =
> RecordType(ANY T0¦¦*, ANY EXPR$1): rowcount = 1.79424053E8, cumulative cost =
> {1.435392424E9 rows, 2.613763341704839E10 cpu, 0.0 io, 3.67460460544E12
> network, 2.870784848E9 memory}, id = 454
> 01-01 SelectionVectorRemover : rowType = RecordType(ANY T0¦¦*,
> ANY EXPR$1): rowcount = 1.79424053E8, cumulative cost = {1.255968371E9 rows,
> 2.470224099304839E10 cpu, 0.0 io, 2.204762763264E12 network, 2.870784848E9
> memory}, id = 453
> 01-02 Sort(sort0=[$1], dir0=[ASC]) : rowType =
> RecordType(ANY T0¦¦*, ANY EXPR$1): rowcount = 1.79424053E8, cumulative cost =
> {1.076544318E9 rows, 2.452281694004839E10 cpu, 0.0 io, 2.204762763264E12
> network, 2.870784848E9 memory}, id = 452
> 01-03 Project(T0¦¦*=[$0], EXPR$1=[$1]) : rowType =
> RecordType(ANY T0¦¦*, ANY EXPR$1): rowcount = 1.79424053E8, cumulative cost =
> {8.97120265E8 rows, 4.844449431E9 cpu, 0.0 io, 2.204762763264E12 network, 0.0
> memory}, id = 451
> 01-04 HashToRandomExchange(dist0=[[$1]]) : rowType =
> RecordType(ANY T0¦¦*, ANY EXPR$1, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount =
> 1.79424053E8, cumulative cost = {8.97120265E8 rows, 4.844449431E9 cpu, 0.0
> io, 2.204762763264E12 network, 0.0 memory}, id = 450
> 02-01 UnorderedMuxExchange : rowType = RecordType(ANY
> T0¦¦*, ANY EXPR$1, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 1.79424053E8,
> cumulative cost = {7.17696212E8 rows, 1.973664583E9 cpu, 0.0 io, 0.0 network,
> 0.0 memory}, id = 449
> 03-01 Project(T0¦¦*=[$0], EXPR$1=[$1],
> E_X_P_R_H_A_S_H_F_I_E_L_D=[hash32AsDouble($1)]) : rowType = RecordType(ANY
> T0¦¦*, ANY EXPR$1, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 1.79424053E8,
> cumulative cost = {5.38272159E8 rows, 1.79424053E9 cpu, 0.0 io, 0.0 network,
> 0.0 memory}, id = 448
> 03-02 Project(T0¦¦*=[$0], EXPR$1=[ITEM($1, 0)]) :
> rowType = RecordType(ANY T0¦¦*, ANY EXPR$1): rowcount = 1.79424053E8,
> cumulative cost = {3.58848106E8 rows, 1.076544318E9 cpu, 0.0 io, 0.0 network,
> 0.0 memory}, id = 447
> 03-03 Project(T0¦¦*=[$0], columns=[$1]) :
> rowType = RecordType(ANY T0¦¦*, ANY columns): rowcount = 1.79424053E8,
> cumulative cost = {1.79424053E8 rows, 3.58848106E8 cpu, 0.0 io, 0.0 network,
> 0.0 memory}, id = 446
> 03-04 Scan(groupscan=[EasyGroupScan
> [selectionRoot=maprfs:/drill/testdata/resource-manager/descending-col-length-8k.tbl,
> numFiles=1, columns=[`*`],
> files=[maprfs:///drill/testdata/resource-manager/descending-col-length-8k.tbl]]])
> : rowType = (DrillRecordRow[*, columns]): rowcount = 1.79424053E8,
> cumulative cost = {1.79424053E8 rows, 3.58848106E8 cpu, 0.0 io, 0.0 network,
> 0.0 memory}, id = 445
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)