[
https://issues.apache.org/jira/browse/HIVE-26111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Youjun Yuan updated HIVE-26111:
-------------------------------
Description:
we hit a query which FULL JOINs two tables, hive produces incorrect results,
for a single value of join key, it produces two records, each record has a
valid value for one table and NULL for the other table.
The query is:
{code:java}
SET mapreduce.job.reduces=2;
SELECT d.id, u.id
FROM (
SELECT id
FROM airflow.tableA rud
WHERE rud.dt = '2022-04-02-1row'
) d
FULL JOIN (
SELECT id
FROM default.tableB
WHERE dt = '2022-04-01' and device_token='blabla'
) u
ON u.id = d.id
; {code}
According to the job log, the two reducers each get an input record, and output
a record.
And produces two records for id=350570497
{code:java}
350570497 NULL
NULL 350570497
Time taken: 62.692 seconds, Fetched: 2 row(s) {code}
I am sure tableB has only one row where device_token='blabla'
And we tried:
1, SET mapreduce.job.reduces=1; then it produces right result;
-2, SET hive.execution.engine=mr; then it produces right result;- mr also has
the issue.
3, JOIN (instead of FULL JOIN) worked as expected
4, in sub query u, change filter device_token='blabla' to id=350570497, it
worked ok
5, flatten the sub queries, then it works ok, like below:
{code:java}
SELECT d.id, u.id
from airflow.rds_users_delta d full join default.users u
on (u.id = d.id)
where d.dt = '2022-04-02-1row' and u.dt = '2022-04-01' and
u.device_token='blabla' {code}
Below is the explain output of the query:
{code:java}
Plan optimized by CBO.Vertex dependency in root stage
Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 2 (CUSTOM_SIMPLE_EDGE)Stage-0
Fetch Operator
limit:-1
Stage-1
Reducer 3
File Output Operator [FS_10]
Map Join Operator [MAPJOIN_13] (rows=2 width=8)
Conds:RS_6.KEY.reducesinkkey0=RS_7.KEY.reducesinkkey0(Outer),DynamicPartitionHashJoin:true,Output:["_col0","_col1"]
<-Map 1 [CUSTOM_SIMPLE_EDGE]
PARTITION_ONLY_SHUFFLE [RS_6]
PartitionCols:_col0
Select Operator [SEL_2] (rows=1 width=4)
Output:["_col0"]
TableScan [TS_0] (rows=1 width=4)
airflow@rds_users_delta,rud,Tbl:COMPLETE,Col:COMPLETE,Output:["id"]
<-Map 2 [CUSTOM_SIMPLE_EDGE]
PARTITION_ONLY_SHUFFLE [RS_7]
PartitionCols:_col0
Select Operator [SEL_5] (rows=1 width=4)
Output:["_col0"]
Filter Operator [FIL_12] (rows=1 width=110)
predicate:(device_token = 'blabla')
TableScan [TS_3] (rows=215192362 width=109)
default@users,users,Tbl:COMPLETE,Col:COMPLETE,Output:["id","device_token"]
{code}
I can't generate a small enough result set to reproduce the issue, I have
minimized the tableA to only 1 row, tableB has ~200m rows, but if I further
reduce the size of tableB, then the issue can't be reproduced.
Any suggestion would be highly appreciated, regarding the root cause of the
issue, how to work around it, or how to reproduce it with small enough dataset.
below is the log found in hive.log
{code:java}
220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17 : STAGE DEPENDENCIES:
Stage-1 is a root stage [MAPRED]
Stage-0 depends on stages: Stage-1 [FETCH]STAGE PLANS:
Stage: Stage-1
Tez
DagId: ec2-user_20220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17:1
Edges:
Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 2 (CUSTOM_SIMPLE_EDGE)
DagName: ec2-user_20220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17:1
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: rud
Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE
Column stats: COMPLETE
GatherStats: false
Select Operator
expressions: id (type: int)
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE
Column stats: COMPLETE
Reduce Output Operator
key expressions: _col0 (type: int)
null sort order: a
sort order: +
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 1 Data size: 4 Basic stats:
COMPLETE Column stats: COMPLETE
tag: 0
auto parallelism: true
Path -> Alias:
s3a://.../rds_users_delta/dt=2022-04-02-1row/hh=00 [rud]
Path -> Partition:
s3a://.../rds_users_delta/dt=2022-04-02-1row/hh=00
Partition
base file name: hh=00
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
partition values:
dt 2022-04-02-1row
hh 00
properties:
COLUMN_STATS_ACCURATE
{"BASIC_STATS":"true","COLUMN_STATS":{"code":"true","creation_timestamp":"true","device_token":"true","general_configuration":"true","id":"true","last_delivery_timestamp":"true","last_survey_timestamp":"true","profile":"true","push_token":"true","server_configuration":"true","update_timestamp":"true"}}
bucket_count -1
column.name.delimiter ,
columns
id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp
columns.comments
columns.types
int:string:string:string:int:int:string:string:string:int:int
field.delim
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
line.delim location
s3a://.../tmp/rds_users_delta/dt=2022-04-02-1row/hh=00
name airflow.rds_users_delta
numFiles 1
numRows 1
partition_columns dt/hh
partition_columns.types string:string
rawDataSize 2507
serialization.ddl struct rds_users_delta { i32 id, string
device_token, string code, string push_token, i32 creation_timestamp, i32
update_timestamp, string general_configuration, string server_configuration,
string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp}
serialization.format
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
totalSize 888
transient_lastDdlTime 1649039842
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
EXTERNAL TRUE
bucket_count -1
bucketing_version 2
column.name.delimiter ,
columns
id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp
columns.comments
columns.types
int:string:string:string:int:int:string:string:string:int:int
field.delim
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
line.delim location
s3a://.../tmp/rds_users_delta
name airflow.rds_users_delta
partition_columns dt/hh
partition_columns.types string:string
serialization.ddl struct rds_users_delta { i32 id, string
device_token, string code, string push_token, i32 creation_timestamp, i32
update_timestamp, string general_configuration, string server_configuration,
string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp}
serialization.format
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1648974877
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: airflow.rds_users_delta
name: airflow.rds_users_delta
Truncated Path -> Alias:
s3a://.../tmp/rds_users_delta/dt=2022-04-02-1row/hh=00 [rud]
Map 2
Map Operator Tree:
TableScan
alias: users
Statistics: Num rows: 215192362 Data size: 23671159812 Basic
stats: COMPLETE Column stats: COMPLETE
GatherStats: false
Filter Operator
isSamplingPred: false
predicate: (device_token = 'blabla') (type: boolean)
Statistics: Num rows: 1 Data size: 110 Basic stats:
COMPLETE Column stats: COMPLETE
Select Operator
expressions: id (type: int)
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 4 Basic stats:
COMPLETE Column stats: COMPLETE
Reduce Output Operator
key expressions: _col0 (type: int)
null sort order: a
sort order: +
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 1 Data size: 4 Basic stats:
COMPLETE Column stats: COMPLETE
tag: 1
auto parallelism: true
Path -> Alias:
s3://.../default/users/dt=2022-04-01 [users]
Path -> Partition:
s3://.../default/users/dt=2022-04-01
Partition
base file name: dt=2022-04-01
input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
output format: org.apache.hadoop.hive.ql.io.RCFileOutputFormat
partition values:
dt 2022-04-01
properties:
COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"}
bucket_count -1
column.name.delimiter ,
columns
id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp,virtual_id,account_id
columns.comments
columns.types
int:string:string:string:int:int:string:string:string:int:int:string:string
file.inputformat
org.apache.hadoop.hive.ql.io.RCFileInputFormat
file.outputformat
org.apache.hadoop.hive.ql.io.RCFileOutputFormat
location s3://.../default/users/dt=2022-04-01
name default.users
numFiles 256
numRows 215192362
partition_columns dt
partition_columns.types string
rawDataSize 389210158991
serialization.ddl struct users { i32 id, string
device_token, string code, string push_token, i32 creation_timestamp, i32
update_timestamp, string general_configuration, string server_configuration,
string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp, string
virtual_id, string account_id}
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
totalSize 50190916884
transient_lastDdlTime 1648905779
serde:
org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.RCFileOutputFormat
properties:
EXTERNAL TRUE
bucket_count -1
column.name.delimiter ,
columns
id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp,virtual_id,account_id
columns.comments
columns.types
int:string:string:string:int:int:string:string:string:int:int:string:string
file.inputformat
org.apache.hadoop.hive.ql.io.RCFileInputFormat
file.outputformat
org.apache.hadoop.hive.ql.io.RCFileOutputFormat
last_modified_by hadoop
last_modified_time 1629880363
location s3://.../default/users
name default.users
partition_columns dt
partition_columns.types string
serialization.ddl struct users { i32 id, string
device_token, string code, string push_token, i32 creation_timestamp, i32
update_timestamp, string general_configuration, string server_configuration,
string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp, string
virtual_id, string account_id}
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
transient_lastDdlTime 1629880363
serde:
org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
name: default.users
name: default.users
Truncated Path -> Alias:
/users/dt=2022-04-01 [users]
Reducer 3
Needs Tagging: false
Reduce Operator Tree:
Map Join Operator
condition map:
Full Outer Join 0 to 1
Estimated key counts: Map 1 => 1
keys:
0 KEY.reducesinkkey0 (type: int)
1 KEY.reducesinkkey0 (type: int)
outputColumnNames: _col0, _col1
input vertices:
0 Map 1
Position of Big Table: 1
Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE
Column stats: COMPLETE
DynamicPartitionHashJoin: true
File Output Operator
compressed: true
GlobalTableId: 0
directory: hdfs://.../-ext-10002
NumFilesPerFileSink: 1
Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE
Column stats: COMPLETE
Stats Publishing Key Prefix: hdfs://.../-ext-10002/
table:
input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
properties:
columns _col0,_col1
columns.types int:int
escape.delim \
hive.serialization.extend.additional.nesting.levels true
serialization.escape.crlf true
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
TotalFiles: 1
GatherStats: false
MultiFileSpray: false {code}
was:
we hit a query which FULL JOINs two tables, hive produces incorrect results,
for a single value of join key, it produces two records, each record has a
valid value for one table and NULL for the other table.
The query is:
{code:java}
SET mapreduce.job.reduces=2;
SELECT d.id, u.id
FROM (
SELECT id
FROM airflow.tableA rud
WHERE rud.dt = '2022-04-02-1row'
) d
FULL JOIN (
SELECT id
FROM default.tableB
WHERE dt = '2022-04-01' and device_token='blabla'
) u
ON u.id = d.id
; {code}
According to the job log, the two reducers each get an input record, and output
a record.
And produces two records for id=350570497
{code:java}
350570497 NULL
NULL 350570497
Time taken: 62.692 seconds, Fetched: 2 row(s) {code}
I am sure tableB has only one row where device_token='blabla'
And we tried:
1, SET mapreduce.job.reduces=1; then it produces right result;
-2, SET hive.execution.engine=mr; then it produces right result;- mr also has
the issue.
3, JOIN (instead of FULL JOIN) worked as expected
4, in sub query u, change filter device_token='blabla' to id=350570497, it
worked ok
5, flatten the sub queries, then it works ok, like below:
{code:java}
SELECT d.id, u.id
from airflow.rds_users_delta d full join default.users u
on (u.id = d.id)
where d.dt = '2022-04-02-1row' and u.dt = '2022-04-01' and
u.device_token='blabla' {code}
Below is the explain output of the query:
{code:java}
Plan optimized by CBO.Vertex dependency in root stage
Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 2 (CUSTOM_SIMPLE_EDGE)Stage-0
Fetch Operator
limit:-1
Stage-1
Reducer 3
File Output Operator [FS_10]
Map Join Operator [MAPJOIN_13] (rows=2 width=8)
Conds:RS_6.KEY.reducesinkkey0=RS_7.KEY.reducesinkkey0(Outer),DynamicPartitionHashJoin:true,Output:["_col0","_col1"]
<-Map 1 [CUSTOM_SIMPLE_EDGE]
PARTITION_ONLY_SHUFFLE [RS_6]
PartitionCols:_col0
Select Operator [SEL_2] (rows=1 width=4)
Output:["_col0"]
TableScan [TS_0] (rows=1 width=4)
airflow@rds_users_delta,rud,Tbl:COMPLETE,Col:COMPLETE,Output:["id"]
<-Map 2 [CUSTOM_SIMPLE_EDGE]
PARTITION_ONLY_SHUFFLE [RS_7]
PartitionCols:_col0
Select Operator [SEL_5] (rows=1 width=4)
Output:["_col0"]
Filter Operator [FIL_12] (rows=1 width=110)
predicate:(device_token = 'blabla')
TableScan [TS_3] (rows=215192362 width=109)
default@users,users,Tbl:COMPLETE,Col:COMPLETE,Output:["id","device_token"]
{code}
I can't generate a small enough result set to reproduce the issue, I have
minimized the tableA to only 1 row, tableB has ~200m rows, but if I further
reduce the size of tableB, then the issue can't be reproduced.
Any suggestion would be highly appreciated, regarding the root cause of the
issue, how to work around it, or how to reproduce it with small enough dataset.
below is the log found in hive.log
{code:java}
220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17 : STAGE DEPENDENCIES:
Stage-1 is a root stage [MAPRED]
Stage-0 depends on stages: Stage-1 [FETCH]STAGE PLANS:
Stage: Stage-1
Tez
DagId: ec2-user_20220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17:1
Edges:
Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 2 (CUSTOM_SIMPLE_EDGE)
DagName: ec2-user_20220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17:1
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: rud
Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE
Column stats: COMPLETE
GatherStats: false
Select Operator
expressions: id (type: int)
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE
Column stats: COMPLETE
Reduce Output Operator
key expressions: _col0 (type: int)
null sort order: a
sort order: +
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 1 Data size: 4 Basic stats:
COMPLETE Column stats: COMPLETE
tag: 0
auto parallelism: true
Path -> Alias:
s3a://.../rds_users_delta/dt=2022-04-02-1row/hh=00 [rud]
Path -> Partition:
s3a://.../rds_users_delta/dt=2022-04-02-1row/hh=00
Partition
base file name: hh=00
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
partition values:
dt 2022-04-02-1row
hh 00
properties:
COLUMN_STATS_ACCURATE
{"BASIC_STATS":"true","COLUMN_STATS":{"code":"true","creation_timestamp":"true","device_token":"true","general_configuration":"true","id":"true","last_delivery_timestamp":"true","last_survey_timestamp":"true","profile":"true","push_token":"true","server_configuration":"true","update_timestamp":"true"}}
bucket_count -1
column.name.delimiter ,
columns
id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp
columns.comments
columns.types
int:string:string:string:int:int:string:string:string:int:int
field.delim
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
line.delim location
s3a://.../tmp/rds_users_delta/dt=2022-04-02-1row/hh=00
name airflow.rds_users_delta
numFiles 1
numRows 1
partition_columns dt/hh
partition_columns.types string:string
rawDataSize 2507
serialization.ddl struct rds_users_delta { i32 id, string
device_token, string code, string push_token, i32 creation_timestamp, i32
update_timestamp, string general_configuration, string server_configuration,
string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp}
serialization.format
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
totalSize 888
transient_lastDdlTime 1649039842
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
EXTERNAL TRUE
bucket_count -1
bucketing_version 2
column.name.delimiter ,
columns
id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp
columns.comments
columns.types
int:string:string:string:int:int:string:string:string:int:int
field.delim
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
line.delim location
s3a://.../tmp/rds_users_delta
name airflow.rds_users_delta
partition_columns dt/hh
partition_columns.types string:string
serialization.ddl struct rds_users_delta { i32 id, string
device_token, string code, string push_token, i32 creation_timestamp, i32
update_timestamp, string general_configuration, string server_configuration,
string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp}
serialization.format
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1648974877
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: airflow.rds_users_delta
name: airflow.rds_users_delta
Truncated Path -> Alias:
s3a://.../tmp/rds_users_delta/dt=2022-04-02-1row/hh=00 [rud]
Map 2
Map Operator Tree:
TableScan
alias: users
Statistics: Num rows: 215192362 Data size: 23671159812 Basic
stats: COMPLETE Column stats: COMPLETE
GatherStats: false
Filter Operator
isSamplingPred: false
predicate: (device_token = 'blabla') (type: boolean)
Statistics: Num rows: 1 Data size: 110 Basic stats:
COMPLETE Column stats: COMPLETE
Select Operator
expressions: id (type: int)
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 4 Basic stats:
COMPLETE Column stats: COMPLETE
Reduce Output Operator
key expressions: _col0 (type: int)
null sort order: a
sort order: +
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 1 Data size: 4 Basic stats:
COMPLETE Column stats: COMPLETE
tag: 1
auto parallelism: true
Path -> Alias:
s3://.../default/users/dt=2022-04-01 [users]
Path -> Partition:
s3://.../default/users/dt=2022-04-01
Partition
base file name: dt=2022-04-01
input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
output format: org.apache.hadoop.hive.ql.io.RCFileOutputFormat
partition values:
dt 2022-04-01
properties:
COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"}
bucket_count -1
column.name.delimiter ,
columns
id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp,virtual_id,account_id
columns.comments
columns.types
int:string:string:string:int:int:string:string:string:int:int:string:string
file.inputformat
org.apache.hadoop.hive.ql.io.RCFileInputFormat
file.outputformat
org.apache.hadoop.hive.ql.io.RCFileOutputFormat
location s3://.../default/users/dt=2022-04-01
name default.users
numFiles 256
numRows 215192362
partition_columns dt
partition_columns.types string
rawDataSize 389210158991
serialization.ddl struct users { i32 id, string
device_token, string code, string push_token, i32 creation_timestamp, i32
update_timestamp, string general_configuration, string server_configuration,
string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp, string
virtual_id, string account_id}
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
totalSize 50190916884
transient_lastDdlTime 1648905779
serde:
org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.RCFileOutputFormat
properties:
EXTERNAL TRUE
bucket_count -1
column.name.delimiter ,
columns
id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp,virtual_id,account_id
columns.comments
columns.types
int:string:string:string:int:int:string:string:string:int:int:string:string
file.inputformat
org.apache.hadoop.hive.ql.io.RCFileInputFormat
file.outputformat
org.apache.hadoop.hive.ql.io.RCFileOutputFormat
last_modified_by hadoop
last_modified_time 1629880363
location s3://.../default/users
name default.users
partition_columns dt
partition_columns.types string
serialization.ddl struct users { i32 id, string
device_token, string code, string push_token, i32 creation_timestamp, i32
update_timestamp, string general_configuration, string server_configuration,
string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp, string
virtual_id, string account_id}
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
transient_lastDdlTime 1629880363
serde:
org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
name: default.users
name: default.users
Truncated Path -> Alias:
/users/dt=2022-04-01 [users]
Reducer 3
Needs Tagging: false
Reduce Operator Tree:
Map Join Operator
condition map:
Full Outer Join 0 to 1
Estimated key counts: Map 1 => 1
keys:
0 KEY.reducesinkkey0 (type: int)
1 KEY.reducesinkkey0 (type: int)
outputColumnNames: _col0, _col1
input vertices:
0 Map 1
Position of Big Table: 1
Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE
Column stats: COMPLETE
DynamicPartitionHashJoin: true
File Output Operator
compressed: true
GlobalTableId: 0
directory: hdfs://.../-ext-10002
NumFilesPerFileSink: 1
Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE
Column stats: COMPLETE
Stats Publishing Key Prefix: hdfs://.../-ext-10002/
table:
input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
properties:
columns _col0,_col1
columns.types int:int
escape.delim \
hive.serialization.extend.additional.nesting.levels true
serialization.escape.crlf true
serialization.format 1
serialization.lib
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
TotalFiles: 1
GatherStats: false
MultiFileSpray: false {code}
> FULL JOIN returns incorrect result
> ----------------------------------
>
> Key: HIVE-26111
> URL: https://issues.apache.org/jira/browse/HIVE-26111
> Project: Hive
> Issue Type: Bug
> Environment: aws EMR (hive 3.1.2 + Tez 0.10.1)
> Reporter: Youjun Yuan
> Priority: Blocker
>
> we hit a query which FULL JOINs two tables, hive produces incorrect results,
> for a single value of join key, it produces two records, each record has a
> valid value for one table and NULL for the other table.
> The query is:
> {code:java}
> SET mapreduce.job.reduces=2;
> SELECT d.id, u.id
> FROM (
> SELECT id
> FROM airflow.tableA rud
> WHERE rud.dt = '2022-04-02-1row'
> ) d
> FULL JOIN (
> SELECT id
> FROM default.tableB
> WHERE dt = '2022-04-01' and device_token='blabla'
> ) u
> ON u.id = d.id
> ; {code}
> According to the job log, the two reducers each get an input record, and
> output a record.
> And produces two records for id=350570497
> {code:java}
> 350570497 NULL
> NULL 350570497
> Time taken: 62.692 seconds, Fetched: 2 row(s) {code}
> I am sure tableB has only one row where device_token='blabla'
> And we tried:
> 1, SET mapreduce.job.reduces=1; then it produces right result;
> -2, SET hive.execution.engine=mr; then it produces right result;- mr also has
> the issue.
> 3, JOIN (instead of FULL JOIN) worked as expected
> 4, in sub query u, change filter device_token='blabla' to id=350570497, it
> worked ok
> 5, flatten the sub queries, then it works ok, like below:
> {code:java}
> SELECT d.id, u.id
> from airflow.rds_users_delta d full join default.users u
> on (u.id = d.id)
> where d.dt = '2022-04-02-1row' and u.dt = '2022-04-01' and
> u.device_token='blabla' {code}
> Below is the explain output of the query:
> {code:java}
> Plan optimized by CBO.Vertex dependency in root stage
> Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 2 (CUSTOM_SIMPLE_EDGE)Stage-0
> Fetch Operator
> limit:-1
> Stage-1
> Reducer 3
> File Output Operator [FS_10]
> Map Join Operator [MAPJOIN_13] (rows=2 width=8)
>
> Conds:RS_6.KEY.reducesinkkey0=RS_7.KEY.reducesinkkey0(Outer),DynamicPartitionHashJoin:true,Output:["_col0","_col1"]
> <-Map 1 [CUSTOM_SIMPLE_EDGE]
> PARTITION_ONLY_SHUFFLE [RS_6]
> PartitionCols:_col0
> Select Operator [SEL_2] (rows=1 width=4)
> Output:["_col0"]
> TableScan [TS_0] (rows=1 width=4)
>
> airflow@rds_users_delta,rud,Tbl:COMPLETE,Col:COMPLETE,Output:["id"]
> <-Map 2 [CUSTOM_SIMPLE_EDGE]
> PARTITION_ONLY_SHUFFLE [RS_7]
> PartitionCols:_col0
> Select Operator [SEL_5] (rows=1 width=4)
> Output:["_col0"]
> Filter Operator [FIL_12] (rows=1 width=110)
> predicate:(device_token = 'blabla')
> TableScan [TS_3] (rows=215192362 width=109)
>
> default@users,users,Tbl:COMPLETE,Col:COMPLETE,Output:["id","device_token"]
> {code}
> I can't generate a small enough result set to reproduce the issue, I have
> minimized the tableA to only 1 row, tableB has ~200m rows, but if I further
> reduce the size of tableB, then the issue can't be reproduced.
> Any suggestion would be highly appreciated, regarding the root cause of the
> issue, how to work around it, or how to reproduce it with small enough
> dataset.
>
> below is the log found in hive.log
> {code:java}
> 220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17 : STAGE DEPENDENCIES:
> Stage-1 is a root stage [MAPRED]
> Stage-0 depends on stages: Stage-1 [FETCH]STAGE PLANS:
> Stage: Stage-1
> Tez
> DagId: ec2-user_20220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17:1
> Edges:
> Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 2 (CUSTOM_SIMPLE_EDGE)
> DagName: ec2-user_20220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17:1
> Vertices:
> Map 1
> Map Operator Tree:
> TableScan
> alias: rud
> Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE
> Column stats: COMPLETE
> GatherStats: false
> Select Operator
> expressions: id (type: int)
> outputColumnNames: _col0
> Statistics: Num rows: 1 Data size: 4 Basic stats:
> COMPLETE Column stats: COMPLETE
> Reduce Output Operator
> key expressions: _col0 (type: int)
> null sort order: a
> sort order: +
> Map-reduce partition columns: _col0 (type: int)
> Statistics: Num rows: 1 Data size: 4 Basic stats:
> COMPLETE Column stats: COMPLETE
> tag: 0
> auto parallelism: true
> Path -> Alias:
> s3a://.../rds_users_delta/dt=2022-04-02-1row/hh=00 [rud]
> Path -> Partition:
> s3a://.../rds_users_delta/dt=2022-04-02-1row/hh=00
> Partition
> base file name: hh=00
> input format: org.apache.hadoop.mapred.TextInputFormat
> output format:
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> partition values:
> dt 2022-04-02-1row
> hh 00
> properties:
> COLUMN_STATS_ACCURATE
> {"BASIC_STATS":"true","COLUMN_STATS":{"code":"true","creation_timestamp":"true","device_token":"true","general_configuration":"true","id":"true","last_delivery_timestamp":"true","last_survey_timestamp":"true","profile":"true","push_token":"true","server_configuration":"true","update_timestamp":"true"}}
> bucket_count -1
> column.name.delimiter ,
> columns
> id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp
> columns.comments
> columns.types
> int:string:string:string:int:int:string:string:string:int:int
> field.delim
> file.inputformat org.apache.hadoop.mapred.TextInputFormat
> file.outputformat
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> line.delim location
> s3a://.../tmp/rds_users_delta/dt=2022-04-02-1row/hh=00
> name airflow.rds_users_delta
> numFiles 1
> numRows 1
> partition_columns dt/hh
> partition_columns.types string:string
> rawDataSize 2507
> serialization.ddl struct rds_users_delta { i32 id, string
> device_token, string code, string push_token, i32 creation_timestamp, i32
> update_timestamp, string general_configuration, string server_configuration,
> string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp}
> serialization.format
> serialization.lib
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> totalSize 888
> transient_lastDdlTime 1649039842
> serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> input format: org.apache.hadoop.mapred.TextInputFormat
> output format:
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> properties:
> EXTERNAL TRUE
> bucket_count -1
> bucketing_version 2
> column.name.delimiter ,
> columns
> id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp
> columns.comments
> columns.types
> int:string:string:string:int:int:string:string:string:int:int
> field.delim
> file.inputformat
> org.apache.hadoop.mapred.TextInputFormat
> file.outputformat
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> line.delim location
> s3a://.../tmp/rds_users_delta
> name airflow.rds_users_delta
> partition_columns dt/hh
> partition_columns.types string:string
> serialization.ddl struct rds_users_delta { i32 id,
> string device_token, string code, string push_token, i32 creation_timestamp,
> i32 update_timestamp, string general_configuration, string
> server_configuration, string profile, i32 last_delivery_timestamp, i32
> last_survey_timestamp}
> serialization.format
> serialization.lib
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> transient_lastDdlTime 1648974877
> serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> name: airflow.rds_users_delta
> name: airflow.rds_users_delta
> Truncated Path -> Alias:
> s3a://.../tmp/rds_users_delta/dt=2022-04-02-1row/hh=00 [rud]
> Map 2
> Map Operator Tree:
> TableScan
> alias: users
> Statistics: Num rows: 215192362 Data size: 23671159812
> Basic stats: COMPLETE Column stats: COMPLETE
> GatherStats: false
> Filter Operator
> isSamplingPred: false
> predicate: (device_token = 'blabla') (type: boolean)
> Statistics: Num rows: 1 Data size: 110 Basic stats:
> COMPLETE Column stats: COMPLETE
> Select Operator
> expressions: id (type: int)
> outputColumnNames: _col0
> Statistics: Num rows: 1 Data size: 4 Basic stats:
> COMPLETE Column stats: COMPLETE
> Reduce Output Operator
> key expressions: _col0 (type: int)
> null sort order: a
> sort order: +
> Map-reduce partition columns: _col0 (type: int)
> Statistics: Num rows: 1 Data size: 4 Basic stats:
> COMPLETE Column stats: COMPLETE
> tag: 1
> auto parallelism: true
> Path -> Alias:
> s3://.../default/users/dt=2022-04-01 [users]
> Path -> Partition:
> s3://.../default/users/dt=2022-04-01
> Partition
> base file name: dt=2022-04-01
> input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
> output format:
> org.apache.hadoop.hive.ql.io.RCFileOutputFormat
> partition values:
> dt 2022-04-01
> properties:
> COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"}
> bucket_count -1
> column.name.delimiter ,
> columns
> id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp,virtual_id,account_id
> columns.comments
> columns.types
> int:string:string:string:int:int:string:string:string:int:int:string:string
> file.inputformat
> org.apache.hadoop.hive.ql.io.RCFileInputFormat
> file.outputformat
> org.apache.hadoop.hive.ql.io.RCFileOutputFormat
> location s3://.../default/users/dt=2022-04-01
> name default.users
> numFiles 256
> numRows 215192362
> partition_columns dt
> partition_columns.types string
> rawDataSize 389210158991
> serialization.ddl struct users { i32 id, string
> device_token, string code, string push_token, i32 creation_timestamp, i32
> update_timestamp, string general_configuration, string server_configuration,
> string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp,
> string virtual_id, string account_id}
> serialization.format 1
> serialization.lib
> org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
> totalSize 50190916884
> transient_lastDdlTime 1648905779
> serde:
> org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
> input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
> output format:
> org.apache.hadoop.hive.ql.io.RCFileOutputFormat
> properties:
> EXTERNAL TRUE
> bucket_count -1
> column.name.delimiter ,
> columns
> id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp,virtual_id,account_id
> columns.comments
> columns.types
> int:string:string:string:int:int:string:string:string:int:int:string:string
> file.inputformat
> org.apache.hadoop.hive.ql.io.RCFileInputFormat
> file.outputformat
> org.apache.hadoop.hive.ql.io.RCFileOutputFormat
> last_modified_by hadoop
> last_modified_time 1629880363
> location s3://.../default/users
> name default.users
> partition_columns dt
> partition_columns.types string
> serialization.ddl struct users { i32 id, string
> device_token, string code, string push_token, i32 creation_timestamp, i32
> update_timestamp, string general_configuration, string server_configuration,
> string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp,
> string virtual_id, string account_id}
> serialization.format 1
> serialization.lib
> org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
> transient_lastDdlTime 1629880363
> serde:
> org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
> name: default.users
> name: default.users
> Truncated Path -> Alias:
> /users/dt=2022-04-01 [users]
> Reducer 3
> Needs Tagging: false
> Reduce Operator Tree:
> Map Join Operator
> condition map:
> Full Outer Join 0 to 1
> Estimated key counts: Map 1 => 1
> keys:
> 0 KEY.reducesinkkey0 (type: int)
> 1 KEY.reducesinkkey0 (type: int)
> outputColumnNames: _col0, _col1
> input vertices:
> 0 Map 1
> Position of Big Table: 1
> Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE
> Column stats: COMPLETE
> DynamicPartitionHashJoin: true
> File Output Operator
> compressed: true
> GlobalTableId: 0
> directory: hdfs://.../-ext-10002
> NumFilesPerFileSink: 1
> Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE
> Column stats: COMPLETE
> Stats Publishing Key Prefix: hdfs://.../-ext-10002/
> table:
> input format:
> org.apache.hadoop.mapred.SequenceFileInputFormat
> output format:
> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
> properties:
> columns _col0,_col1
> columns.types int:int
> escape.delim \
> hive.serialization.extend.additional.nesting.levels
> true
> serialization.escape.crlf true
> serialization.format 1
> serialization.lib
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> serde:
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> TotalFiles: 1
> GatherStats: false
> MultiFileSpray: false {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)