[ 
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-7507:
-------------------------------
    Description: 
*Scenarios:*

Although HUDI operations hold a table lock when creating a .requested instant, 
because HUDI writers do not generate a timestamp and create a .requsted plan in 
the same transaction, there can be a scenario where 
 # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x 
- 1)
 # Job 1 schedules and creates requested file with instant timestamp (x)
 # Job 2 schedules and creates requested file with instant timestamp (x-1)
 # Both jobs continue running

If one job is writing a commit and the other is a table service, this can cause 
issues:
 * 
 ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
when Job 1 runs before Job 2 and can create a compaction plan for all instant 
times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 will 
create instant time (x-1), but timeline will be in a corrupted state since 
compaction plan was supposed to include (x-1)
 ** There is a similar issue with clean. If Job2 is a long-running commit (that 
was stuck/delayed for a while before creating its .requested plan) and Job 1 is 
a clean, then Job 1 can perform a clean that updates the 
earliest-commit-to-retain without waiting for the inflight instant by Job 2 at 
(x-1) to complete. This causes Job2 to be "skipped" by clean.

[Edit] I added a diagram to visualize the issue, specifically the second 
scenario with clean

!Flowchart (1).png!

*Proposed approach:*

One way this can be resolved is by combining the operations of generating 
instant time and creating a requested file in the same HUDI table transaction. 
Specifically, executing the following steps whenever any instant (commit, table 
service, etc) is scheduled

Approach A
 # Acquire table lock
 # Look at the latest instant C on the active timeline (completed or not). 
Generate a timestamp after C
 # Create the plan and requested file using this new timestamp ( that is 
greater than C)
 # Release table lock

Unfortunately (A) has the following drawbacks
 * Every operation must now hold the table lock when computing its plan even if 
it's an expensive operation and will take a while
 * Users of HUDI cannot easily set their own instant time of an operation, and 
this restriction would break any public APIs that allow this and would require 
deprecating those APIs.

 

An alternate approach is to have every operation abort creating a .requested 
file unless it has the latest timestamp. Specifically, for any instant type, 
whenever an operation is about to create a .requested plan on timeline, it 
should take the table lock and assert that there are no other instants on 
timeline that are greater than it that could cause a conflict. If that 
assertion fails, then throw a retry-able conflict resolution exception.

Specifically, the following steps should be followed whenever any instant 
(commit, table service, etc) is scheduled

Approach B
 # Acquire table lock. Assume that the desired instant time C and requested 
file plan metadata have already been created, regardless of wether it was 
before this step or right after acquiring the table lock.
 # Get the set of all instants on the timeline that are greater than C 
(regardless of their action or sate status). 
 ## If the current operation is an ingestion type 
(commit/deltacommit/insert_overwrite replace) then assert the set is empty
 ## If the current operation is a table service then assert that the set 
doesn't contain any table service instant types
 # Create requested plan on timeline (As usual)
 # Release table

Unlike (A), this approach (B) allows users to continue to use HUDI APIs where 
caller can specify instant time (preventing the need from deprecating any 
public API). It also allows the possibility of table service operations 
computing their plan without holding a lock. Despite this though, (B) has 
following drawbacks
 * It is not immediately clear how MDT vs base table operations should be 
handled here. At first glance it seems that at step (2) both the base table and 
MDT timeline should be checked, but that might need more investigation to 
confirm.
 * This error will still be thrown even for combinations of concurrent 
operations where it would be safe to continue. For example, assume two 
ingestion writers being executing on a dataset, with each only performing a 
insert commit on the dataset (with no table service being scheduled). If the 
writer that started scheduling later ending up having an earlier timestamp, it 
would still be safe for it to continue. Despite that, because of step (2.1)  it 
would still have to abort an throw an error. This means that on datasets with 
many frequent concurrent ingestion commits and very infrequent table service 
operations, there would be a lot of transient failures/noise by failing 
writers. This step (2.1) could potentially be revised to avoid this scenario 
(by only checking for certain actions like table services) but that would add 
complexity and it is not clear at first glance if that would open up some other 
edge cases.

Between these two approaches, it seems (B) might be preferable since it allows 
user to still use existing APIs for the time being.

We were wondering if the Apache HUDI project team would be interested in 
investigating and implementing (B) to resolve this issue?

  was:
*Scenarios:*

Although HUDI operations hold a table lock when creating a .requested instant, 
because HUDI writers do not generate a timestamp and create a .requsted plan in 
the same transaction, there can be a scenario where 
 # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x 
- 1)
 # Job 1 schedules and creates requested file with instant timestamp (x)
 # Job 2 schedules and creates requested file with instant timestamp (x-1)
 # Both jobs continue running

If one job is writing a commit and the other is a table service, this can cause 
issues:
 * 
 ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
when Job 1 runs before Job 2 and can create a compaction plan for all instant 
times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 will 
create instant time (x-1), but timeline will be in a corrupted state since 
compaction plan was supposed to include (x-1)
 ** There is a similar issue with clean. If Job2 is a long-running commit (that 
was stuck/delayed for a while before creating its .requested plan) and Job 1 is 
a clean, then Job 1 can perform a clean that updates the 
earliest-commit-to-retain without waiting for the inflight instant by Job 2 at 
(x-1) to complete. This causes Job2 to be "skipped" by clean.

[Edit] I added a diagram to visualize the issue, specifically the second 
scenario with clean

!Flowchart (1).png!

*Proposed approach:*

One way this can be resolved is by combining the operations of generating 
instant time and creating a requested file in the same HUDI table transaction. 
Specifically, executing the following steps whenever any instant (commit, table 
service, etc) is scheduled

Approach A
 # Acquire table lock
 # Look at the latest instant C on the active timeline (completed or not). 
Generate a timestamp after C
 # Create the plan and requested file using this new timestamp ( that is 
greater than C)
 # Release table lock

Unfortunately (A) has the following drawbacks
 * Every operation must now hold the table lock when computing its plan even if 
it's an expensive operation and will take a while
 * Users of HUDI cannot easily set their own instant time of an operation, and 
this restriction would break any public APIs that allow this and would require 
deprecating those APIs.

 

An alternate approach is to have every operation abort creating a .requested 
file unless it has the latest timestamp. Specifically, for any instant type, 
whenever an operation is about to create a .requested plan on timeline, it 
should take the table lock and assert that there are no other instants on 
timeline (inflight or otherwise) that are greater than it. If that assertion 
fails, then throw a retry-able conflict resolution exception.

Specifically, the following steps should be followed whenever any instant 
(commit, table service, etc) is scheduled

Approach B
 # Acquire table lock. Assume that the desired instant time C and requested 
file plan metadata have already been created, regardless of wether it was 
before this step or right after acquiring the table lock.
 # Check if there are any instant files on timeline greater than C (regardless 
of their action or sate status). If so raise a custom exception
 # Create requested plan on timeline (As usual)
 # Release table

Unlike (A), this approach (B) allows users to continue to use HUDI APIs where 
caller can specify instant time (preventing the need from deprecating any 
public API). It also allows the possibility of table service operations 
computing their plan without holding a lock. Despite this though, (B) has 
following drawbacks
 * It is not immediately clear how MDT vs base table operations should be 
handled here. At first glance it seems that at step (2) both the base table and 
MDT timeline should be checked, but that might need more investigation to 
confirm.
 * This error will still be thrown even for combinations of concurrent 
operations where it would be safe to continue. For example, assume two 
ingestion writers being executing on a dataset, with each only performing a 
insert commit on the dataset (with no table service being scheduled). If the 
writer that started scheduling later ending up having an earlier timestamp, it 
would still be safe for it to continue. Despite that, because of step (2)  it 
would still have to abort an throw an error. This means that on datasets with 
many frequent concurrent ingestion commits and very infrequent table service 
operations, there would be a lot of transient failures/noise by failing 
writers. This step (2) could potentially be revised to avoid this scenario (by 
only checking for certain actions like table services) but that would add 
complexity and it is not clear at first glance if that would open up some other 
edge cases.

Between these two approaches, it seems (B) might be preferable since it allows 
user to still use existing APIs for the time being.

We were wondering if the Apache HUDI project team would be interested in 
investigating and implementing (B) to resolve this issue?


>  ongoing concurrent writers with smaller timestamp can cause issues with 
> table services
> ---------------------------------------------------------------------------------------
>
>                 Key: HUDI-7507
>                 URL: https://issues.apache.org/jira/browse/HUDI-7507
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: table-service
>            Reporter: Krishen Bhan
>            Priority: Major
>             Fix For: 0.15.0
>
>         Attachments: Flowchart (1).png, Flowchart.png
>
>
> *Scenarios:*
> Although HUDI operations hold a table lock when creating a .requested 
> instant, because HUDI writers do not generate a timestamp and create a 
> .requsted plan in the same transaction, there can be a scenario where 
>  # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp 
> (x - 1)
>  # Job 1 schedules and creates requested file with instant timestamp (x)
>  # Job 2 schedules and creates requested file with instant timestamp (x-1)
>  # Both jobs continue running
> If one job is writing a commit and the other is a table service, this can 
> cause issues:
>  * 
>  ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then 
> when Job 1 runs before Job 2 and can create a compaction plan for all instant 
> times (up to (x) ) that doesn’t include instant time (x-1) .  Later Job 2 
> will create instant time (x-1), but timeline will be in a corrupted state 
> since compaction plan was supposed to include (x-1)
>  ** There is a similar issue with clean. If Job2 is a long-running commit 
> (that was stuck/delayed for a while before creating its .requested plan) and 
> Job 1 is a clean, then Job 1 can perform a clean that updates the 
> earliest-commit-to-retain without waiting for the inflight instant by Job 2 
> at (x-1) to complete. This causes Job2 to be "skipped" by clean.
> [Edit] I added a diagram to visualize the issue, specifically the second 
> scenario with clean
> !Flowchart (1).png!
> *Proposed approach:*
> One way this can be resolved is by combining the operations of generating 
> instant time and creating a requested file in the same HUDI table 
> transaction. Specifically, executing the following steps whenever any instant 
> (commit, table service, etc) is scheduled
> Approach A
>  # Acquire table lock
>  # Look at the latest instant C on the active timeline (completed or not). 
> Generate a timestamp after C
>  # Create the plan and requested file using this new timestamp ( that is 
> greater than C)
>  # Release table lock
> Unfortunately (A) has the following drawbacks
>  * Every operation must now hold the table lock when computing its plan even 
> if it's an expensive operation and will take a while
>  * Users of HUDI cannot easily set their own instant time of an operation, 
> and this restriction would break any public APIs that allow this and would 
> require deprecating those APIs.
>  
> An alternate approach is to have every operation abort creating a .requested 
> file unless it has the latest timestamp. Specifically, for any instant type, 
> whenever an operation is about to create a .requested plan on timeline, it 
> should take the table lock and assert that there are no other instants on 
> timeline that are greater than it that could cause a conflict. If that 
> assertion fails, then throw a retry-able conflict resolution exception.
> Specifically, the following steps should be followed whenever any instant 
> (commit, table service, etc) is scheduled
> Approach B
>  # Acquire table lock. Assume that the desired instant time C and requested 
> file plan metadata have already been created, regardless of wether it was 
> before this step or right after acquiring the table lock.
>  # Get the set of all instants on the timeline that are greater than C 
> (regardless of their action or sate status). 
>  ## If the current operation is an ingestion type 
> (commit/deltacommit/insert_overwrite replace) then assert the set is empty
>  ## If the current operation is a table service then assert that the set 
> doesn't contain any table service instant types
>  # Create requested plan on timeline (As usual)
>  # Release table
> Unlike (A), this approach (B) allows users to continue to use HUDI APIs where 
> caller can specify instant time (preventing the need from deprecating any 
> public API). It also allows the possibility of table service operations 
> computing their plan without holding a lock. Despite this though, (B) has 
> following drawbacks
>  * It is not immediately clear how MDT vs base table operations should be 
> handled here. At first glance it seems that at step (2) both the base table 
> and MDT timeline should be checked, but that might need more investigation to 
> confirm.
>  * This error will still be thrown even for combinations of concurrent 
> operations where it would be safe to continue. For example, assume two 
> ingestion writers being executing on a dataset, with each only performing a 
> insert commit on the dataset (with no table service being scheduled). If the 
> writer that started scheduling later ending up having an earlier timestamp, 
> it would still be safe for it to continue. Despite that, because of step 
> (2.1)  it would still have to abort an throw an error. This means that on 
> datasets with many frequent concurrent ingestion commits and very infrequent 
> table service operations, there would be a lot of transient failures/noise by 
> failing writers. This step (2.1) could potentially be revised to avoid this 
> scenario (by only checking for certain actions like table services) but that 
> would add complexity and it is not clear at first glance if that would open 
> up some other edge cases.
> Between these two approaches, it seems (B) might be preferable since it 
> allows user to still use existing APIs for the time being.
> We were wondering if the Apache HUDI project team would be interested in 
> investigating and implementing (B) to resolve this issue?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to