nsivabalan opened a new pull request, #9545:
URL: https://github.com/apache/hudi/pull/9545
### Change Logs
Due to spark retries, we could have duplicated log blocks added during
write. And since, we don't delete anything during marker based reconciliation
on the writer side, the reader could see duplicated log blocks. for most of the
payload implementation, this should not be an issue. But for expression
payload, it could result in data consistency since an expression could be
evaluated twice (for eg, colA*2).
#### Fix
With spark retries, there could be duplicate log blocks created. So, we are
going to might need to add some kind of block identifier to every block added.
Already we have a header with key "INSTANT_TIME". In addition we can add
"BLOCK_SEQUENCE_NO". This is a simple counter which will increment from 1 and
keep increasing during rollover for a given a file slice(append handle). On
spark retries, again, we start with sequence no of 1 for a given File slice.
Because, for a given commit and for a given file slice, with spark task or
stage retries, entire file slice will be attempted again. so, its safe to just
go with a simple sequence number and don't really need to rely on taskId etc to
come up with block identifiers. For eg, if a file slice is supposed to get 3
log files due to log file size rollovers, and during first attempt only 2 log
files were created, in 2nd attempt, all 3 log files will have to re-created.
On the reader side, if there are duplicate blocks detected with same commit
time and same block sequence number, we should find the longest sequence and
pick them if more than one sequence is found. And corrupt blocks should be
ignored during this reconciliation as well. We should account for backwards
compatability where the block seq number may not be present. For rollback
blocks, we can statically set the sequence no to 1 since there should be only
one log block per file slice.
#### Diff scenarios:
As called out above, we will go with a simple Block_Sequence_No which starts
with 0 and increments for a given file slice. Major crux here is, how the
reader will reconcile if there are duplicate log files created due to spark
task failures. High level logic to reconcile for the reader is as follows:
Reader find the longest sequence of BSN(block sequence number) and picks the
maximum one. So, we do one pass over every log block (which we already do as of
today) to parse header info, w/o deserializing actual content and determine the
right ones (and ignore the spurious log blocks)
##### Scenario 1:
Happy path. no retries.
log_file1(BSN : 0), log_file2(BSN:1) added.
Reader:
finds only one sequence of 0,1 for BSN and marks both log_file1 and
log_file2 as valid.
##### Scenario 2:
Task failed and retired, where 1st attempt did not create entire list of log
files.
attempt1:
log_file1(BSN : 0), log_file2(BSN:1) added.
attempt2:
log_file3(BSN : 0), log_file4(BSN:1), log_file5(BSN:2) added. // BSN starts
with 0 everytime for a given file slice.
Reader:
Finds two sequence of Block sequence nos.
(0,1) and (0,1,2). And so chooses lf3, lf4 and lf5 as valid and ignores lf1
and lf2 as spurious ones.
##### Scenario 3:
Task failed and retired, where both attempts created full list. (spark
speculation may be)
attempt1:
log_file1(BSN : 0), log_file2(BSN:1) added.
attempt2:
log_file3(BSN : 0), log_file4(BSN:1) added.
Reader:
Finds two sequence of Block sequence nos.
(0,1) and (0,1). And so chooses lf3 and lf4 as valid and ignores lf1 and lf2
as spurious ones. We will probably pick the latest set if both sets have same
sequence.
##### Scenario 4:
Task failed and retired, where 1st attempt has full list of log files.
attempt1:
log_file1(BSN : 0), log_file2(BSN:1) added.
attempt2:
log_file3(BSN : 0) added.
Reader:
Finds two sequence of Block sequence nos.
(0,1) and (0). And so chooses lf1 and lf2 as valid and ignores lf3 as
spurious one.
Same logic should work out for hdfs as well. Since log blocks are ordered as
per log file name w/ versions, the ordering should be intact. i.e. log
files/log blocks created by 2nd attempt should not interleave w/ log blocks
during w/ 1st attempt. If log blocks are within same log file, the log blocks
will be ordered for sure. If its across log files, the log version number
ordering should suffice (which already happens).
##### Scenario 3 with hdfs:
Task failed and retired, where both attempts created full list. (spark
speculation may be). but in one of the attempt, there are partial
writes(corrupted blocks).
attempt1:
log_file1, block1(BSN : 0), log_file1, block2(BSN:1), but a corrupt block
due to partial write.
attempt2:
log_file1, block3(BSN : 0), log_file2, block4 (BSN:1) added.
Reader:
Finds two sequence of Block sequence nos.
Since lf1, block2 is a corrupt block, valid sequences deduced are (0) and
(0,1). And so we choose lf1, block3 and lf1, block3 as valid and ignores lf1,
block1 and lf1, block2 as invalid.
### Impact
MOR log record reader will never read any duplicated log blocks created due
to spark task retries during write.
### Risk level (write none, low medium or high below)
medium.
### Documentation Update
N/A
### Contributor's checklist
- [ ] Read through [contributor's
guide](https://hudi.apache.org/contribute/how-to-contribute)
- [ ] Change Logs and Impact were stated clearly
- [ ] Adequate tests were added if applicable
- [ ] CI passed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]