prashantwason commented on code in PR #18904:
URL: https://github.com/apache/hudi/pull/18904#discussion_r3354050436


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java:
##########
@@ -197,20 +222,26 @@ public boolean isHeartbeatExpired(String instantTime) 
throws IOException {
   private void updateHeartbeat(String instantTime) throws 
HoodieHeartbeatException {
     try {
       Long newHeartbeatTime = System.currentTimeMillis();
-      OutputStream outputStream =
-          this.storage.create(
-              new StoragePath(heartbeatFolderPath, instantTime), true);
-      outputStream.close();
+      writeHeartbeatFile(instantTime);
       Heartbeat heartbeat = instantToHeartbeatMap.get(instantTime);
       if (heartbeat.getLastHeartbeatTime() != null && 
isHeartbeatExpired(instantTime)) {
-        log.error("Aborting, missed generating heartbeat within allowable 
interval {} ms", this.maxAllowableHeartbeatIntervalInMs);
-        // Since TimerTask allows only java.lang.Runnable, cannot throw an 
exception and bubble to the caller thread, hence
-        // explicitly interrupting the timer thread.
-        Thread.currentThread().interrupt();
+        // A previous refresh was delayed past the tolerable interval (e.g. 
due to a slow storage write
+        // or driver pressure). Do NOT interrupt the timer thread here: that 
would permanently kill all
+        // future heartbeats for this instant, turning a transient delay into 
a permanent blackout.
+        // Enforcement is done at commit time in 
HeartbeatUtils.abortIfHeartbeatExpired(), which is the
+        // correct and sole enforcement point.
+        log.warn("Missed generating heartbeat for instant {} within allowable 
interval {} ms; continuing to refresh",

Review Comment:
   Thanks, this is an important catch. I reworked the expiry handling to 
preserve the original safety property:
   
   - On a detected lapse we now **stop refreshing** the heartbeat (cancel the 
timer) and do **not** advance the last-heartbeat time. So a lapsed writer still 
fails at commit via `abortIfHeartbeatExpired()` — it cannot resurrect a 
heartbeat that an async cleaner (LAZY policy) may already have acted on, which 
closes the data-loss window you described. The only change from the original is 
that the timer is cancelled cleanly instead of via 
`Thread.currentThread().interrupt()` (which permanently killed the timer thread 
on the first miss).
   - Separately, I raised the default 
`hoodie.client.heartbeat.tolerable.misses` from 2 to 10 as you suggested, so 
transient GC/storage pauses are far less likely to trip expiry in the first 
place. Combined with moving the heartbeat write off the timer thread 
(time-bounded), transient delays no longer cause spurious aborts, while a 
genuine lapse still aborts safely.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java:
##########
@@ -229,5 +292,11 @@ public Heartbeat getHeartbeat(String instantTime) {
   public void close() {
     this.stopHeartbeatTimers();
     this.instantToHeartbeatMap.clear();
+    synchronized (this) {

Review Comment:
   Added a `closed` flag — `close()` is now `synchronized` and idempotent (a 
second call is a no-op), and the executor getter throws if used after close so 
a late timer tick cannot resurrect the executor.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java:
##########
@@ -197,20 +222,26 @@ public boolean isHeartbeatExpired(String instantTime) 
throws IOException {
   private void updateHeartbeat(String instantTime) throws 
HoodieHeartbeatException {
     try {
       Long newHeartbeatTime = System.currentTimeMillis();
-      OutputStream outputStream =
-          this.storage.create(
-              new StoragePath(heartbeatFolderPath, instantTime), true);
-      outputStream.close();
+      writeHeartbeatFile(instantTime);
       Heartbeat heartbeat = instantToHeartbeatMap.get(instantTime);
       if (heartbeat.getLastHeartbeatTime() != null && 
isHeartbeatExpired(instantTime)) {
-        log.error("Aborting, missed generating heartbeat within allowable 
interval {} ms", this.maxAllowableHeartbeatIntervalInMs);
-        // Since TimerTask allows only java.lang.Runnable, cannot throw an 
exception and bubble to the caller thread, hence
-        // explicitly interrupting the timer thread.
-        Thread.currentThread().interrupt();
+        // A previous refresh was delayed past the tolerable interval (e.g. 
due to a slow storage write
+        // or driver pressure). Do NOT interrupt the timer thread here: that 
would permanently kill all
+        // future heartbeats for this instant, turning a transient delay into 
a permanent blackout.
+        // Enforcement is done at commit time in 
HeartbeatUtils.abortIfHeartbeatExpired(), which is the
+        // correct and sole enforcement point.
+        log.warn("Missed generating heartbeat for instant {} within allowable 
interval {} ms; continuing to refresh",
+            instantTime, this.maxAllowableHeartbeatIntervalInMs);
       }
       heartbeat.setInstantTime(instantTime);
       heartbeat.setLastHeartbeatTime(newHeartbeatTime);
       heartbeat.setNumHeartbeats(heartbeat.getNumHeartbeats() + 1);
+    } catch (TimeoutException te) {

Review Comment:
   Good catch — fixed. `isHeartbeatExpired()` now falls back to the DFS read 
when the in-memory last-heartbeat time is null (which can happen if the first 
synchronous write times out), so it no longer NPEs on the unboxing comparison. 
A missing heartbeat file reads as 0 and is correctly treated as expired.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java:
##########
@@ -58,7 +65,16 @@ public class HoodieHeartbeatClient implements AutoCloseable, 
Serializable {
   // heartbeat interval in millis
   private final Long heartbeatIntervalInMs;
   private final Long maxAllowableHeartbeatIntervalInMs;
+  // Maximum time the timer thread will wait for a single heartbeat file write 
to complete before
+  // abandoning it and letting the next tick retry. Bounded to one interval so 
that a slow/hung
+  // storage write cannot block the timer thread (and thus freeze all 
subsequent heartbeats).
+  private final long heartbeatWriteTimeoutMs;

Review Comment:
   Done — changed `heartbeatWriteTimeoutMs` to boxed `Long` to match the 
sibling duration fields.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to