[ https://issues.apache.org/jira/browse/HUDI-7507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Krishen Bhan updated HUDI-7507: ------------------------------- Description: *Scenarios:* Although HUDI operations hold a table lock when creating a .requested instant, because HUDI writers do not generate a timestamp and create a .requsted plan in the same transaction, there can be a scenario where # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x - 1) # Job 1 schedules and creates requested file with instant timestamp (x) # Job 2 schedules and creates requested file with instant timestamp (x-1) # Both jobs continue running If one job is writing a commit and the other is a table service, this can cause issues: * ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then when Job 1 runs before Job 2 and can create a compaction plan for all instant times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 will create instant time (x-1), but timeline will be in a corrupted state since compaction plan was supposed to include (x-1) ** There is a similar issue with clean. If Job2 is a long-running commit (that was stuck/delayed for a while before creating its .requested plan) and Job 1 is a clean, then Job 1 can perform a clean that updates the earliest-commit-to-retain without waiting for the inflight instant by Job 2 at (x-1) to complete. This causes Job2 to be "skipped" by clean. [Edit] I added a diagram to visualize the issue, specifically the second scenario with clean !Flowchart (1).png! *Proposed approach:* One way this can be resolved is by combining the operations of generating instant time and creating a requested file in the same HUDI table transaction. Specifically, executing the following steps whenever any instant (commit, table service, etc) is scheduled Approach A # Acquire table lock # Look at the latest instant C on the active timeline (completed or not). Generate a timestamp after C # Create the plan and requested file using this new timestamp ( that is greater than C) # Release table lock Unfortunately (A) has the following drawbacks * Every operation must now hold the table lock when computing its plan even if it's an expensive operation and will take a while * Users of HUDI cannot easily set their own instant time of an operation, and this restriction would break any public APIs that allow this and would require deprecating those APIs. An alternate approach is to have every operation abort creating a .requested file unless it has the latest timestamp. Specifically, for any instant type, whenever an operation is about to create a .requested plan on timeline, it should take the table lock and assert that there are no other instants on timeline that are greater than it that could cause a conflict. If that assertion fails, then throw a retry-able conflict resolution exception. Specifically, the following steps should be followed whenever any instant (commit, table service, etc) is scheduled Approach B # Acquire table lock. Assume that the desired instant time C and requested file plan metadata have already been created, regardless of wether it was before this step or right after acquiring the table lock. # Get the set of all instants on the timeline that are greater than C (regardless of their action or sate status). ## If the current operation is an ingestion type (commit/deltacommit/insert_overwrite replace) then assert the set is empty ## If the current operation is a table service then assert that the set doesn't contain any table service instant types # Create requested plan on timeline (As usual) # Release table Unlike (A), this approach (B) allows users to continue to use HUDI APIs where caller can specify instant time (preventing the need from deprecating any public API). It also allows the possibility of table service operations computing their plan without holding a lock. Despite this though, (B) has following drawbacks * It is not immediately clear how MDT vs base table operations should be handled here. At first glance it seems that at step (2) both the base table and MDT timeline should be checked, but that might need more investigation to confirm. * This error will still be thrown even for combinations of concurrent operations where it would be safe to continue. For example, assume two ingestion writers being executing on a dataset, with each only performing a insert commit on the dataset (with no table service being scheduled). If the writer that started scheduling later ending up having an earlier timestamp, it would still be safe for it to continue. Despite that, because of step (2.1) it would still have to abort an throw an error. This means that on datasets with many frequent concurrent ingestion commits and very infrequent table service operations, there would be a lot of transient failures/noise by failing writers. This step (2.1) could potentially be revised to avoid this scenario (by only checking for certain actions like table services) but that would add complexity and it is not clear at first glance if that would open up some other edge cases. Between these two approaches, it seems (B) might be preferable since it allows user to still use existing APIs for the time being. We were wondering if the Apache HUDI project team would be interested in investigating and implementing (B) to resolve this issue? was: *Scenarios:* Although HUDI operations hold a table lock when creating a .requested instant, because HUDI writers do not generate a timestamp and create a .requsted plan in the same transaction, there can be a scenario where # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp (x - 1) # Job 1 schedules and creates requested file with instant timestamp (x) # Job 2 schedules and creates requested file with instant timestamp (x-1) # Both jobs continue running If one job is writing a commit and the other is a table service, this can cause issues: * ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then when Job 1 runs before Job 2 and can create a compaction plan for all instant times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 will create instant time (x-1), but timeline will be in a corrupted state since compaction plan was supposed to include (x-1) ** There is a similar issue with clean. If Job2 is a long-running commit (that was stuck/delayed for a while before creating its .requested plan) and Job 1 is a clean, then Job 1 can perform a clean that updates the earliest-commit-to-retain without waiting for the inflight instant by Job 2 at (x-1) to complete. This causes Job2 to be "skipped" by clean. [Edit] I added a diagram to visualize the issue, specifically the second scenario with clean !Flowchart (1).png! *Proposed approach:* One way this can be resolved is by combining the operations of generating instant time and creating a requested file in the same HUDI table transaction. Specifically, executing the following steps whenever any instant (commit, table service, etc) is scheduled Approach A # Acquire table lock # Look at the latest instant C on the active timeline (completed or not). Generate a timestamp after C # Create the plan and requested file using this new timestamp ( that is greater than C) # Release table lock Unfortunately (A) has the following drawbacks * Every operation must now hold the table lock when computing its plan even if it's an expensive operation and will take a while * Users of HUDI cannot easily set their own instant time of an operation, and this restriction would break any public APIs that allow this and would require deprecating those APIs. An alternate approach is to have every operation abort creating a .requested file unless it has the latest timestamp. Specifically, for any instant type, whenever an operation is about to create a .requested plan on timeline, it should take the table lock and assert that there are no other instants on timeline (inflight or otherwise) that are greater than it. If that assertion fails, then throw a retry-able conflict resolution exception. Specifically, the following steps should be followed whenever any instant (commit, table service, etc) is scheduled Approach B # Acquire table lock. Assume that the desired instant time C and requested file plan metadata have already been created, regardless of wether it was before this step or right after acquiring the table lock. # Check if there are any instant files on timeline greater than C (regardless of their action or sate status). If so raise a custom exception # Create requested plan on timeline (As usual) # Release table Unlike (A), this approach (B) allows users to continue to use HUDI APIs where caller can specify instant time (preventing the need from deprecating any public API). It also allows the possibility of table service operations computing their plan without holding a lock. Despite this though, (B) has following drawbacks * It is not immediately clear how MDT vs base table operations should be handled here. At first glance it seems that at step (2) both the base table and MDT timeline should be checked, but that might need more investigation to confirm. * This error will still be thrown even for combinations of concurrent operations where it would be safe to continue. For example, assume two ingestion writers being executing on a dataset, with each only performing a insert commit on the dataset (with no table service being scheduled). If the writer that started scheduling later ending up having an earlier timestamp, it would still be safe for it to continue. Despite that, because of step (2) it would still have to abort an throw an error. This means that on datasets with many frequent concurrent ingestion commits and very infrequent table service operations, there would be a lot of transient failures/noise by failing writers. This step (2) could potentially be revised to avoid this scenario (by only checking for certain actions like table services) but that would add complexity and it is not clear at first glance if that would open up some other edge cases. Between these two approaches, it seems (B) might be preferable since it allows user to still use existing APIs for the time being. We were wondering if the Apache HUDI project team would be interested in investigating and implementing (B) to resolve this issue? > ongoing concurrent writers with smaller timestamp can cause issues with > table services > --------------------------------------------------------------------------------------- > > Key: HUDI-7507 > URL: https://issues.apache.org/jira/browse/HUDI-7507 > Project: Apache Hudi > Issue Type: Improvement > Components: table-service > Reporter: Krishen Bhan > Priority: Major > Fix For: 0.15.0 > > Attachments: Flowchart (1).png, Flowchart.png > > > *Scenarios:* > Although HUDI operations hold a table lock when creating a .requested > instant, because HUDI writers do not generate a timestamp and create a > .requsted plan in the same transaction, there can be a scenario where > # Job 1 starts, chooses timestamp (x) , Job 2 starts and chooses timestamp > (x - 1) > # Job 1 schedules and creates requested file with instant timestamp (x) > # Job 2 schedules and creates requested file with instant timestamp (x-1) > # Both jobs continue running > If one job is writing a commit and the other is a table service, this can > cause issues: > * > ** If Job 2 is ingestion commit and Job 1 is compaction/log compaction, then > when Job 1 runs before Job 2 and can create a compaction plan for all instant > times (up to (x) ) that doesn’t include instant time (x-1) . Later Job 2 > will create instant time (x-1), but timeline will be in a corrupted state > since compaction plan was supposed to include (x-1) > ** There is a similar issue with clean. If Job2 is a long-running commit > (that was stuck/delayed for a while before creating its .requested plan) and > Job 1 is a clean, then Job 1 can perform a clean that updates the > earliest-commit-to-retain without waiting for the inflight instant by Job 2 > at (x-1) to complete. This causes Job2 to be "skipped" by clean. > [Edit] I added a diagram to visualize the issue, specifically the second > scenario with clean > !Flowchart (1).png! > *Proposed approach:* > One way this can be resolved is by combining the operations of generating > instant time and creating a requested file in the same HUDI table > transaction. Specifically, executing the following steps whenever any instant > (commit, table service, etc) is scheduled > Approach A > # Acquire table lock > # Look at the latest instant C on the active timeline (completed or not). > Generate a timestamp after C > # Create the plan and requested file using this new timestamp ( that is > greater than C) > # Release table lock > Unfortunately (A) has the following drawbacks > * Every operation must now hold the table lock when computing its plan even > if it's an expensive operation and will take a while > * Users of HUDI cannot easily set their own instant time of an operation, > and this restriction would break any public APIs that allow this and would > require deprecating those APIs. > > An alternate approach is to have every operation abort creating a .requested > file unless it has the latest timestamp. Specifically, for any instant type, > whenever an operation is about to create a .requested plan on timeline, it > should take the table lock and assert that there are no other instants on > timeline that are greater than it that could cause a conflict. If that > assertion fails, then throw a retry-able conflict resolution exception. > Specifically, the following steps should be followed whenever any instant > (commit, table service, etc) is scheduled > Approach B > # Acquire table lock. Assume that the desired instant time C and requested > file plan metadata have already been created, regardless of wether it was > before this step or right after acquiring the table lock. > # Get the set of all instants on the timeline that are greater than C > (regardless of their action or sate status). > ## If the current operation is an ingestion type > (commit/deltacommit/insert_overwrite replace) then assert the set is empty > ## If the current operation is a table service then assert that the set > doesn't contain any table service instant types > # Create requested plan on timeline (As usual) > # Release table > Unlike (A), this approach (B) allows users to continue to use HUDI APIs where > caller can specify instant time (preventing the need from deprecating any > public API). It also allows the possibility of table service operations > computing their plan without holding a lock. Despite this though, (B) has > following drawbacks > * It is not immediately clear how MDT vs base table operations should be > handled here. At first glance it seems that at step (2) both the base table > and MDT timeline should be checked, but that might need more investigation to > confirm. > * This error will still be thrown even for combinations of concurrent > operations where it would be safe to continue. For example, assume two > ingestion writers being executing on a dataset, with each only performing a > insert commit on the dataset (with no table service being scheduled). If the > writer that started scheduling later ending up having an earlier timestamp, > it would still be safe for it to continue. Despite that, because of step > (2.1) it would still have to abort an throw an error. This means that on > datasets with many frequent concurrent ingestion commits and very infrequent > table service operations, there would be a lot of transient failures/noise by > failing writers. This step (2.1) could potentially be revised to avoid this > scenario (by only checking for certain actions like table services) but that > would add complexity and it is not clear at first glance if that would open > up some other edge cases. > Between these two approaches, it seems (B) might be preferable since it > allows user to still use existing APIs for the time being. > We were wondering if the Apache HUDI project team would be interested in > investigating and implementing (B) to resolve this issue? -- This message was sent by Atlassian Jira (v8.20.10#820010)