[
https://issues.apache.org/jira/browse/HIVE-22666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Krisztian Kasa updated HIVE-22666:
----------------------------------
Description:
{code}
EXPLAIN EXTENDED
SELECT s_state, ranking
FROM (
SELECT s_state AS s_state,
rank() OVER (PARTITION BY s_state ORDER BY ss_net_profit) AS ranking
FROM testtable_n1000) tmp1
WHERE ranking <= 3;
{code}
{code}
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Tez
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: testtable_n1000
Statistics: Num rows: 10 Data size: 940 Basic stats: COMPLETE
Column stats: COMPLETE
GatherStats: false
Reduce Output Operator
key expressions: s_state (type: string), ss_net_profit
(type: double)
null sort order: az
sort order: ++
Map-reduce partition columns: s_state (type: string)
Statistics: Num rows: 10 Data size: 940 Basic stats:
COMPLETE Column stats: COMPLETE
tag: -1
TopN: 4
TopN Hash Memory Usage: 0.1
auto parallelism: true
Execution mode: vectorized, llap
LLAP IO: no inputs
Path -> Alias:
#### A masked pattern was here ####
Path -> Partition:
#### A masked pattern was here ####
Partition
base file name: testtable_n1000
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
COLUMN_STATS_ACCURATE
{"BASIC_STATS":"true","COLUMN_STATS":{"s_state":"true","ss_net_profit":"true"}}
bucket_count -1
bucketing_version 2
column.name.delimiter ,
columns s_state,ss_net_profit
columns.comments
columns.types string:double
#### A masked pattern was here ####
name default.testtable_n1000
numFiles 1
numRows 10
rawDataSize 80
serialization.ddl struct testtable_n1000 { string s_state,
double ss_net_profit}
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
totalSize 90
#### A masked pattern was here ####
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
COLUMN_STATS_ACCURATE
{"BASIC_STATS":"true","COLUMN_STATS":{"s_state":"true","ss_net_profit":"true"}}
bucket_count -1
bucketing_version 2
column.name.delimiter ,
columns s_state,ss_net_profit
columns.comments
columns.types string:double
#### A masked pattern was here ####
name default.testtable_n1000
numFiles 1
numRows 10
rawDataSize 80
serialization.ddl struct testtable_n1000 { string
s_state, double ss_net_profit}
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
totalSize 90
#### A masked pattern was here ####
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.testtable_n1000
name: default.testtable_n1000
Truncated Path -> Alias:
/testtable_n1000 [testtable_n1000]
Reducer 2
Execution mode: vectorized, llap
Needs Tagging: false
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: string),
KEY.reducesinkkey1 (type: double)
outputColumnNames: _col0, _col1
Statistics: Num rows: 10 Data size: 3620 Basic stats: COMPLETE
Column stats: COMPLETE
PTF Operator
Function definitions:
Input definition
input alias: ptf_0
output shape: _col0: string, _col1: double
type: WINDOWING
Windowing table definition
input alias: ptf_1
name: windowingtablefunction
order by: _col1 ASC NULLS LAST
partition by: _col0
raw input shape:
window functions:
window function definition
alias: rank_window_0
arguments: _col1
name: rank
window function: GenericUDAFRankEvaluator
window frame: ROWS PRECEDING(MAX)~FOLLOWING(MAX)
isPivotResult: true
Statistics: Num rows: 10 Data size: 3620 Basic stats:
COMPLETE Column stats: COMPLETE
Filter Operator
isSamplingPred: false
predicate: (rank_window_0 <= 3) (type: boolean)
Statistics: Num rows: 3 Data size: 1086 Basic stats:
COMPLETE Column stats: COMPLETE
Select Operator
expressions: _col0 (type: string), rank_window_0 (type:
int)
outputColumnNames: _col0, _col1
Statistics: Num rows: 3 Data size: 270 Basic stats:
COMPLETE Column stats: COMPLETE
File Output Operator
compressed: false
GlobalTableId: 0
#### A masked pattern was here ####
NumFilesPerFileSink: 1
Statistics: Num rows: 3 Data size: 270 Basic stats:
COMPLETE Column stats: COMPLETE
#### A masked pattern was here ####
table:
input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
properties:
columns _col0,_col1
columns.types string:int
escape.delim \
hive.serialization.extend.additional.nesting.levels true
serialization.escape.crlf true
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
serde:
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
TotalFiles: 1
GatherStats: false
MultiFileSpray: false
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
{code}
In this case the topN value (3+1) will be pushed to the ReduceSink (Reduce
Output Operator) operator in Map 1
https://github.com/apache/hive/blob/520aa19b20381bfd2ed25c835443c013f6e6ebb9/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java#L257
ReduceSink operator uses PTFTopNHash to get the topN rows for each partition
key (s_state) value.
The goals of this jira are:
- implement supporting partitioning in TopNKeyOperator
- enable push down of partitioned TopNKeyOperator
was:
{code}
EXPLAIN EXTENDED
SELECT s_state, ranking
FROM (
SELECT s_state AS s_state,
rank() OVER (PARTITION BY s_state ORDER BY ss_net_profit) AS ranking
FROM testtable_n1000) tmp1
WHERE ranking <= 3;
{code}
{code}
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Tez
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: testtable_n1000
Statistics: Num rows: 10 Data size: 940 Basic stats: COMPLETE
Column stats: COMPLETE
GatherStats: false
Reduce Output Operator
key expressions: s_state (type: string), ss_net_profit
(type: double)
null sort order: az
sort order: ++
Map-reduce partition columns: s_state (type: string)
Statistics: Num rows: 10 Data size: 940 Basic stats:
COMPLETE Column stats: COMPLETE
tag: -1
TopN: 4
TopN Hash Memory Usage: 0.1
auto parallelism: true
Execution mode: vectorized, llap
LLAP IO: no inputs
Path -> Alias:
#### A masked pattern was here ####
Path -> Partition:
#### A masked pattern was here ####
Partition
base file name: testtable_n1000
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
COLUMN_STATS_ACCURATE
{"BASIC_STATS":"true","COLUMN_STATS":{"s_state":"true","ss_net_profit":"true"}}
bucket_count -1
bucketing_version 2
column.name.delimiter ,
columns s_state,ss_net_profit
columns.comments
columns.types string:double
#### A masked pattern was here ####
name default.testtable_n1000
numFiles 1
numRows 10
rawDataSize 80
serialization.ddl struct testtable_n1000 { string s_state,
double ss_net_profit}
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
totalSize 90
#### A masked pattern was here ####
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
COLUMN_STATS_ACCURATE
{"BASIC_STATS":"true","COLUMN_STATS":{"s_state":"true","ss_net_profit":"true"}}
bucket_count -1
bucketing_version 2
column.name.delimiter ,
columns s_state,ss_net_profit
columns.comments
columns.types string:double
#### A masked pattern was here ####
name default.testtable_n1000
numFiles 1
numRows 10
rawDataSize 80
serialization.ddl struct testtable_n1000 { string
s_state, double ss_net_profit}
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
totalSize 90
#### A masked pattern was here ####
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.testtable_n1000
name: default.testtable_n1000
Truncated Path -> Alias:
/testtable_n1000 [testtable_n1000]
Reducer 2
Execution mode: vectorized, llap
Needs Tagging: false
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: string),
KEY.reducesinkkey1 (type: double)
outputColumnNames: _col0, _col1
Statistics: Num rows: 10 Data size: 3620 Basic stats: COMPLETE
Column stats: COMPLETE
PTF Operator
Function definitions:
Input definition
input alias: ptf_0
output shape: _col0: string, _col1: double
type: WINDOWING
Windowing table definition
input alias: ptf_1
name: windowingtablefunction
order by: _col1 ASC NULLS LAST
partition by: _col0
raw input shape:
window functions:
window function definition
alias: rank_window_0
arguments: _col1
name: rank
window function: GenericUDAFRankEvaluator
window frame: ROWS PRECEDING(MAX)~FOLLOWING(MAX)
isPivotResult: true
Statistics: Num rows: 10 Data size: 3620 Basic stats:
COMPLETE Column stats: COMPLETE
Filter Operator
isSamplingPred: false
predicate: (rank_window_0 <= 3) (type: boolean)
Statistics: Num rows: 3 Data size: 1086 Basic stats:
COMPLETE Column stats: COMPLETE
Select Operator
expressions: _col0 (type: string), rank_window_0 (type:
int)
outputColumnNames: _col0, _col1
Statistics: Num rows: 3 Data size: 270 Basic stats:
COMPLETE Column stats: COMPLETE
File Output Operator
compressed: false
GlobalTableId: 0
#### A masked pattern was here ####
NumFilesPerFileSink: 1
Statistics: Num rows: 3 Data size: 270 Basic stats:
COMPLETE Column stats: COMPLETE
#### A masked pattern was here ####
table:
input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
properties:
columns _col0,_col1
columns.types string:int
escape.delim \
hive.serialization.extend.additional.nesting.levels true
serialization.escape.crlf true
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
serde:
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
TotalFiles: 1
GatherStats: false
MultiFileSpray: false
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
{code}
In this case the topN value (3) will be pushed to the ReduceSink operator
https://github.com/apache/hive/blob/520aa19b20381bfd2ed25c835443c013f6e6ebb9/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java#L257
ReduceSink operator uses PTFTopNHash to get the topN rows for each partition
key (s_state) value.
The goals of this jira are:
- implement supporting partitioning in TopNKeyOperator
- enable push down of partitioned TopNKeyOperator
> Introduce TopNKey operator for PTF Reduce Sink
> ----------------------------------------------
>
> Key: HIVE-22666
> URL: https://issues.apache.org/jira/browse/HIVE-22666
> Project: Hive
> Issue Type: Improvement
> Reporter: Krisztian Kasa
> Priority: Major
>
> {code}
> EXPLAIN EXTENDED
> SELECT s_state, ranking
> FROM (
> SELECT s_state AS s_state,
> rank() OVER (PARTITION BY s_state ORDER BY ss_net_profit) AS ranking
> FROM testtable_n1000) tmp1
> WHERE ranking <= 3;
> {code}
> {code}
> STAGE DEPENDENCIES:
> Stage-1 is a root stage
> Stage-0 depends on stages: Stage-1
> STAGE PLANS:
> Stage: Stage-1
> Tez
> #### A masked pattern was here ####
> Edges:
> Reducer 2 <- Map 1 (SIMPLE_EDGE)
> #### A masked pattern was here ####
> Vertices:
> Map 1
> Map Operator Tree:
> TableScan
> alias: testtable_n1000
> Statistics: Num rows: 10 Data size: 940 Basic stats:
> COMPLETE Column stats: COMPLETE
> GatherStats: false
> Reduce Output Operator
> key expressions: s_state (type: string), ss_net_profit
> (type: double)
> null sort order: az
> sort order: ++
> Map-reduce partition columns: s_state (type: string)
> Statistics: Num rows: 10 Data size: 940 Basic stats:
> COMPLETE Column stats: COMPLETE
> tag: -1
> TopN: 4
> TopN Hash Memory Usage: 0.1
> auto parallelism: true
> Execution mode: vectorized, llap
> LLAP IO: no inputs
> Path -> Alias:
> #### A masked pattern was here ####
> Path -> Partition:
> #### A masked pattern was here ####
> Partition
> base file name: testtable_n1000
> input format: org.apache.hadoop.mapred.TextInputFormat
> output format:
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> properties:
> COLUMN_STATS_ACCURATE
> {"BASIC_STATS":"true","COLUMN_STATS":{"s_state":"true","ss_net_profit":"true"}}
> bucket_count -1
> bucketing_version 2
> column.name.delimiter ,
> columns s_state,ss_net_profit
> columns.comments
> columns.types string:double
> #### A masked pattern was here ####
> name default.testtable_n1000
> numFiles 1
> numRows 10
> rawDataSize 80
> serialization.ddl struct testtable_n1000 { string
> s_state, double ss_net_profit}
> serialization.format 1
> serialization.lib
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> totalSize 90
> #### A masked pattern was here ####
> serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>
> input format: org.apache.hadoop.mapred.TextInputFormat
> output format:
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> properties:
> COLUMN_STATS_ACCURATE
> {"BASIC_STATS":"true","COLUMN_STATS":{"s_state":"true","ss_net_profit":"true"}}
> bucket_count -1
> bucketing_version 2
> column.name.delimiter ,
> columns s_state,ss_net_profit
> columns.comments
> columns.types string:double
> #### A masked pattern was here ####
> name default.testtable_n1000
> numFiles 1
> numRows 10
> rawDataSize 80
> serialization.ddl struct testtable_n1000 { string
> s_state, double ss_net_profit}
> serialization.format 1
> serialization.lib
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> totalSize 90
> #### A masked pattern was here ####
> serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> name: default.testtable_n1000
> name: default.testtable_n1000
> Truncated Path -> Alias:
> /testtable_n1000 [testtable_n1000]
> Reducer 2
> Execution mode: vectorized, llap
> Needs Tagging: false
> Reduce Operator Tree:
> Select Operator
> expressions: KEY.reducesinkkey0 (type: string),
> KEY.reducesinkkey1 (type: double)
> outputColumnNames: _col0, _col1
> Statistics: Num rows: 10 Data size: 3620 Basic stats:
> COMPLETE Column stats: COMPLETE
> PTF Operator
> Function definitions:
> Input definition
> input alias: ptf_0
> output shape: _col0: string, _col1: double
> type: WINDOWING
> Windowing table definition
> input alias: ptf_1
> name: windowingtablefunction
> order by: _col1 ASC NULLS LAST
> partition by: _col0
> raw input shape:
> window functions:
> window function definition
> alias: rank_window_0
> arguments: _col1
> name: rank
> window function: GenericUDAFRankEvaluator
> window frame: ROWS PRECEDING(MAX)~FOLLOWING(MAX)
> isPivotResult: true
> Statistics: Num rows: 10 Data size: 3620 Basic stats:
> COMPLETE Column stats: COMPLETE
> Filter Operator
> isSamplingPred: false
> predicate: (rank_window_0 <= 3) (type: boolean)
> Statistics: Num rows: 3 Data size: 1086 Basic stats:
> COMPLETE Column stats: COMPLETE
> Select Operator
> expressions: _col0 (type: string), rank_window_0 (type:
> int)
> outputColumnNames: _col0, _col1
> Statistics: Num rows: 3 Data size: 270 Basic stats:
> COMPLETE Column stats: COMPLETE
> File Output Operator
> compressed: false
> GlobalTableId: 0
> #### A masked pattern was here ####
> NumFilesPerFileSink: 1
> Statistics: Num rows: 3 Data size: 270 Basic stats:
> COMPLETE Column stats: COMPLETE
> #### A masked pattern was here ####
> table:
> input format:
> org.apache.hadoop.mapred.SequenceFileInputFormat
> output format:
> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
> properties:
> columns _col0,_col1
> columns.types string:int
> escape.delim \
>
> hive.serialization.extend.additional.nesting.levels true
> serialization.escape.crlf true
> serialization.format 1
> serialization.lib
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> serde:
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> TotalFiles: 1
> GatherStats: false
> MultiFileSpray: false
> Stage: Stage-0
> Fetch Operator
> limit: -1
> Processor Tree:
> ListSink
> {code}
> In this case the topN value (3+1) will be pushed to the ReduceSink (Reduce
> Output Operator) operator in Map 1
> https://github.com/apache/hive/blob/520aa19b20381bfd2ed25c835443c013f6e6ebb9/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java#L257
> ReduceSink operator uses PTFTopNHash to get the topN rows for each partition
> key (s_state) value.
> The goals of this jira are:
> - implement supporting partitioning in TopNKeyOperator
> - enable push down of partitioned TopNKeyOperator
--
This message was sent by Atlassian Jira
(v8.3.4#803005)