[
https://issues.apache.org/jira/browse/HUDI-6758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sagar Sumit updated HUDI-6758:
------------------------------
Fix Version/s: 0.14.0
> Avoid duplicated log blocks on the LogRecordReader
> --------------------------------------------------
>
> Key: HUDI-6758
> URL: https://issues.apache.org/jira/browse/HUDI-6758
> Project: Apache Hudi
> Issue Type: Bug
> Components: reader-core
> Reporter: sivabalan narayanan
> Assignee: sivabalan narayanan
> Priority: Major
> Labels: pull-request-available
> Fix For: 0.14.0
>
>
> Due to spark retries, we could have duplicated log blocks added during write.
> And since, we don't delete anything during marker based reconciliation on the
> writer side, the reader could see duplicated log blocks. for most of the
> payload implementation, this should not be an issue. But for expression
> payload, it could result in data consistency since an expression could be
> evaluated twice (for eg, colA*2).
>
> Here is the proposal for the fix:
>
> With spark retries, there could be duplicate log blocks created. So, we are
> going to might need to add some kind of block identifier to every block
> added. Already we have a header with key "INSTANT_TIME". In addition we can
> add "BLOCK_SEQUENCE_NO". This is a simple counter which will increment from 1
> and keep increasing during rollover for a given a file slice(append handle).
> On spark retries, again, we start with sequence no of 1 for a given File
> slice. Because, for a given commit and for a given file slice, with spark
> task or stage retries, entire file slice will be attempted again. so, its
> safe to just go with a simple sequence number and don't really need to rely
> on taskId etc to come up with block identifiers. For eg, if a file slice is
> supposed to get 3 log files due to log file size rollovers, and during first
> attempt only 2 log files were created, in 2nd attempt, all 3 log files will
> have to re-created.
>
> {*}On the reader side{*}, if there are duplicate blocks detected with same
> commit time and same block sequence number, we should find the longest
> sequence and pick them if more than one sequence is found. And corrupt blocks
> should be ignored during this reconciliation as well. We should account for
> backwards compatability where the block seq number may not be present. For
> rollback blocks, we can statically set the sequence no to 1 since there
> should be only one log block per file slice.
>
> Diff scenarios:
> As called out above, we will go with a simple Block_Sequence_No which starts
> with 0 and increments for a given file slice. Major crux here is, how the
> reader will reconcile if there are duplicate log files created due to spark
> task failures. High level logic to reconcile for the reader is as follows:
> Reader find the longest sequence of BSN(block sequence number) and picks the
> maximum one. So, we do one pass over every log block (which we already do as
> of today) to parse header info, w/o deserializing actual content and
> determine the right ones (and ignore the spurious log blocks)
>
> h4. Scenario 1:
> Happy path. no retries.
> log_file1(BSN : 0), log_file2(BSN:1) added.
> Reader:
> finds only one sequence of 0,1 for BSN and marks both log_file1 and log_file2
> as valid.
>
> h4. Scenario 2:
> Task failed and retired, where 1st attempt did not create entire list of log
> files.
> attempt1:
> log_file1(BSN : 0), log_file2(BSN:1) added.
> attempt2:
> log_file3(BSN : 0), log_file4(BSN:1), log_file5(BSN:2) added. // BSN starts
> with 0 everytime for a given file slice.
>
> Reader:
> Finds two sequence of Block sequence nos.
> (0,1) and (0,1,2). And so chooses lf3, lf4 and lf5 as valid and ignores lf1
> and lf2 as spurious ones.
>
> h4. Scenario 3:
> Task failed and retired, where both attempts created full list. (spark
> speculation may be)
> attempt1:
> log_file1(BSN : 0), log_file2(BSN:1) added.
> attempt2:
> log_file3(BSN : 0), log_file4(BSN:1) added.
>
> Reader:
> Finds two sequence of Block sequence nos.
> (0,1) and (0,1). And so chooses lf3 and lf4 as valid and ignores lf1 and lf2
> as spurious ones. We will probably pick the latest set if both sets have same
> sequence.
>
> h4. Scenario 4:
> Task failed and retired, where 1st attempt has full list of log files.
> attempt1:
> log_file1(BSN : 0), log_file2(BSN:1) added.
> attempt2:
> log_file3(BSN : 0) added.
>
> Reader:
> Finds two sequence of Block sequence nos.
> (0,1) and (0). And so chooses lf1 and lf2 as valid and ignores lf3 as
> spurious one.
>
> Same logic should work out for hdfs as well. Since log blocks are ordered as
> per log file name w/ versions, the ordering should be intact. i.e. log
> files/log blocks created by 2nd attempt should not interleave w/ log blocks
> during w/ 1st attempt. If log blocks are within same log file, the log blocks
> will be ordered for sure. If its across log files, the log version number
> ordering should suffice (which already happens).
>
>
> Scenario 3 with hdfs:
> Task failed and retired, where both attempts created full list. (spark
> speculation may be). but in one of the attempt, there are partial
> writes(corrupted blocks).
> attempt1:
> log_file1, block1(BSN : 0), log_file1, block2(BSN:1), but a corrupt block due
> to partial write.
> attempt2:
> log_file1, block3(BSN : 0), log_file2, block4 (BSN:1) added.
>
> Reader:
> Finds two sequence of Block sequence nos.
> Since lf1, block2 is a corrupt block, valid sequences deduced are (0) and
> (0,1). And so we choose lf1, block3 and lf1, block3 as valid and ignores lf1,
> block1 and lf1, block2 as invalid.
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)