[
https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
liyunzhang_intel updated HIVE-16600:
------------------------------------
Attachment: HIVE-16600.4.patch
[~lirui]: update HIVE-16600.4.patch
changes:
1. add {{-- SORT_QUERY_RESULTS}} to the test
About the question you mentioned
bq.But why we disable parallel order by when there's a limit is to avoid an
extra stage (see this
[comment|https://issues.apache.org/jira/browse/HIVE-10458?focusedCommentId=14539299&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14539299]).
If the extra stage is needed anyway, then it makes no sense to disable
parallel order by.
I guess [~xuefuz] means the common order by+limit case like following
{code}
set hive.mapred.mode=nonstrict;
set hive.exec.reducers.bytes.per.reducer=256;
set hive.optimize.sampling.orderby=true;
select key, value from src order by key limit 10;
{code}
the plan is:
{code}
STAGE PLANS:
Stage: Stage-1
Spark
Edges:
Reducer 2 <- Map 1 (SORT, 1)
DagName: root_20170515101633_2093e4cb-a060-452c-aa21-201132ea0819:1
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: src
Statistics: Num rows: 29 Data size: 5812 Basic stats:
COMPLETE Column stats: NONE
GatherStats: false
Select Operator
expressions: key (type: string), value (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 29 Data size: 5812 Basic stats:
COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: string)
null sort order: a
sort order: +
Statistics: Num rows: 29 Data size: 5812 Basic stats:
COMPLETE Column stats: NONE
tag: -1
TopN: 10
TopN Hash Memory Usage: 0.1
value expressions: _col1 (type: string)
auto parallelism: false
Path -> Alias:
hdfs://bdpe41:8020/user/hive/warehouse/src [src]
Path -> Partition:
hdfs://bdpe41:8020/user/hive/warehouse/src
Partition
base file name: src
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
column.name.delimiter ,
columns key,value
columns.comments 'default','default'
columns.types string:string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://bdpe41:8020/user/hive/warehouse/src
name default.src
numFiles 1
numRows 0
rawDataSize 0
serialization.ddl struct src { string key, string value}
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
totalSize 5812
transient_lastDdlTime 1493960133
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
column.name.delimiter ,
columns key,value
columns.comments 'default','default'
columns.types string:string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://bdpe41:8020/user/hive/warehouse/src
name default.src
numFiles 1
numRows 0
rawDataSize 0
serialization.ddl struct src { string key, string value}
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
totalSize 5812
transient_lastDdlTime 1493960133
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.src
name: default.src
Truncated Path -> Alias:
/src [src]
Reducer 2
Needs Tagging: false
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: string), VALUE._col0
(type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE
Column stats: NONE
Limit
Number of rows: 10
Statistics: Num rows: 10 Data size: 2000 Basic stats:
COMPLETE Column stats: NONE
File Output Operator
compressed: false
GlobalTableId: 0
directory:
hdfs://bdpe41:8020/tmp/hive/root/92c15c56-8c37-4ec3-8547-a6912d0ad483/hive_2017-05-15_10-16-33_083_7741809362938040844-1/-mr-10001/.hive-staging_hive_2017-05-15_10-16-33_083_7741809362938040844-1/-ext-10002
NumFilesPerFileSink: 1
Statistics: Num rows: 10 Data size: 2000 Basic stats:
COMPLETE Column stats: NONE
Stats Publishing Key Prefix:
hdfs://bdpe41:8020/tmp/hive/root/92c15c56-8c37-4ec3-8547-a6912d0ad483/hive_2017-05-15_10-16-33_083_7741809362938040844-1/-mr-10001/.hive-staging_hive_2017-05-15_10-16-33_083_7741809362938040844-1/-ext-10002/
table:
input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
properties:
columns _col0,_col1
columns.types string:string
escape.delim \
hive.serialization.extend.additional.nesting.levels
true
serialization.escape.crlf true
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
serde:
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
TotalFiles: 1
GatherStats: false
MultiFileSpray: false
Stage: Stage-0
Fetch Operator
limit: 10
Processor Tree:
ListSink
{code}
We can see that the parallelism of Sort is 1 even we enable parallel order
by(set hive.optimize.sampling.orderby=true). This is because we use
[rdd.sortByKey|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java#L51]
to implement sort. If the parallelism of sort is not 1, order by+limit will be
executed in different tasks. The result may not be correct( if we divide the
input file into 10 subtasks to get the top1 of each file, and collect the
result . The result may not the top 10 of the input file).
> Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel
> order by in multi_insert cases
> --------------------------------------------------------------------------------------------------------
>
> Key: HIVE-16600
> URL: https://issues.apache.org/jira/browse/HIVE-16600
> Project: Hive
> Issue Type: Sub-task
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
> Attachments: HIVE-16600.1.patch, HIVE-16600.2.patch,
> HIVE-16600.3.patch, HIVE-16600.4.patch, mr.explain, mr.explain.log.HIVE-16600
>
>
> multi_insert_gby.case.q
> {code}
> set hive.exec.reducers.bytes.per.reducer=256;
> set hive.optimize.sampling.orderby=true;
> drop table if exists e1;
> drop table if exists e2;
> create table e1 (key string, value string);
> create table e2 (key string);
> FROM (select key, cast(key as double) as keyD, value from src order by key) a
> INSERT OVERWRITE TABLE e1
> SELECT key, value
> INSERT OVERWRITE TABLE e2
> SELECT key;
> select * from e1;
> select * from e2;
> {code}
> the parallelism of Sort is 1 even we enable parallel order
> by("hive.optimize.sampling.orderby" is set as "true"). This is not
> reasonable because the parallelism should be calcuated by
> [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170]
> this is because SetSparkReducerParallelism#needSetParallelism returns false
> when [children size of
> RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207]
> is greater than 1.
> in this case, the children size of {{RS[2]}} is two.
> the logical plan of the case
> {code}
> TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5]
> -SEL[6]-FS[7]
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)