[
https://issues.apache.org/jira/browse/HUDI-9672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Shuo Cheng updated HUDI-9672:
-----------------------------
Description:
Considering the following ingestion case for spark:
* MOR table + upsert/insert
1st insert: ("1", "a1", "10", "000"). -> fg1-001.parquet, contains 1 rows
2st insert: ("2", "a2", "10", "000"). -> fg1-002.parquet, contains 2 rows
3st insert: ("3", "a3", "10", "000"). -> fg1-003.parquet, contains 3 rows
4st insert: ("4", "a4", "10", "000"). -> fg1-004.parquet, contains 4 rows
clustering -> fg2-005.parquet, contains 4 rows
5st insert: ("5", "a5", "10", "000"). -> fg2-006.parquet, contains 5 rows
During upsert/insert operation, we opportunistically expand existing small
files on storage, instead of writing new files to keep number of files to an
optimum. So each file generated for current commit will contains all datas for
previous commits.
If we fire an incremental query with: START_OFFSET = 002 and skipping cluster
enabled.
There will two file spilts to read:
* fg1 latest file slice: fg1-004.parquet
* fg2 latest file slice: fg2-006.parquet
The final read results will have duplicates for rows with key: "2", "3", "4".
expected actual
[2,a1,11,001] [2,a1,11,001]
[3,a1,12,002] [2,a1,11,001]
[4,a1,13,003] [3,a1,12,002]
[5,a1,14,004] [3,a1,12,002]
[4,a1,13,003]
[4,a1,13,003]
[5,a1,14,004]
was:
Considering the following ingestion case for spark:
* MOR table + upsert/insert
1st insert: ("1", "a1", "10", "000"). -> fg1-001.parquet, contains 1 rows
2st insert: ("2", "a2", "10", "000"). -> fg1-002.parquet, contains 2 rows
3st insert: ("3", "a3", "10", "000"). -> fg1-003.parquet, contains 3 rows
4st insert: ("4", "a4", "10", "000"). -> fg1-004.parquet, contains 4 rows
clustering -> fg2-005.parquet,
contains 4 rows
5st insert: ("5", "a5", "10", "000"). -> fg2-006.parquet, contains 5 rows
During upsert/insert operation, we opportunistically expand existing small
files on storage, instead of writing new files to keep number of files to an
optimum. So each file generated for current commit will contains all datas for
previous commits.
If we fire an incremental query with: START_OFFSET = 002 and skipping cluster
enabled.
There will two file spilts to read:
* fg1 latest file slice: fg1-004.parquet
* fg2 latest file slice: fg2-006.parquet
The final read results will have duplicates for rows with key: "2", "3", "4".
expected actual
[2,a1,11,001] [2,a1,11,001]
[3,a1,12,002] [2,a1,11,001]
[4,a1,13,003] [3,a1,12,002]
[5,a1,14,004] [3,a1,12,002]
[4,a1,13,003]
[4,a1,13,003]
[5,a1,14,004]
> Fix data loss for spark incremental query with skip clustering enabled
> ----------------------------------------------------------------------
>
> Key: HUDI-9672
> URL: https://issues.apache.org/jira/browse/HUDI-9672
> Project: Apache Hudi
> Issue Type: Bug
> Components: spark-sql
> Reporter: Shuo Cheng
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.1.0
>
>
> Considering the following ingestion case for spark:
> * MOR table + upsert/insert
> 1st insert: ("1", "a1", "10", "000"). -> fg1-001.parquet, contains 1 rows
> 2st insert: ("2", "a2", "10", "000"). -> fg1-002.parquet, contains 2 rows
> 3st insert: ("3", "a3", "10", "000"). -> fg1-003.parquet, contains 3 rows
> 4st insert: ("4", "a4", "10", "000"). -> fg1-004.parquet, contains 4 rows
> clustering -> fg2-005.parquet, contains 4 rows
> 5st insert: ("5", "a5", "10", "000"). -> fg2-006.parquet, contains 5 rows
> During upsert/insert operation, we opportunistically expand existing small
> files on storage, instead of writing new files to keep number of files to an
> optimum. So each file generated for current commit will contains all datas
> for previous commits.
> If we fire an incremental query with: START_OFFSET = 002 and skipping cluster
> enabled.
> There will two file spilts to read:
> * fg1 latest file slice: fg1-004.parquet
> * fg2 latest file slice: fg2-006.parquet
> The final read results will have duplicates for rows with key: "2", "3", "4".
> expected actual
> [2,a1,11,001] [2,a1,11,001]
> [3,a1,12,002] [2,a1,11,001]
> [4,a1,13,003] [3,a1,12,002]
> [5,a1,14,004] [3,a1,12,002]
> [4,a1,13,003]
> [4,a1,13,003]
> [5,a1,14,004]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)