nsivabalan opened a new pull request, #9545:
URL: https://github.com/apache/hudi/pull/9545

   ### Change Logs
   
   Due to spark retries, we could have duplicated log blocks added during 
write. And since, we don't delete anything during marker based reconciliation 
on the writer side, the reader could see duplicated log blocks. for most of the 
payload implementation, this should not be an issue. But for expression 
payload, it could result in data consistency since an expression could be 
evaluated twice (for eg, colA*2).
   
   #### Fix 
   With spark retries, there could be duplicate log blocks created. So, we are 
going to might need to add some kind of block identifier to every block added. 
Already we have a header with key "INSTANT_TIME". In addition we can add 
"BLOCK_SEQUENCE_NO". This is a simple counter which will increment from 1 and 
keep increasing during rollover for a given a file slice(append handle). On 
spark retries, again, we start with sequence no of 1 for a given File slice. 
Because, for a given commit and for a given file slice, with spark task or 
stage retries, entire file slice will be attempted again. so, its safe to just 
go with a simple sequence number and don't really need to rely on taskId etc to 
come up with block identifiers. For eg, if a file slice is supposed to get 3 
log files due to log file size rollovers, and during first attempt only 2 log 
files were created, in 2nd attempt, all 3 log files will have to re-created.
    
   On the reader side, if there are duplicate blocks detected with same commit 
time and same block sequence number, we should find the longest sequence and 
pick them if more than one sequence is found. And corrupt blocks should be 
ignored during this reconciliation as well. We should account for backwards 
compatability where the block seq number may not be present. For rollback 
blocks, we can statically set the sequence no to 1 since there should be only 
one log block per file slice.
    
   
   #### Diff scenarios: 
   As called out above, we will go with a simple Block_Sequence_No which starts 
with 0 and increments for a given file slice. Major crux here is, how the 
reader will reconcile if there are duplicate log files created due to spark 
task failures. High level logic to reconcile for the reader is as follows: 
Reader find the longest sequence of BSN(block sequence number) and picks the 
maximum one. So, we do one pass over every log block (which we already do as of 
today) to parse header info, w/o deserializing actual content and determine the 
right ones (and ignore the spurious log blocks)
    
   
   ##### Scenario 1:
   Happy path. no retries.
   log_file1(BSN : 0), log_file2(BSN:1) added.
   Reader:
   finds only one sequence of 0,1 for BSN and marks both log_file1 and 
log_file2 as valid.
    
   
   ##### Scenario 2:
   Task failed and retired, where 1st attempt did not create entire list of log 
files.
   attempt1:
   log_file1(BSN : 0), log_file2(BSN:1) added.
   attempt2:
   log_file3(BSN : 0), log_file4(BSN:1), log_file5(BSN:2) added. // BSN starts 
with 0 everytime for a given file slice.
    
   Reader:
   Finds two sequence of Block sequence nos.
   (0,1) and (0,1,2). And so chooses lf3, lf4 and lf5 as valid and ignores lf1 
and lf2 as spurious ones.
    
   
   ##### Scenario 3:
   Task failed and retired, where both attempts created full list. (spark 
speculation may be)
   attempt1:
   log_file1(BSN : 0), log_file2(BSN:1) added.
   attempt2:
   log_file3(BSN : 0), log_file4(BSN:1) added.
    
   Reader:
   Finds two sequence of Block sequence nos.
   (0,1) and (0,1). And so chooses lf3 and lf4 as valid and ignores lf1 and lf2 
as spurious ones. We will probably pick the latest set if both sets have same 
sequence.
    
   
   ##### Scenario 4:
   Task failed and retired, where 1st attempt has full list of log files.
   attempt1:
   log_file1(BSN : 0), log_file2(BSN:1) added.
   attempt2:
   log_file3(BSN : 0) added.
    
   Reader:
   Finds two sequence of Block sequence nos.
   (0,1) and (0). And so chooses lf1 and lf2 as valid and ignores lf3 as 
spurious one.
    
   Same logic should work out for hdfs as well. Since log blocks are ordered as 
per log file name w/ versions, the ordering should be intact. i.e. log 
files/log blocks created by 2nd attempt should not interleave w/ log blocks 
during w/ 1st attempt. If log blocks are within same log file, the log blocks 
will be ordered for sure. If its across log files, the log version number 
ordering should suffice (which already happens).
    
    
   ##### Scenario 3 with hdfs:
   Task failed and retired, where both attempts created full list. (spark 
speculation may be). but in one of the attempt, there are partial 
writes(corrupted blocks).
   attempt1:
   log_file1, block1(BSN : 0), log_file1, block2(BSN:1), but a corrupt block 
due to partial write.
   attempt2:
   log_file1, block3(BSN : 0), log_file2, block4 (BSN:1) added.
    
   Reader:
   Finds two sequence of Block sequence nos.
   Since lf1, block2 is a corrupt block, valid sequences deduced are (0) and 
(0,1). And so we choose lf1, block3 and lf1, block3 as valid and ignores lf1, 
block1 and lf1, block2 as invalid.
   
   ### Impact
   
   MOR log record reader will never read any duplicated log blocks created due 
to spark task retries during write. 
   
   ### Risk level (write none, low medium or high below)
   
   medium.
   
   ### Documentation Update
   
   N/A
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to