[ 
https://issues.apache.org/jira/browse/HUDI-1196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Pifer updated HUDI-1196:
-----------------------------
    Description: 
When upserting a record in a global index table (global and hbase) where a 
single batch has multiple versions of the record in different partitions, the 
record is deduplicated correctly but placed in the incorrect partition. This 
was with using "hoodie.bloom.update.partition.path=true" as well

 

Batch with multiple versions of a record in different partitions:

```

scala> val inputDF = spark.read.format("parquet").load(inputDataPath).show()

+---------+--------++-----------------------------++-------------               
|    wbn|    cs_ss|    action_date|          ad|  ad_updated|

+---------+--------++-----------------------------++-------------
|12345678|InTransit|1596716921000601|2020-08-06-12|2020-08-06-12|
|12345678|  Pending|1596716921000602|2020-08-06-12|2020-08-06-12|
|12345678|  Pending|1596716921000603|2020-08-06-13|2020-08-06-13|

+---------+--------++-----------------------------++-------------

```

 

Values when querying _rt and _ro tables:

```

scala> spark.sql("select * from gb_update_partition_1_ro").show()

+--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
  _hoodie_file_name|    wbn|  cs_ss|    action_date|  ad_updated|          ad|

+--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
|    20200817220935|  20200817220935_0_1|          12345678|        
2020-08-06-12|4dddb6e8-87c4-4bd...|12345678|Pending|1596716921000603|2020-08-06-13|2020-08-06-12|

+--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+

  

scala> spark.sql("select * from gb_update_partition_1_rt").show()

+--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
  _hoodie_file_name|    wbn|  cs_ss|    action_date|  ad_updated|          ad|

+--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
|    20200817221924|  20200817221924_0_1|          12345678|        
2020-08-06-12|4dddb6e8-87c4-4bd...|12345678|Pending|1596716921000603|2020-08-06-13|2020-08-06-12|

+--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+

 ```

 

We can see that record displays most current version of the data except the 
partition values are from the older versions

 

  was:
When upserting a record in a global index table (global and hbase) where the 
batch has multiple versions of the record in different partitions, the record 
is deduplicated correctly but placed in the incorrect partition. This was with 
using "hoodie.bloom.update.partition.path=true" as well

 

Batch with multiple versions of a record in different partitions:

```

scala> val inputDF = spark.read.format("parquet").load(inputDataPath).show()

+--------+---------+----------------+-------------+-------------+               

|     wbn|    cs_ss|     action_date|           ad|   ad_updated|

+--------+---------+----------------+-------------+-------------+

|12345678|InTransit|1596716921000601|2020-08-06-12|2020-08-06-12|

|12345678|  Pending|1596716921000602|2020-08-06-12|2020-08-06-12|

|12345678|  Pending|1596716921000603|2020-08-06-13|2020-08-06-13|

+--------+---------+----------------+-------------+-------------+

```

 

Values when querying _rt and _ro tables:

```

scala> spark.sql("select * from gb_update_partition_1_ro").show()

+-------------------+--------------------+------------------+----------------------+--------------------+--------+-------+----------------+-------------+-------------+

|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|     wbn|  cs_ss|     action_date|   ad_updated|           
ad|

+-------------------+--------------------+------------------+----------------------+--------------------+--------+-------+----------------+-------------+-------------+

|     20200817220935|  20200817220935_0_1|          12345678|         
2020-08-06-12|4dddb6e8-87c4-4bd...|12345678|Pending|1596716921000603|2020-08-06-13|2020-08-06-12|

+-------------------+--------------------+------------------+----------------------+--------------------+--------+-------+----------------+-------------+-------------+

  

scala> spark.sql("select * from gb_update_partition_1_rt").show()

+-------------------+--------------------+------------------+----------------------+--------------------+--------+-------+----------------+-------------+-------------+

|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|     wbn|  cs_ss|     action_date|   ad_updated|           
ad|

+-------------------+--------------------+------------------+----------------------+--------------------+--------+-------+----------------+-------------+-------------+

|     20200817221924|  20200817221924_0_1|          12345678|         
2020-08-06-12|4dddb6e8-87c4-4bd...|12345678|Pending|1596716921000603|2020-08-06-13|2020-08-06-12|

+-------------------+--------------------+------------------+----------------------+--------------------+--------+-------+----------------+-------------+-------------+

 ```

 

We can see that record displays most current version of the data except the 
partition values are from the older versions

 


> Record being placed in incorrect partition during upsert on COW/MOR global 
> indexed tables
> -----------------------------------------------------------------------------------------
>
>                 Key: HUDI-1196
>                 URL: https://issues.apache.org/jira/browse/HUDI-1196
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Ryan Pifer
>            Priority: Major
>
> When upserting a record in a global index table (global and hbase) where a 
> single batch has multiple versions of the record in different partitions, the 
> record is deduplicated correctly but placed in the incorrect partition. This 
> was with using "hoodie.bloom.update.partition.path=true" as well
>  
> Batch with multiple versions of a record in different partitions:
> ```
> scala> val inputDF = spark.read.format("parquet").load(inputDataPath).show()
> +---------+--------++-----------------------------++-------------             
>   
> |    wbn|    cs_ss|    action_date|          ad|  ad_updated|
> +---------+--------++-----------------------------++-------------
> |12345678|InTransit|1596716921000601|2020-08-06-12|2020-08-06-12|
> |12345678|  Pending|1596716921000602|2020-08-06-12|2020-08-06-12|
> |12345678|  Pending|1596716921000603|2020-08-06-13|2020-08-06-13|
> +---------+--------++-----------------------------++-------------
> ```
>  
> Values when querying _rt and _ro tables:
> ```
> scala> spark.sql("select * from gb_update_partition_1_ro").show()
> +--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
> |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
>   _hoodie_file_name|    wbn|  cs_ss|    action_date|  ad_updated|          ad|
> +--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
> |    20200817220935|  20200817220935_0_1|          12345678|        
> 2020-08-06-12|4dddb6e8-87c4-4bd...|12345678|Pending|1596716921000603|2020-08-06-13|2020-08-06-12|
> +--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
>   
> scala> spark.sql("select * from gb_update_partition_1_rt").show()
> +--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
> |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
>   _hoodie_file_name|    wbn|  cs_ss|    action_date|  ad_updated|          ad|
> +--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
> |    20200817221924|  20200817221924_0_1|          12345678|        
> 2020-08-06-12|4dddb6e8-87c4-4bd...|12345678|Pending|1596716921000603|2020-08-06-13|2020-08-06-12|
> +--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
>  ```
>  
> We can see that record displays most current version of the data except the 
> partition values are from the older versions
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to