[ 
https://issues.apache.org/jira/browse/HUDI-6758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Sumit updated HUDI-6758:
------------------------------
    Fix Version/s: 0.14.0

> Avoid duplicated log blocks on the LogRecordReader
> --------------------------------------------------
>
>                 Key: HUDI-6758
>                 URL: https://issues.apache.org/jira/browse/HUDI-6758
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: reader-core
>            Reporter: sivabalan narayanan
>            Assignee: sivabalan narayanan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.14.0
>
>
> Due to spark retries, we could have duplicated log blocks added during write. 
> And since, we don't delete anything during marker based reconciliation on the 
> writer side, the reader could see duplicated log blocks. for most of the 
> payload implementation, this should not be an issue. But for expression 
> payload, it could result in data consistency since an expression could be 
> evaluated twice (for eg, colA*2).
>  
> Here is the proposal for the fix:
>  
> With spark retries, there could be duplicate log blocks created. So, we are 
> going to might need to add some kind of block identifier to every block 
> added. Already we have a header with key "INSTANT_TIME". In addition we can 
> add "BLOCK_SEQUENCE_NO". This is a simple counter which will increment from 1 
> and keep increasing during rollover for a given a file slice(append handle). 
> On spark retries, again, we start with sequence no of 1 for a given File 
> slice. Because, for a given commit and for a given file slice, with spark 
> task or stage retries, entire file slice will be attempted again. so, its 
> safe to just go with a simple sequence number and don't really need to rely 
> on taskId etc to come up with block identifiers. For eg, if a file slice is 
> supposed to get 3 log files due to log file size rollovers, and during first 
> attempt only 2 log files were created, in 2nd attempt, all 3 log files will 
> have to re-created.
>  
> {*}On the reader side{*}, if there are duplicate blocks detected with same 
> commit time and same block sequence number, we should find the longest 
> sequence and pick them if more than one sequence is found. And corrupt blocks 
> should be ignored during this reconciliation as well. We should account for 
> backwards compatability where the block seq number may not be present. For 
> rollback blocks, we can statically set the sequence no to 1 since there 
> should be only one log block per file slice.
>  
> Diff scenarios: 
> As called out above, we will go with a simple Block_Sequence_No which starts 
> with 0 and increments for a given file slice. Major crux here is, how the 
> reader will reconcile if there are duplicate log files created due to spark 
> task failures. High level logic to reconcile for the reader is as follows: 
> Reader find the longest sequence of BSN(block sequence number) and picks the 
> maximum one. So, we do one pass over every log block (which we already do as 
> of today) to parse header info, w/o deserializing actual content and 
> determine the right ones (and ignore the spurious log blocks)
>  
> h4. Scenario 1:
> Happy path. no retries.
> log_file1(BSN : 0), log_file2(BSN:1) added.
> Reader:
> finds only one sequence of 0,1 for BSN and marks both log_file1 and log_file2 
> as valid.
>  
> h4. Scenario 2:
> Task failed and retired, where 1st attempt did not create entire list of log 
> files.
> attempt1:
> log_file1(BSN : 0), log_file2(BSN:1) added.
> attempt2:
> log_file3(BSN : 0), log_file4(BSN:1), log_file5(BSN:2) added. // BSN starts 
> with 0 everytime for a given file slice.
>  
> Reader:
> Finds two sequence of Block sequence nos.
> (0,1) and (0,1,2). And so chooses lf3, lf4 and lf5 as valid and ignores lf1 
> and lf2 as spurious ones.
>  
> h4. Scenario 3:
> Task failed and retired, where both attempts created full list. (spark 
> speculation may be)
> attempt1:
> log_file1(BSN : 0), log_file2(BSN:1) added.
> attempt2:
> log_file3(BSN : 0), log_file4(BSN:1) added.
>  
> Reader:
> Finds two sequence of Block sequence nos.
> (0,1) and (0,1). And so chooses lf3 and lf4 as valid and ignores lf1 and lf2 
> as spurious ones. We will probably pick the latest set if both sets have same 
> sequence.
>  
> h4. Scenario 4:
> Task failed and retired, where 1st attempt has full list of log files.
> attempt1:
> log_file1(BSN : 0), log_file2(BSN:1) added.
> attempt2:
> log_file3(BSN : 0) added.
>  
> Reader:
> Finds two sequence of Block sequence nos.
> (0,1) and (0). And so chooses lf1 and lf2 as valid and ignores lf3 as 
> spurious one.
>  
> Same logic should work out for hdfs as well. Since log blocks are ordered as 
> per log file name w/ versions, the ordering should be intact. i.e. log 
> files/log blocks created by 2nd attempt should not interleave w/ log blocks 
> during w/ 1st attempt. If log blocks are within same log file, the log blocks 
> will be ordered for sure. If its across log files, the log version number 
> ordering should suffice (which already happens).
>  
>  
> Scenario 3 with hdfs:
> Task failed and retired, where both attempts created full list. (spark 
> speculation may be). but in one of the attempt, there are partial 
> writes(corrupted blocks).
> attempt1:
> log_file1, block1(BSN : 0), log_file1, block2(BSN:1), but a corrupt block due 
> to partial write.
> attempt2:
> log_file1, block3(BSN : 0), log_file2, block4 (BSN:1) added.
>  
> Reader:
> Finds two sequence of Block sequence nos.
> Since lf1, block2 is a corrupt block, valid sequences deduced are (0) and 
> (0,1). And so we choose lf1, block3 and lf1, block3 as valid and ignores lf1, 
> block1 and lf1, block2 as invalid.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to