[ 
https://issues.apache.org/jira/browse/HIVE-26111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Youjun Yuan updated HIVE-26111:
-------------------------------
    Description: 
we hit a query which FULL JOINs two tables, hive produces incorrect results, 
for a single value of join key, it produces two records, each record has a 
valid value for one table and NULL for the other table.

The query is:
{code:java}
SET mapreduce.job.reduces=2;
SELECT d.id, u.id
FROM (
       SELECT id
       FROM   airflow.tableA rud
       WHERE  rud.dt = '2022-04-02-1row'
) d
FULL JOIN (
       SELECT id
       FROM   default.tableB
       WHERE  dt = '2022-04-01' and device_token='blabla'
 ) u
ON u.id = d.id
; {code}
According to the job log, the two reducers each get an input record, and output 
a record.

And produces two records for id=350570497
{code:java}
350570497    NULL
NULL    350570497
Time taken: 62.692 seconds, Fetched: 2 row(s) {code}
I am sure tableB has only one row where device_token='blabla'

And we tried:

1, SET mapreduce.job.reduces=1; then it produces right result;

-2, SET hive.execution.engine=mr; then it produces right result;- mr also has 
the issue.

3, JOIN (instead of FULL JOIN) worked as expected

4, in sub query u, change filter device_token='blabla' to id=350570497, it 
worked ok

5, flatten the sub queries, then it works ok, like below:
{code:java}
SELECT  d.id, u.id 
from airflow.rds_users_delta d full join default.users u
on (u.id = d.id)
where d.dt = '2022-04-02-1row' and u.dt = '2022-04-01' and 
u.device_token='blabla' {code}
Below is the explain output of the query:
{code:java}
Plan optimized by CBO.Vertex dependency in root stage
Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 2 (CUSTOM_SIMPLE_EDGE)Stage-0
  Fetch Operator
    limit:-1
    Stage-1
      Reducer 3
      File Output Operator [FS_10]
        Map Join Operator [MAPJOIN_13] (rows=2 width=8)
          
Conds:RS_6.KEY.reducesinkkey0=RS_7.KEY.reducesinkkey0(Outer),DynamicPartitionHashJoin:true,Output:["_col0","_col1"]
        <-Map 1 [CUSTOM_SIMPLE_EDGE]
          PARTITION_ONLY_SHUFFLE [RS_6]
            PartitionCols:_col0
            Select Operator [SEL_2] (rows=1 width=4)
              Output:["_col0"]
              TableScan [TS_0] (rows=1 width=4)
                
airflow@rds_users_delta,rud,Tbl:COMPLETE,Col:COMPLETE,Output:["id"]
        <-Map 2 [CUSTOM_SIMPLE_EDGE]
          PARTITION_ONLY_SHUFFLE [RS_7]
            PartitionCols:_col0
            Select Operator [SEL_5] (rows=1 width=4)
              Output:["_col0"]
              Filter Operator [FIL_12] (rows=1 width=110)
                predicate:(device_token = 'blabla')
                TableScan [TS_3] (rows=215192362 width=109)
                  
default@users,users,Tbl:COMPLETE,Col:COMPLETE,Output:["id","device_token"]  
{code}
I can't generate a small enough result set to reproduce the issue, I have 
minimized the tableA to only 1 row, tableB has ~200m rows, but if I further 
reduce the size of tableB, then the issue can't be reproduced.

Any suggestion would be highly appreciated, regarding the root cause of the 
issue, how to work around it, or how to reproduce it with small enough dataset. 

 

below is the log found in hive.log
{code:java}
220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17 : STAGE DEPENDENCIES:
  Stage-1 is a root stage [MAPRED]
  Stage-0 depends on stages: Stage-1 [FETCH]STAGE PLANS:
  Stage: Stage-1
    Tez
      DagId: ec2-user_20220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17:1
      Edges:
        Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 2 (CUSTOM_SIMPLE_EDGE)
      DagName: ec2-user_20220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17:1
      Vertices:
        Map 1
            Map Operator Tree:
                TableScan
                  alias: rud
                  Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE 
Column stats: COMPLETE
                  GatherStats: false
                  Select Operator
                    expressions: id (type: int)
                    outputColumnNames: _col0
                    Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE 
Column stats: COMPLETE
                    Reduce Output Operator
                      key expressions: _col0 (type: int)
                      null sort order: a
                      sort order: +
                      Map-reduce partition columns: _col0 (type: int)
                      Statistics: Num rows: 1 Data size: 4 Basic stats: 
COMPLETE Column stats: COMPLETE
                      tag: 0
                      auto parallelism: true
            Path -> Alias:
              s3a://.../rds_users_delta/dt=2022-04-02-1row/hh=00 [rud]
            Path -> Partition:
              s3a://.../rds_users_delta/dt=2022-04-02-1row/hh=00
                Partition
                  base file name: hh=00
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                  partition values:
                    dt 2022-04-02-1row
                    hh 00
                  properties:
                    COLUMN_STATS_ACCURATE 
{"BASIC_STATS":"true","COLUMN_STATS":{"code":"true","creation_timestamp":"true","device_token":"true","general_configuration":"true","id":"true","last_delivery_timestamp":"true","last_survey_timestamp":"true","profile":"true","push_token":"true","server_configuration":"true","update_timestamp":"true"}}
                    bucket_count -1
                    column.name.delimiter ,
                    columns 
id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp
                    columns.comments
                    columns.types 
int:string:string:string:int:int:string:string:string:int:int
                    field.delim
                    file.inputformat org.apache.hadoop.mapred.TextInputFormat
                    file.outputformat 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                    line.delim                    location 
s3a://.../tmp/rds_users_delta/dt=2022-04-02-1row/hh=00
                    name airflow.rds_users_delta
                    numFiles 1
                    numRows 1
                    partition_columns dt/hh
                    partition_columns.types string:string
                    rawDataSize 2507
                    serialization.ddl struct rds_users_delta { i32 id, string 
device_token, string code, string push_token, i32 creation_timestamp, i32 
update_timestamp, string general_configuration, string server_configuration, 
string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp}
                    serialization.format
                    serialization.lib 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                    totalSize 888
                    transient_lastDdlTime 1649039842
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe     
               input format: org.apache.hadoop.mapred.TextInputFormat
                    output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                    properties:
                      EXTERNAL TRUE
                      bucket_count -1
                      bucketing_version 2
                      column.name.delimiter ,
                      columns 
id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp
                      columns.comments
                      columns.types 
int:string:string:string:int:int:string:string:string:int:int
                      field.delim
                      file.inputformat org.apache.hadoop.mapred.TextInputFormat
                      file.outputformat 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                      line.delim                      location 
s3a://.../tmp/rds_users_delta
                      name airflow.rds_users_delta
                      partition_columns dt/hh
                      partition_columns.types string:string
                      serialization.ddl struct rds_users_delta { i32 id, string 
device_token, string code, string push_token, i32 creation_timestamp, i32 
update_timestamp, string general_configuration, string server_configuration, 
string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp}
                      serialization.format
                      serialization.lib 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                      transient_lastDdlTime 1648974877
                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                    name: airflow.rds_users_delta
                  name: airflow.rds_users_delta
            Truncated Path -> Alias:
              s3a://.../tmp/rds_users_delta/dt=2022-04-02-1row/hh=00 [rud]
        Map 2
            Map Operator Tree:
                TableScan
                  alias: users
                  Statistics: Num rows: 215192362 Data size: 23671159812 Basic 
stats: COMPLETE Column stats: COMPLETE
                  GatherStats: false
                  Filter Operator
                    isSamplingPred: false
                    predicate: (device_token = 'blabla') (type: boolean)
                    Statistics: Num rows: 1 Data size: 110 Basic stats: 
COMPLETE Column stats: COMPLETE
                    Select Operator
                      expressions: id (type: int)
                      outputColumnNames: _col0
                      Statistics: Num rows: 1 Data size: 4 Basic stats: 
COMPLETE Column stats: COMPLETE
                      Reduce Output Operator
                        key expressions: _col0 (type: int)
                        null sort order: a
                        sort order: +
                        Map-reduce partition columns: _col0 (type: int)
                        Statistics: Num rows: 1 Data size: 4 Basic stats: 
COMPLETE Column stats: COMPLETE
                        tag: 1
                        auto parallelism: true
            Path -> Alias:
              s3://.../default/users/dt=2022-04-01 [users]
            Path -> Partition:
              s3://.../default/users/dt=2022-04-01
                Partition
                  base file name: dt=2022-04-01
                  input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
                  output format: org.apache.hadoop.hive.ql.io.RCFileOutputFormat
                  partition values:
                    dt 2022-04-01
                  properties:
                    COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"}
                    bucket_count -1
                    column.name.delimiter ,
                    columns 
id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp,virtual_id,account_id
                    columns.comments
                    columns.types 
int:string:string:string:int:int:string:string:string:int:int:string:string
                    file.inputformat 
org.apache.hadoop.hive.ql.io.RCFileInputFormat
                    file.outputformat 
org.apache.hadoop.hive.ql.io.RCFileOutputFormat
                    location s3://.../default/users/dt=2022-04-01
                    name default.users
                    numFiles 256
                    numRows 215192362
                    partition_columns dt
                    partition_columns.types string
                    rawDataSize 389210158991
                    serialization.ddl struct users { i32 id, string 
device_token, string code, string push_token, i32 creation_timestamp, i32 
update_timestamp, string general_configuration, string server_configuration, 
string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp, string 
virtual_id, string account_id}
                    serialization.format 1
                    serialization.lib 
org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
                    totalSize 50190916884
                    transient_lastDdlTime 1648905779
                  serde: 
org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe                  
  input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
                    output format: 
org.apache.hadoop.hive.ql.io.RCFileOutputFormat
                    properties:
                      EXTERNAL TRUE
                      bucket_count -1
                      column.name.delimiter ,
                      columns 
id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp,virtual_id,account_id
                      columns.comments
                      columns.types 
int:string:string:string:int:int:string:string:string:int:int:string:string
                      file.inputformat 
org.apache.hadoop.hive.ql.io.RCFileInputFormat
                      file.outputformat 
org.apache.hadoop.hive.ql.io.RCFileOutputFormat
                      last_modified_by hadoop
                      last_modified_time 1629880363
                      location s3://.../default/users
                      name default.users
                      partition_columns dt
                      partition_columns.types string
                      serialization.ddl struct users { i32 id, string 
device_token, string code, string push_token, i32 creation_timestamp, i32 
update_timestamp, string general_configuration, string server_configuration, 
string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp, string 
virtual_id, string account_id}
                      serialization.format 1
                      serialization.lib 
org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
                      transient_lastDdlTime 1629880363
                    serde: 
org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
                    name: default.users
                  name: default.users
            Truncated Path -> Alias:
              /users/dt=2022-04-01 [users]
        Reducer 3
            Needs Tagging: false
            Reduce Operator Tree:
              Map Join Operator
                condition map:
                     Full Outer Join 0 to 1
                Estimated key counts: Map 1 => 1
                keys:
                  0 KEY.reducesinkkey0 (type: int)
                  1 KEY.reducesinkkey0 (type: int)
                outputColumnNames: _col0, _col1
                input vertices:
                  0 Map 1
                Position of Big Table: 1
                Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE 
Column stats: COMPLETE
                DynamicPartitionHashJoin: true
                File Output Operator
                  compressed: true
                  GlobalTableId: 0
                  directory: hdfs://.../-ext-10002
                  NumFilesPerFileSink: 1
                  Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE 
Column stats: COMPLETE
                  Stats Publishing Key Prefix: hdfs://.../-ext-10002/
                  table:
                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                      properties:
                        columns _col0,_col1
                        columns.types int:int
                        escape.delim \
                        hive.serialization.extend.additional.nesting.levels true
                        serialization.escape.crlf true
                        serialization.format 1
                        serialization.lib 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                  TotalFiles: 1
                  GatherStats: false
                  MultiFileSpray: false {code}
 

  was:
we hit a query which FULL JOINs two tables, hive produces incorrect results, 
for a single value of join key, it produces two records, each record has a 
valid value for one table and NULL for the other table.

The query is:
{code:java}
SET mapreduce.job.reduces=2;
SELECT d.id, u.id
FROM (
       SELECT id
       FROM   airflow.tableA rud
       WHERE  rud.dt = '2022-04-02-1row'
) d
FULL JOIN (
       SELECT id
       FROM   default.tableB
       WHERE  dt = '2022-04-01' and device_token='blabla'
 ) u
ON u.id = d.id
; {code}
According to the job log, the two reducers each get an input record, and output 
a record.

And produces two records for id=350570497
{code:java}
350570497    NULL
NULL    350570497
Time taken: 62.692 seconds, Fetched: 2 row(s) {code}
I am sure tableB has only one row where device_token='blabla'

And we tried:

1, SET mapreduce.job.reduces=1; then it produces right result;

-2, SET hive.execution.engine=mr; then it produces right result;- mr also has 
the issue.

3, JOIN (instead of FULL JOIN) worked as expected

4, in sub query u, change filter device_token='blabla' to id=350570497, it 
worked ok

Below is the explain output of the query:
{code:java}
Plan optimized by CBO.Vertex dependency in root stage
Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 2 (CUSTOM_SIMPLE_EDGE)Stage-0
  Fetch Operator
    limit:-1
    Stage-1
      Reducer 3
      File Output Operator [FS_10]
        Map Join Operator [MAPJOIN_13] (rows=2 width=8)
          
Conds:RS_6.KEY.reducesinkkey0=RS_7.KEY.reducesinkkey0(Outer),DynamicPartitionHashJoin:true,Output:["_col0","_col1"]
        <-Map 1 [CUSTOM_SIMPLE_EDGE]
          PARTITION_ONLY_SHUFFLE [RS_6]
            PartitionCols:_col0
            Select Operator [SEL_2] (rows=1 width=4)
              Output:["_col0"]
              TableScan [TS_0] (rows=1 width=4)
                
airflow@rds_users_delta,rud,Tbl:COMPLETE,Col:COMPLETE,Output:["id"]
        <-Map 2 [CUSTOM_SIMPLE_EDGE]
          PARTITION_ONLY_SHUFFLE [RS_7]
            PartitionCols:_col0
            Select Operator [SEL_5] (rows=1 width=4)
              Output:["_col0"]
              Filter Operator [FIL_12] (rows=1 width=110)
                predicate:(device_token = 'blabla')
                TableScan [TS_3] (rows=215192362 width=109)
                  
default@users,users,Tbl:COMPLETE,Col:COMPLETE,Output:["id","device_token"]  
{code}
I can't generate a small enough result set to reproduce the issue, I have 
minimized the tableA to only 1 row, tableB has ~200m rows, but if I further 
reduce the size of tableB, then the issue can't be reproduced.

Any suggestion would be highly appreciated, regarding the root cause of the 
issue, how to work around it, or how to reproduce it with small enough dataset. 

 

below is the log found in hive.log
{code:java}
220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17 : STAGE DEPENDENCIES:
  Stage-1 is a root stage [MAPRED]
  Stage-0 depends on stages: Stage-1 [FETCH]STAGE PLANS:
  Stage: Stage-1
    Tez
      DagId: ec2-user_20220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17:1
      Edges:
        Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 2 (CUSTOM_SIMPLE_EDGE)
      DagName: ec2-user_20220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17:1
      Vertices:
        Map 1
            Map Operator Tree:
                TableScan
                  alias: rud
                  Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE 
Column stats: COMPLETE
                  GatherStats: false
                  Select Operator
                    expressions: id (type: int)
                    outputColumnNames: _col0
                    Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE 
Column stats: COMPLETE
                    Reduce Output Operator
                      key expressions: _col0 (type: int)
                      null sort order: a
                      sort order: +
                      Map-reduce partition columns: _col0 (type: int)
                      Statistics: Num rows: 1 Data size: 4 Basic stats: 
COMPLETE Column stats: COMPLETE
                      tag: 0
                      auto parallelism: true
            Path -> Alias:
              s3a://.../rds_users_delta/dt=2022-04-02-1row/hh=00 [rud]
            Path -> Partition:
              s3a://.../rds_users_delta/dt=2022-04-02-1row/hh=00
                Partition
                  base file name: hh=00
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                  partition values:
                    dt 2022-04-02-1row
                    hh 00
                  properties:
                    COLUMN_STATS_ACCURATE 
{"BASIC_STATS":"true","COLUMN_STATS":{"code":"true","creation_timestamp":"true","device_token":"true","general_configuration":"true","id":"true","last_delivery_timestamp":"true","last_survey_timestamp":"true","profile":"true","push_token":"true","server_configuration":"true","update_timestamp":"true"}}
                    bucket_count -1
                    column.name.delimiter ,
                    columns 
id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp
                    columns.comments
                    columns.types 
int:string:string:string:int:int:string:string:string:int:int
                    field.delim
                    file.inputformat org.apache.hadoop.mapred.TextInputFormat
                    file.outputformat 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                    line.delim                    location 
s3a://.../tmp/rds_users_delta/dt=2022-04-02-1row/hh=00
                    name airflow.rds_users_delta
                    numFiles 1
                    numRows 1
                    partition_columns dt/hh
                    partition_columns.types string:string
                    rawDataSize 2507
                    serialization.ddl struct rds_users_delta { i32 id, string 
device_token, string code, string push_token, i32 creation_timestamp, i32 
update_timestamp, string general_configuration, string server_configuration, 
string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp}
                    serialization.format
                    serialization.lib 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                    totalSize 888
                    transient_lastDdlTime 1649039842
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe     
               input format: org.apache.hadoop.mapred.TextInputFormat
                    output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                    properties:
                      EXTERNAL TRUE
                      bucket_count -1
                      bucketing_version 2
                      column.name.delimiter ,
                      columns 
id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp
                      columns.comments
                      columns.types 
int:string:string:string:int:int:string:string:string:int:int
                      field.delim
                      file.inputformat org.apache.hadoop.mapred.TextInputFormat
                      file.outputformat 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                      line.delim                      location 
s3a://.../tmp/rds_users_delta
                      name airflow.rds_users_delta
                      partition_columns dt/hh
                      partition_columns.types string:string
                      serialization.ddl struct rds_users_delta { i32 id, string 
device_token, string code, string push_token, i32 creation_timestamp, i32 
update_timestamp, string general_configuration, string server_configuration, 
string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp}
                      serialization.format
                      serialization.lib 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                      transient_lastDdlTime 1648974877
                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                    name: airflow.rds_users_delta
                  name: airflow.rds_users_delta
            Truncated Path -> Alias:
              s3a://.../tmp/rds_users_delta/dt=2022-04-02-1row/hh=00 [rud]
        Map 2
            Map Operator Tree:
                TableScan
                  alias: users
                  Statistics: Num rows: 215192362 Data size: 23671159812 Basic 
stats: COMPLETE Column stats: COMPLETE
                  GatherStats: false
                  Filter Operator
                    isSamplingPred: false
                    predicate: (device_token = 'blabla') (type: boolean)
                    Statistics: Num rows: 1 Data size: 110 Basic stats: 
COMPLETE Column stats: COMPLETE
                    Select Operator
                      expressions: id (type: int)
                      outputColumnNames: _col0
                      Statistics: Num rows: 1 Data size: 4 Basic stats: 
COMPLETE Column stats: COMPLETE
                      Reduce Output Operator
                        key expressions: _col0 (type: int)
                        null sort order: a
                        sort order: +
                        Map-reduce partition columns: _col0 (type: int)
                        Statistics: Num rows: 1 Data size: 4 Basic stats: 
COMPLETE Column stats: COMPLETE
                        tag: 1
                        auto parallelism: true
            Path -> Alias:
              s3://.../default/users/dt=2022-04-01 [users]
            Path -> Partition:
              s3://.../default/users/dt=2022-04-01
                Partition
                  base file name: dt=2022-04-01
                  input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
                  output format: org.apache.hadoop.hive.ql.io.RCFileOutputFormat
                  partition values:
                    dt 2022-04-01
                  properties:
                    COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"}
                    bucket_count -1
                    column.name.delimiter ,
                    columns 
id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp,virtual_id,account_id
                    columns.comments
                    columns.types 
int:string:string:string:int:int:string:string:string:int:int:string:string
                    file.inputformat 
org.apache.hadoop.hive.ql.io.RCFileInputFormat
                    file.outputformat 
org.apache.hadoop.hive.ql.io.RCFileOutputFormat
                    location s3://.../default/users/dt=2022-04-01
                    name default.users
                    numFiles 256
                    numRows 215192362
                    partition_columns dt
                    partition_columns.types string
                    rawDataSize 389210158991
                    serialization.ddl struct users { i32 id, string 
device_token, string code, string push_token, i32 creation_timestamp, i32 
update_timestamp, string general_configuration, string server_configuration, 
string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp, string 
virtual_id, string account_id}
                    serialization.format 1
                    serialization.lib 
org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
                    totalSize 50190916884
                    transient_lastDdlTime 1648905779
                  serde: 
org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe                  
  input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
                    output format: 
org.apache.hadoop.hive.ql.io.RCFileOutputFormat
                    properties:
                      EXTERNAL TRUE
                      bucket_count -1
                      column.name.delimiter ,
                      columns 
id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp,virtual_id,account_id
                      columns.comments
                      columns.types 
int:string:string:string:int:int:string:string:string:int:int:string:string
                      file.inputformat 
org.apache.hadoop.hive.ql.io.RCFileInputFormat
                      file.outputformat 
org.apache.hadoop.hive.ql.io.RCFileOutputFormat
                      last_modified_by hadoop
                      last_modified_time 1629880363
                      location s3://.../default/users
                      name default.users
                      partition_columns dt
                      partition_columns.types string
                      serialization.ddl struct users { i32 id, string 
device_token, string code, string push_token, i32 creation_timestamp, i32 
update_timestamp, string general_configuration, string server_configuration, 
string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp, string 
virtual_id, string account_id}
                      serialization.format 1
                      serialization.lib 
org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
                      transient_lastDdlTime 1629880363
                    serde: 
org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
                    name: default.users
                  name: default.users
            Truncated Path -> Alias:
              /users/dt=2022-04-01 [users]
        Reducer 3
            Needs Tagging: false
            Reduce Operator Tree:
              Map Join Operator
                condition map:
                     Full Outer Join 0 to 1
                Estimated key counts: Map 1 => 1
                keys:
                  0 KEY.reducesinkkey0 (type: int)
                  1 KEY.reducesinkkey0 (type: int)
                outputColumnNames: _col0, _col1
                input vertices:
                  0 Map 1
                Position of Big Table: 1
                Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE 
Column stats: COMPLETE
                DynamicPartitionHashJoin: true
                File Output Operator
                  compressed: true
                  GlobalTableId: 0
                  directory: hdfs://.../-ext-10002
                  NumFilesPerFileSink: 1
                  Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE 
Column stats: COMPLETE
                  Stats Publishing Key Prefix: hdfs://.../-ext-10002/
                  table:
                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                      properties:
                        columns _col0,_col1
                        columns.types int:int
                        escape.delim \
                        hive.serialization.extend.additional.nesting.levels true
                        serialization.escape.crlf true
                        serialization.format 1
                        serialization.lib 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                  TotalFiles: 1
                  GatherStats: false
                  MultiFileSpray: false {code}
 


> FULL JOIN returns incorrect result
> ----------------------------------
>
>                 Key: HIVE-26111
>                 URL: https://issues.apache.org/jira/browse/HIVE-26111
>             Project: Hive
>          Issue Type: Bug
>         Environment: aws EMR (hive 3.1.2 + Tez 0.10.1)
>            Reporter: Youjun Yuan
>            Priority: Blocker
>
> we hit a query which FULL JOINs two tables, hive produces incorrect results, 
> for a single value of join key, it produces two records, each record has a 
> valid value for one table and NULL for the other table.
> The query is:
> {code:java}
> SET mapreduce.job.reduces=2;
> SELECT d.id, u.id
> FROM (
>        SELECT id
>        FROM   airflow.tableA rud
>        WHERE  rud.dt = '2022-04-02-1row'
> ) d
> FULL JOIN (
>        SELECT id
>        FROM   default.tableB
>        WHERE  dt = '2022-04-01' and device_token='blabla'
>  ) u
> ON u.id = d.id
> ; {code}
> According to the job log, the two reducers each get an input record, and 
> output a record.
> And produces two records for id=350570497
> {code:java}
> 350570497    NULL
> NULL    350570497
> Time taken: 62.692 seconds, Fetched: 2 row(s) {code}
> I am sure tableB has only one row where device_token='blabla'
> And we tried:
> 1, SET mapreduce.job.reduces=1; then it produces right result;
> -2, SET hive.execution.engine=mr; then it produces right result;- mr also has 
> the issue.
> 3, JOIN (instead of FULL JOIN) worked as expected
> 4, in sub query u, change filter device_token='blabla' to id=350570497, it 
> worked ok
> 5, flatten the sub queries, then it works ok, like below:
> {code:java}
> SELECT  d.id, u.id 
> from airflow.rds_users_delta d full join default.users u
> on (u.id = d.id)
> where d.dt = '2022-04-02-1row' and u.dt = '2022-04-01' and 
> u.device_token='blabla' {code}
> Below is the explain output of the query:
> {code:java}
> Plan optimized by CBO.Vertex dependency in root stage
> Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 2 (CUSTOM_SIMPLE_EDGE)Stage-0
>   Fetch Operator
>     limit:-1
>     Stage-1
>       Reducer 3
>       File Output Operator [FS_10]
>         Map Join Operator [MAPJOIN_13] (rows=2 width=8)
>           
> Conds:RS_6.KEY.reducesinkkey0=RS_7.KEY.reducesinkkey0(Outer),DynamicPartitionHashJoin:true,Output:["_col0","_col1"]
>         <-Map 1 [CUSTOM_SIMPLE_EDGE]
>           PARTITION_ONLY_SHUFFLE [RS_6]
>             PartitionCols:_col0
>             Select Operator [SEL_2] (rows=1 width=4)
>               Output:["_col0"]
>               TableScan [TS_0] (rows=1 width=4)
>                 
> airflow@rds_users_delta,rud,Tbl:COMPLETE,Col:COMPLETE,Output:["id"]
>         <-Map 2 [CUSTOM_SIMPLE_EDGE]
>           PARTITION_ONLY_SHUFFLE [RS_7]
>             PartitionCols:_col0
>             Select Operator [SEL_5] (rows=1 width=4)
>               Output:["_col0"]
>               Filter Operator [FIL_12] (rows=1 width=110)
>                 predicate:(device_token = 'blabla')
>                 TableScan [TS_3] (rows=215192362 width=109)
>                   
> default@users,users,Tbl:COMPLETE,Col:COMPLETE,Output:["id","device_token"]  
> {code}
> I can't generate a small enough result set to reproduce the issue, I have 
> minimized the tableA to only 1 row, tableB has ~200m rows, but if I further 
> reduce the size of tableB, then the issue can't be reproduced.
> Any suggestion would be highly appreciated, regarding the root cause of the 
> issue, how to work around it, or how to reproduce it with small enough 
> dataset. 
>  
> below is the log found in hive.log
> {code:java}
> 220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17 : STAGE DEPENDENCIES:
>   Stage-1 is a root stage [MAPRED]
>   Stage-0 depends on stages: Stage-1 [FETCH]STAGE PLANS:
>   Stage: Stage-1
>     Tez
>       DagId: ec2-user_20220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17:1
>       Edges:
>         Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 2 (CUSTOM_SIMPLE_EDGE)
>       DagName: ec2-user_20220405004014_2c3b3486-9bc7-4d1d-9639-693dad39da17:1
>       Vertices:
>         Map 1
>             Map Operator Tree:
>                 TableScan
>                   alias: rud
>                   Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE 
> Column stats: COMPLETE
>                   GatherStats: false
>                   Select Operator
>                     expressions: id (type: int)
>                     outputColumnNames: _col0
>                     Statistics: Num rows: 1 Data size: 4 Basic stats: 
> COMPLETE Column stats: COMPLETE
>                     Reduce Output Operator
>                       key expressions: _col0 (type: int)
>                       null sort order: a
>                       sort order: +
>                       Map-reduce partition columns: _col0 (type: int)
>                       Statistics: Num rows: 1 Data size: 4 Basic stats: 
> COMPLETE Column stats: COMPLETE
>                       tag: 0
>                       auto parallelism: true
>             Path -> Alias:
>               s3a://.../rds_users_delta/dt=2022-04-02-1row/hh=00 [rud]
>             Path -> Partition:
>               s3a://.../rds_users_delta/dt=2022-04-02-1row/hh=00
>                 Partition
>                   base file name: hh=00
>                   input format: org.apache.hadoop.mapred.TextInputFormat
>                   output format: 
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>                   partition values:
>                     dt 2022-04-02-1row
>                     hh 00
>                   properties:
>                     COLUMN_STATS_ACCURATE 
> {"BASIC_STATS":"true","COLUMN_STATS":{"code":"true","creation_timestamp":"true","device_token":"true","general_configuration":"true","id":"true","last_delivery_timestamp":"true","last_survey_timestamp":"true","profile":"true","push_token":"true","server_configuration":"true","update_timestamp":"true"}}
>                     bucket_count -1
>                     column.name.delimiter ,
>                     columns 
> id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp
>                     columns.comments
>                     columns.types 
> int:string:string:string:int:int:string:string:string:int:int
>                     field.delim
>                     file.inputformat org.apache.hadoop.mapred.TextInputFormat
>                     file.outputformat 
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>                     line.delim                    location 
> s3a://.../tmp/rds_users_delta/dt=2022-04-02-1row/hh=00
>                     name airflow.rds_users_delta
>                     numFiles 1
>                     numRows 1
>                     partition_columns dt/hh
>                     partition_columns.types string:string
>                     rawDataSize 2507
>                     serialization.ddl struct rds_users_delta { i32 id, string 
> device_token, string code, string push_token, i32 creation_timestamp, i32 
> update_timestamp, string general_configuration, string server_configuration, 
> string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp}
>                     serialization.format
>                     serialization.lib 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                     totalSize 888
>                     transient_lastDdlTime 1649039842
>                   serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe   
>                  input format: org.apache.hadoop.mapred.TextInputFormat
>                     output format: 
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>                     properties:
>                       EXTERNAL TRUE
>                       bucket_count -1
>                       bucketing_version 2
>                       column.name.delimiter ,
>                       columns 
> id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp
>                       columns.comments
>                       columns.types 
> int:string:string:string:int:int:string:string:string:int:int
>                       field.delim
>                       file.inputformat 
> org.apache.hadoop.mapred.TextInputFormat
>                       file.outputformat 
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>                       line.delim                      location 
> s3a://.../tmp/rds_users_delta
>                       name airflow.rds_users_delta
>                       partition_columns dt/hh
>                       partition_columns.types string:string
>                       serialization.ddl struct rds_users_delta { i32 id, 
> string device_token, string code, string push_token, i32 creation_timestamp, 
> i32 update_timestamp, string general_configuration, string 
> server_configuration, string profile, i32 last_delivery_timestamp, i32 
> last_survey_timestamp}
>                       serialization.format
>                       serialization.lib 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                       transient_lastDdlTime 1648974877
>                     serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                     name: airflow.rds_users_delta
>                   name: airflow.rds_users_delta
>             Truncated Path -> Alias:
>               s3a://.../tmp/rds_users_delta/dt=2022-04-02-1row/hh=00 [rud]
>         Map 2
>             Map Operator Tree:
>                 TableScan
>                   alias: users
>                   Statistics: Num rows: 215192362 Data size: 23671159812 
> Basic stats: COMPLETE Column stats: COMPLETE
>                   GatherStats: false
>                   Filter Operator
>                     isSamplingPred: false
>                     predicate: (device_token = 'blabla') (type: boolean)
>                     Statistics: Num rows: 1 Data size: 110 Basic stats: 
> COMPLETE Column stats: COMPLETE
>                     Select Operator
>                       expressions: id (type: int)
>                       outputColumnNames: _col0
>                       Statistics: Num rows: 1 Data size: 4 Basic stats: 
> COMPLETE Column stats: COMPLETE
>                       Reduce Output Operator
>                         key expressions: _col0 (type: int)
>                         null sort order: a
>                         sort order: +
>                         Map-reduce partition columns: _col0 (type: int)
>                         Statistics: Num rows: 1 Data size: 4 Basic stats: 
> COMPLETE Column stats: COMPLETE
>                         tag: 1
>                         auto parallelism: true
>             Path -> Alias:
>               s3://.../default/users/dt=2022-04-01 [users]
>             Path -> Partition:
>               s3://.../default/users/dt=2022-04-01
>                 Partition
>                   base file name: dt=2022-04-01
>                   input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
>                   output format: 
> org.apache.hadoop.hive.ql.io.RCFileOutputFormat
>                   partition values:
>                     dt 2022-04-01
>                   properties:
>                     COLUMN_STATS_ACCURATE {"BASIC_STATS":"true"}
>                     bucket_count -1
>                     column.name.delimiter ,
>                     columns 
> id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp,virtual_id,account_id
>                     columns.comments
>                     columns.types 
> int:string:string:string:int:int:string:string:string:int:int:string:string
>                     file.inputformat 
> org.apache.hadoop.hive.ql.io.RCFileInputFormat
>                     file.outputformat 
> org.apache.hadoop.hive.ql.io.RCFileOutputFormat
>                     location s3://.../default/users/dt=2022-04-01
>                     name default.users
>                     numFiles 256
>                     numRows 215192362
>                     partition_columns dt
>                     partition_columns.types string
>                     rawDataSize 389210158991
>                     serialization.ddl struct users { i32 id, string 
> device_token, string code, string push_token, i32 creation_timestamp, i32 
> update_timestamp, string general_configuration, string server_configuration, 
> string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp, 
> string virtual_id, string account_id}
>                     serialization.format 1
>                     serialization.lib 
> org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
>                     totalSize 50190916884
>                     transient_lastDdlTime 1648905779
>                   serde: 
> org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe                
>     input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
>                     output format: 
> org.apache.hadoop.hive.ql.io.RCFileOutputFormat
>                     properties:
>                       EXTERNAL TRUE
>                       bucket_count -1
>                       column.name.delimiter ,
>                       columns 
> id,device_token,code,push_token,creation_timestamp,update_timestamp,general_configuration,server_configuration,profile,last_delivery_timestamp,last_survey_timestamp,virtual_id,account_id
>                       columns.comments
>                       columns.types 
> int:string:string:string:int:int:string:string:string:int:int:string:string
>                       file.inputformat 
> org.apache.hadoop.hive.ql.io.RCFileInputFormat
>                       file.outputformat 
> org.apache.hadoop.hive.ql.io.RCFileOutputFormat
>                       last_modified_by hadoop
>                       last_modified_time 1629880363
>                       location s3://.../default/users
>                       name default.users
>                       partition_columns dt
>                       partition_columns.types string
>                       serialization.ddl struct users { i32 id, string 
> device_token, string code, string push_token, i32 creation_timestamp, i32 
> update_timestamp, string general_configuration, string server_configuration, 
> string profile, i32 last_delivery_timestamp, i32 last_survey_timestamp, 
> string virtual_id, string account_id}
>                       serialization.format 1
>                       serialization.lib 
> org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
>                       transient_lastDdlTime 1629880363
>                     serde: 
> org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
>                     name: default.users
>                   name: default.users
>             Truncated Path -> Alias:
>               /users/dt=2022-04-01 [users]
>         Reducer 3
>             Needs Tagging: false
>             Reduce Operator Tree:
>               Map Join Operator
>                 condition map:
>                      Full Outer Join 0 to 1
>                 Estimated key counts: Map 1 => 1
>                 keys:
>                   0 KEY.reducesinkkey0 (type: int)
>                   1 KEY.reducesinkkey0 (type: int)
>                 outputColumnNames: _col0, _col1
>                 input vertices:
>                   0 Map 1
>                 Position of Big Table: 1
>                 Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE 
> Column stats: COMPLETE
>                 DynamicPartitionHashJoin: true
>                 File Output Operator
>                   compressed: true
>                   GlobalTableId: 0
>                   directory: hdfs://.../-ext-10002
>                   NumFilesPerFileSink: 1
>                   Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE 
> Column stats: COMPLETE
>                   Stats Publishing Key Prefix: hdfs://.../-ext-10002/
>                   table:
>                       input format: 
> org.apache.hadoop.mapred.SequenceFileInputFormat
>                       output format: 
> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
>                       properties:
>                         columns _col0,_col1
>                         columns.types int:int
>                         escape.delim \
>                         hive.serialization.extend.additional.nesting.levels 
> true
>                         serialization.escape.crlf true
>                         serialization.format 1
>                         serialization.lib 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                       serde: 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>                   TotalFiles: 1
>                   GatherStats: false
>                   MultiFileSpray: false {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to