[
https://issues.apache.org/jira/browse/HUDI-6758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sivabalan narayanan updated HUDI-6758:
--------------------------------------
Description:
Due to spark retries, we could have duplicated log blocks added during write.
And since, we don't delete anything during marker based reconciliation on the
writer side, the reader could see duplicated log blocks. for most of the
payload implementation, this should not be an issue. But for expression
payload, it could result in data consistency since an expression could be
evaluated twice (for eg, colA*2).
Here is the proposal for the fix:
With spark retries, there could be duplicate log blocks created. So, we are
going to might need to add some kind of block identifier to every block added.
Already we have a header with key "INSTANT_TIME". In addition we can add
"BLOCK_SEQUENCE_NO". This is a simple counter which will increment from 1 and
keep increasing during rollover for a given a file slice(append handle). On
spark retries, again, we start with sequence no of 1 for a given File slice.
Because, for a given commit and for a given file slice, with spark task or
stage retries, entire file slice will be attempted again. so, its safe to just
go with a simple sequence number and don't really need to rely on taskId etc to
come up with block identifiers. For eg, if a file slice is supposed to get 3
log files due to log file size rollovers, and during first attempt only 2 log
files were created, in 2nd attempt, all 3 log files will have to re-created.
{*}On the reader side{*}, if there are duplicate blocks detected with same
commit time and same block sequence number, we should find the longest sequence
and pick them if more than one sequence is found. And corrupt blocks should be
ignored during this reconciliation as well. We should account for backwards
compatability where the block seq number may not be present. For rollback
blocks, we can statically set the sequence no to 1 since there should be only
one log block per file slice.
Diff scenarios:
As called out above, we will go with a simple Block_Sequence_No which starts
with 0 and increments for a given file slice. Major crux here is, how the
reader will reconcile if there are duplicate log files created due to spark
task failures. High level logic to reconcile for the reader is as follows:
Reader find the longest sequence of BSN(block sequence number) and picks the
maximum one. So, we do one pass over every log block (which we already do as of
today) to parse header info, w/o deserializing actual content and determine the
right ones (and ignore the spurious log blocks)
h4. Scenario 1:
Happy path. no retries.
log_file1(BSN : 0), log_file2(BSN:1) added.
Reader:
finds only one sequence of 0,1 for BSN and marks both log_file1 and log_file2
as valid.
h4. Scenario 2:
Task failed and retired, where 1st attempt did not create entire list of log
files.
attempt1:
log_file1(BSN : 0), log_file2(BSN:1) added.
attempt2:
log_file3(BSN : 0), log_file4(BSN:1), log_file5(BSN:2) added. // BSN starts
with 0 everytime for a given file slice.
Reader:
Finds two sequence of Block sequence nos.
(0,1) and (0,1,2). And so chooses lf3, lf4 and lf5 as valid and ignores lf1 and
lf2 as spurious ones.
h4. Scenario 3:
Task failed and retired, where both attempts created full list. (spark
speculation may be)
attempt1:
log_file1(BSN : 0), log_file2(BSN:1) added.
attempt2:
log_file3(BSN : 0), log_file4(BSN:1) added.
Reader:
Finds two sequence of Block sequence nos.
(0,1) and (0,1). And so chooses lf3 and lf4 as valid and ignores lf1 and lf2 as
spurious ones. We will probably pick the latest set if both sets have same
sequence.
h4. Scenario 4:
Task failed and retired, where 1st attempt has full list of log files.
attempt1:
log_file1(BSN : 0), log_file2(BSN:1) added.
attempt2:
log_file3(BSN : 0) added.
Reader:
Finds two sequence of Block sequence nos.
(0,1) and (0). And so chooses lf1 and lf2 as valid and ignores lf3 as spurious
one.
Same logic should work out for hdfs as well. Since log blocks are ordered as
per log file name w/ versions, the ordering should be intact. i.e. log
files/log blocks created by 2nd attempt should not interleave w/ log blocks
during w/ 1st attempt. If log blocks are within same log file, the log blocks
will be ordered for sure. If its across log files, the log version number
ordering should suffice (which already happens).
Scenario 3 with hdfs:
Task failed and retired, where both attempts created full list. (spark
speculation may be). but in one of the attempt, there are partial
writes(corrupted blocks).
attempt1:
log_file1, block1(BSN : 0), log_file1, block2(BSN:1), but a corrupt block due
to partial write.
attempt2:
log_file1, block3(BSN : 0), log_file2, block4 (BSN:1) added.
Reader:
Finds two sequence of Block sequence nos.
Since lf1, block2 is a corrupt block, valid sequences deduced are (0) and
(0,1). And so we choose lf1, block3 and lf1, block3 as valid and ignores lf1,
block1 and lf1, block2 as invalid.
was:
Due to spark retries, we could have duplicated log blocks added during write.
And since, we don't delete anything during marker based reconciliation on the
writer side, the reader could see duplicated log blocks. for most of the
payload implementation, this should not be an issue. But for expression
payload, it could result in data consistency since an expression could be
evaluated twice (for eg, colA*2).
> Avoid duplicated log blocks on the LogRecordReader
> --------------------------------------------------
>
> Key: HUDI-6758
> URL: https://issues.apache.org/jira/browse/HUDI-6758
> Project: Apache Hudi
> Issue Type: Bug
> Components: reader-core
> Reporter: sivabalan narayanan
> Assignee: sivabalan narayanan
> Priority: Major
>
> Due to spark retries, we could have duplicated log blocks added during write.
> And since, we don't delete anything during marker based reconciliation on the
> writer side, the reader could see duplicated log blocks. for most of the
> payload implementation, this should not be an issue. But for expression
> payload, it could result in data consistency since an expression could be
> evaluated twice (for eg, colA*2).
>
> Here is the proposal for the fix:
>
> With spark retries, there could be duplicate log blocks created. So, we are
> going to might need to add some kind of block identifier to every block
> added. Already we have a header with key "INSTANT_TIME". In addition we can
> add "BLOCK_SEQUENCE_NO". This is a simple counter which will increment from 1
> and keep increasing during rollover for a given a file slice(append handle).
> On spark retries, again, we start with sequence no of 1 for a given File
> slice. Because, for a given commit and for a given file slice, with spark
> task or stage retries, entire file slice will be attempted again. so, its
> safe to just go with a simple sequence number and don't really need to rely
> on taskId etc to come up with block identifiers. For eg, if a file slice is
> supposed to get 3 log files due to log file size rollovers, and during first
> attempt only 2 log files were created, in 2nd attempt, all 3 log files will
> have to re-created.
>
> {*}On the reader side{*}, if there are duplicate blocks detected with same
> commit time and same block sequence number, we should find the longest
> sequence and pick them if more than one sequence is found. And corrupt blocks
> should be ignored during this reconciliation as well. We should account for
> backwards compatability where the block seq number may not be present. For
> rollback blocks, we can statically set the sequence no to 1 since there
> should be only one log block per file slice.
>
> Diff scenarios:
> As called out above, we will go with a simple Block_Sequence_No which starts
> with 0 and increments for a given file slice. Major crux here is, how the
> reader will reconcile if there are duplicate log files created due to spark
> task failures. High level logic to reconcile for the reader is as follows:
> Reader find the longest sequence of BSN(block sequence number) and picks the
> maximum one. So, we do one pass over every log block (which we already do as
> of today) to parse header info, w/o deserializing actual content and
> determine the right ones (and ignore the spurious log blocks)
>
> h4. Scenario 1:
> Happy path. no retries.
> log_file1(BSN : 0), log_file2(BSN:1) added.
> Reader:
> finds only one sequence of 0,1 for BSN and marks both log_file1 and log_file2
> as valid.
>
> h4. Scenario 2:
> Task failed and retired, where 1st attempt did not create entire list of log
> files.
> attempt1:
> log_file1(BSN : 0), log_file2(BSN:1) added.
> attempt2:
> log_file3(BSN : 0), log_file4(BSN:1), log_file5(BSN:2) added. // BSN starts
> with 0 everytime for a given file slice.
>
> Reader:
> Finds two sequence of Block sequence nos.
> (0,1) and (0,1,2). And so chooses lf3, lf4 and lf5 as valid and ignores lf1
> and lf2 as spurious ones.
>
> h4. Scenario 3:
> Task failed and retired, where both attempts created full list. (spark
> speculation may be)
> attempt1:
> log_file1(BSN : 0), log_file2(BSN:1) added.
> attempt2:
> log_file3(BSN : 0), log_file4(BSN:1) added.
>
> Reader:
> Finds two sequence of Block sequence nos.
> (0,1) and (0,1). And so chooses lf3 and lf4 as valid and ignores lf1 and lf2
> as spurious ones. We will probably pick the latest set if both sets have same
> sequence.
>
> h4. Scenario 4:
> Task failed and retired, where 1st attempt has full list of log files.
> attempt1:
> log_file1(BSN : 0), log_file2(BSN:1) added.
> attempt2:
> log_file3(BSN : 0) added.
>
> Reader:
> Finds two sequence of Block sequence nos.
> (0,1) and (0). And so chooses lf1 and lf2 as valid and ignores lf3 as
> spurious one.
>
> Same logic should work out for hdfs as well. Since log blocks are ordered as
> per log file name w/ versions, the ordering should be intact. i.e. log
> files/log blocks created by 2nd attempt should not interleave w/ log blocks
> during w/ 1st attempt. If log blocks are within same log file, the log blocks
> will be ordered for sure. If its across log files, the log version number
> ordering should suffice (which already happens).
>
>
> Scenario 3 with hdfs:
> Task failed and retired, where both attempts created full list. (spark
> speculation may be). but in one of the attempt, there are partial
> writes(corrupted blocks).
> attempt1:
> log_file1, block1(BSN : 0), log_file1, block2(BSN:1), but a corrupt block due
> to partial write.
> attempt2:
> log_file1, block3(BSN : 0), log_file2, block4 (BSN:1) added.
>
> Reader:
> Finds two sequence of Block sequence nos.
> Since lf1, block2 is a corrupt block, valid sequences deduced are (0) and
> (0,1). And so we choose lf1, block3 and lf1, block3 as valid and ignores lf1,
> block1 and lf1, block2 as invalid.
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)