[ 
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-7507:
-------------------------------
    Description: 
*Scenarios:*

Although HUDI operations hold a table lock when creating a .requested instant, 
because HUDI writers do not generate a timestamp and create a .requsted plan in 
the same transaction, there can be a scenario where 
 # Job 1 starts, chooses timestamp ( x ) , Job 2 starts and chooses timestamp 
(x - 1)
 # Job 1 schedules and creates requested file with instant timestamp ( x )
 # Job 2 schedules and creates requested file with instant timestamp (x-1)
 # Both jobs continue running

If one job is writing a commit and the other is a table service, this can cause 
issues:
 * 
 ** (A) If Job 2 is ingestion commit and Job 1 is ingestion commit that also 
does compaction/log compaction on MDT, then when Job 1 runs before Job 2 and 
can create a compaction plan for all instant times (up to ( x ) ) that doesn’t 
include instant time (x-1) .  Later Job 2 will create instant time (x-1), but 
timeline will be in a corrupted state since compaction plan was supposed to 
include (x-1)
 ** (B) There is a similar issue with clean. If Job2 is a long-running commit 
(that was stuck/delayed for a while before creating its .requested plan) and 
Job 1 is a clean, then Job 1 can perform a clean that updates the 
earliest-commit-to-retain without waiting for the inflight instant by Job 2 at 
(x-1) to complete. This causes Job2 to be "skipped" by clean.
 ** If the completed commit files include som sort of "checkpointing" with 
another "downstream job" performing incremental reads on this dataset (such as 
Hoodie Streamer/DeltaSync) then there may be incorrect behavior, such as the 
incremental reader skipping some completed commits (that have a smaller instant 
timestamp than latest completed commit but were created after).

 added a diagram to visualize the issue (A), specifically the scenario with 
metadata table compact

 !Flowchart (3).png!
 

and another with incremental clean (B) (in below diagram assume that the 
earliest commit to retain computed by clean i5 is some instant time that is 
greater than i4)

!Flowchart (1).png!

*Proposed approach:*

One way this can be resolved is by combining the operations of generating 
instant time and creating a requested file in the same HUDI table transaction. 
Specifically, executing the following steps whenever any instant (commit, table 
service, etc) is scheduled

Approach A
 # Acquire table lock
 # Look at the latest instant C on the active timeline (completed or not). 
Generate a timestamp after C
 # Create the plan and requested file using this new timestamp ( that is 
greater than C)
 # Release table lock

Unfortunately (A) has the following drawbacks
 * Every operation must now hold the table lock when computing its plan even if 
it's an expensive operation and will take a while
 * Users of HUDI cannot easily set their own instant time of an operation, and 
this restriction would break any public APIs that allow this and would require 
deprecating those APIs.

 

An alternate approach is to have every operation abort creating a .requested 
file unless it has the latest timestamp. Specifically, for any instant type, 
whenever an operation is about to create a .requested plan on timeline, it 
should take the table lock and assert that there are no other instants on 
timeline that are greater than it that could cause a conflict. If that 
assertion fails, then throw a retry-able conflict resolution exception.

Specifically, the following steps should be followed whenever any instant 
(commit, table service, etc) is scheduled

Approach B
 # Acquire table lock. Assume that the desired instant time C and requested 
file plan metadata have already been computed (but not yet serialized to 
timeline as a .requested file), regardless of wether it was before this step or 
right after acquiring the table lock.
 # If there are any ingestion/clustering/compaction operation types (.commit, 
.deltacommit, replacecommit) instants on the timeline that are greater than C 
(regardless of their operation type or state status) then release table lock 
and throw an exception
 # Create requested plan on timeline (As usual)
 # Release table lock

The reason why (2) only should to check for  ingestion/clustering/compaction 
type operations is because
 * Clean and rollback executes don't attempt scheduling a compaction to MDT, so 
a clean/rollback that is scheduled before an ingestion/clustering/compaction 
write but has a greater timestamp will anyways not create a compaction plan in 
MDT that "skips" the corresponding MDT deltacommit from 
ingestion/clustering/compaction write. And if the 
ingestion/clustering/compaction write in this scenario schedules a MDT 
compaction plan it will not include the corresponding MDT deltacommit created 
by the clean/rollback, as expected.
 * Since ingestion/clustering/compaction writes will have this timestamp 
ordering enforced, any concurrent clean plan being scheduled can select a 
candidate earliest commit to retain without needing to worry about another 
ingestion/clustering/compaction instant with a timestamp earlier than that 
appearing on the active timeline. 

 

 

Unlike (A), this approach (B) allows users to continue to use HUDI APIs where 
caller can specify instant time (preventing the need from deprecating any 
public API). Despite this though, (B) has following drawbacks
 * -It is not immediately clear how MDT vs base table operations should be 
handled here. Do we need to update (2) to consider both base table and MDT 
timelines (rather than just MDT)?- (update: as per current implementation in 
0.x we don't need to consider MDT timeline or even perform this timestamp 
ordering validation when writing to MDT, since due to instants on base tables 
now being ordered in the manner mentioned above, any writer attempting to 
schedule a compaction on MDT won't have to worry about an earlier instant time 
"appearing" on the timeline)
 * This error will still be thrown even for scenarios of concurrent operations 
where it would be safe to continue. For example, assume two ingestion writers 
being executing on a dataset, with each only performing a insert commit on the 
dataset (with no compact/clean being scheduled on MDT). Additionally, assume 
there is no "downstream" job performing incremental reads on this dataset. If 
the writer that started scheduling later ending up having an earlier timestamp, 
it would still be safe for it to continue. Despite that, because of step (2)  
it would still have to abort an throw an error. This also means that on 
datasets with many frequent concurrent ingestion commits and very infrequent 
metadata compactions, there would be a lot of transient failures/noise by 
failing writers if this timestamp delay issue happens a lot. 

Between these two approaches, it seems (B) might be preferable since it allows 
user to still use existing APIs for the time being.

 

  was:
*Scenarios:*

Although HUDI operations hold a table lock when creating a .requested instant, 
because HUDI writers do not generate a timestamp and create a .requsted plan in 
the same transaction, there can be a scenario where 
 # Job 1 starts, chooses timestamp ( x ) , Job 2 starts and chooses timestamp 
(x - 1)
 # Job 1 schedules and creates requested file with instant timestamp ( x )
 # Job 2 schedules and creates requested file with instant timestamp (x-1)
 # Both jobs continue running

If one job is writing a commit and the other is a table service, this can cause 
issues:
 * 
 ** (A) If Job 2 is ingestion commit and Job 1 is ingestion commit that also 
does compaction/log compaction on MDT, then when Job 1 runs before Job 2 and 
can create a compaction plan for all instant times (up to ( x ) ) that doesn’t 
include instant time (x-1) .  Later Job 2 will create instant time (x-1), but 
timeline will be in a corrupted state since compaction plan was supposed to 
include (x-1)
 ** (B) There is a similar issue with clean. If Job2 is a long-running commit 
(that was stuck/delayed for a while before creating its .requested plan) and 
Job 1 is a clean, then Job 1 can perform a clean that updates the 
earliest-commit-to-retain without waiting for the inflight instant by Job 2 at 
(x-1) to complete. This causes Job2 to be "skipped" by clean.
 ** If the completed commit files include som sort of "checkpointing" with 
another "downstream job" performing incremental reads on this dataset (such as 
Hoodie Streamer/DeltaSync) then there may be incorrect behavior, such as the 
incremental reader skipping some completed commits (that have a smaller instant 
timestamp than latest completed commit but were created after).

 added a diagram to visualize the issue (A), specifically the scenario with 
metadata table compact

 

 

and another with incremental clean (B) (in below diagram assume that the 
earliest commit to retain computed by clean i5 is some instant time that is 
greater than i4)

!Flowchart (1).png!

*Proposed approach:*

One way this can be resolved is by combining the operations of generating 
instant time and creating a requested file in the same HUDI table transaction. 
Specifically, executing the following steps whenever any instant (commit, table 
service, etc) is scheduled

Approach A
 # Acquire table lock
 # Look at the latest instant C on the active timeline (completed or not). 
Generate a timestamp after C
 # Create the plan and requested file using this new timestamp ( that is 
greater than C)
 # Release table lock

Unfortunately (A) has the following drawbacks
 * Every operation must now hold the table lock when computing its plan even if 
it's an expensive operation and will take a while
 * Users of HUDI cannot easily set their own instant time of an operation, and 
this restriction would break any public APIs that allow this and would require 
deprecating those APIs.

 

An alternate approach is to have every operation abort creating a .requested 
file unless it has the latest timestamp. Specifically, for any instant type, 
whenever an operation is about to create a .requested plan on timeline, it 
should take the table lock and assert that there are no other instants on 
timeline that are greater than it that could cause a conflict. If that 
assertion fails, then throw a retry-able conflict resolution exception.

Specifically, the following steps should be followed whenever any instant 
(commit, table service, etc) is scheduled

Approach B
 # Acquire table lock. Assume that the desired instant time C and requested 
file plan metadata have already been computed (but not yet serialized to 
timeline as a .requested file), regardless of wether it was before this step or 
right after acquiring the table lock.
 # If there are any ingestion/clustering/compaction operation types (.commit, 
.deltacommit, replacecommit) instants on the timeline that are greater than C 
(regardless of their operation type or state status) then release table lock 
and throw an exception
 # Create requested plan on timeline (As usual)
 # Release table lock

The reason why (2) only should to check for  ingestion/clustering/compaction 
type operations is because
 * Clean and rollback executes don't attempt scheduling a compaction to MDT, so 
a clean/rollback that is scheduled before an ingestion/clustering/compaction 
write but has a greater timestamp will anyways not create a compaction plan in 
MDT that "skips" the corresponding MDT deltacommit from 
ingestion/clustering/compaction write. And if the 
ingestion/clustering/compaction write in this scenario schedules a MDT 
compaction plan it will not include the corresponding MDT deltacommit created 
by the clean/rollback, as expected.
 * Since ingestion/clustering/compaction writes will have this timestamp 
ordering enforced, any concurrent clean plan being scheduled can select a 
candidate earliest commit to retain without needing to worry about another 
ingestion/clustering/compaction instant with a timestamp earlier than that 
appearing on the active timeline. 

 

 

Unlike (A), this approach (B) allows users to continue to use HUDI APIs where 
caller can specify instant time (preventing the need from deprecating any 
public API). Despite this though, (B) has following drawbacks
 * -It is not immediately clear how MDT vs base table operations should be 
handled here. Do we need to update (2) to consider both base table and MDT 
timelines (rather than just MDT)?- (update: as per current implementation in 
0.x we don't need to consider MDT timeline or even perform this timestamp 
ordering validation when writing to MDT, since due to instants on base tables 
now being ordered in the manner mentioned above, any writer attempting to 
schedule a compaction on MDT won't have to worry about an earlier instant time 
"appearing" on the timeline)
 * This error will still be thrown even for scenarios of concurrent operations 
where it would be safe to continue. For example, assume two ingestion writers 
being executing on a dataset, with each only performing a insert commit on the 
dataset (with no compact/clean being scheduled on MDT). Additionally, assume 
there is no "downstream" job performing incremental reads on this dataset. If 
the writer that started scheduling later ending up having an earlier timestamp, 
it would still be safe for it to continue. Despite that, because of step (2)  
it would still have to abort an throw an error. This also means that on 
datasets with many frequent concurrent ingestion commits and very infrequent 
metadata compactions, there would be a lot of transient failures/noise by 
failing writers if this timestamp delay issue happens a lot. 

Between these two approaches, it seems (B) might be preferable since it allows 
user to still use existing APIs for the time being.

 


>  ongoing concurrent writers with smaller timestamp can cause issues with 
> table services
> ---------------------------------------------------------------------------------------
>
>                 Key: HUDI-7507
>                 URL: https://issues.apache.org/jira/browse/HUDI-7507
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: table-service
>            Reporter: Krishen Bhan
>            Assignee: sivabalan narayanan
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 0.16.0, 1.0.0
>
>         Attachments: Flowchart (1).png, Flowchart (2)-2.png, Flowchart 
> (3)-1.png, Flowchart (3).png, Flowchart.png
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> *Scenarios:*
> Although HUDI operations hold a table lock when creating a .requested 
> instant, because HUDI writers do not generate a timestamp and create a 
> .requsted plan in the same transaction, there can be a scenario where 
>  # Job 1 starts, chooses timestamp ( x ) , Job 2 starts and chooses timestamp 
> (x - 1)
>  # Job 1 schedules and creates requested file with instant timestamp ( x )
>  # Job 2 schedules and creates requested file with instant timestamp (x-1)
>  # Both jobs continue running
> If one job is writing a commit and the other is a table service, this can 
> cause issues:
>  * 
>  ** (A) If Job 2 is ingestion commit and Job 1 is ingestion commit that also 
> does compaction/log compaction on MDT, then when Job 1 runs before Job 2 and 
> can create a compaction plan for all instant times (up to ( x ) ) that 
> doesn’t include instant time (x-1) .  Later Job 2 will create instant time 
> (x-1), but timeline will be in a corrupted state since compaction plan was 
> supposed to include (x-1)
>  ** (B) There is a similar issue with clean. If Job2 is a long-running commit 
> (that was stuck/delayed for a while before creating its .requested plan) and 
> Job 1 is a clean, then Job 1 can perform a clean that updates the 
> earliest-commit-to-retain without waiting for the inflight instant by Job 2 
> at (x-1) to complete. This causes Job2 to be "skipped" by clean.
>  ** If the completed commit files include som sort of "checkpointing" with 
> another "downstream job" performing incremental reads on this dataset (such 
> as Hoodie Streamer/DeltaSync) then there may be incorrect behavior, such as 
> the incremental reader skipping some completed commits (that have a smaller 
> instant timestamp than latest completed commit but were created after).
>  added a diagram to visualize the issue (A), specifically the scenario with 
> metadata table compact
>  !Flowchart (3).png!
>  
> and another with incremental clean (B) (in below diagram assume that the 
> earliest commit to retain computed by clean i5 is some instant time that is 
> greater than i4)
> !Flowchart (1).png!
> *Proposed approach:*
> One way this can be resolved is by combining the operations of generating 
> instant time and creating a requested file in the same HUDI table 
> transaction. Specifically, executing the following steps whenever any instant 
> (commit, table service, etc) is scheduled
> Approach A
>  # Acquire table lock
>  # Look at the latest instant C on the active timeline (completed or not). 
> Generate a timestamp after C
>  # Create the plan and requested file using this new timestamp ( that is 
> greater than C)
>  # Release table lock
> Unfortunately (A) has the following drawbacks
>  * Every operation must now hold the table lock when computing its plan even 
> if it's an expensive operation and will take a while
>  * Users of HUDI cannot easily set their own instant time of an operation, 
> and this restriction would break any public APIs that allow this and would 
> require deprecating those APIs.
>  
> An alternate approach is to have every operation abort creating a .requested 
> file unless it has the latest timestamp. Specifically, for any instant type, 
> whenever an operation is about to create a .requested plan on timeline, it 
> should take the table lock and assert that there are no other instants on 
> timeline that are greater than it that could cause a conflict. If that 
> assertion fails, then throw a retry-able conflict resolution exception.
> Specifically, the following steps should be followed whenever any instant 
> (commit, table service, etc) is scheduled
> Approach B
>  # Acquire table lock. Assume that the desired instant time C and requested 
> file plan metadata have already been computed (but not yet serialized to 
> timeline as a .requested file), regardless of wether it was before this step 
> or right after acquiring the table lock.
>  # If there are any ingestion/clustering/compaction operation types (.commit, 
> .deltacommit, replacecommit) instants on the timeline that are greater than C 
> (regardless of their operation type or state status) then release table lock 
> and throw an exception
>  # Create requested plan on timeline (As usual)
>  # Release table lock
> The reason why (2) only should to check for  ingestion/clustering/compaction 
> type operations is because
>  * Clean and rollback executes don't attempt scheduling a compaction to MDT, 
> so a clean/rollback that is scheduled before an 
> ingestion/clustering/compaction write but has a greater timestamp will 
> anyways not create a compaction plan in MDT that "skips" the corresponding 
> MDT deltacommit from ingestion/clustering/compaction write. And if the 
> ingestion/clustering/compaction write in this scenario schedules a MDT 
> compaction plan it will not include the corresponding MDT deltacommit created 
> by the clean/rollback, as expected.
>  * Since ingestion/clustering/compaction writes will have this timestamp 
> ordering enforced, any concurrent clean plan being scheduled can select a 
> candidate earliest commit to retain without needing to worry about another 
> ingestion/clustering/compaction instant with a timestamp earlier than that 
> appearing on the active timeline. 
>  
>  
> Unlike (A), this approach (B) allows users to continue to use HUDI APIs where 
> caller can specify instant time (preventing the need from deprecating any 
> public API). Despite this though, (B) has following drawbacks
>  * -It is not immediately clear how MDT vs base table operations should be 
> handled here. Do we need to update (2) to consider both base table and MDT 
> timelines (rather than just MDT)?- (update: as per current implementation in 
> 0.x we don't need to consider MDT timeline or even perform this timestamp 
> ordering validation when writing to MDT, since due to instants on base tables 
> now being ordered in the manner mentioned above, any writer attempting to 
> schedule a compaction on MDT won't have to worry about an earlier instant 
> time "appearing" on the timeline)
>  * This error will still be thrown even for scenarios of concurrent 
> operations where it would be safe to continue. For example, assume two 
> ingestion writers being executing on a dataset, with each only performing a 
> insert commit on the dataset (with no compact/clean being scheduled on MDT). 
> Additionally, assume there is no "downstream" job performing incremental 
> reads on this dataset. If the writer that started scheduling later ending up 
> having an earlier timestamp, it would still be safe for it to continue. 
> Despite that, because of step (2)  it would still have to abort an throw an 
> error. This also means that on datasets with many frequent concurrent 
> ingestion commits and very infrequent metadata compactions, there would be a 
> lot of transient failures/noise by failing writers if this timestamp delay 
> issue happens a lot. 
> Between these two approaches, it seems (B) might be preferable since it 
> allows user to still use existing APIs for the time being.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to