This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c6f38c6b6e84 fix: prevent heartbeat timer from being permanently 
killed by slow or delayed heartbeats (#18904)
c6f38c6b6e84 is described below

commit c6f38c6b6e84212f6f9aec5c88ae271ff424e741
Author: Prashant Wason <[email protected]>
AuthorDate: Mon Jun 22 01:48:47 2026 -0700

    fix: prevent heartbeat timer from being permanently killed by slow or 
delayed heartbeats (#18904)
    
    * fix: prevent heartbeat timer from being permanently killed by slow or 
delayed heartbeats
    
    HoodieHeartbeatClient could permanently stop generating heartbeats for an
    instant, causing later commits to abort with "Heartbeat for instant ... has
    expired" even though the writer was still alive:
    
    - The heartbeat file is written synchronously on the Timer thread. Because 
the
      timer uses scheduleAtFixedRate, a slow or hung storage write blocks the 
thread
      and freezes all subsequent heartbeats for that instant.
    - When a heartbeat refresh is delayed past the tolerable interval,
      updateHeartbeat() called Thread.currentThread().interrupt(), permanently
      killing the timer thread and turning a transient delay into a permanent
      blackout.
    
    Fix:
    - Perform the heartbeat file write on a bounded daemon executor (Future.get 
with
      a per-interval timeout) so a slow or hung storage call cannot block the 
timer
      thread; a timed-out write is retried on the next tick.
    - Remove the self-interrupt; log a warning and continue. The commit-time 
check
      HeartbeatUtils.abortIfHeartbeatExpired() remains the sole enforcement 
point.
    
    Add TestHoodieHeartbeatClient.testTimerSurvivesHungHeartbeatWrite.
    
    * fix: address review comments on heartbeat resilience
    
    - isHeartbeatExpired: handle a null last-heartbeat-time (the very first 
write
      can time out, leaving it unset) by falling back to the DFS read, avoiding 
an
      NPE on the unboxing comparison.
    - On a detected lapse, stop refreshing the heartbeat (cancel the timer) and 
do
      not advance the last-heartbeat time, so a lapsed writer still aborts at 
commit
      via HeartbeatUtils.abortIfHeartbeatExpired() and cannot resurrect a 
heartbeat
      that a concurrent cleaner (LAZY failed-writes policy) may already have 
acted
      on. The timer is cancelled cleanly rather than via Thread.interrupt().
    - close(): make idempotent via a closed flag and guard executor creation 
after
      close.
    - Use boxed Long for heartbeatWriteTimeoutMs to match the sibling duration
      fields.
    - Raise the default hoodie.client.heartbeat.tolerable.misses from 2 to 10 so
      transient driver pauses (e.g. GC) or storage-latency spikes do not abort a
      still-healthy writer.
    
    * fix: keep heartbeat client reusable after close
    
    A write client reuses its HoodieHeartbeatClient across operations: after
    close(), startCommit() and acquireRollbackHeartbeatIfMultiWriter() call
    heartbeatClient.start() again. The previously added "already closed" guard 
in
    the executor accessor turned this valid reuse into a failure (observed in
    TestJavaHoodieBackedMetadata and TestHoodieJavaClientOnCopyOnWriteStorage).
    
    Remove the guard and the closed flag; close() remains idempotent via
    synchronization plus the null-check on the executor, so repeated/concurrent
    close() is still safe while reuse-after-close works (the executor is lazily
    re-created on the next heartbeat).
    
    * switch timer into a executor service
    
    ---------
    
    Co-authored-by: danny0405 <[email protected]>
---
 .../client/heartbeat/HoodieHeartbeatClient.java    | 150 ++++++++++++++++-----
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   6 +-
 .../heartbeat/TestHoodieHeartbeatClient.java       | 102 ++++++++++++++
 3 files changed, 224 insertions(+), 34 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
index a043f73e632c..b8f2f15fdf0a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.client.heartbeat;
 
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.CustomizedThreadFactory;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieHeartbeatException;
@@ -35,9 +36,15 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.hudi.common.heartbeat.HoodieHeartbeatUtils.getLastHeartbeatTime;
 
@@ -58,7 +65,16 @@ public class HoodieHeartbeatClient implements AutoCloseable, 
Serializable {
   // heartbeat interval in millis
   private final Long heartbeatIntervalInMs;
   private final Long maxAllowableHeartbeatIntervalInMs;
+  // Maximum time the scheduler thread will wait for a single heartbeat file 
write to complete before
+  // abandoning it and letting the next tick retry. Bounded to one interval so 
that a slow/hung
+  // storage write cannot block the scheduler thread (and thus freeze all 
subsequent heartbeats).
+  private final Long heartbeatWriteTimeoutMs;
   private final Map<String, Heartbeat> instantToHeartbeatMap;
+  // Daemon executor used to perform the (potentially slow) storage write off 
the scheduler thread so the
+  // write can be time-bounded. A cached pool is intentional: if one write 
hangs, that thread is left
+  // parked while the next tick proceeds on a fresh thread. Lazily created and 
marked transient since
+  // this client is Serializable with a transient storage handle.
+  private transient ExecutorService heartbeatWriteExecutor;
 
   public HoodieHeartbeatClient(HoodieStorage storage, String basePath, Long 
heartbeatIntervalInMs,
                                Integer numTolerableHeartbeatMisses) {
@@ -68,9 +84,18 @@ public class HoodieHeartbeatClient implements AutoCloseable, 
Serializable {
     this.heartbeatFolderPath = 
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
     this.heartbeatIntervalInMs = heartbeatIntervalInMs;
     this.maxAllowableHeartbeatIntervalInMs = this.heartbeatIntervalInMs * 
numTolerableHeartbeatMisses;
+    this.heartbeatWriteTimeoutMs = this.heartbeatIntervalInMs;
     this.instantToHeartbeatMap = new ConcurrentHashMap<>();
   }
 
+  private synchronized ExecutorService getHeartbeatWriteExecutor() {
+    if (heartbeatWriteExecutor == null) {
+      heartbeatWriteExecutor =
+          Executors.newCachedThreadPool(new 
CustomizedThreadFactory("heartbeat_write", true));
+    }
+    return heartbeatWriteExecutor;
+  }
+
   @Data
   static class Heartbeat {
 
@@ -79,10 +104,12 @@ public class HoodieHeartbeatClient implements 
AutoCloseable, Serializable {
     private boolean isHeartbeatStopped = false;
     private Long lastHeartbeatTime;
     private Integer numHeartbeats = 0;
-    private Timer timer = new Timer(true);
+    private ScheduledExecutorService heartbeatScheduler =
+        Executors.newSingleThreadScheduledExecutor(new 
CustomizedThreadFactory("heartbeat_scheduler", true));
+    private ScheduledFuture<?> scheduledFuture;
   }
 
-  class HeartbeatTask extends TimerTask {
+  class HeartbeatTask implements Runnable {
 
     private final String instantTime;
 
@@ -92,7 +119,11 @@ public class HoodieHeartbeatClient implements 
AutoCloseable, Serializable {
 
     @Override
     public void run() {
-      updateHeartbeat(instantTime);
+      try {
+        updateHeartbeat(instantTime);
+      } catch (Exception e) {
+        log.error("Failed to update heartbeat for instant {}; will retry on 
next tick", instantTime, e);
+      }
     }
   }
 
@@ -114,11 +145,11 @@ public class HoodieHeartbeatClient implements 
AutoCloseable, Serializable {
     newHeartbeat.setHeartbeatStarted(true);
     instantToHeartbeatMap.put(instantTime, newHeartbeat);
     // Ensure heartbeat is generated for the first time with this blocking 
call.
-    // Since timer submits the task to a thread, no guarantee when that thread 
will get CPU
+    // Since scheduler submits the task to a thread, no guarantee when that 
thread will get CPU
     // cycles to generate the first heartbeat.
     updateHeartbeat(instantTime);
-    newHeartbeat.getTimer().scheduleAtFixedRate(new 
HeartbeatTask(instantTime), this.heartbeatIntervalInMs,
-        this.heartbeatIntervalInMs);
+    
newHeartbeat.setScheduledFuture(newHeartbeat.getHeartbeatScheduler().scheduleAtFixedRate(
+        new HeartbeatTask(instantTime), this.heartbeatIntervalInMs, 
this.heartbeatIntervalInMs, TimeUnit.MILLISECONDS));
   }
 
   /**
@@ -130,7 +161,7 @@ public class HoodieHeartbeatClient implements 
AutoCloseable, Serializable {
   public Heartbeat stop(String instantTime) throws HoodieException {
     Heartbeat heartbeat = instantToHeartbeatMap.remove(instantTime);
     if (isHeartbeatStarted(heartbeat)) {
-      stopHeartbeatTimer(heartbeat);
+      stopHeartbeatScheduler(heartbeat);
       HeartbeatUtils.deleteHeartbeatFile(storage, basePath, instantTime);
       log.info("Deleted heartbeat file for instant {}", instantTime);
     }
@@ -138,12 +169,12 @@ public class HoodieHeartbeatClient implements 
AutoCloseable, Serializable {
   }
 
   /**
-   * Stops all timers of heartbeats started via this instance of the client.
+   * Stops all heartbeat schedulers started via this instance of the client.
    *
    * @throws HoodieException
    */
   public void stopHeartbeatTimers() throws HoodieException {
-    
instantToHeartbeatMap.values().stream().filter(this::isHeartbeatStarted).forEach(this::stopHeartbeatTimer);
+    
instantToHeartbeatMap.values().stream().filter(this::isHeartbeatStarted).forEach(this::stopHeartbeatScheduler);
   }
 
   /**
@@ -158,17 +189,24 @@ public class HoodieHeartbeatClient implements 
AutoCloseable, Serializable {
   }
 
   /**
-   * Stops the timer of the given heartbeat.
+   * Stops the scheduler of the given heartbeat.
    *
    * @param heartbeat The heartbeat to stop.
    */
-  private void stopHeartbeatTimer(Heartbeat heartbeat) {
+  private void stopHeartbeatScheduler(Heartbeat heartbeat) {
     log.info("Stopping heartbeat for instant {}", heartbeat.getInstantTime());
-    heartbeat.getTimer().cancel();
+    shutdownHeartbeatScheduler(heartbeat);
     heartbeat.setHeartbeatStopped(true);
     log.info("Stopped heartbeat for instant {}", heartbeat.getInstantTime());
   }
 
+  private void shutdownHeartbeatScheduler(Heartbeat heartbeat) {
+    if (heartbeat.getScheduledFuture() != null) {
+      heartbeat.getScheduledFuture().cancel(false);
+    }
+    heartbeat.getHeartbeatScheduler().shutdownNow();
+  }
+
   public static Boolean heartbeatExists(HoodieStorage storage, String 
basePath, String instantTime) throws IOException {
     StoragePath heartbeatFilePath = new StoragePath(
         HoodieTableMetaClient.getHeartbeatFolderPath(basePath), instantTime);
@@ -178,17 +216,18 @@ public class HoodieHeartbeatClient implements 
AutoCloseable, Serializable {
   public boolean isHeartbeatExpired(String instantTime) throws IOException {
     Long currentTime = System.currentTimeMillis();
     Heartbeat lastHeartbeatForWriter = instantToHeartbeatMap.get(instantTime);
-    if (lastHeartbeatForWriter == null) {
-      log.info("Heartbeat not found in internal map, falling back to reading 
from DFS");
-      long lastHeartbeatForWriterTime = getLastHeartbeatTime(this.storage, 
basePath, instantTime);
-      lastHeartbeatForWriter = new Heartbeat();
-      lastHeartbeatForWriter.setLastHeartbeatTime(lastHeartbeatForWriterTime);
-      lastHeartbeatForWriter.setInstantTime(instantTime);
-      lastHeartbeatForWriter.getTimer().cancel();
+    Long lastHeartbeatTime = lastHeartbeatForWriter == null ? null : 
lastHeartbeatForWriter.getLastHeartbeatTime();
+    // lastHeartbeatTime can be null when the heartbeat is not in the internal 
map, or when it is in the
+    // map but no heartbeat has been generated yet (e.g. the first write timed 
out). In both cases fall
+    // back to reading the last heartbeat time from DFS (returns 0 if no 
heartbeat file exists, which is
+    // correctly treated as expired).
+    if (lastHeartbeatTime == null) {
+      log.info("Heartbeat time not available in internal map, falling back to 
reading from DFS");
+      lastHeartbeatTime = getLastHeartbeatTime(this.storage, basePath, 
instantTime);
     }
-    if (currentTime - lastHeartbeatForWriter.getLastHeartbeatTime() > 
this.maxAllowableHeartbeatIntervalInMs) {
+    if (currentTime - lastHeartbeatTime > 
this.maxAllowableHeartbeatIntervalInMs) {
       log.warn("Heartbeat expired, currentTime = {}, last heartbeat = {}, 
heartbeat interval = {}", currentTime,
-          lastHeartbeatForWriter, this.heartbeatIntervalInMs);
+          lastHeartbeatTime, this.heartbeatIntervalInMs);
       return true;
     }
     return false;
@@ -197,20 +236,31 @@ public class HoodieHeartbeatClient implements 
AutoCloseable, Serializable {
   private void updateHeartbeat(String instantTime) throws 
HoodieHeartbeatException {
     try {
       Long newHeartbeatTime = System.currentTimeMillis();
-      OutputStream outputStream =
-          this.storage.create(
-              new StoragePath(heartbeatFolderPath, instantTime), true);
-      outputStream.close();
+      writeHeartbeatFile(instantTime);
       Heartbeat heartbeat = instantToHeartbeatMap.get(instantTime);
       if (heartbeat.getLastHeartbeatTime() != null && 
isHeartbeatExpired(instantTime)) {
-        log.error("Aborting, missed generating heartbeat within allowable 
interval {} ms", this.maxAllowableHeartbeatIntervalInMs);
-        // Since TimerTask allows only java.lang.Runnable, cannot throw an 
exception and bubble to the caller thread, hence
-        // explicitly interrupting the timer thread.
-        Thread.currentThread().interrupt();
+        // A previous refresh was delayed past the tolerable interval. Stop 
refreshing this heartbeat
+        // (cancel the scheduler) and do NOT advance the last heartbeat time, 
so the heartbeat stays expired
+        // and the writer aborts at commit time via 
HeartbeatUtils.abortIfHeartbeatExpired(). We must not
+        // keep refreshing here: a concurrent process (e.g. an async cleaner 
under LAZY failed-writes
+        // policy) may already have started rolling back this instant once it 
observed the expiry, and
+        // resurrecting the heartbeat could let this writer commit on top of 
rolled-back files.
+        // The scheduler is cancelled cleanly rather than via 
Thread.interrupt(), which would permanently
+        // kill the scheduler thread (turning a transient delay into a 
permanent blackout on the first miss).
+        log.error("Missed generating heartbeat for instant {} within allowable 
interval {} ms; stopping heartbeat refresh",
+            instantTime, this.maxAllowableHeartbeatIntervalInMs);
+        shutdownHeartbeatScheduler(heartbeat);
+        return;
       }
       heartbeat.setInstantTime(instantTime);
       heartbeat.setLastHeartbeatTime(newHeartbeatTime);
       heartbeat.setNumHeartbeats(heartbeat.getNumHeartbeats() + 1);
+    } catch (TimeoutException te) {
+      // The storage write did not complete within the bounded window. Do not 
advance the last heartbeat
+      // time (the write is unconfirmed); the next scheduled tick will retry 
on a fresh executor thread.
+      // Crucially, the scheduler thread is freed instead of being blocked by 
a hung storage call.
+      log.warn("Heartbeat file write for instant {} did not complete within {} 
ms; will retry on next tick",
+          instantTime, this.heartbeatWriteTimeoutMs);
     } catch (IOException io) {
       boolean isHeartbeatStopped = 
instantToHeartbeatMap.get(instantTime).isHeartbeatStopped();
       if (isHeartbeatStopped) {
@@ -221,13 +271,49 @@ public class HoodieHeartbeatClient implements 
AutoCloseable, Serializable {
     }
   }
 
+  /**
+   * Writes the heartbeat file for the given instant on a dedicated daemon 
executor, bounded by
+   * {@link #heartbeatWriteTimeoutMs}. Performing the storage write off the 
scheduler thread (and with a
+   * timeout) ensures that a slow or hung storage call cannot block the 
scheduler thread and freeze all
+   * subsequent heartbeats for this instant.
+   */
+  private void writeHeartbeatFile(String instantTime) throws IOException, 
TimeoutException {
+    Future<Void> future = getHeartbeatWriteExecutor().submit(() -> {
+      try (OutputStream outputStream =
+               this.storage.create(new StoragePath(heartbeatFolderPath, 
instantTime), true)) {
+        // create + close confirms the heartbeat file write landed on storage.
+      }
+      return null;
+    });
+    try {
+      future.get(heartbeatWriteTimeoutMs, TimeUnit.MILLISECONDS);
+    } catch (TimeoutException te) {
+      future.cancel(true);
+      throw te;
+    } catch (InterruptedException ie) {
+      future.cancel(true);
+      Thread.currentThread().interrupt();
+      throw new HoodieHeartbeatException("Interrupted while writing heartbeat 
for instant " + instantTime, ie);
+    } catch (ExecutionException ee) {
+      Throwable cause = ee.getCause();
+      if (cause instanceof IOException) {
+        throw (IOException) cause;
+      }
+      throw new HoodieHeartbeatException("Failed to write heartbeat for 
instant " + instantTime, cause);
+    }
+  }
+
   public Heartbeat getHeartbeat(String instantTime) {
     return this.instantToHeartbeatMap.get(instantTime);
   }
 
   @Override
-  public void close() {
+  public synchronized void close() {
     this.stopHeartbeatTimers();
     this.instantToHeartbeatMap.clear();
+    if (heartbeatWriteExecutor != null) {
+      heartbeatWriteExecutor.shutdownNow();
+      heartbeatWriteExecutor = null;
+    }
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 9e4263a4fbdd..115c1216c450 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -679,9 +679,11 @@ public class HoodieWriteConfig extends HoodieConfig {
 
   public static final ConfigProperty<Integer> 
CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES = ConfigProperty
       .key("hoodie.client.heartbeat.tolerable.misses")
-      .defaultValue(2)
+      .defaultValue(10)
       .markAdvanced()
-      .withDocumentation("Number of heartbeat misses, before a writer is 
deemed not alive and all pending writes are aborted.");
+      .withDocumentation("Number of heartbeat misses, before a writer is 
deemed not alive and all pending writes are aborted. "
+          + "A higher value tolerates transient driver pauses (e.g. GC) or 
storage-latency spikes that would otherwise "
+          + "delay a heartbeat and cause a still-healthy writer's commit to be 
aborted.");
 
   public static final ConfigProperty<Boolean> 
CLUSTERING_BLOCK_FOR_PENDING_INGESTION = ConfigProperty
       
.key("hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution")
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/heartbeat/TestHoodieHeartbeatClient.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/heartbeat/TestHoodieHeartbeatClient.java
index c7ea5fa87bbd..5feb79ee83bc 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/heartbeat/TestHoodieHeartbeatClient.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/heartbeat/TestHoodieHeartbeatClient.java
@@ -21,12 +21,18 @@ package org.apache.hudi.client.heartbeat;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.awaitility.Awaitility.await;
@@ -113,4 +119,100 @@ public class TestHoodieHeartbeatClient extends 
HoodieCommonTestHarness {
     assertFalse(hoodieHeartbeatClient.isHeartbeatExpired(instantTime1));
     
assertTrue(hoodieHeartbeatClient.getHeartbeat(instantTime1).isHeartbeatStopped());
   }
+
+  /**
+   * Regression test for the heartbeat-expiry incident: a single slow/hung 
storage write must not
+   * block (freeze) the heartbeat scheduler thread. The first heartbeat write 
blocks (simulating a hung
+   * cloud-storage call); we assert the scheduler keeps producing heartbeats 
on fresh threads once that
+   * write times out, proving the scheduler thread was not blocked by the 
synchronous storage call (#1).
+   * A high tolerable-misses is used so that recovery after the blocked write 
does not itself trip the
+   * expiry path (which intentionally stops refresh on a genuine lapse).
+   */
+  @Test
+  public void testSlowHeartbeatWriteDoesNotBlockScheduler() {
+    CountDownLatch releaseFirstWrite = new CountDownLatch(1);
+    SlowCreateStorage slowStorage =
+        new SlowCreateStorage((FileSystem) 
metaClient.getStorage().getFileSystem(), releaseFirstWrite);
+    // interval 1s, write timeout = 1s; high tolerable-misses so the ~1s 
recovery gap stays well within
+    // the allowable window and the scheduler keeps beating rather than 
treating it as a lapse.
+    HoodieHeartbeatClient hoodieHeartbeatClient =
+        new HoodieHeartbeatClient(slowStorage, 
metaClient.getBasePath().toString(),
+            heartBeatInterval, 10);
+    try {
+      hoodieHeartbeatClient.start(instantTime1);
+      // Despite the first write hanging, the scheduler must keep generating 
heartbeats on fresh threads.
+      await().atMost(15, SECONDS)
+          .until(() -> 
hoodieHeartbeatClient.getHeartbeat(instantTime1).getNumHeartbeats() >= 2);
+    } finally {
+      releaseFirstWrite.countDown();
+      hoodieHeartbeatClient.close();
+    }
+  }
+
+  @Test
+  public void testScheduledHeartbeatRetriesAfterWriteFailure() {
+    FailOnceAfterInitialCreateStorage storage =
+        new FailOnceAfterInitialCreateStorage((FileSystem) 
metaClient.getStorage().getFileSystem());
+    HoodieHeartbeatClient hoodieHeartbeatClient =
+        new HoodieHeartbeatClient(storage, 
metaClient.getBasePath().toString(), heartBeatInterval, 10);
+    try {
+      hoodieHeartbeatClient.start(instantTime1);
+      await().atMost(10, SECONDS).until(storage::hasInjectedFailure);
+      await().atMost(10, SECONDS)
+          .until(() -> 
hoodieHeartbeatClient.getHeartbeat(instantTime1).getNumHeartbeats() >= 2);
+    } finally {
+      hoodieHeartbeatClient.close();
+    }
+  }
+
+  /**
+   * A storage wrapper whose first {@code create()} call blocks until 
released, simulating a hung
+   * storage write. All subsequent calls delegate normally.
+   */
+  private static class SlowCreateStorage extends HoodieHadoopStorage {
+
+    private final AtomicBoolean firstCall = new AtomicBoolean(true);
+    private final CountDownLatch releaseFirstWrite;
+
+    SlowCreateStorage(FileSystem fs, CountDownLatch releaseFirstWrite) {
+      super(fs);
+      this.releaseFirstWrite = releaseFirstWrite;
+    }
+
+    @Override
+    public OutputStream create(StoragePath path, boolean overwrite) throws 
IOException {
+      if (firstCall.getAndSet(false)) {
+        try {
+          releaseFirstWrite.await();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException("Interrupted while simulating a hung heartbeat 
write", e);
+        }
+      }
+      return super.create(path, overwrite);
+    }
+  }
+
+  private static class FailOnceAfterInitialCreateStorage extends 
HoodieHadoopStorage {
+
+    private final AtomicInteger createCalls = new AtomicInteger(0);
+    private final AtomicBoolean injectedFailure = new AtomicBoolean(false);
+
+    FailOnceAfterInitialCreateStorage(FileSystem fs) {
+      super(fs);
+    }
+
+    @Override
+    public OutputStream create(StoragePath path, boolean overwrite) throws 
IOException {
+      int currentCall = createCalls.incrementAndGet();
+      if (currentCall == 2 && injectedFailure.compareAndSet(false, true)) {
+        throw new IOException("Injected scheduled heartbeat write failure");
+      }
+      return super.create(path, overwrite);
+    }
+
+    private boolean hasInjectedFailure() {
+      return injectedFailure.get();
+    }
+  }
 }

Reply via email to