This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c6f38c6b6e84 fix: prevent heartbeat timer from being permanently
killed by slow or delayed heartbeats (#18904)
c6f38c6b6e84 is described below
commit c6f38c6b6e84212f6f9aec5c88ae271ff424e741
Author: Prashant Wason <[email protected]>
AuthorDate: Mon Jun 22 01:48:47 2026 -0700
fix: prevent heartbeat timer from being permanently killed by slow or
delayed heartbeats (#18904)
* fix: prevent heartbeat timer from being permanently killed by slow or
delayed heartbeats
HoodieHeartbeatClient could permanently stop generating heartbeats for an
instant, causing later commits to abort with "Heartbeat for instant ... has
expired" even though the writer was still alive:
- The heartbeat file is written synchronously on the Timer thread. Because
the
timer uses scheduleAtFixedRate, a slow or hung storage write blocks the
thread
and freezes all subsequent heartbeats for that instant.
- When a heartbeat refresh is delayed past the tolerable interval,
updateHeartbeat() called Thread.currentThread().interrupt(), permanently
killing the timer thread and turning a transient delay into a permanent
blackout.
Fix:
- Perform the heartbeat file write on a bounded daemon executor (Future.get
with
a per-interval timeout) so a slow or hung storage call cannot block the
timer
thread; a timed-out write is retried on the next tick.
- Remove the self-interrupt; log a warning and continue. The commit-time
check
HeartbeatUtils.abortIfHeartbeatExpired() remains the sole enforcement
point.
Add TestHoodieHeartbeatClient.testTimerSurvivesHungHeartbeatWrite.
* fix: address review comments on heartbeat resilience
- isHeartbeatExpired: handle a null last-heartbeat-time (the very first
write
can time out, leaving it unset) by falling back to the DFS read, avoiding
an
NPE on the unboxing comparison.
- On a detected lapse, stop refreshing the heartbeat (cancel the timer) and
do
not advance the last-heartbeat time, so a lapsed writer still aborts at
commit
via HeartbeatUtils.abortIfHeartbeatExpired() and cannot resurrect a
heartbeat
that a concurrent cleaner (LAZY failed-writes policy) may already have
acted
on. The timer is cancelled cleanly rather than via Thread.interrupt().
- close(): make idempotent via a closed flag and guard executor creation
after
close.
- Use boxed Long for heartbeatWriteTimeoutMs to match the sibling duration
fields.
- Raise the default hoodie.client.heartbeat.tolerable.misses from 2 to 10 so
transient driver pauses (e.g. GC) or storage-latency spikes do not abort a
still-healthy writer.
* fix: keep heartbeat client reusable after close
A write client reuses its HoodieHeartbeatClient across operations: after
close(), startCommit() and acquireRollbackHeartbeatIfMultiWriter() call
heartbeatClient.start() again. The previously added "already closed" guard
in
the executor accessor turned this valid reuse into a failure (observed in
TestJavaHoodieBackedMetadata and TestHoodieJavaClientOnCopyOnWriteStorage).
Remove the guard and the closed flag; close() remains idempotent via
synchronization plus the null-check on the executor, so repeated/concurrent
close() is still safe while reuse-after-close works (the executor is lazily
re-created on the next heartbeat).
* switch timer into a executor service
---------
Co-authored-by: danny0405 <[email protected]>
---
.../client/heartbeat/HoodieHeartbeatClient.java | 150 ++++++++++++++++-----
.../org/apache/hudi/config/HoodieWriteConfig.java | 6 +-
.../heartbeat/TestHoodieHeartbeatClient.java | 102 ++++++++++++++
3 files changed, 224 insertions(+), 34 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
index a043f73e632c..b8f2f15fdf0a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
@@ -19,6 +19,7 @@
package org.apache.hudi.client.heartbeat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieHeartbeatException;
@@ -35,9 +36,15 @@ import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static
org.apache.hudi.common.heartbeat.HoodieHeartbeatUtils.getLastHeartbeatTime;
@@ -58,7 +65,16 @@ public class HoodieHeartbeatClient implements AutoCloseable,
Serializable {
// heartbeat interval in millis
private final Long heartbeatIntervalInMs;
private final Long maxAllowableHeartbeatIntervalInMs;
+ // Maximum time the scheduler thread will wait for a single heartbeat file
write to complete before
+ // abandoning it and letting the next tick retry. Bounded to one interval so
that a slow/hung
+ // storage write cannot block the scheduler thread (and thus freeze all
subsequent heartbeats).
+ private final Long heartbeatWriteTimeoutMs;
private final Map<String, Heartbeat> instantToHeartbeatMap;
+ // Daemon executor used to perform the (potentially slow) storage write off
the scheduler thread so the
+ // write can be time-bounded. A cached pool is intentional: if one write
hangs, that thread is left
+ // parked while the next tick proceeds on a fresh thread. Lazily created and
marked transient since
+ // this client is Serializable with a transient storage handle.
+ private transient ExecutorService heartbeatWriteExecutor;
public HoodieHeartbeatClient(HoodieStorage storage, String basePath, Long
heartbeatIntervalInMs,
Integer numTolerableHeartbeatMisses) {
@@ -68,9 +84,18 @@ public class HoodieHeartbeatClient implements AutoCloseable,
Serializable {
this.heartbeatFolderPath =
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
this.heartbeatIntervalInMs = heartbeatIntervalInMs;
this.maxAllowableHeartbeatIntervalInMs = this.heartbeatIntervalInMs *
numTolerableHeartbeatMisses;
+ this.heartbeatWriteTimeoutMs = this.heartbeatIntervalInMs;
this.instantToHeartbeatMap = new ConcurrentHashMap<>();
}
+ private synchronized ExecutorService getHeartbeatWriteExecutor() {
+ if (heartbeatWriteExecutor == null) {
+ heartbeatWriteExecutor =
+ Executors.newCachedThreadPool(new
CustomizedThreadFactory("heartbeat_write", true));
+ }
+ return heartbeatWriteExecutor;
+ }
+
@Data
static class Heartbeat {
@@ -79,10 +104,12 @@ public class HoodieHeartbeatClient implements
AutoCloseable, Serializable {
private boolean isHeartbeatStopped = false;
private Long lastHeartbeatTime;
private Integer numHeartbeats = 0;
- private Timer timer = new Timer(true);
+ private ScheduledExecutorService heartbeatScheduler =
+ Executors.newSingleThreadScheduledExecutor(new
CustomizedThreadFactory("heartbeat_scheduler", true));
+ private ScheduledFuture<?> scheduledFuture;
}
- class HeartbeatTask extends TimerTask {
+ class HeartbeatTask implements Runnable {
private final String instantTime;
@@ -92,7 +119,11 @@ public class HoodieHeartbeatClient implements
AutoCloseable, Serializable {
@Override
public void run() {
- updateHeartbeat(instantTime);
+ try {
+ updateHeartbeat(instantTime);
+ } catch (Exception e) {
+ log.error("Failed to update heartbeat for instant {}; will retry on
next tick", instantTime, e);
+ }
}
}
@@ -114,11 +145,11 @@ public class HoodieHeartbeatClient implements
AutoCloseable, Serializable {
newHeartbeat.setHeartbeatStarted(true);
instantToHeartbeatMap.put(instantTime, newHeartbeat);
// Ensure heartbeat is generated for the first time with this blocking
call.
- // Since timer submits the task to a thread, no guarantee when that thread
will get CPU
+ // Since scheduler submits the task to a thread, no guarantee when that
thread will get CPU
// cycles to generate the first heartbeat.
updateHeartbeat(instantTime);
- newHeartbeat.getTimer().scheduleAtFixedRate(new
HeartbeatTask(instantTime), this.heartbeatIntervalInMs,
- this.heartbeatIntervalInMs);
+
newHeartbeat.setScheduledFuture(newHeartbeat.getHeartbeatScheduler().scheduleAtFixedRate(
+ new HeartbeatTask(instantTime), this.heartbeatIntervalInMs,
this.heartbeatIntervalInMs, TimeUnit.MILLISECONDS));
}
/**
@@ -130,7 +161,7 @@ public class HoodieHeartbeatClient implements
AutoCloseable, Serializable {
public Heartbeat stop(String instantTime) throws HoodieException {
Heartbeat heartbeat = instantToHeartbeatMap.remove(instantTime);
if (isHeartbeatStarted(heartbeat)) {
- stopHeartbeatTimer(heartbeat);
+ stopHeartbeatScheduler(heartbeat);
HeartbeatUtils.deleteHeartbeatFile(storage, basePath, instantTime);
log.info("Deleted heartbeat file for instant {}", instantTime);
}
@@ -138,12 +169,12 @@ public class HoodieHeartbeatClient implements
AutoCloseable, Serializable {
}
/**
- * Stops all timers of heartbeats started via this instance of the client.
+ * Stops all heartbeat schedulers started via this instance of the client.
*
* @throws HoodieException
*/
public void stopHeartbeatTimers() throws HoodieException {
-
instantToHeartbeatMap.values().stream().filter(this::isHeartbeatStarted).forEach(this::stopHeartbeatTimer);
+
instantToHeartbeatMap.values().stream().filter(this::isHeartbeatStarted).forEach(this::stopHeartbeatScheduler);
}
/**
@@ -158,17 +189,24 @@ public class HoodieHeartbeatClient implements
AutoCloseable, Serializable {
}
/**
- * Stops the timer of the given heartbeat.
+ * Stops the scheduler of the given heartbeat.
*
* @param heartbeat The heartbeat to stop.
*/
- private void stopHeartbeatTimer(Heartbeat heartbeat) {
+ private void stopHeartbeatScheduler(Heartbeat heartbeat) {
log.info("Stopping heartbeat for instant {}", heartbeat.getInstantTime());
- heartbeat.getTimer().cancel();
+ shutdownHeartbeatScheduler(heartbeat);
heartbeat.setHeartbeatStopped(true);
log.info("Stopped heartbeat for instant {}", heartbeat.getInstantTime());
}
+ private void shutdownHeartbeatScheduler(Heartbeat heartbeat) {
+ if (heartbeat.getScheduledFuture() != null) {
+ heartbeat.getScheduledFuture().cancel(false);
+ }
+ heartbeat.getHeartbeatScheduler().shutdownNow();
+ }
+
public static Boolean heartbeatExists(HoodieStorage storage, String
basePath, String instantTime) throws IOException {
StoragePath heartbeatFilePath = new StoragePath(
HoodieTableMetaClient.getHeartbeatFolderPath(basePath), instantTime);
@@ -178,17 +216,18 @@ public class HoodieHeartbeatClient implements
AutoCloseable, Serializable {
public boolean isHeartbeatExpired(String instantTime) throws IOException {
Long currentTime = System.currentTimeMillis();
Heartbeat lastHeartbeatForWriter = instantToHeartbeatMap.get(instantTime);
- if (lastHeartbeatForWriter == null) {
- log.info("Heartbeat not found in internal map, falling back to reading
from DFS");
- long lastHeartbeatForWriterTime = getLastHeartbeatTime(this.storage,
basePath, instantTime);
- lastHeartbeatForWriter = new Heartbeat();
- lastHeartbeatForWriter.setLastHeartbeatTime(lastHeartbeatForWriterTime);
- lastHeartbeatForWriter.setInstantTime(instantTime);
- lastHeartbeatForWriter.getTimer().cancel();
+ Long lastHeartbeatTime = lastHeartbeatForWriter == null ? null :
lastHeartbeatForWriter.getLastHeartbeatTime();
+ // lastHeartbeatTime can be null when the heartbeat is not in the internal
map, or when it is in the
+ // map but no heartbeat has been generated yet (e.g. the first write timed
out). In both cases fall
+ // back to reading the last heartbeat time from DFS (returns 0 if no
heartbeat file exists, which is
+ // correctly treated as expired).
+ if (lastHeartbeatTime == null) {
+ log.info("Heartbeat time not available in internal map, falling back to
reading from DFS");
+ lastHeartbeatTime = getLastHeartbeatTime(this.storage, basePath,
instantTime);
}
- if (currentTime - lastHeartbeatForWriter.getLastHeartbeatTime() >
this.maxAllowableHeartbeatIntervalInMs) {
+ if (currentTime - lastHeartbeatTime >
this.maxAllowableHeartbeatIntervalInMs) {
log.warn("Heartbeat expired, currentTime = {}, last heartbeat = {},
heartbeat interval = {}", currentTime,
- lastHeartbeatForWriter, this.heartbeatIntervalInMs);
+ lastHeartbeatTime, this.heartbeatIntervalInMs);
return true;
}
return false;
@@ -197,20 +236,31 @@ public class HoodieHeartbeatClient implements
AutoCloseable, Serializable {
private void updateHeartbeat(String instantTime) throws
HoodieHeartbeatException {
try {
Long newHeartbeatTime = System.currentTimeMillis();
- OutputStream outputStream =
- this.storage.create(
- new StoragePath(heartbeatFolderPath, instantTime), true);
- outputStream.close();
+ writeHeartbeatFile(instantTime);
Heartbeat heartbeat = instantToHeartbeatMap.get(instantTime);
if (heartbeat.getLastHeartbeatTime() != null &&
isHeartbeatExpired(instantTime)) {
- log.error("Aborting, missed generating heartbeat within allowable
interval {} ms", this.maxAllowableHeartbeatIntervalInMs);
- // Since TimerTask allows only java.lang.Runnable, cannot throw an
exception and bubble to the caller thread, hence
- // explicitly interrupting the timer thread.
- Thread.currentThread().interrupt();
+ // A previous refresh was delayed past the tolerable interval. Stop
refreshing this heartbeat
+ // (cancel the scheduler) and do NOT advance the last heartbeat time,
so the heartbeat stays expired
+ // and the writer aborts at commit time via
HeartbeatUtils.abortIfHeartbeatExpired(). We must not
+ // keep refreshing here: a concurrent process (e.g. an async cleaner
under LAZY failed-writes
+ // policy) may already have started rolling back this instant once it
observed the expiry, and
+ // resurrecting the heartbeat could let this writer commit on top of
rolled-back files.
+ // The scheduler is cancelled cleanly rather than via
Thread.interrupt(), which would permanently
+ // kill the scheduler thread (turning a transient delay into a
permanent blackout on the first miss).
+ log.error("Missed generating heartbeat for instant {} within allowable
interval {} ms; stopping heartbeat refresh",
+ instantTime, this.maxAllowableHeartbeatIntervalInMs);
+ shutdownHeartbeatScheduler(heartbeat);
+ return;
}
heartbeat.setInstantTime(instantTime);
heartbeat.setLastHeartbeatTime(newHeartbeatTime);
heartbeat.setNumHeartbeats(heartbeat.getNumHeartbeats() + 1);
+ } catch (TimeoutException te) {
+ // The storage write did not complete within the bounded window. Do not
advance the last heartbeat
+ // time (the write is unconfirmed); the next scheduled tick will retry
on a fresh executor thread.
+ // Crucially, the scheduler thread is freed instead of being blocked by
a hung storage call.
+ log.warn("Heartbeat file write for instant {} did not complete within {}
ms; will retry on next tick",
+ instantTime, this.heartbeatWriteTimeoutMs);
} catch (IOException io) {
boolean isHeartbeatStopped =
instantToHeartbeatMap.get(instantTime).isHeartbeatStopped();
if (isHeartbeatStopped) {
@@ -221,13 +271,49 @@ public class HoodieHeartbeatClient implements
AutoCloseable, Serializable {
}
}
+ /**
+ * Writes the heartbeat file for the given instant on a dedicated daemon
executor, bounded by
+ * {@link #heartbeatWriteTimeoutMs}. Performing the storage write off the
scheduler thread (and with a
+ * timeout) ensures that a slow or hung storage call cannot block the
scheduler thread and freeze all
+ * subsequent heartbeats for this instant.
+ */
+ private void writeHeartbeatFile(String instantTime) throws IOException,
TimeoutException {
+ Future<Void> future = getHeartbeatWriteExecutor().submit(() -> {
+ try (OutputStream outputStream =
+ this.storage.create(new StoragePath(heartbeatFolderPath,
instantTime), true)) {
+ // create + close confirms the heartbeat file write landed on storage.
+ }
+ return null;
+ });
+ try {
+ future.get(heartbeatWriteTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException te) {
+ future.cancel(true);
+ throw te;
+ } catch (InterruptedException ie) {
+ future.cancel(true);
+ Thread.currentThread().interrupt();
+ throw new HoodieHeartbeatException("Interrupted while writing heartbeat
for instant " + instantTime, ie);
+ } catch (ExecutionException ee) {
+ Throwable cause = ee.getCause();
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ }
+ throw new HoodieHeartbeatException("Failed to write heartbeat for
instant " + instantTime, cause);
+ }
+ }
+
public Heartbeat getHeartbeat(String instantTime) {
return this.instantToHeartbeatMap.get(instantTime);
}
@Override
- public void close() {
+ public synchronized void close() {
this.stopHeartbeatTimers();
this.instantToHeartbeatMap.clear();
+ if (heartbeatWriteExecutor != null) {
+ heartbeatWriteExecutor.shutdownNow();
+ heartbeatWriteExecutor = null;
+ }
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 9e4263a4fbdd..115c1216c450 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -679,9 +679,11 @@ public class HoodieWriteConfig extends HoodieConfig {
public static final ConfigProperty<Integer>
CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES = ConfigProperty
.key("hoodie.client.heartbeat.tolerable.misses")
- .defaultValue(2)
+ .defaultValue(10)
.markAdvanced()
- .withDocumentation("Number of heartbeat misses, before a writer is
deemed not alive and all pending writes are aborted.");
+ .withDocumentation("Number of heartbeat misses, before a writer is
deemed not alive and all pending writes are aborted. "
+ + "A higher value tolerates transient driver pauses (e.g. GC) or
storage-latency spikes that would otherwise "
+ + "delay a heartbeat and cause a still-healthy writer's commit to be
aborted.");
public static final ConfigProperty<Boolean>
CLUSTERING_BLOCK_FOR_PENDING_INGESTION = ConfigProperty
.key("hoodie.clustering.fail.on.pending.ingestion.during.conflict.resolution")
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/heartbeat/TestHoodieHeartbeatClient.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/heartbeat/TestHoodieHeartbeatClient.java
index c7ea5fa87bbd..5feb79ee83bc 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/heartbeat/TestHoodieHeartbeatClient.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/heartbeat/TestHoodieHeartbeatClient.java
@@ -21,12 +21,18 @@ package org.apache.hudi.client.heartbeat;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
+import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
@@ -113,4 +119,100 @@ public class TestHoodieHeartbeatClient extends
HoodieCommonTestHarness {
assertFalse(hoodieHeartbeatClient.isHeartbeatExpired(instantTime1));
assertTrue(hoodieHeartbeatClient.getHeartbeat(instantTime1).isHeartbeatStopped());
}
+
+ /**
+ * Regression test for the heartbeat-expiry incident: a single slow/hung
storage write must not
+ * block (freeze) the heartbeat scheduler thread. The first heartbeat write
blocks (simulating a hung
+ * cloud-storage call); we assert the scheduler keeps producing heartbeats
on fresh threads once that
+ * write times out, proving the scheduler thread was not blocked by the
synchronous storage call (#1).
+ * A high tolerable-misses is used so that recovery after the blocked write
does not itself trip the
+ * expiry path (which intentionally stops refresh on a genuine lapse).
+ */
+ @Test
+ public void testSlowHeartbeatWriteDoesNotBlockScheduler() {
+ CountDownLatch releaseFirstWrite = new CountDownLatch(1);
+ SlowCreateStorage slowStorage =
+ new SlowCreateStorage((FileSystem)
metaClient.getStorage().getFileSystem(), releaseFirstWrite);
+ // interval 1s, write timeout = 1s; high tolerable-misses so the ~1s
recovery gap stays well within
+ // the allowable window and the scheduler keeps beating rather than
treating it as a lapse.
+ HoodieHeartbeatClient hoodieHeartbeatClient =
+ new HoodieHeartbeatClient(slowStorage,
metaClient.getBasePath().toString(),
+ heartBeatInterval, 10);
+ try {
+ hoodieHeartbeatClient.start(instantTime1);
+ // Despite the first write hanging, the scheduler must keep generating
heartbeats on fresh threads.
+ await().atMost(15, SECONDS)
+ .until(() ->
hoodieHeartbeatClient.getHeartbeat(instantTime1).getNumHeartbeats() >= 2);
+ } finally {
+ releaseFirstWrite.countDown();
+ hoodieHeartbeatClient.close();
+ }
+ }
+
+ @Test
+ public void testScheduledHeartbeatRetriesAfterWriteFailure() {
+ FailOnceAfterInitialCreateStorage storage =
+ new FailOnceAfterInitialCreateStorage((FileSystem)
metaClient.getStorage().getFileSystem());
+ HoodieHeartbeatClient hoodieHeartbeatClient =
+ new HoodieHeartbeatClient(storage,
metaClient.getBasePath().toString(), heartBeatInterval, 10);
+ try {
+ hoodieHeartbeatClient.start(instantTime1);
+ await().atMost(10, SECONDS).until(storage::hasInjectedFailure);
+ await().atMost(10, SECONDS)
+ .until(() ->
hoodieHeartbeatClient.getHeartbeat(instantTime1).getNumHeartbeats() >= 2);
+ } finally {
+ hoodieHeartbeatClient.close();
+ }
+ }
+
+ /**
+ * A storage wrapper whose first {@code create()} call blocks until
released, simulating a hung
+ * storage write. All subsequent calls delegate normally.
+ */
+ private static class SlowCreateStorage extends HoodieHadoopStorage {
+
+ private final AtomicBoolean firstCall = new AtomicBoolean(true);
+ private final CountDownLatch releaseFirstWrite;
+
+ SlowCreateStorage(FileSystem fs, CountDownLatch releaseFirstWrite) {
+ super(fs);
+ this.releaseFirstWrite = releaseFirstWrite;
+ }
+
+ @Override
+ public OutputStream create(StoragePath path, boolean overwrite) throws
IOException {
+ if (firstCall.getAndSet(false)) {
+ try {
+ releaseFirstWrite.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while simulating a hung heartbeat
write", e);
+ }
+ }
+ return super.create(path, overwrite);
+ }
+ }
+
+ private static class FailOnceAfterInitialCreateStorage extends
HoodieHadoopStorage {
+
+ private final AtomicInteger createCalls = new AtomicInteger(0);
+ private final AtomicBoolean injectedFailure = new AtomicBoolean(false);
+
+ FailOnceAfterInitialCreateStorage(FileSystem fs) {
+ super(fs);
+ }
+
+ @Override
+ public OutputStream create(StoragePath path, boolean overwrite) throws
IOException {
+ int currentCall = createCalls.incrementAndGet();
+ if (currentCall == 2 && injectedFailure.compareAndSet(false, true)) {
+ throw new IOException("Injected scheduled heartbeat write failure");
+ }
+ return super.create(path, overwrite);
+ }
+
+ private boolean hasInjectedFailure() {
+ return injectedFailure.get();
+ }
+ }
}