[
https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Krishen Bhan updated HUDI-7507:
-------------------------------
Description:
*Scenarios:*
Although HUDI operations hold a table lock when creating a .requested instant,
because HUDI writers do not generate a timestamp and create a .requsted plan in
the same transaction, there can be a scenario where
# Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x
- 1)
# Job 1 schedules and creates requested file with instant timestamp (x)
# Job 2 schedules and creates requested file with instant timestamp (x-1)
# Both jobs continue running
If one job is writing a commit and the other is a table service, this can cause
issues:
*
** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then
when Job 1 runs before Job 2 and can create a compaction plan for all instant
times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 will
create instant time (x-1), but timeline will be in a corrupted state since
compaction plan was supposed to include (x-1)
** There is a similar issue with clean. If Job2 is a long-running commit (that
was stuck/delayed for a while before creating its .requested plan) and Job 1 is
a clean, then Job 1 can perform a clean that updates the
earliest-commit-to-retain without waiting for the inflight instant by Job 2 at
(x-1) to complete. This causes Job2 to be "skipped" by clean.
[Edit] I added a diagram to visualize the issue, specifically the second
scenario with clean
!Flowchart (1).png!
*Proposed approach:*
One way this can be resolved is by combining the operations of generating
instant time and creating a requested file in the same HUDI table transaction.
Specifically, executing the following steps whenever any instant (commit, table
service, etc) is scheduled
Approach A
# Acquire table lock
# Look at the latest instant C on the active timeline (completed or not).
Generate a timestamp after C
# Create the plan and requested file using this new timestamp ( that is
greater than C)
# Release table lock
Unfortunately (A) has the following drawbacks
* Every operation must now hold the table lock when computing its plan even if
it's an expensive operation and will take a while
* Users of HUDI cannot easily set their own instant time of an operation, and
this restriction would break any public APIs that allow this and would require
deprecating those APIs.
An alternate approach is to have every operation abort creating a .requested
file unless it has the latest timestamp. Specifically, for any instant type,
whenever an operation is about to create a .requested plan on timeline, it
should take the table lock and assert that there are no other instants on
timeline (inflight or otherwise) that are greater than it. If that assertion
fails, then throw a retry-able conflict resolution exception.
Specifically, the following steps should be followed whenever any instant
(commit, table service, etc) is scheduled
Approach B
# Acquire table lock. Assume that the desired instant time C and requested
file plan metadata have already been created, regardless of wether it was
before this step or right after acquiring the table lock.
# Check if there are any instant files on timeline greater than C (regardless
of their action or sate status). If so raise a custom exception
# Create requested plan on timeline (As usual)
# Release table
Unlike (A), this approach (B) allows users to continue to use HUDI APIs where
caller can specify instant time (preventing the need from deprecating any
public API). It also allows the possibility of table service operations
computing their plan without holding a lock. Despite this though, (B) has
following drawbacks
* It is not immediately clear how MDT vs base table operations should be
handled here. At first glance it seems that at step (2) both the base table and
MDT timeline should be checked, but that might need more investigation to
confirm.
* This error will still be thrown even for combinations of concurrent
operations where it would be safe to continue. For example, assume two
ingestion writers being executing on a dataset, with each only performing a
insert commit on the dataset (with no table service being scheduled). If the
writer that started scheduling later ending up having an earlier timestamp, it
would still be safe for it to continue. Despite that, because of step (2) it
would still have to abort an throw an error. This means that on datasets with
many frequent concurrent ingestion commits and very infrequent table service
operations, there would be a lot of transient failures/noise by failing
writers. This step (2) could potentially be revised to avoid this scenario (by
only checking for certain actions like table services) but that would add
complexity and it is not clear at first glance if that would open up some other
edge cases.
was:
Although HUDI operations hold a table lock when creating a .requested instant,
because HUDI writers do not generate a timestamp and create a .requsted plan in
the same transaction, there can be a scenario where
# Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x
- 1)
# Job 1 schedules and creates requested file with instant timestamp (x)
# Job 2 schedules and creates requested file with instant timestamp (x-1)
# Both jobs continue running
If one job is writing a commit and the other is a table service, this can cause
issues:
*
** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then
when Job 1 runs before Job 2 and can create a compaction plan for all instant
times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 will
create instant time (x-1), but timeline will be in a corrupted state since
compaction plan was supposed to include (x-1)
** There is a similar issue with clean. If Job2 is a long-running commit (that
was stuck/delayed for a while before creating its .requested plan) and Job 1 is
a clean, then Job 1 can perform a clean that updates the
earliest-commit-to-retain without waiting for the inflight instant by Job 2 at
(x-1) to complete. This causes Job2 to be "skipped" by clean.
[Edit] I added a diagram to visualize the issue, specifically the second
scenario with clean
!Flowchart (1).png!
One way this can be resolved is by combining the operations of generating
instant time and creating a requested file in the same HUDI table transaction.
Specifically, executing the following steps whenever any instant (commit, table
service, etc) is scheduled
# Acquire table lock
# Look at the latest instant C on the active timeline (completed or not).
Generate a timestamp after C
# Create the plan and requested file using this new timestamp ( that is
greater than C)
# Release table lock
Unfortunately this has the following drawbacks
* Every operation must now hold the table lock when computing its plan, even
if its an expensive operation and will take a while
* Users of HUDI cannot easily set their own instant time of an operation, and
this restriction would break any public APIs that allow this
-An alternate approach (suggested by- [~pwason] -) was to instead have all
operations including table services perform conflict resolution checks before
committing. For example, clean and compaction would generate their plan as
usual. But when creating a transaction to write a .requested file, right before
creating the file they should check if another lower timestamp instant has
appeared in the timeline. And if so, they should fail/abort without creating
the plan. Commit operations would also be updated/verified to have similar
check, before creating a .requested file (during a transaction) the commit
operation will check if a table service plan (clean/compact) with a greater
instant time has been created. And if so, would abort/fail. This avoids the
drawbacks of the first approach, but will lead to more transient failures that
users have to handle.-
An alternate approach is to have every operation abort creating a .requested
file unless it has the latest timestamp. Specifically, for any instant type,
whenever an operation is about to create a .requested plan on timeline, it
should take the table lock and assert that there are no other instants on
timeline (inflight or otherwise) that are greater than it. If that assertion
fails, then throw a retry-able conflict resolution exception.
> ongoing concurrent writers with smaller timestamp can cause issues with
> table services
> ---------------------------------------------------------------------------------------
>
> Key: HUDI-7507
> URL: https://issues.apache.org/jira/browse/HUDI-7507
> Project: Apache Hudi
> Issue Type: Improvement
> Components: table-service
> Reporter: Krishen Bhan
> Priority: Major
> Fix For: 0.15.0
>
> Attachments: Flowchart (1).png, Flowchart.png
>
>
> *Scenarios:*
> Although HUDI operations hold a table lock when creating a .requested
> instant, because HUDI writers do not generate a timestamp and create a
> .requsted plan in the same transaction, there can be a scenario where
> # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp
> (x - 1)
> # Job 1 schedules and creates requested file with instant timestamp (x)
> # Job 2 schedules and creates requested file with instant timestamp (x-1)
> # Both jobs continue running
> If one job is writing a commit and the other is a table service, this can
> cause issues:
> *
> ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then
> when Job 1 runs before Job 2 and can create a compaction plan for all instant
> times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2
> will create instant time (x-1), but timeline will be in a corrupted state
> since compaction plan was supposed to include (x-1)
> ** There is a similar issue with clean. If Job2 is a long-running commit
> (that was stuck/delayed for a while before creating its .requested plan) and
> Job 1 is a clean, then Job 1 can perform a clean that updates the
> earliest-commit-to-retain without waiting for the inflight instant by Job 2
> at (x-1) to complete. This causes Job2 to be "skipped" by clean.
> [Edit] I added a diagram to visualize the issue, specifically the second
> scenario with clean
> !Flowchart (1).png!
> *Proposed approach:*
> One way this can be resolved is by combining the operations of generating
> instant time and creating a requested file in the same HUDI table
> transaction. Specifically, executing the following steps whenever any instant
> (commit, table service, etc) is scheduled
> Approach A
> # Acquire table lock
> # Look at the latest instant C on the active timeline (completed or not).
> Generate a timestamp after C
> # Create the plan and requested file using this new timestamp ( that is
> greater than C)
> # Release table lock
> Unfortunately (A) has the following drawbacks
> * Every operation must now hold the table lock when computing its plan even
> if it's an expensive operation and will take a while
> * Users of HUDI cannot easily set their own instant time of an operation,
> and this restriction would break any public APIs that allow this and would
> require deprecating those APIs.
>
> An alternate approach is to have every operation abort creating a .requested
> file unless it has the latest timestamp. Specifically, for any instant type,
> whenever an operation is about to create a .requested plan on timeline, it
> should take the table lock and assert that there are no other instants on
> timeline (inflight or otherwise) that are greater than it. If that assertion
> fails, then throw a retry-able conflict resolution exception.
> Specifically, the following steps should be followed whenever any instant
> (commit, table service, etc) is scheduled
> Approach B
> # Acquire table lock. Assume that the desired instant time C and requested
> file plan metadata have already been created, regardless of wether it was
> before this step or right after acquiring the table lock.
> # Check if there are any instant files on timeline greater than C
> (regardless of their action or sate status). If so raise a custom exception
> # Create requested plan on timeline (As usual)
> # Release table
> Unlike (A), this approach (B) allows users to continue to use HUDI APIs where
> caller can specify instant time (preventing the need from deprecating any
> public API). It also allows the possibility of table service operations
> computing their plan without holding a lock. Despite this though, (B) has
> following drawbacks
> * It is not immediately clear how MDT vs base table operations should be
> handled here. At first glance it seems that at step (2) both the base table
> and MDT timeline should be checked, but that might need more investigation to
> confirm.
> * This error will still be thrown even for combinations of concurrent
> operations where it would be safe to continue. For example, assume two
> ingestion writers being executing on a dataset, with each only performing a
> insert commit on the dataset (with no table service being scheduled). If the
> writer that started scheduling later ending up having an earlier timestamp,
> it would still be safe for it to continue. Despite that, because of step (2)
> it would still have to abort an throw an error. This means that on datasets
> with many frequent concurrent ingestion commits and very infrequent table
> service operations, there would be a lot of transient failures/noise by
> failing writers. This step (2) could potentially be revised to avoid this
> scenario (by only checking for certain actions like table services) but that
> would add complexity and it is not clear at first glance if that would open
> up some other edge cases.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)