[ 
https://issues.apache.org/jira/browse/HUDI-9672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuo Cheng updated HUDI-9672:
-----------------------------
    Description: 
Considering the following ingestion case for spark:
* MOR table + upsert/insert

1st insert: ("1", "a1", "10", "000").    -> fg1-001.parquet, contains 1 rows
2st insert: ("2", "a2", "10", "000").  -> fg1-002.parquet, contains 2 rows
3st insert: ("3", "a3", "10", "000").  -> fg1-003.parquet, contains 3 rows
4st insert: ("4", "a4", "10", "000").  -> fg1-004.parquet, contains 4 rows
clustering                                           -> fg2-005.parquet, 
contains 4 rows
5st insert: ("5", "a5", "10", "000").  -> fg2-006.parquet, contains 5 rows

During upsert/insert operation, we opportunistically expand existing small 
files on storage, instead of writing new files to keep number of files to an 
optimum. So each file generated for current commit will contains all datas for 
previous commits. 

If we fire an incremental query with: START_OFFSET = 002 and skipping cluster 
enabled.
There will two file spilts to read: 
* fg1 latest file slice: fg1-004.parquet
* fg2 latest file slice: fg2-006.parquet

The final read results will have duplicates for rows with key: "2", "3", "4".
expected                   actual
[2,a1,11,001]              [2,a1,11,001]
[3,a1,12,002]             [2,a1,11,001]
[4,a1,13,003]             [3,a1,12,002]
[5,a1,14,004]             [3,a1,12,002]
                                   [4,a1,13,003]
                                   [4,a1,13,003]
                                   [5,a1,14,004]

  was:
Considering the following ingestion case for spark:
* MOR table + upsert/insert

1st insert: ("1", "a1", "10", "000").    -> fg1-001.parquet, contains 1 rows
2st insert: ("2", "a2", "10", "000").  -> fg1-002.parquet, contains 2 rows
3st insert: ("3", "a3", "10", "000").  -> fg1-003.parquet, contains 3 rows
4st insert: ("4", "a4", "10", "000").  -> fg1-004.parquet, contains 4 rows
clustering                                           -> fg2-005.parquet, 
contains 4 rows
5st insert: ("5", "a5", "10", "000").  -> fg2-006.parquet, contains 5 rows

During upsert/insert operation, we opportunistically expand existing small 
files on storage, instead of writing new files to keep number of files to an 
optimum. So each file generated for current commit will contains all datas for 
previous commits. 

If we fire an incremental query with: START_OFFSET = 002 and skipping cluster 
enabled.
There will two file spilts to read: 
* fg1 latest file slice: fg1-004.parquet
* fg2 latest file slice: fg2-006.parquet

The final read results will have duplicates for rows with key: "2", "3", "4".


> Fix data loss for spark incremental query with skip clustering enabled
> ----------------------------------------------------------------------
>
>                 Key: HUDI-9672
>                 URL: https://issues.apache.org/jira/browse/HUDI-9672
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: spark-sql
>            Reporter: Shuo Cheng
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.1.0
>
>
> Considering the following ingestion case for spark:
> * MOR table + upsert/insert
> 1st insert: ("1", "a1", "10", "000").    -> fg1-001.parquet, contains 1 rows
> 2st insert: ("2", "a2", "10", "000").  -> fg1-002.parquet, contains 2 rows
> 3st insert: ("3", "a3", "10", "000").  -> fg1-003.parquet, contains 3 rows
> 4st insert: ("4", "a4", "10", "000").  -> fg1-004.parquet, contains 4 rows
> clustering                                           -> fg2-005.parquet, 
> contains 4 rows
> 5st insert: ("5", "a5", "10", "000").  -> fg2-006.parquet, contains 5 rows
> During upsert/insert operation, we opportunistically expand existing small 
> files on storage, instead of writing new files to keep number of files to an 
> optimum. So each file generated for current commit will contains all datas 
> for previous commits. 
> If we fire an incremental query with: START_OFFSET = 002 and skipping cluster 
> enabled.
> There will two file spilts to read: 
> * fg1 latest file slice: fg1-004.parquet
> * fg2 latest file slice: fg2-006.parquet
> The final read results will have duplicates for rows with key: "2", "3", "4".
> expected                   actual
> [2,a1,11,001]              [2,a1,11,001]
> [3,a1,12,002]             [2,a1,11,001]
> [4,a1,13,003]             [3,a1,12,002]
> [5,a1,14,004]             [3,a1,12,002]
>                                    [4,a1,13,003]
>                                    [4,a1,13,003]
>                                    [5,a1,14,004]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to