[
https://issues.apache.org/jira/browse/HUDI-8248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884413#comment-17884413
]
sivabalan narayanan edited comment on HUDI-8248 at 9/24/24 6:43 PM:
--------------------------------------------------------------------
Lets discuss (B) is bit more detail
Current logic in LogRecordReader:
{code:java}
For every log block
curLogBlockInstantTime = logblock.getInstantTime from header
if (curLogBlockInstantTime > maxInstantTime configured)
break;
else proceed onto processing data blocks, delete blocks and rollback
blocks. {code}
Proposed fix:
{code:java}
For every log block
curLogBlockInstantTime = logblock.getInstantTime from header
if (curLogBlock is a data block of delete block && curLogBlockInstantTime
> maxInstantTime configured)
continue; // skip processing cur data/delete block.
else proceed onto processing data blocks, delete blocks and rollback
blocks. {code}
In essence, we only try to adhere maxInstant time only for data blocks. For
rollback blocks, we account for all log blocks.
Impact compared to existing code:
We might parse few additional log block headers. Previously, we might bail out
whenever we encounter a rollback block w/ higher instant time. But w/ the fix,
we might have to parse the headers for all log blocks.
Incase of snapshot reader, we should not see much overhead since we are looking
to read the latest state of the table. So, we could have just 1 or 2 additional
log blocks (which was written by a concurrent writer) whose header might be
parsed in addition to what was happening before this fix.
But with timetravel and incremental query, we might parse few additional log
block headers. but given the criticality of the consistency, we feel its a fair
trade off.
was (Author: shivnarayan):
Lets discuss (B) is bit more detail
Current logic in LogRecordReader:
{code:java}
For every log block
curLogBlockInstantTime = logblock.getInstantTime from header
if (curLogBlockInstantTime > maxInstantTime configured)
break;
else proceed onto processing data blocks, delete blocks and rollback
blocks. {code}
Proposed fix:
{code:java}
For every log block
curLogBlockInstantTime = logblock.getInstantTime from header
if (curLogBlock is a data block of delete block && curLogBlockInstantTime
> maxInstantTime configured)
continue;
else proceed onto processing data blocks, delete blocks and rollback
blocks. {code}
In essence, we only try to adhere maxInstant time only for data blocks. For
rollback blocks, we account for all log blocks.
Impact compared to existing code:
We might parse few additional log block headers. Previously, we might bail out
whenever we encounter a rollback block w/ higher instant time. But w/ the fix,
we might have to parse the headers for all log blocks.
Incase of snapshot reader, we should not see much overhead since we are looking
to read the latest state of the table. So, we could have just 1 or 2 additional
log blocks (which was written by a concurrent writer) whose header might be
parsed in addition to what was happening before this fix.
But with timetravel and incremental query, we might parse few additional log
block headers. but given the criticality of the consistency, we feel its a fair
trade off.
> Fix LogRecord reader to account for rollback blocks with higher timestamps
> --------------------------------------------------------------------------
>
> Key: HUDI-8248
> URL: https://issues.apache.org/jira/browse/HUDI-8248
> Project: Apache Hudi
> Issue Type: Improvement
> Components: reader-core
> Reporter: sivabalan narayanan
> Assignee: sivabalan narayanan
> Priority: Major
> Labels: pull-request-available
> Fix For: 0.16.0, 1.0.0
>
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> With LogRecordReader, we also configure maxIntant time to read. Sometimes
> rollback blocks could have higher timestamps compared to the maxInstant set,
> which might lead to some data inconsistencies.
>
> Lets go through an illustration:
> Say, we have t1.dc, t2.dc and t2.dc crashed mid way.
> Current layout is,
> {{base file(t1), lf1(partially committed data w/ t2 as instant time)}}
>
> Then we start t5.dc say. just when we start t5.dc, hudi detects pending
> commit and triggers a rollback. And this rollback will get an instant time of
> t6 (t6.rb). Note that rollback's commit time is greater than t5 or current
> ongoing delta commit.
> So, once rollback completes, this is the layout.
> {{base file, lf1(from t2.dc partially failed), lf3 (rollback command block
> with t6).}}
>
> And once t5.dc completes, this is how the layout looks like
> {{base file, lf1(from t2.dc partially failed), lf3 (rollback command block
> with t6). lf4 (from t5)}}
>
> At this point in time, when we trigger snapshot read or try to trigger
> tagLocation w/ global index, maxInstant is set to last entry among commits
> timeline which is t5. So, while LogRecordReader while processing all log
> blocks, when it reaches lf3, it detects the timestamp of t6 > t5 (i.e max
> instant time) and bails out of for loop. So, in essence it may not even read
> lf4 in above scenario.
>
> If lf1 and lf4 is referring to a delete block, it could lead to data
> consistency issues w/ global indexes when record moves from one partition to
> another.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)