yihua commented on code in PR #17870:
URL: https://github.com/apache/hudi/pull/17870#discussion_r2771579691
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java:
##########
@@ -213,6 +222,9 @@ private void heartbeatTaskRunner(Thread threadToMonitor) {
// Call synchronized method after releasing the semaphore
if (!heartbeatExecutionSuccessful) {
logger.error("Owner {}: Heartbeat function did not succeed.", ownerId);
+ // Interrupt the monitored thread to notify it of heartbeat failure.
+ threadToMonitor.interrupt();
+ logger.info("Owner {}: Interrupted monitored thread due to heartbeat
failure.", ownerId);
Review Comment:
Could this cause the following case of regular `unlock()` to fail the writer
which should not?
```
┌─────────────────────────────────────────────────────────────────────────────────┐
│ Writer Thread │ Heartbeat Thread
│
├────────────────────────────────────────┼────────────────────────────────────────┤
│ T1: unlock() │
│
│ T2: stopHeartbeat(true) │
│
│ T3: cancel(true) ──────────────────┼──> T4: receives interrupt
│
│ │ T5: renewLock() throws exception
│
│ T6: syncWait... (waiting) │ T7: executeHeartbeat returns
false │
│ │ T8: semaphore.release()
│
│ T9: semaphore.acquire() │
│
│ T10: semaphore.release() │
│
│ T11: return true │ T12: if(!success) {
│
│ T13: tryExpireCurrentLock() ←─────────┼─ T14: interrupt(writerThread)
!! │
│ ↑ INTERRUPTED! │ T15: }
│
│ T16: FAILS or behaves unexpectedly │
│
└────────────────────────────────────────┴────────────────────────────────────────┘
```
Timeline of events:
T1: Writer thread calls `StorageBasedLockProvider#unlock()` because it
finished its work
T2: unlock() calls `LockProviderHeartbeatManager#stopHeartbeat(true)` with
`mayInterruptIfRunning=true`
T3: `cancelRecurringHeartbeatTask(true)` calls
`scheduledFuture.cancel(true)`, sending interrupt to heartbeat thread
T4: Heartbeat thread receives interrupt while executing `renewLock()` →
`tryUpsertLockFile()`
T5: The S3/GCS client throws an exception due to the interrupt;
`renewLock()` catches it and returns false
T6: `executeHeartbeat()` returns false
T7: Writer thread enters `syncWaitInflightHeartbeatTaskToFinish()`, waiting
to acquire semaphore
T8: Heartbeat thread releases semaphore (exits the try-finally block)
T9: Writer thread acquires semaphore (heartbeat execution appears complete)
T10: Writer thread releases semaphore
T11: `stopHeartbeat()` returns true (thinks heartbeat is fully stopped)
T12-T14: THE RACE! Heartbeat thread continues executing code after semaphore
release, sees `!heartbeatExecutionSuccessful`, and calls
`threadToMonitor.interrupt()`
T13: Writer thread proceeds to `tryExpireCurrentLock()` but now has
interrupted flag set
T16: Lock expiration may fail or behave unexpectedly due to the interrupt
The semaphore only protects `executeHeartbeat()`. The interrupt call added
by this PR runs after the semaphore is released, creating a race where
`stopHeartbeat()` returns before the heartbeat thread finishes its
post-execution logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]