[ 
https://issues.apache.org/jira/browse/HUDI-9672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuo Cheng updated HUDI-9672:
-----------------------------
    Description: 
Considering the following ingestion case for spark:
 * MOR table + upsert/insert

1st insert: ("1", "a1", "10", "000"). -> fg1-001.parquet, contains 1 rows
2st insert: ("2", "a1", "11", "001"). -> fg1-002.parquet, contains 2 rows
3st insert: ("3", "a1", "12", "002"). -> fg1-003.parquet, contains 3 rows
4st insert: ("4", "a1", "13", "003"). -> fg1-004.parquet, contains 4 rows
clustering -> fg2-005.parquet, contains 4 rows
5st insert: ("5", "a1", "14", "004"). -> fg2-006.parquet, contains 5 rows

During upsert/insert operation, we opportunistically expand existing small 
files on storage, instead of writing new files to keep number of files to an 
optimum. So each file generated for current commit will contains all datas for 
previous commits.

If we fire an incremental query with: START_OFFSET = 002 and skipping cluster 
enabled.
There will two file spilts to read:
 * fg1 latest file slice: fg1-004.parquet
 * fg2 latest file slice: fg2-006.parquet

The final read results will have duplicates for rows with key: "2", "3", "4".
expected        actual
[2,a1,11,001]   [2,a1,11,001]
[3,a1,12,002]  [2,a1,11,001]
[4,a1,13,003]  [3,a1,12,002]
[5,a1,14,004]  [3,a1,12,002]
                        [4,a1,13,003]
                        [4,a1,13,003]
                        [5,a1,14,004] 

 

We currently *disable skipping clustering* for spark incremental query before 
proper solution is proposed.

  was:
Considering the following ingestion case for spark:
 * MOR table + upsert/insert

1st insert: ("1", "a1", "10", "000"). -> fg1-001.parquet, contains 1 rows
2st insert: ("2", "a1", "11", "001"). -> fg1-002.parquet, contains 2 rows
3st insert: ("3", "a1", "12", "002"). -> fg1-003.parquet, contains 3 rows
4st insert: ("4", "a1", "13", "003"). -> fg1-004.parquet, contains 4 rows
clustering -> fg2-005.parquet, contains 4 rows
5st insert: ("5", "a1", "14", "004"). -> fg2-006.parquet, contains 5 rows

During upsert/insert operation, we opportunistically expand existing small 
files on storage, instead of writing new files to keep number of files to an 
optimum. So each file generated for current commit will contains all datas for 
previous commits.

If we fire an incremental query with: START_OFFSET = 002 and skipping cluster 
enabled.
There will two file spilts to read:
 * fg1 latest file slice: fg1-004.parquet
 * fg2 latest file slice: fg2-006.parquet

The final read results will have duplicates for rows with key: "2", "3", "4".
expected        actual
[2,a1,11,001]   [2,a1,11,001]
[3,a1,12,002]  [2,a1,11,001]
[4,a1,13,003]  [3,a1,12,002]
[5,a1,14,004]  [3,a1,12,002]
                        [4,a1,13,003]
                        [4,a1,13,003]
                        [5,a1,14,004] 

 

We currently *disable skipping clustering* before proper solution is proposed.


> Fix data loss for spark incremental query with skip clustering enabled
> ----------------------------------------------------------------------
>
>                 Key: HUDI-9672
>                 URL: https://issues.apache.org/jira/browse/HUDI-9672
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: spark-sql
>            Reporter: Shuo Cheng
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.1.0
>
>
> Considering the following ingestion case for spark:
>  * MOR table + upsert/insert
> 1st insert: ("1", "a1", "10", "000"). -> fg1-001.parquet, contains 1 rows
> 2st insert: ("2", "a1", "11", "001"). -> fg1-002.parquet, contains 2 rows
> 3st insert: ("3", "a1", "12", "002"). -> fg1-003.parquet, contains 3 rows
> 4st insert: ("4", "a1", "13", "003"). -> fg1-004.parquet, contains 4 rows
> clustering -> fg2-005.parquet, contains 4 rows
> 5st insert: ("5", "a1", "14", "004"). -> fg2-006.parquet, contains 5 rows
> During upsert/insert operation, we opportunistically expand existing small 
> files on storage, instead of writing new files to keep number of files to an 
> optimum. So each file generated for current commit will contains all datas 
> for previous commits.
> If we fire an incremental query with: START_OFFSET = 002 and skipping cluster 
> enabled.
> There will two file spilts to read:
>  * fg1 latest file slice: fg1-004.parquet
>  * fg2 latest file slice: fg2-006.parquet
> The final read results will have duplicates for rows with key: "2", "3", "4".
> expected        actual
> [2,a1,11,001]   [2,a1,11,001]
> [3,a1,12,002]  [2,a1,11,001]
> [4,a1,13,003]  [3,a1,12,002]
> [5,a1,14,004]  [3,a1,12,002]
>                         [4,a1,13,003]
>                         [4,a1,13,003]
>                         [5,a1,14,004] 
>  
> We currently *disable skipping clustering* for spark incremental query before 
> proper solution is proposed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to