[
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Krishen Bhan updated HUDI-7507:
-------------------------------
Attachment: (was: Flowchart (3)-1.png)
> ongoing concurrent writers with smaller timestamp can cause issues with
> table services
> ---------------------------------------------------------------------------------------
>
> Key: HUDI-7507
> URL: https://issues.apache.org/jira/browse/HUDI-7507
> Project: Apache Hudi
> Issue Type: Improvement
> Components: table-service
> Reporter: Krishen Bhan
> Assignee: sivabalan narayanan
> Priority: Critical
> Labels: pull-request-available
> Fix For: 0.16.0, 1.0.0
>
> Attachments: Flowchart (1).png, Flowchart (2)-2.png, Flowchart
> (4).png, Flowchart.png
>
> Original Estimate: 1h
> Remaining Estimate: 1h
>
> *Scenarios:*
> Although HUDI operations hold a table lock when creating a .requested
> instant, because HUDI writers do not generate a timestamp and create a
> .requsted plan in the same transaction, there can be a scenario where
> # Job 1 starts, chooses timestamp ( x ) , Job 2 starts and chooses timestamp
> (x - 1)
> # Job 1 schedules and creates requested file with instant timestamp ( x )
> # Job 2 schedules and creates requested file with instant timestamp (x-1)
> # Both jobs continue running
> If one job is writing a commit and the other is a table service, this can
> cause issues:
> *
> ** (A) If Job 2 is ingestion commit and Job 1 is ingestion commit that also
> does compaction/log compaction on MDT, then when Job 1 runs before Job 2 and
> can create a compaction plan for all instant times (up to ( x ) ) that
> doesn’t include instant time (x-1) . Later Job 2 will create instant time
> (x-1), but timeline will be in a corrupted state since compaction plan was
> supposed to include (x-1)
> ** (B) There is a similar issue with clean. If Job2 is a long-running commit
> (that was stuck/delayed for a while before creating its .requested plan) and
> Job 1 is a clean, then Job 1 can perform a clean that updates the
> earliest-commit-to-retain without waiting for the inflight instant by Job 2
> at (x-1) to complete. This causes Job2 to be "skipped" by clean.
> ** If the completed commit files include som sort of "checkpointing" with
> another "downstream job" performing incremental reads on this dataset (such
> as Hoodie Streamer/DeltaSync) then there may be incorrect behavior, such as
> the incremental reader skipping some completed commits (that have a smaller
> instant timestamp than latest completed commit but were created after).
> added a diagram to visualize the issue (A), specifically the scenario with
> metadata table compact
> !Flowchart (4).png!
>
> and another with incremental clean (B) (in below diagram assume that the
> earliest commit to retain computed by clean i5 is some instant time that is
> greater than i4)
> !Flowchart (1).png!
> *Proposed approach:*
> One way this can be resolved is by combining the operations of generating
> instant time and creating a requested file in the same HUDI table
> transaction. Specifically, executing the following steps whenever any instant
> (commit, table service, etc) is scheduled
> Approach A
> # Acquire table lock
> # Look at the latest instant C on the active timeline (completed or not).
> Generate a timestamp after C
> # Create the plan and requested file using this new timestamp ( that is
> greater than C)
> # Release table lock
> Unfortunately (A) has the following drawbacks
> * Every operation must now hold the table lock when computing its plan even
> if it's an expensive operation and will take a while
> * Users of HUDI cannot easily set their own instant time of an operation,
> and this restriction would break any public APIs that allow this and would
> require deprecating those APIs.
>
> An alternate approach is to have every operation abort creating a .requested
> file unless it has the latest timestamp. Specifically, for any instant type,
> whenever an operation is about to create a .requested plan on timeline, it
> should take the table lock and assert that there are no other instants on
> timeline that are greater than it that could cause a conflict. If that
> assertion fails, then throw a retry-able conflict resolution exception.
> Specifically, the following steps should be followed whenever any instant
> (commit, table service, etc) is scheduled
> Approach B
> # Acquire table lock. Assume that the desired instant time C and requested
> file plan metadata have already been computed (but not yet serialized to
> timeline as a .requested file), regardless of wether it was before this step
> or right after acquiring the table lock.
> # If there are any ingestion/clustering/compaction operation types (.commit,
> .deltacommit, replacecommit) instants on the timeline that are greater than C
> (regardless of their operation type or state status) then release table lock
> and throw an exception
> # Create requested plan on timeline (As usual)
> # Release table lock
> The reason why (2) only should to check for ingestion/clustering/compaction
> type operations is because
> * Clean and rollback executes don't attempt scheduling a compaction to MDT,
> so a clean/rollback that is scheduled before an
> ingestion/clustering/compaction write but has a greater timestamp will
> anyways not create a compaction plan in MDT that "skips" the corresponding
> MDT deltacommit from ingestion/clustering/compaction write. And if the
> ingestion/clustering/compaction write in this scenario schedules a MDT
> compaction plan it will not include the corresponding MDT deltacommit created
> by the clean/rollback, as expected.
> * Since ingestion/clustering/compaction writes will have this timestamp
> ordering enforced, any concurrent clean plan being scheduled can select a
> candidate earliest commit to retain without needing to worry about another
> ingestion/clustering/compaction instant with a timestamp earlier than that
> appearing on the active timeline.
>
>
> Unlike (A), this approach (B) allows users to continue to use HUDI APIs where
> caller can specify instant time (preventing the need from deprecating any
> public API). Despite this though, (B) has following drawbacks
> * -It is not immediately clear how MDT vs base table operations should be
> handled here. Do we need to update (2) to consider both base table and MDT
> timelines (rather than just MDT)?- (update: as per current implementation in
> 0.x we don't need to consider MDT timeline or even perform this timestamp
> ordering validation when writing to MDT, since due to instants on base tables
> now being ordered in the manner mentioned above, any writer attempting to
> schedule a compaction on MDT won't have to worry about an earlier instant
> time "appearing" on the timeline)
> * This error will still be thrown even for scenarios of concurrent
> operations where it would be safe to continue. For example, assume two
> ingestion writers being executing on a dataset, with each only performing a
> insert commit on the dataset (with no compact/clean being scheduled on MDT).
> Additionally, assume there is no "downstream" job performing incremental
> reads on this dataset. If the writer that started scheduling later ending up
> having an earlier timestamp, it would still be safe for it to continue.
> Despite that, because of step (2) it would still have to abort an throw an
> error. This also means that on datasets with many frequent concurrent
> ingestion commits and very infrequent metadata compactions, there would be a
> lot of transient failures/noise by failing writers if this timestamp delay
> issue happens a lot.
> Between these two approaches, it seems (B) might be preferable since it
> allows user to still use existing APIs for the time being.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)