[ 
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-7507:
-------------------------------
    Attachment: Flowchart (3)-1.png

>  ongoing concurrent writers with smaller timestamp can cause issues with 
> table services
> ---------------------------------------------------------------------------------------
>
>                 Key: HUDI-7507
>                 URL: https://issues.apache.org/jira/browse/HUDI-7507
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: table-service
>            Reporter: Krishen Bhan
>            Assignee: sivabalan narayanan
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 0.16.0, 1.0.0
>
>         Attachments: Flowchart (1).png, Flowchart (2)-2.png, Flowchart 
> (3)-1.png, Flowchart (3).png, Flowchart.png
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> *Scenarios:*
> Although HUDI operations hold a table lock when creating a .requested 
> instant, because HUDI writers do not generate a timestamp and create a 
> .requsted plan in the same transaction, there can be a scenario where 
>  # Job 1 starts, chooses timestamp ( x ) , Job 2 starts and chooses timestamp 
> (x - 1)
>  # Job 1 schedules and creates requested file with instant timestamp ( x )
>  # Job 2 schedules and creates requested file with instant timestamp (x-1)
>  # Both jobs continue running
> If one job is writing a commit and the other is a table service, this can 
> cause issues:
>  * 
>  ** (A) If Job 2 is ingestion commit and Job 1 is ingestion commit that also 
> does compaction/log compaction on MDT, then when Job 1 runs before Job 2 and 
> can create a compaction plan for all instant times (up to ( x ) ) that 
> doesn’t include instant time (x-1) .  Later Job 2 will create instant time 
> (x-1), but timeline will be in a corrupted state since compaction plan was 
> supposed to include (x-1)
>  **  
>  ** (C) There is a similar issue with clean. If Job2 is a long-running commit 
> (that was stuck/delayed for a while before creating its .requested plan) and 
> Job 1 is a clean, then Job 1 can perform a clean that updates the 
> earliest-commit-to-retain without waiting for the inflight instant by Job 2 
> at (x-1) to complete. This causes Job2 to be "skipped" by clean.
>  ** If the completed commit files include som sort of "checkpointing" with 
> another "downstream job" performing incremental reads on this dataset (such 
> as Hoodie Streamer/DeltaSync) then there may be incorrect behavior, such as 
> the incremental reader skipping some completed commits (that have a smaller 
> instant timestamp than latest completed commit but were created after).
> [Edit] I added a diagram to visualize the issue, specifically the second 
> scenario with metadata table compact
>  
>  
> and another with incremental clean (in below diagram assume that the earliest 
> commit to retain computed by clean i5 is some instant time that is greater 
> than i4)
> !Flowchart (1).png!
> *Proposed approach:*
> One way this can be resolved is by combining the operations of generating 
> instant time and creating a requested file in the same HUDI table 
> transaction. Specifically, executing the following steps whenever any instant 
> (commit, table service, etc) is scheduled
> Approach A
>  # Acquire table lock
>  # Look at the latest instant C on the active timeline (completed or not). 
> Generate a timestamp after C
>  # Create the plan and requested file using this new timestamp ( that is 
> greater than C)
>  # Release table lock
> Unfortunately (A) has the following drawbacks
>  * Every operation must now hold the table lock when computing its plan even 
> if it's an expensive operation and will take a while
>  * Users of HUDI cannot easily set their own instant time of an operation, 
> and this restriction would break any public APIs that allow this and would 
> require deprecating those APIs.
>  
> An alternate approach is to have every operation abort creating a .requested 
> file unless it has the latest timestamp. Specifically, for any instant type, 
> whenever an operation is about to create a .requested plan on timeline, it 
> should take the table lock and assert that there are no other instants on 
> timeline that are greater than it that could cause a conflict. If that 
> assertion fails, then throw a retry-able conflict resolution exception.
> Specifically, the following steps should be followed whenever any instant 
> (commit, table service, etc) is scheduled
> Approach B
>  # Acquire table lock. Assume that the desired instant time C and requested 
> file plan metadata have already been computed (but not yet serialized to 
> timeline as a .requested file), regardless of wether it was before this step 
> or right after acquiring the table lock.
>  # If there are any ingestion/clustering/compaction operation types (.commit, 
> .deltacommit, replacecommit) instants on the timeline that are greater than C 
> (regardless of their operation type or state status) then release table lock 
> and throw an exception
>  # Create requested plan on timeline (As usual)
>  # Release table lock
> The reason why (2) only should to check for  ingestion/clustering/compaction 
> type operations is because
>  * Clean and rollback executes don't attempt scheduling a compaction to MDT, 
> so a clean/rollback that is scheduled before an 
> ingestion/clustering/compaction write but has a greater timestamp will 
> anyways not create a compaction plan in MDT that "skips" the corresponding 
> MDT deltacommit from ingestion/clustering/compaction write. And if the 
> ingestion/clustering/compaction write in this scenario schedules a MDT 
> compaction plan it will not include the corresponding MDT deltacommit created 
> by the clean/rollback, as expected.
>  * Since ingestion/clustering/compaction writes will have this timestamp 
> ordering enforced, any concurrent clean plan being scheduled can select a 
> candidate earliest commit to retain without needing to worry about another 
> ingestion/clustering/compaction instant with a timestamp earlier than that 
> appearing on the active timeline. 
>  
>  
> Unlike (A), this approach (B) allows users to continue to use HUDI APIs where 
> caller can specify instant time (preventing the need from deprecating any 
> public API). Despite this though, (B) has following drawbacks
>  * -It is not immediately clear how MDT vs base table operations should be 
> handled here. Do we need to update (2) to consider both base table and MDT 
> timelines (rather than just MDT)?- (update: as per current implementation in 
> 0.x we don't need to consider MDT timeline or even perform this timestamp 
> ordering validation when writing to MDT, since due to instants on base tables 
> now being ordered in the manner mentioned above, any writer attempting to 
> schedule a compaction on MDT won't have to worry about an earlier instant 
> time "appearing" on the timeline)
>  * This error will still be thrown even for scenarios of concurrent 
> operations where it would be safe to continue. For example, assume two 
> ingestion writers being executing on a dataset, with each only performing a 
> insert commit on the dataset (with no compact/clean being scheduled on MDT). 
> Additionally, assume there is no "downstream" job performing incremental 
> reads on this dataset. If the writer that started scheduling later ending up 
> having an earlier timestamp, it would still be safe for it to continue. 
> Despite that, because of step (2)  it would still have to abort an throw an 
> error. This also means that on datasets with many frequent concurrent 
> ingestion commits and very infrequent metadata compactions, there would be a 
> lot of transient failures/noise by failing writers if this timestamp delay 
> issue happens a lot. 
> Between these two approaches, it seems (B) might be preferable since it 
> allows user to still use existing APIs for the time being.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to