[
https://issues.apache.org/jira/browse/HUDI-1196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ryan Pifer updated HUDI-1196:
-----------------------------
Description:
When upserting a record in a global index table (global and hbase) where a
single batch has multiple versions of the record in different partitions, the
record is deduplicated correctly but placed in the incorrect partition. This
was with using "hoodie.bloom.update.partition.path=true" as well
Batch with multiple versions of a record in different partitions:
```
scala> val inputDF = spark.read.format("parquet").load(inputDataPath).show()
+---------+--------++-----------------------------++-------------
| wbn| cs_ss| action_date| ad| ad_updated|
+---------+--------++-----------------------------++-------------
|12345678|InTransit|1596716921000601|2020-08-06-12|2020-08-06-12|
|12345678| Pending|1596716921000602|2020-08-06-12|2020-08-06-12|
|12345678| Pending|1596716921000603|2020-08-06-13|2020-08-06-13|
+---------+--------++-----------------------------++-------------
```
Values when querying _rt and _ro tables:
```
scala> spark.sql("select * from gb_update_partition_1_ro").show()
+--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name| wbn| cs_ss| action_date| ad_updated| ad|
+--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
| 20200817220935| 20200817220935_0_1| 12345678|
2020-08-06-12|4dddb6e8-87c4-4bd...|12345678|Pending|1596716921000603|2020-08-06-13|2020-08-06-12|
+--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
scala> spark.sql("select * from gb_update_partition_1_rt").show()
+--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name| wbn| cs_ss| action_date| ad_updated| ad|
+--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
| 20200817221924| 20200817221924_0_1| 12345678|
2020-08-06-12|4dddb6e8-87c4-4bd...|12345678|Pending|1596716921000603|2020-08-06-13|2020-08-06-12|
+--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
```
We can see that record displays most current version of the data except the
partition values are from the older versions
was:
When upserting a record in a global index table (global and hbase) where the
batch has multiple versions of the record in different partitions, the record
is deduplicated correctly but placed in the incorrect partition. This was with
using "hoodie.bloom.update.partition.path=true" as well
Batch with multiple versions of a record in different partitions:
```
scala> val inputDF = spark.read.format("parquet").load(inputDataPath).show()
+--------+---------+----------------+-------------+-------------+
| wbn| cs_ss| action_date| ad| ad_updated|
+--------+---------+----------------+-------------+-------------+
|12345678|InTransit|1596716921000601|2020-08-06-12|2020-08-06-12|
|12345678| Pending|1596716921000602|2020-08-06-12|2020-08-06-12|
|12345678| Pending|1596716921000603|2020-08-06-13|2020-08-06-13|
+--------+---------+----------------+-------------+-------------+
```
Values when querying _rt and _ro tables:
```
scala> spark.sql("select * from gb_update_partition_1_ro").show()
+-------------------+--------------------+------------------+----------------------+--------------------+--------+-------+----------------+-------------+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name| wbn| cs_ss| action_date| ad_updated|
ad|
+-------------------+--------------------+------------------+----------------------+--------------------+--------+-------+----------------+-------------+-------------+
| 20200817220935| 20200817220935_0_1| 12345678|
2020-08-06-12|4dddb6e8-87c4-4bd...|12345678|Pending|1596716921000603|2020-08-06-13|2020-08-06-12|
+-------------------+--------------------+------------------+----------------------+--------------------+--------+-------+----------------+-------------+-------------+
scala> spark.sql("select * from gb_update_partition_1_rt").show()
+-------------------+--------------------+------------------+----------------------+--------------------+--------+-------+----------------+-------------+-------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name| wbn| cs_ss| action_date| ad_updated|
ad|
+-------------------+--------------------+------------------+----------------------+--------------------+--------+-------+----------------+-------------+-------------+
| 20200817221924| 20200817221924_0_1| 12345678|
2020-08-06-12|4dddb6e8-87c4-4bd...|12345678|Pending|1596716921000603|2020-08-06-13|2020-08-06-12|
+-------------------+--------------------+------------------+----------------------+--------------------+--------+-------+----------------+-------------+-------------+
```
We can see that record displays most current version of the data except the
partition values are from the older versions
> Record being placed in incorrect partition during upsert on COW/MOR global
> indexed tables
> -----------------------------------------------------------------------------------------
>
> Key: HUDI-1196
> URL: https://issues.apache.org/jira/browse/HUDI-1196
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Ryan Pifer
> Priority: Major
>
> When upserting a record in a global index table (global and hbase) where a
> single batch has multiple versions of the record in different partitions, the
> record is deduplicated correctly but placed in the incorrect partition. This
> was with using "hoodie.bloom.update.partition.path=true" as well
>
> Batch with multiple versions of a record in different partitions:
> ```
> scala> val inputDF = spark.read.format("parquet").load(inputDataPath).show()
> +---------+--------++-----------------------------++-------------
>
> | wbn| cs_ss| action_date| ad| ad_updated|
> +---------+--------++-----------------------------++-------------
> |12345678|InTransit|1596716921000601|2020-08-06-12|2020-08-06-12|
> |12345678| Pending|1596716921000602|2020-08-06-12|2020-08-06-12|
> |12345678| Pending|1596716921000603|2020-08-06-13|2020-08-06-13|
> +---------+--------++-----------------------------++-------------
> ```
>
> Values when querying _rt and _ro tables:
> ```
> scala> spark.sql("select * from gb_update_partition_1_ro").show()
> +--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
> |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
> _hoodie_file_name| wbn| cs_ss| action_date| ad_updated| ad|
> +--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
> | 20200817220935| 20200817220935_0_1| 12345678|
> 2020-08-06-12|4dddb6e8-87c4-4bd...|12345678|Pending|1596716921000603|2020-08-06-13|2020-08-06-12|
> +--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
>
> scala> spark.sql("select * from gb_update_partition_1_rt").show()
> +--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
> |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
> _hoodie_file_name| wbn| cs_ss| action_date| ad_updated| ad|
> +--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
> | 20200817221924| 20200817221924_0_1| 12345678|
> 2020-08-06-12|4dddb6e8-87c4-4bd...|12345678|Pending|1596716921000603|2020-08-06-13|2020-08-06-12|
> +--------------------+-------------------++----------------------------------------++----------------------------++-----------------------++--------------------------+
> ```
>
> We can see that record displays most current version of the data except the
> partition values are from the older versions
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)