[
https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16011763#comment-16011763
]
liyunzhang_intel commented on HIVE-16600:
-----------------------------------------
[~lirui]:
bq. For orderBy + Limit query, we have two choices: use multiple reducers to
get global order, and then shuffle to get global limit. Or we can use single
reducer to do the order and limit together. I think the latter is better
because we don't actually need to get a global order.
agree
bq.In our multi insert example, it seems we cannot choose the latter. I suspect
that's because there's only one limit. liyunzhang_intel, could you try adding
limit to both inserts and see what happens?
It seems that MR also uses extra shuffle in two limit case:
{code}
set hive.mapred.mode=nonstrict;
set hive.exec.reducers.bytes.per.reducer=256;
set hive.optimize.sampling.orderby=true;
drop table if exists e1;
drop table if exists e2;
create table e1 (key string, value string);
create table e2 (key string);
FROM (select key,value from src order by key) a
INSERT OVERWRITE TABLE e1
SELECT key, value limit 5
INSERT OVERWRITE TABLE e2
SELECT key limit 10;
select * from e1;
select * from e2;
{code}
explain
{code}
Stage-2 is a root stage [MAPRED]
Stage-3 depends on stages: Stage-2 [MAPRED]
Stage-0 depends on stages: Stage-3 [MOVE]
Stage-4 depends on stages: Stage-0 [STATS]
Stage-5 depends on stages: Stage-2 [MAPRED]
Stage-1 depends on stages: Stage-5 [MOVE]
Stage-6 depends on stages: Stage-1 [STATS]
STAGE PLANS:
Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
alias: src
GatherStats: false
Select Operator
expressions: key (type: string), value (type: string)
outputColumnNames: _col0, _col1
Reduce Output Operator
key expressions: _col0 (type: string)
null sort order: a
sort order: +
tag: -1
value expressions: _col1 (type: string)
auto parallelism: false
Path -> Alias:
hdfs://bdpe41:8020/user/hive/warehouse/src [a:src]
Path -> Partition:
hdfs://bdpe41:8020/user/hive/warehouse/src
Partition
base file name: src
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
column.name.delimiter ,
columns key,value
columns.comments 'default','default'
columns.types string:string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://bdpe41:8020/user/hive/warehouse/src
name default.src
numFiles 1
numRows 0
rawDataSize 0
serialization.ddl struct src { string key, string value}
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
totalSize 5812
transient_lastDdlTime 1493960133
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
column.name.delimiter ,
columns key,value
columns.comments 'default','default'
columns.types string:string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://bdpe41:8020/user/hive/warehouse/src
name default.src
numFiles 1
numRows 0
rawDataSize 0
serialization.ddl struct src { string key, string value}
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
totalSize 5812
transient_lastDdlTime 1493960133
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.src
name: default.src
Truncated Path -> Alias:
/src [a:src]
Needs Tagging: false
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type:
string)
outputColumnNames: _col0, _col1
Limit
Number of rows: 5
File Output Operator
compressed: false
GlobalTableId: 0
directory:
hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10004
NumFilesPerFileSink: 1
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
properties:
column.name.delimiter ,
columns _col0,_col1
columns.types string,string
escape.delim \
serialization.lib
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
serde:
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
TotalFiles: 1
GatherStats: false
MultiFileSpray: false
Select Operator
expressions: _col0 (type: string)
outputColumnNames: _col0
Limit
Number of rows: 10
File Output Operator
compressed: false
GlobalTableId: 0
directory:
hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10005
NumFilesPerFileSink: 1
table:
input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
properties:
column.name.delimiter ,
columns _col0
columns.types string
escape.delim \
serialization.lib
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
serde:
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
TotalFiles: 1
GatherStats: false
MultiFileSpray: false
Stage: Stage-3
Map Reduce
Map Operator Tree:
TableScan
GatherStats: false
Reduce Output Operator
null sort order:
sort order:
tag: -1
TopN: 5
TopN Hash Memory Usage: 0.1
value expressions: _col0 (type: string), _col1 (type: string)
auto parallelism: false
Path -> Alias:
hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10004
[hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10004]
Path -> Partition:
hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10004
Partition
base file name: -mr-10004
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
properties:
column.name.delimiter ,
columns _col0,_col1
columns.types string,string
escape.delim \
serialization.lib
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
properties:
column.name.delimiter ,
columns _col0,_col1
columns.types string,string
escape.delim \
serialization.lib
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Truncated Path -> Alias:
hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10004
[hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10004]
Needs Tagging: false
Reduce Operator Tree:
Select Operator
expressions: VALUE._col0 (type: string), VALUE._col1 (type: string)
outputColumnNames: _col0, _col1
Limit
Number of rows: 5
File Output Operator
compressed: false
GlobalTableId: 1
directory:
hdfs://bdpe41:8020/user/hive/warehouse/e1/.hive-staging_hive_2017-05-16_13-11-32_896_1505166932595220976-1/-ext-10000
NumFilesPerFileSink: 1
Stats Publishing Key Prefix:
hdfs://bdpe41:8020/user/hive/warehouse/e1/.hive-staging_hive_2017-05-16_13-11-32_896_1505166932595220976-1/-ext-10000/
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"}
bucket_count -1
column.name.delimiter ,
columns key,value
columns.comments
columns.types string:string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://bdpe41:8020/user/hive/warehouse/e1
name default.e1
numFiles 0
numRows 0
rawDataSize 0
serialization.ddl struct e1 { string key, string value}
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
totalSize 0
transient_lastDdlTime 1494911492
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.e1
TotalFiles: 1
GatherStats: true
MultiFileSpray: false
Stage: Stage-0
Move Operator
tables:
replace: true
source:
hdfs://bdpe41:8020/user/hive/warehouse/e1/.hive-staging_hive_2017-05-16_13-11-32_896_1505166932595220976-1/-ext-10000
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"}
bucket_count -1
column.name.delimiter ,
columns key,value
columns.comments
columns.types string:string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://bdpe41:8020/user/hive/warehouse/e1
name default.e1
numFiles 0
numRows 0
rawDataSize 0
serialization.ddl struct e1 { string key, string value}
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
totalSize 0
transient_lastDdlTime 1494911492
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.e1
Stage: Stage-4
Stats-Aggr Operator
Stats Aggregation Key Prefix:
hdfs://bdpe41:8020/user/hive/warehouse/e1/.hive-staging_hive_2017-05-16_13-11-32_896_1505166932595220976-1/-ext-10000/
Stage: Stage-5
Map Reduce
Map Operator Tree:
TableScan
GatherStats: false
Reduce Output Operator
null sort order:
sort order:
tag: -1
TopN: 10
TopN Hash Memory Usage: 0.1
value expressions: _col0 (type: string)
auto parallelism: false
Path -> Alias:
hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10005
[hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10005]
Path -> Partition:
hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10005
Partition
base file name: -mr-10005
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
properties:
column.name.delimiter ,
columns _col0
columns.types string
escape.delim \
serialization.lib
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
properties:
column.name.delimiter ,
columns _col0
columns.types string
escape.delim \
serialization.lib
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Truncated Path -> Alias:
hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10005
[hdfs://bdpe41:8020/tmp/hive/root/f86a772c-2902-4908-88aa-fe6f793b06ef/hive_2017-05-16_13-11-32_896_1505166932595220976-1/-mr-10005]
Needs Tagging: false
Reduce Operator Tree:
Select Operator
expressions: VALUE._col0 (type: string)
outputColumnNames: _col0
Limit
Number of rows: 10
File Output Operator
compressed: false
GlobalTableId: 2
directory:
hdfs://bdpe41:8020/user/hive/warehouse/e2/.hive-staging_hive_2017-05-16_13-11-32_896_1505166932595220976-1/-ext-10002
NumFilesPerFileSink: 1
Stats Publishing Key Prefix:
hdfs://bdpe41:8020/user/hive/warehouse/e2/.hive-staging_hive_2017-05-16_13-11-32_896_1505166932595220976-1/-ext-10002/
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"}
bucket_count -1
column.name.delimiter ,
columns key
columns.comments
columns.types string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://bdpe41:8020/user/hive/warehouse/e2
name default.e2
numFiles 0
numRows 0
rawDataSize 0
serialization.ddl struct e2 { string key}
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
totalSize 0
transient_lastDdlTime 1494911492
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.e2
TotalFiles: 1
GatherStats: true
MultiFileSpray: false
Stage: Stage-1
Move Operator
tables:
replace: true
source:
hdfs://bdpe41:8020/user/hive/warehouse/e2/.hive-staging_hive_2017-05-16_13-11-32_896_1505166932595220976-1/-ext-10002
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"}
bucket_count -1
column.name.delimiter ,
columns key
columns.comments
columns.types string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://bdpe41:8020/user/hive/warehouse/e2
name default.e2
numFiles 0
numRows 0
rawDataSize 0
serialization.ddl struct e2 { string key}
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
totalSize 0
transient_lastDdlTime 1494911492
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.e2
Stage: Stage-6
Stats-Aggr Operator
Stats Aggregation Key Prefix:
hdfs://bdpe41:8020/user/hive/warehouse/e2/.hive-staging_hive_2017-05-16_13-11-32_896_1505166932595220976-1/-ext-10002/
{code}
bq.Ideally, we should be able to use single reducer if all the inserts have
limit (with same or similar number perhaps). If not, we shouldn't disable
parallel order by for multi insert.
agree. so update this logic in HIVE-16600.5.patch. There is big difference
between order by+limit and order by+ multi insert(contain limit). It should be
in order in the former while may not in order in the latter.
> Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel
> order by in multi_insert cases
> --------------------------------------------------------------------------------------------------------
>
> Key: HIVE-16600
> URL: https://issues.apache.org/jira/browse/HIVE-16600
> Project: Hive
> Issue Type: Sub-task
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
> Attachments: HIVE-16600.1.patch, HIVE-16600.2.patch,
> HIVE-16600.3.patch, HIVE-16600.4.patch, mr.explain, mr.explain.log.HIVE-16600
>
>
> multi_insert_gby.case.q
> {code}
> set hive.exec.reducers.bytes.per.reducer=256;
> set hive.optimize.sampling.orderby=true;
> drop table if exists e1;
> drop table if exists e2;
> create table e1 (key string, value string);
> create table e2 (key string);
> FROM (select key, cast(key as double) as keyD, value from src order by key) a
> INSERT OVERWRITE TABLE e1
> SELECT key, value
> INSERT OVERWRITE TABLE e2
> SELECT key;
> select * from e1;
> select * from e2;
> {code}
> the parallelism of Sort is 1 even we enable parallel order
> by("hive.optimize.sampling.orderby" is set as "true"). This is not
> reasonable because the parallelism should be calcuated by
> [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170]
> this is because SetSparkReducerParallelism#needSetParallelism returns false
> when [children size of
> RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]
> is greater than 1.
> in this case, the children size of {{RS[2]}} is two.
> the logical plan of the case
> {code}
> TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5]
> -SEL[6]-FS[7]
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)