[ 
https://issues.apache.org/jira/browse/HUDI-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishen Bhan updated HUDI-8139:
-------------------------------
    Description: 
*Issue*
When using HUDI multiwriter with optimistic concurrency mode enabled, a 
writer's operation may need to acquire a table lock before performing some 
steps. If the writer has to wait longer to acquire a lock, then the runtime of 
it's operation will increase or may fail if `hoodie.write.lock.wait_time_ms` 
breached. Although the lock wait time is expected to typically be short, there 
are cases where a writer may need to wait for 10-20+ minutes before finally 
getting the lock:
 # The writer that currently acquired the lock may have encountered a 
service/infra degradation thats causing it to get "stuck" in the middle of a 
transaction. For example, a user using Spark and YARN may encounter a delay 
when awaiting executor resources.
 # The writer that currently acquired the lock may have to process many instant 
files in the base table and metadata table, due to delayed 
clean/archival/compaction
 # If there are many concurrent writers writing to the dataset, each writer may 
have to wait a longer time before finally getting their "chance" to acquire the 
lock.

Although this issue can be partially mitigated by increasing 
`hoodie.write.lock.wait_time_ms` for a writer, the runtime of the operation 
will still not be reduced. Some users would want to prioritize the 
delay/runtime of a specific writer even at the risk of failing other writers. 
For example, a user may want to give an ingestion writer high priority over 
other writers in order to ensure that the dataset isn't stale. In order to 
satisfy this use case, HUDI can add an enhancement to reduce the amount of time 
"high priority" writers need to wait before acquriing a lock.

*Resolutions*

[Approach A]

One approach could be to add a new timeout config that determines the maximum 
amount of time a writer can acquire a lock before being forced to abort it's 
operation and release it. Special care needs to be given so that all DFS 
operations are aborted before the lock is released

This approach does not rely on the type of lock provider used. This in turn 
though also means that the changes proposed here will not be isolated to the 
lock provider class and changes to these other parts of HUDI need to be made. 
Another drawback with this approach though is that it might not scale well with 
scenario 3, since the user will have to keep track of the max # of writers on 
the dataset at any given time, and the allowed time window each low-priority 
writer will be allowed to hold the lock for (before aborting) will decrease as 
the number of total low-priority writers on the dataset increases

[Approach B] 

Another approach would be to update ZookeeperLockProvider to allow giving 
giving a priority to a lock acquirer. The idea being that the high priority 
lock acquirer will "skip the line" and be the next job to take the lock. If an 
existing Apache curator recipe for a "priority lock" exists that can be used to 
be update ZookeeperLockProvider, if not a new Zookeeper recipe may need to be 
modified/created using [https://zookeeper.apache.org/doc/r3.1.2/recipes.html] 
as starting point for research.

This approach though may not be sufficient in solving this use case and may 
also require Approach A to be implemented, so that the current low priority 
lock holder will abort within a bounded time window. Also although this 
approach can isolate it's changes to  just the Zookeeper lock provider, it also 
means that implementing this change will require delving into Zookeeper 
specific semantics (and porting them to other distributed lock providers 
requested by users). Also this approach will only work on locks where an 
internal logical "queue" is maintained like ZookeeperLockProivder. 

[Approach C]

Similar to approach B, except instead update the ZookeeperLockProvider to allow 
a high priority writer to forcibly "steal" the lock by forcing the current lock 
holder to fail until the writer finally acquires the lock. At a high level, the 
high priority writer could have another "grace period" timer (smaller than 
`hoodie.write.lock.wait_time_ms`) that dictates the amount of time the writer 
will wait before informing the current lock owner that is should self-abort. 
Based on looking at docs it seems Apache curator might already have support for 
this by having an API for InterProcessMutex (the Apache Curator API that 
ZookeeperLockProvider uses) to make the lock revocable 
[https://curator.apache.org/apidocs/org/apache/curator/framework/recipes/locks/InterProcessMutex.html#makeRevocable(org.apache.curator.framework.recipes.locks.RevocationListener)]
 , see 
[https://stackoverflow.com/questions/33374596/apache-curator-lock-recipes-revoking]
 for example. Implementing this needs to be researched further, since like 
mentioned in Approach A, HUDI should ensure that once the lock is released 
there are no ongoing DFS operations initiated from the writer still running. 
One potential implementation approach could be to have the (low priority) lock 
owner, upon getting a revocation request, throw a runtime exception and fail 
without explictly releasing the lock. The idea being that by the time Zookeeper 
nodes cleanup the ephemeral lock node (due to identifying the writer's ZK 
client session as being stale) any ongoing DFS operations would have completed. 
On the other end, the high priority writer will, after waiting for the "grace 
periond" to elapse, repeatedly revoke the lock .

Similar to approach B, this approach has the drawback of requiring time being 
spent delving into Zookeeper semantics (though likely less since the existing 
curator recipe used by HUDI seems to supports lock revocation, even if in a 
future version of curator) and possibly having to do the work of porting it to 
other lock providers later (and not all distributed lock providers may support 
lock revocation).

[Suggested approach]

The suggested approach is C, since although it requires delving into Zookeeper 
semantics more, it resolves scenario 3, making it potentially easier to 
test/implement. Unlike approach B there seems to be more existing zookeeper 
references. Unfortunately similar to A, more work would be needed to ensure 
that lock owner doesn't have an ongoing pending DFS operation after the lock is 
unlocked.

  was:
*Issue*
When using HUDI multiwriter with optimistic concurrency mode enabled, a 
writer's operation may need to acquire a table lock before performing some 
steps. If the writer has to wait longer to acquire a lock, then the runtime of 
it's operation will increase or may fail if `hoodie.write.lock.wait_time_ms` 
breached. Although the lock wait time is expected to typically be short, there 
are cases where a writer may need to wait for 10-20+ minutes before finally 
getting the lock:
 # The writer that currently acquired the lock may have encountered a 
service/infra degradation thats causing it to get "stuck" in the middle of a 
transaction. For example, a user using Spark and YARN may encounter a delay 
when awaiting executor resources.
 # The writer that currently acquired the lock may have to process many instant 
files in the base table and metadata table, due to delayed 
clean/archival/compaction
 # If there are many concurrent writers writing to the dataset, each writer may 
have to wait a longer time before finally getting their "chance" to acquire the 
lock.

Although this issue can be partially mitigated by increasing 
`hoodie.write.lock.wait_time_ms` for a writer, the runtime of the operation 
will still not be reduced. Some users would want to prioritize the 
delay/runtime of a specific writer even at the risk of failing other writers. 
For example, a user may want to give an ingestion writer high priority over 
other writers in order to ensure that the dataset isn't stale. In order to 
satisfy this use case, HUDI can add an enhancement to reduce the amount of time 
"high priority" writers need to wait before acquriing a lock.

*Resolutions*

[Approach A]

One approach could be to add a new timeout config that determines the maximum 
amount of time a writer can acquire a lock before being forced to abort it's 
operation and release it. Special care needs to be given so that all DFS 
operations are aborted before the lock is released

This can be implemented at the transaction/lock manager level in HUDI 
abstraction, and does not rely on the type of lock provider used. This in turn 
though also means that the changes proposed here will not be isolated to the 
lock provider class and changes to these other parts of HUDI need to be made. 
Another drawback with this approach though is that it might not scale well with 
scenario 3, since the user will have to keep track of the max # of writers on 
the dataset at any given time, and the allowed time window each low-priority 
writer will be allowed to hold the lock for (before aborting) will decrease as 
the number of total low-priority writers on the dataset increases

[Approach B] 

Another approach would be to update ZookeeperLockProvider to allow giving 
giving a priority to a lock acquirer. The idea being that the high priority 
lock acquirer will "skip the line" and be the next job to take the lock. If an 
existing Apache curator recipe for a "priority lock" exists that can be used to 
be update ZookeeperLockProvider, if not a new Zookeeper recipe may need to be 
modified/created using [https://zookeeper.apache.org/doc/r3.1.2/recipes.html] 
as starting point for research.

This approach though may not be sufficient in solving this use case and may 
also require Approach A to be implemented, so that the current low priority 
lock holder will abort within a bounded time window. Also although this 
approach can isolate it's changes to  just the Zookeeper lock provider, it also 
means that implementing this change will require delving into Zookeeper 
specific semantics (and porting them to other distributed lock providers 
requested by users). Also this approach will only work on locks where an 
internal logical "queue" is maintained like ZookeeperLockProivder. 

[Approach C]

Similar to approach B, except instead update the ZookeeperLockProvider to allow 
a high priority writer to forcibly "steal" the lock by forcing the current lock 
holder to fail until the writer finally acquires the lock. At a high level, the 
high priority writer could have another "grace period" timer (smaller than 
`hoodie.write.lock.wait_time_ms`) that dictates the amount of time the writer 
will wait before informing the current lock owner that is should self-abort. 
Based on looking at docs it seems Apache curator might already have support for 
this by having an API for InterProcessMutex (the Apache Curator API that 
ZookeeperLockProvider uses) to make the lock revocable 
[https://curator.apache.org/apidocs/org/apache/curator/framework/recipes/locks/InterProcessMutex.html#makeRevocable(org.apache.curator.framework.recipes.locks.RevocationListener)]
 , see 
[https://stackoverflow.com/questions/33374596/apache-curator-lock-recipes-revoking]
 for example. Implementing this needs to be researched further, since like 
mentioned in Approach A, HUDI should ensure that once the lock is released 
there are no ongoing DFS operations initiated from the writer still running. 
One potential implementation approach could be to have the (low priority) lock 
owner, upon getting a revocation request, throw a runtime exception and fail 
without explictly releasing the lock. The idea being that by the time Zookeeper 
nodes cleanup the ephemeral lock node (due to identifying the writer's ZK 
client session as being stale) any ongoing DFS operations would have completed. 
On the other end, the high priority writer will, after waiting for the "grace 
periond" to elapse, repeatedly revoke the lock .

Similar to approach B, this approach has the drawback of requiring time being 
spent delving into Zookeeper semantics (though likely less since the existing 
curator recipe used by HUDI seems to supports lock revocation, even if in a 
future version of curator) and possibly having to do the work of porting it to 
other lock providers later (and not all distributed lock providers may support 
lock revocation).

[Suggested approach]

The suggested approach is C, since although it requires delving into Zookeeper 
semantics more, it resolves scenario 3 and likely limits the changes to mostly 
the ZooKeeperLockProvider related classes, making it potentially easier to 
test/implement. Unlike approach B there seems to be more existing zookeeper 
references. Unfortunately similar to A, more work would be needed to ensure 
that lock owner doesn't have an ongoing pending DFS operation after the lock is 
unlocked.


> Allow high priority writer to force-acquire lock within a bounded amount of 
> time
> --------------------------------------------------------------------------------
>
>                 Key: HUDI-8139
>                 URL: https://issues.apache.org/jira/browse/HUDI-8139
>             Project: Apache Hudi
>          Issue Type: Wish
>            Reporter: Krishen Bhan
>            Priority: Trivial
>
> *Issue*
> When using HUDI multiwriter with optimistic concurrency mode enabled, a 
> writer's operation may need to acquire a table lock before performing some 
> steps. If the writer has to wait longer to acquire a lock, then the runtime 
> of it's operation will increase or may fail if 
> `hoodie.write.lock.wait_time_ms` breached. Although the lock wait time is 
> expected to typically be short, there are cases where a writer may need to 
> wait for 10-20+ minutes before finally getting the lock:
>  # The writer that currently acquired the lock may have encountered a 
> service/infra degradation thats causing it to get "stuck" in the middle of a 
> transaction. For example, a user using Spark and YARN may encounter a delay 
> when awaiting executor resources.
>  # The writer that currently acquired the lock may have to process many 
> instant files in the base table and metadata table, due to delayed 
> clean/archival/compaction
>  # If there are many concurrent writers writing to the dataset, each writer 
> may have to wait a longer time before finally getting their "chance" to 
> acquire the lock.
> Although this issue can be partially mitigated by increasing 
> `hoodie.write.lock.wait_time_ms` for a writer, the runtime of the operation 
> will still not be reduced. Some users would want to prioritize the 
> delay/runtime of a specific writer even at the risk of failing other writers. 
> For example, a user may want to give an ingestion writer high priority over 
> other writers in order to ensure that the dataset isn't stale. In order to 
> satisfy this use case, HUDI can add an enhancement to reduce the amount of 
> time "high priority" writers need to wait before acquriing a lock.
> *Resolutions*
> [Approach A]
> One approach could be to add a new timeout config that determines the maximum 
> amount of time a writer can acquire a lock before being forced to abort it's 
> operation and release it. Special care needs to be given so that all DFS 
> operations are aborted before the lock is released
> This approach does not rely on the type of lock provider used. This in turn 
> though also means that the changes proposed here will not be isolated to the 
> lock provider class and changes to these other parts of HUDI need to be made. 
> Another drawback with this approach though is that it might not scale well 
> with scenario 3, since the user will have to keep track of the max # of 
> writers on the dataset at any given time, and the allowed time window each 
> low-priority writer will be allowed to hold the lock for (before aborting) 
> will decrease as the number of total low-priority writers on the dataset 
> increases
> [Approach B] 
> Another approach would be to update ZookeeperLockProvider to allow giving 
> giving a priority to a lock acquirer. The idea being that the high priority 
> lock acquirer will "skip the line" and be the next job to take the lock. If 
> an existing Apache curator recipe for a "priority lock" exists that can be 
> used to be update ZookeeperLockProvider, if not a new Zookeeper recipe may 
> need to be modified/created using 
> [https://zookeeper.apache.org/doc/r3.1.2/recipes.html] as starting point for 
> research.
> This approach though may not be sufficient in solving this use case and may 
> also require Approach A to be implemented, so that the current low priority 
> lock holder will abort within a bounded time window. Also although this 
> approach can isolate it's changes to  just the Zookeeper lock provider, it 
> also means that implementing this change will require delving into Zookeeper 
> specific semantics (and porting them to other distributed lock providers 
> requested by users). Also this approach will only work on locks where an 
> internal logical "queue" is maintained like ZookeeperLockProivder. 
> [Approach C]
> Similar to approach B, except instead update the ZookeeperLockProvider to 
> allow a high priority writer to forcibly "steal" the lock by forcing the 
> current lock holder to fail until the writer finally acquires the lock. At a 
> high level, the high priority writer could have another "grace period" timer 
> (smaller than `hoodie.write.lock.wait_time_ms`) that dictates the amount of 
> time the writer will wait before informing the current lock owner that is 
> should self-abort. Based on looking at docs it seems Apache curator might 
> already have support for this by having an API for InterProcessMutex (the 
> Apache Curator API that ZookeeperLockProvider uses) to make the lock 
> revocable 
> [https://curator.apache.org/apidocs/org/apache/curator/framework/recipes/locks/InterProcessMutex.html#makeRevocable(org.apache.curator.framework.recipes.locks.RevocationListener)]
>  , see 
> [https://stackoverflow.com/questions/33374596/apache-curator-lock-recipes-revoking]
>  for example. Implementing this needs to be researched further, since like 
> mentioned in Approach A, HUDI should ensure that once the lock is released 
> there are no ongoing DFS operations initiated from the writer still running. 
> One potential implementation approach could be to have the (low priority) 
> lock owner, upon getting a revocation request, throw a runtime exception and 
> fail without explictly releasing the lock. The idea being that by the time 
> Zookeeper nodes cleanup the ephemeral lock node (due to identifying the 
> writer's ZK client session as being stale) any ongoing DFS operations would 
> have completed. On the other end, the high priority writer will, after 
> waiting for the "grace periond" to elapse, repeatedly revoke the lock .
> Similar to approach B, this approach has the drawback of requiring time being 
> spent delving into Zookeeper semantics (though likely less since the existing 
> curator recipe used by HUDI seems to supports lock revocation, even if in a 
> future version of curator) and possibly having to do the work of porting it 
> to other lock providers later (and not all distributed lock providers may 
> support lock revocation).
> [Suggested approach]
> The suggested approach is C, since although it requires delving into 
> Zookeeper semantics more, it resolves scenario 3, making it potentially 
> easier to test/implement. Unlike approach B there seems to be more existing 
> zookeeper references. Unfortunately similar to A, more work would be needed 
> to ensure that lock owner doesn't have an ongoing pending DFS operation after 
> the lock is unlocked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to