[
https://issues.apache.org/jira/browse/HIVE-27788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17776189#comment-17776189
]
Krisztian Kasa commented on HIVE-27788:
---------------------------------------
Another repro with 3 records and inner join
{code}
set hive.optimize.semijoin.conversion = false;
CREATE TABLE tbl1_n5(key int, value string) CLUSTERED BY (key) SORTED BY (key)
INTO 2 BUCKETS;
insert into tbl1_n5(key, value)
values
(0, 'val_0'),
(2, 'val_2'),
(9, 'val_9');
explain
SELECT t1.key from
(SELECT key , row_number() over(partition by key order by value desc) as rk
from tbl1_n5) t1
join
( SELECT key,count(distinct value) as cp_count from tbl1_n5 group by key) t2
on t1.key = t2.key where rk = 1;
{code}
{code}
POSTHOOK: query: explain
SELECT t1.key from
(SELECT key , row_number() over(partition by key order by value desc) as rk
from tbl1_n5) t1
join
( SELECT key,count(distinct value) as cp_count from tbl1_n5 group by key) t2
on t1.key = t2.key where rk = 1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl1_n5
#### A masked pattern was here ####
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Tez
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: tbl1_n5
filterExpr: key is not null (type: boolean)
Statistics: Num rows: 3 Data size: 279 Basic stats: COMPLETE
Column stats: COMPLETE
Filter Operator
predicate: key is not null (type: boolean)
Statistics: Num rows: 3 Data size: 279 Basic stats:
COMPLETE Column stats: COMPLETE
Reduce Output Operator
key expressions: key (type: int), value (type: string)
null sort order: aa
sort order: +-
Map-reduce partition columns: key (type: int)
Statistics: Num rows: 3 Data size: 279 Basic stats:
COMPLETE Column stats: COMPLETE
Execution mode: vectorized, llap
LLAP IO: all inputs
Map 3
Map Operator Tree:
TableScan
alias: tbl1_n5
filterExpr: key is not null (type: boolean)
Statistics: Num rows: 3 Data size: 279 Basic stats: COMPLETE
Column stats: COMPLETE
Filter Operator
predicate: key is not null (type: boolean)
Statistics: Num rows: 3 Data size: 279 Basic stats:
COMPLETE Column stats: COMPLETE
Group By Operator
keys: key (type: int), value (type: string)
minReductionHashAggr: 0.4
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 3 Data size: 279 Basic stats:
COMPLETE Column stats: COMPLETE
Reduce Output Operator
key expressions: _col0 (type: int), _col1 (type: string)
null sort order: zz
sort order: ++
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 3 Data size: 279 Basic stats:
COMPLETE Column stats: COMPLETE
Execution mode: vectorized, llap
LLAP IO: all inputs
Reducer 2
Reduce Operator Tree:
Group By Operator
keys: KEY._col0 (type: int), KEY._col1 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1
Statistics: Num rows: 3 Data size: 279 Basic stats: COMPLETE
Column stats: COMPLETE
Select Operator
expressions: _col0 (type: int)
outputColumnNames: _col0
Statistics: Num rows: 3 Data size: 279 Basic stats: COMPLETE
Column stats: COMPLETE
Group By Operator
keys: _col0 (type: int)
mode: complete
outputColumnNames: _col0
Statistics: Num rows: 3 Data size: 12 Basic stats: COMPLETE
Column stats: COMPLETE
Dummy Store
Execution mode: llap
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1
(type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 3 Data size: 279 Basic stats: COMPLETE
Column stats: COMPLETE
PTF Operator
Function definitions:
Input definition
input alias: ptf_0
output shape: _col0: int, _col1: string
type: WINDOWING
Windowing table definition
input alias: ptf_1
name: windowingtablefunction
order by: _col1 DESC NULLS FIRST
partition by: _col0
raw input shape:
window functions:
window function definition
alias: row_number_window_0
name: row_number
window function: GenericUDAFRowNumberEvaluator
window frame: ROWS PRECEDING(MAX)~FOLLOWING(MAX)
isPivotResult: true
Statistics: Num rows: 3 Data size: 279 Basic stats: COMPLETE
Column stats: COMPLETE
Filter Operator
predicate: (row_number_window_0 = 1) (type: boolean)
Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE
Column stats: COMPLETE
Select Operator
expressions: _col0 (type: int)
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 4 Basic stats:
COMPLETE Column stats: COMPLETE
Merge Join Operator
condition map:
Inner Join 0 to 1
keys:
0 _col0 (type: int)
1 _col0 (type: int)
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 4 Basic stats:
COMPLETE Column stats: COMPLETE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 4 Basic stats:
COMPLETE Column stats: COMPLETE
table:
input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde:
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
{code}
The plan has a {{Merge Join Operator}} in {{Reducer 2}}. {{Reducer 2}} has two
operator trees: one for each join branches.
The first tree
{code}
GBY-SEL-GBY-DS
{code}
belongs to the subquery
{code}
SELECT key,count(distinct value) as cp_count from tbl1_n5 group by key
{code}
The aggregate call
{code}
count(distinct value)
{code}
needs two group by operators
* the first one ensures the distinct values of the {{value}} column.
* the second aggregates by the {{key}} column
The second tree does the other branch of the join with the PTF operator and the
join itself.
Currently Merge Join Operator doesn't support more than one Group by operators
in the same branch and in the same Reducer/Mapper instance because Group by
operators doesn't forward the records to child operator immediately but buffers
one output row and forwards it only when an input row comes with a different
key value than the currently buffered.
Since we have two GBYs in this case 2 rows are buffered. When the underlying
datasource run out of records the buffered records in GBYs has to be flushed
https://github.com/apache/hive/blob/b6847ed38b7d32586ab22e224904867f159b510e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java#L268-L270
In this case both buffered records are forwarded to Merge Join Operator but the
operator couldn't process any records from the other side of the join and it
leads to the exception mentioned in the description.
Example key values from both sides of the join:
{code}
left 0 2 9
right 0 2 9
{code}
1. Join reads 0 from right.
2. Join [starts reading
records|https://github.com/apache/hive/blob/b6847ed38b7d32586ab22e224904867f159b510e/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java#L484-L488]
from the left but GBY operators buffer records with key values 0 and 2 and no
records forwarded to join.
3. Join is still reading from left because join [key is
matching|https://github.com/apache/hive/blob/b6847ed38b7d32586ab22e224904867f159b510e/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java#L268-L269].
When 9 is read 0 is forwarded to the join from the left tree (2nd gby forwards
its buffered record) and it is still a match.
4. Join tries to read from left again but source is empty hence [flush left
operator
tree|https://github.com/apache/hive/blob/b6847ed38b7d32586ab22e224904867f159b510e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java#L268-L270]
5. The 1st GBY forwards 9 hence 2nd GBY forwards 2 to the Join and Join founds
a key mismatch hence [sets the flag left has a
mismatch|https://github.com/apache/hive/blob/b6847ed38b7d32586ab22e224904867f159b510e/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java#L271-L276]
6. 2nd GBY flush its buffered record with key 9 because flush is a recursive
call and Join gets 9 from the left but it didn't had a chance to read anything
from right. Exception is thrown.
IMHO operator tree in a mapper/reducer having merge join shouldn't have branch
with more than one GBY operator.
A possible solution is to disable SMB join conversion in such cases:
https://github.com/apache/hive/blob/b6847ed38b7d32586ab22e224904867f159b510e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java#L498
> Exception in Sort Merge join with Group By + PTF Operator
> ---------------------------------------------------------
>
> Key: HIVE-27788
> URL: https://issues.apache.org/jira/browse/HIVE-27788
> Project: Hive
> Issue Type: Bug
> Components: Operators
> Affects Versions: 4.0.0-beta-1
> Reporter: Riju Trivedi
> Priority: Major
> Attachments: auto_sortmerge_join_17.q
>
>
> Sort- merge join with Group By + PTF operator leads to Runtime exceptionÂ
> {code:java}
> Caused by: java.lang.RuntimeException:
> org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while
> processing row
> at
> org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource.pushRecord(ReduceRecordSource.java:313)
> at
> org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor.run(ReduceRecordProcessor.java:291)
> at
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:293)
> ... 15 more
> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime
> Error while processing row
> at
> org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource$GroupIterator.next(ReduceRecordSource.java:387)
> at
> org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource.pushRecord(ReduceRecordSource.java:303)
> ... 17 more
> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException:
> org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException:
> org.apache.hadoop.hive.ql.metadata.HiveException:
> org.apache.hadoop.hive.ql.metadata.HiveException: Attempting to overwrite
> nextKeyWritables[1]
> at
> org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator.joinOneGroup(CommonMergeJoinOperator.java:392)
> at
> org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator.joinOneGroup(CommonMergeJoinOperator.java:372)
> at
> org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator.process(CommonMergeJoinOperator.java:316)
> at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:888)
> at
> org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:94)
> at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:888)
> at
> org.apache.hadoop.hive.ql.exec.FilterOperator.process(FilterOperator.java:127)
> at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:888)
> at
> org.apache.hadoop.hive.ql.exec.PTFOperator$PTFInvocation.handleOutputRows(PTFOperator.java:337)
> at
> org.apache.hadoop.hive.ql.exec.PTFOperator$PTFInvocation.processRow(PTFOperator.java:325)
> at
> org.apache.hadoop.hive.ql.exec.PTFOperator.process(PTFOperator.java:139)
> at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:888)
> at
> org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:94)
> at
> org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource$GroupIterator.next(ReduceRecordSource.java:372)
> ... 18 more
> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException:
> java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException:
> org.apache.hadoop.hive.ql.metadata.HiveException: Attempting to overwrite
> nextKeyWritables[1]
> at
> org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator.fetchOneRow(CommonMergeJoinOperator.java:534)
> at
> org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator.fetchNextGroup(CommonMergeJoinOperator.java:488)
> at
> org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator.joinOneGroup(CommonMergeJoinOperator.java:390)
> ... 31 more
> Caused by: java.lang.RuntimeException:
> org.apache.hadoop.hive.ql.metadata.HiveException:
> org.apache.hadoop.hive.ql.metadata.HiveException: Attempting to overwrite
> nextKeyWritables[1]
> at
> org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource.pushRecord(ReduceRecordSource.java:313)
> at
> org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator.fetchOneRow(CommonMergeJoinOperator.java:522)
> ... 33 more {code}
> Issue can be reproduced with [^auto_sortmerge_join_17.q]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)