[
https://issues.apache.org/jira/browse/HIVE-15474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15765176#comment-15765176
]
Jesus Camacho Rodriguez edited comment on HIVE-15474 at 12/20/16 8:35 PM:
--------------------------------------------------------------------------
[~xuefuz], thanks for the feedback. It is indeed true for MR and Tez. I am not
so familiar with Spark as the backend. If it does not return ordered groups,
are you sure the sequence of operators is still RS-GB-RS? That would mean that
other optimizations, such as ReduceSinkDeDuplication, do not work properly
(\?). In turn, I would argue that RS is a physical operator that has certain
semantics that should be respected, no matter the backend. However, if the
sequence of operators is still that one, I can just disable the optimization
for Hive on Spark.
--
Given this query:
{code:sql}
explain
select key, value, count(key + 1) as agg1 from src
group by key, value
order by key, value, agg1 limit 20;
{code}
Explain plan without patch:
{code}
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: src
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE
Column stats: NONE
Select Operator
expressions: key (type: string), value (type: string),
(UDFToDouble(key) + 1.0) (type: double)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE
Column stats: NONE
Group By Operator
aggregations: count(_col2)
keys: _col0 (type: string), _col1 (type: string)
mode: hash
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE
Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: string), _col1 (type: string)
sort order: ++
Map-reduce partition columns: _col0 (type: string), _col1
(type: string)
Statistics: Num rows: 500 Data size: 5312 Basic stats:
COMPLETE Column stats: NONE
value expressions: _col2 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
keys: KEY._col0 (type: string), KEY._col1 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE
Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
key expressions: _col0 (type: string), _col1 (type: string),
_col2 (type: bigint)
sort order: +++
Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE
Column stats: NONE
TopN Hash Memory Usage: 0.3
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1
(type: string), KEY.reducesinkkey2 (type: bigint)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE
Column stats: NONE
Limit
Number of rows: 20
Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE
Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE
Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: 20
Processor Tree:
ListSink
{code}
Explain plan with patch:
{code}
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: src
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE
Column stats: NONE
Select Operator
expressions: key (type: string), value (type: string),
(UDFToDouble(key) + 1.0) (type: double)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE
Column stats: NONE
Group By Operator
aggregations: count(_col2)
keys: _col0 (type: string), _col1 (type: string)
mode: hash
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE
Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: string), _col1 (type: string)
sort order: ++
Map-reduce partition columns: _col0 (type: string), _col1
(type: string)
Statistics: Num rows: 500 Data size: 5312 Basic stats:
COMPLETE Column stats: NONE
TopN Hash Memory Usage: 0.3
value expressions: _col2 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
keys: KEY._col0 (type: string), KEY._col1 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE
Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
key expressions: _col0 (type: string), _col1 (type: string),
_col2 (type: bigint)
sort order: +++
Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE
Column stats: NONE
TopN Hash Memory Usage: 0.3
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1
(type: string), KEY.reducesinkkey2 (type: bigint)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE
Column stats: NONE
Limit
Number of rows: 20
Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE
Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE
Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: 20
Processor Tree:
ListSink
{code}
Observe the only difference is the TopN annotation in the first stage, which
prevents shuffling all the data.
was (Author: jcamachorodriguez):
[~xuefuz], thanks for the feedback. It is indeed true for MR and Tez. I am not
so familiar with Spark as the backend. If it does not return ordered groups,
are you sure the sequence of operators is still RS-GB-RS? That would mean that
other optimizations, such as ReduceSinkDeDuplication, do not work properly
(\?). In turn, I would argue that RS is a physical operator that has certain
semantics that should be respected, no matter the backend. However, if the
sequence of operators is still that one, I can just disable the optimization
for Hive on Spark.
--
Given this query:
{code:sql}
explain
select key, value, count(key + 1) as agg1 from src
group by key, value
order by key, value, agg1 limit 20;
{code}
Explain plan without patch:
{code}
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: src
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE
Column stats: NONE
Select Operator
expressions: key (type: string), value (type: string),
(UDFToDouble(key) + 1.0) (type: double)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE
Column stats: NONE
Group By Operator
aggregations: count(_col2)
keys: _col0 (type: string), _col1 (type: string)
mode: hash
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE
Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: string), _col1 (type: string)
sort order: ++
Map-reduce partition columns: _col0 (type: string), _col1
(type: string)
Statistics: Num rows: 500 Data size: 5312 Basic stats:
COMPLETE Column stats: NONE
value expressions: _col2 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
keys: KEY._col0 (type: string), KEY._col1 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE
Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
key expressions: _col0 (type: string), _col1 (type: string),
_col2 (type: bigint)
sort order: +++
Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE
Column stats: NONE
TopN Hash Memory Usage: 0.3
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1
(type: string), KEY.reducesinkkey2 (type: bigint)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE
Column stats: NONE
Limit
Number of rows: 20
Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE
Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE
Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: 20
Processor Tree:
ListSink
{code}
Explain plan with patch:
{code}
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: src
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE
Column stats: NONE
Select Operator
expressions: key (type: string), value (type: string),
(UDFToDouble(key) + 1.0) (type: double)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE
Column stats: NONE
Group By Operator
aggregations: count(_col2)
keys: _col0 (type: string), _col1 (type: string)
mode: hash
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE
Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: string), _col1 (type: string)
sort order: ++
Map-reduce partition columns: _col0 (type: string), _col1
(type: string)
Statistics: Num rows: 500 Data size: 5312 Basic stats:
COMPLETE Column stats: NONE
TopN Hash Memory Usage: 0.3
value expressions: _col2 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
keys: KEY._col0 (type: string), KEY._col1 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE
Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
key expressions: _col0 (type: string), _col1 (type: string),
_col2 (type: bigint)
sort order: +++
Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE
Column stats: NONE
TopN Hash Memory Usage: 0.3
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1
(type: string), KEY.reducesinkkey2 (type: bigint)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE
Column stats: NONE
Limit
Number of rows: 20
Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE
Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 20 Data size: 200 Basic stats: COMPLETE
Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: 20
Processor Tree:
ListSink
{code}
Observe the only difference is the TopN annotation in the map side RS, which
prevents shuffling all the data.
> Extend limit propagation for chain of RS-GB-RS operators
> --------------------------------------------------------
>
> Key: HIVE-15474
> URL: https://issues.apache.org/jira/browse/HIVE-15474
> Project: Hive
> Issue Type: Bug
> Components: Physical Optimizer
> Affects Versions: 2.2.0
> Reporter: Jesus Camacho Rodriguez
> Assignee: Jesus Camacho Rodriguez
> Attachments: HIVE-15474.patch
>
>
> The goal is to extend the work started in HIVE-14002.
> For instance, given the following query:
> {code:sql}
> explain
> select key, value, count(key + 1) as agg1 from src
> group by key, value
> order by key, value, agg1 limit 20;
> {code}
> We can push the limit to the GBy operator. However, currently we do not do it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)