[ 
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-7507:
-------------------------------
    Description: 
*Scenarios:*

Although HUDI operations hold a table lock when creating a .requested instant, 
because HUDI writers do not generate a timestamp and create a .requsted plan in 
the same transaction, there can be a scenario where 
 # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x 
- 1)
 # Job 1 schedules and creates requested file with instant timestamp (x)
 # Job 2 schedules and creates requested file with instant timestamp (x-1)
 # Both jobs continue running

If one job is writing a commit and the other is a table service, this can cause 
issues:
 * 
 ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
when Job 1 runs before Job 2 and can create a compaction plan for all instant 
times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 will 
create instant time (x-1), but timeline will be in a corrupted state since 
compaction plan was supposed to include (x-1)
 ** There is a similar issue with clean. If Job2 is a long-running commit (that 
was stuck/delayed for a while before creating its .requested plan) and Job 1 is 
a clean, then Job 1 can perform a clean that updates the 
earliest-commit-to-retain without waiting for the inflight instant by Job 2 at 
(x-1) to complete. This causes Job2 to be "skipped" by clean.
 ** If the completed commit files include som sort of "checkpointing" with 
another "downstream job" performing incremental reads on this dataset (such as 
Hoodie Streamer/DeltaSync) then there may be incorrect behavior, such as the 
incremental reader skipping some completed commits (that have a smaller instant 
timestamp than latest completed commit but were created after).

[Edit] I added a diagram to visualize the issue, specifically the second 
scenario with clean

!Flowchart (1).png!

*Proposed approach:*

One way this can be resolved is by combining the operations of generating 
instant time and creating a requested file in the same HUDI table transaction. 
Specifically, executing the following steps whenever any instant (commit, table 
service, etc) is scheduled

Approach A
 # Acquire table lock
 # Look at the latest instant C on the active timeline (completed or not). 
Generate a timestamp after C
 # Create the plan and requested file using this new timestamp ( that is 
greater than C)
 # Release table lock

Unfortunately (A) has the following drawbacks
 * Every operation must now hold the table lock when computing its plan even if 
it's an expensive operation and will take a while
 * Users of HUDI cannot easily set their own instant time of an operation, and 
this restriction would break any public APIs that allow this and would require 
deprecating those APIs.

 

An alternate approach is to have every operation abort creating a .requested 
file unless it has the latest timestamp. Specifically, for any instant type, 
whenever an operation is about to create a .requested plan on timeline, it 
should take the table lock and assert that there are no other instants on 
timeline that are greater than it that could cause a conflict. If that 
assertion fails, then throw a retry-able conflict resolution exception.

Specifically, the following steps should be followed whenever any instant 
(commit, table service, etc) is scheduled

Approach B
 # Acquire table lock. Assume that the desired instant time C and requested 
file plan metadata have already been created, regardless of wether it was 
before this step or right after acquiring the table lock.
 # If there are any instants on the timeline that are greater than C 
(regardless of their operation type or sate status) then release table lock and 
throw an exception
 # Create requested plan on timeline (As usual)
 # Release table lock

Unlike (A), this approach (B) allows users to continue to use HUDI APIs where 
caller can specify instant time (preventing the need from deprecating any 
public API). It also allows the possibility of table service operations 
computing their plan without holding a lock. Despite this though, (B) has 
following drawbacks
 * It is not immediately clear how MDT vs base table operations should be 
handled here. Do we need to update (2) to consider both base table and MDT 
timelines (rather than just MDT)?
 * This error will still be thrown even for scenarios of concurrent operations 
where it would be safe to continue. For example, assume two ingestion writers 
being executing on a dataset, with each only performing a insert commit on the 
dataset (with no compact/clean being scheduled on MDT). Additionally, assume 
there is no "downstream" job performing incremental reads on this dataset. If 
the writer that started scheduling later ending up having an earlier timestamp, 
it would still be safe for it to continue. Despite that, because of step (2)  
it would still have to abort an throw an error. This also means that on 
datasets with many frequent concurrent ingestion commits and very infrequent 
metadata compactions, there would be a lot of transient failures/noise by 
failing writers if this timestamp delay issue happens a lot. 

Between these two approaches, it seems (B) might be preferable since it allows 
user to still use existing APIs for the time being.

We were wondering if the Apache HUDI project team would be interested in 
investigating and implementing (B) to resolve this issue?

  was:
*Scenarios:*

Although HUDI operations hold a table lock when creating a .requested instant, 
because HUDI writers do not generate a timestamp and create a .requsted plan in 
the same transaction, there can be a scenario where 
 # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x 
- 1)
 # Job 1 schedules and creates requested file with instant timestamp (x)
 # Job 2 schedules and creates requested file with instant timestamp (x-1)
 # Both jobs continue running

If one job is writing a commit and the other is a table service, this can cause 
issues:
 * 
 ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
when Job 1 runs before Job 2 and can create a compaction plan for all instant 
times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 will 
create instant time (x-1), but timeline will be in a corrupted state since 
compaction plan was supposed to include (x-1)
 ** There is a similar issue with clean. If Job2 is a long-running commit (that 
was stuck/delayed for a while before creating its .requested plan) and Job 1 is 
a clean, then Job 1 can perform a clean that updates the 
earliest-commit-to-retain without waiting for the inflight instant by Job 2 at 
(x-1) to complete. This causes Job2 to be "skipped" by clean.

[Edit] I added a diagram to visualize the issue, specifically the second 
scenario with clean

!Flowchart (1).png!

*Proposed approach:*

One way this can be resolved is by combining the operations of generating 
instant time and creating a requested file in the same HUDI table transaction. 
Specifically, executing the following steps whenever any instant (commit, table 
service, etc) is scheduled

Approach A
 # Acquire table lock
 # Look at the latest instant C on the active timeline (completed or not). 
Generate a timestamp after C
 # Create the plan and requested file using this new timestamp ( that is 
greater than C)
 # Release table lock

Unfortunately (A) has the following drawbacks
 * Every operation must now hold the table lock when computing its plan even if 
it's an expensive operation and will take a while
 * Users of HUDI cannot easily set their own instant time of an operation, and 
this restriction would break any public APIs that allow this and would require 
deprecating those APIs.

 

An alternate approach is to have every operation abort creating a .requested 
file unless it has the latest timestamp. Specifically, for any instant type, 
whenever an operation is about to create a .requested plan on timeline, it 
should take the table lock and assert that there are no other instants on 
timeline that are greater than it that could cause a conflict. If that 
assertion fails, then throw a retry-able conflict resolution exception.

Specifically, the following steps should be followed whenever any instant 
(commit, table service, etc) is scheduled

Approach B
 # Acquire table lock. Assume that the desired instant time C and requested 
file plan metadata have already been created, regardless of wether it was 
before this step or right after acquiring the table lock.
 # Get the set of all instants on the timeline that are greater than C 
(regardless of their operation type or sate status). 
 ## If the current operation is an "ingestion" type 
(commit/deltacommit/insert_overwrite replacecommit) then assert the set is 
empty. This is because another "ingestion" operation with a later instant time 
might schedule and execute a compaction at said instant time in MDT, leading 
the table in the aforementioned situation where a compact on MDT is scheduled 
after an inflight ingestion commit.
 ## If the current operation is a "table service" (clean/compaction/cluster) 
then assert that the set doesn't contain any table service instant types 
(clean/compaction/cluster).
 # Create requested plan on timeline (As usual)
 # Release table

Unlike (A), this approach (B) allows users to continue to use HUDI APIs where 
caller can specify instant time (preventing the need from deprecating any 
public API). It also allows the possibility of table service operations 
computing their plan without holding a lock. Despite this though, (B) has 
following drawbacks
 * It is not immediately clear how MDT vs base table operations should be 
handled here. Do we need to update (2) to build it's set from both base table 
and MDT timelines (rather than just MDT)?
 * This error will still be thrown even for scenarios of concurrent operations 
where it would be safe to continue. For example, assume two ingestion writers 
being executing on a dataset, with each only performing a insert commit on the 
dataset (with no table service being scheduled on MDT). If the writer that 
started scheduling later ending up having an earlier timestamp, it would still 
be safe for it to continue. Despite that, because of step (2.1)  it would still 
have to abort an throw an error. This also means that on datasets with many 
frequent concurrent ingestion commits and very infrequent metadata compactions, 
there would be a lot of transient failures/noise by failing writers if this 
timestamp delay issue happens a lot. 

Between these two approaches, it seems (B) might be preferable since it allows 
user to still use existing APIs for the time being.

We were wondering if the Apache HUDI project team would be interested in 
investigating and implementing (B) to resolve this issue?


>  ongoing concurrent writers with smaller timestamp can cause issues with 
> table services
> ---------------------------------------------------------------------------------------
>
>                 Key: HUDI-7507
>                 URL: https://issues.apache.org/jira/browse/HUDI-7507
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: table-service
>            Reporter: Krishen Bhan
>            Priority: Major
>             Fix For: 0.15.0
>
>         Attachments: Flowchart (1).png, Flowchart.png
>
>
> *Scenarios:*
> Although HUDI operations hold a table lock when creating a .requested 
> instant, because HUDI writers do not generate a timestamp and create a 
> .requsted plan in the same transaction, there can be a scenario where 
>  # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp 
> (x - 1)
>  # Job 1 schedules and creates requested file with instant timestamp (x)
>  # Job 2 schedules and creates requested file with instant timestamp (x-1)
>  # Both jobs continue running
> If one job is writing a commit and the other is a table service, this can 
> cause issues:
>  * 
>  ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
> when Job 1 runs before Job 2 and can create a compaction plan for all instant 
> times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 
> will create instant time (x-1), but timeline will be in a corrupted state 
> since compaction plan was supposed to include (x-1)
>  ** There is a similar issue with clean. If Job2 is a long-running commit 
> (that was stuck/delayed for a while before creating its .requested plan) and 
> Job 1 is a clean, then Job 1 can perform a clean that updates the 
> earliest-commit-to-retain without waiting for the inflight instant by Job 2 
> at (x-1) to complete. This causes Job2 to be "skipped" by clean.
>  ** If the completed commit files include som sort of "checkpointing" with 
> another "downstream job" performing incremental reads on this dataset (such 
> as Hoodie Streamer/DeltaSync) then there may be incorrect behavior, such as 
> the incremental reader skipping some completed commits (that have a smaller 
> instant timestamp than latest completed commit but were created after).
> [Edit] I added a diagram to visualize the issue, specifically the second 
> scenario with clean
> !Flowchart (1).png!
> *Proposed approach:*
> One way this can be resolved is by combining the operations of generating 
> instant time and creating a requested file in the same HUDI table 
> transaction. Specifically, executing the following steps whenever any instant 
> (commit, table service, etc) is scheduled
> Approach A
>  # Acquire table lock
>  # Look at the latest instant C on the active timeline (completed or not). 
> Generate a timestamp after C
>  # Create the plan and requested file using this new timestamp ( that is 
> greater than C)
>  # Release table lock
> Unfortunately (A) has the following drawbacks
>  * Every operation must now hold the table lock when computing its plan even 
> if it's an expensive operation and will take a while
>  * Users of HUDI cannot easily set their own instant time of an operation, 
> and this restriction would break any public APIs that allow this and would 
> require deprecating those APIs.
>  
> An alternate approach is to have every operation abort creating a .requested 
> file unless it has the latest timestamp. Specifically, for any instant type, 
> whenever an operation is about to create a .requested plan on timeline, it 
> should take the table lock and assert that there are no other instants on 
> timeline that are greater than it that could cause a conflict. If that 
> assertion fails, then throw a retry-able conflict resolution exception.
> Specifically, the following steps should be followed whenever any instant 
> (commit, table service, etc) is scheduled
> Approach B
>  # Acquire table lock. Assume that the desired instant time C and requested 
> file plan metadata have already been created, regardless of wether it was 
> before this step or right after acquiring the table lock.
>  # If there are any instants on the timeline that are greater than C 
> (regardless of their operation type or sate status) then release table lock 
> and throw an exception
>  # Create requested plan on timeline (As usual)
>  # Release table lock
> Unlike (A), this approach (B) allows users to continue to use HUDI APIs where 
> caller can specify instant time (preventing the need from deprecating any 
> public API). It also allows the possibility of table service operations 
> computing their plan without holding a lock. Despite this though, (B) has 
> following drawbacks
>  * It is not immediately clear how MDT vs base table operations should be 
> handled here. Do we need to update (2) to consider both base table and MDT 
> timelines (rather than just MDT)?
>  * This error will still be thrown even for scenarios of concurrent 
> operations where it would be safe to continue. For example, assume two 
> ingestion writers being executing on a dataset, with each only performing a 
> insert commit on the dataset (with no compact/clean being scheduled on MDT). 
> Additionally, assume there is no "downstream" job performing incremental 
> reads on this dataset. If the writer that started scheduling later ending up 
> having an earlier timestamp, it would still be safe for it to continue. 
> Despite that, because of step (2)  it would still have to abort an throw an 
> error. This also means that on datasets with many frequent concurrent 
> ingestion commits and very infrequent metadata compactions, there would be a 
> lot of transient failures/noise by failing writers if this timestamp delay 
> issue happens a lot. 
> Between these two approaches, it seems (B) might be preferable since it 
> allows user to still use existing APIs for the time being.
> We were wondering if the Apache HUDI project team would be interested in 
> investigating and implementing (B) to resolve this issue?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to