This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new afe7ea67fde7 [HUDI-9156] Add heartbeat manager for 
StorageBasedLockProvider to 0.x branch (#13710)
afe7ea67fde7 is described below

commit afe7ea67fde7e2430d5d5bd840b2c03b9d94679a
Author: Alex R <[email protected]>
AuthorDate: Mon Aug 11 23:54:21 2025 -0700

    [HUDI-9156] Add heartbeat manager for StorageBasedLockProvider to 0.x 
branch (#13710)
---
 .../transaction/lock/models/HeartbeatManager.java  |  49 +++
 .../lock/models/LockProviderHeartbeatManager.java  | 351 +++++++++++++++
 .../models/TestLockProviderHeartbeatManager.java   | 481 +++++++++++++++++++++
 3 files changed, 881 insertions(+)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/HeartbeatManager.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/HeartbeatManager.java
new file mode 100644
index 000000000000..97c9b864d498
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/HeartbeatManager.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+/**
+ * The heartbeat manager interface is meant to manage the lifecycle of 
heartbeat tasks.
+ *
+ */
+public interface HeartbeatManager extends AutoCloseable {
+
+  /**
+   * Starts the heartbeat for the given thread and does not stop until 
stopHeartbeat is called or the thread has died.
+   * @param threadToMonitor The thread to pass to/monitor when running the 
heartbeat task.
+   * @return @return True when there is no previously active heartbeat and the 
heartbeat is successfully started. False
+   * otherwise.
+   */
+  boolean startHeartbeatForThread(Thread threadToMonitor);
+
+  /**
+   * Stops the heartbeat, if one is active.
+   * This is a blocking call, which drains any in-flight heart beat task 
execution before return.
+   * @param mayInterruptIfRunning Whether we may interrupt the underlying 
heartbeat task if it is in-flight.
+   * @return true: no heartbeat task is in-flight or to be executed.
+   *         false: failed to stop the heartbeat, there can still be recurring 
execution of heartbeat tasks.
+   */
+  boolean stopHeartbeat(boolean mayInterruptIfRunning);
+
+  /**
+   * Whether the heartbeat manager has an active heartbeat task currently.
+   * @return A boolean.
+   */
+  boolean hasActiveHeartbeat();
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
new file mode 100644
index 000000000000..be73bd7cb2c0
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * LockProviderHeartbeatManager is a helper class which handles the scheduling 
and stopping of heartbeat
+ * tasks. This is intended for use with the storage based lock provider, which 
requires
+ * a separate thread to spawn and renew the lock repeatedly.
+ * It should be responsible for the entire lifecycle of the heartbeat task.
+ * Importantly, a new instance should be created for each lock provider.
+ */
+@ThreadSafe
+public class LockProviderHeartbeatManager implements HeartbeatManager {
+  public static long DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS = 15_000L;
+  @GuardedBy("this")
+  private final ScheduledExecutorService scheduler;
+
+  // Constant does not need multi-threading protections.
+  private final String ownerId;
+  private final Logger logger;
+  private final long heartbeatTimeMs;
+
+  /**
+   * Contract for the heartbeat function execution.
+   *
+   * <p>Behavior of the heartbeat manager (consumer):
+   * <ul>
+   *   <li>Executes heartBeatFuncToExec every heartbeatTimeMs when:
+   *     <ul>
+   *       <li>heartBeatFuncToExec returns true</li>
+   *     </ul>
+   *   </li>
+   *   <li>Stops executing heartBeatFuncToExec when:
+   *     <ul>
+   *       <li>heartBeatFuncToExec returns false</li>
+   *       <li>heartBeatFuncToExec throws an exception</li>
+   *       <li>heart beat manager calls stopHeartbeat, which will interrupt 
any inflight execution
+   *           and prevent further recurring executions</li>
+   *     </ul>
+   *   </li>
+   * </ul>
+   *
+   * <p>Requirements for heartBeatFuncToExec implementation:
+   * <ul>
+   *   <li>Should perform the logic of renewing lock lease</li>
+   *   <li>Should be super light-weight, typically runs within 1 second</li>
+   *   <li>Should handle thread interruptions so that the stopHeartbeat 
function will not wait long for any
+   *       inflight execution to complete</li>
+   *   <li>Should almost always return true in cases like:
+   *     <ul>
+   *       <li>Successfully extending the lock lease</li>
+   *       <li>Transient failures (network partition, remote service errors) 
to allow automatic retry</li>
+   *     </ul>
+   *   </li>
+   *   <li>Should return false only in specific cases:
+   *     <ul>
+   *       <li>When the lock is already expired (no point in extending an 
expired lock)</li>
+   *       <li>When the writer thread does not hold any lock</li>
+   *     </ul>
+   *   </li>
+   * </ul>
+   *
+   * <p>Warning: Returning false stops all future lock renewal attempts. If 
the writer thread
+   * is still running, it will execute with a lock that can expire at any 
time, potentially
+   * leading to corrupted data.
+   */
+  private final Supplier<Boolean> heartbeatFuncToExec;
+  private final long stopHeartbeatTimeoutMs;
+
+  // We ensure within the context of LockProviderHeartbeatManager, 
heartbeatFuncToExec only execute in a single thread periodically.
+  @GuardedBy("this")
+  private ScheduledFuture<?> scheduledFuture;
+
+  /**
+   * Semaphore for managing heartbeat task execution synchronization.
+   *
+   * <p><strong>IMPORTANT: Thread Safety Warning</strong>
+   * This semaphore is mutually exclusive with {@code synchronized(this)}. 
Never synchronize
+   * on {@code this} while the thread is holding the heartbeatSemaphore.
+   *
+   * <p>Execution flow:
+   * <ul>
+   *   <li>Heartbeat task always attempts to acquire the semaphore before 
proceeding and
+   *       releases it before finishing the current round of execution</li>
+   *   <li>The heartbeat manager acquires the semaphore when it needs to:
+   *     <ul>
+   *       <li>Drain any inflight execution</li>
+   *       <li>Prevent further execution of the heartbeat task</li>
+   *     </ul>
+   *   </li>
+   * </ul>
+   */
+  private final Semaphore heartbeatSemaphore;
+
+  private static final Logger DEFAULT_LOGGER = 
LoggerFactory.getLogger(LockProviderHeartbeatManager.class);
+
+  /**
+   * Initializes a heartbeat manager.
+   * @param ownerId The identifier for logging of who owns this heartbeat 
manager.
+   * @param heartbeatTimeMs The time between heartbeat executions.
+   *                        The first heartbeat will execute after this amount 
of time elapses.
+   * @param heartbeatFuncToExec The function to execute on each heartbeat. 
This should handle interrupts.
+   */
+  public LockProviderHeartbeatManager(String ownerId,
+                                      long heartbeatTimeMs,
+                                      Supplier<Boolean> heartbeatFuncToExec) {
+    this(
+            ownerId,
+            createThreadScheduler((ownerId != null && ownerId.length() >= 6) ? 
ownerId.substring(0, 6) : ""),
+            heartbeatTimeMs,
+            DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS,
+            heartbeatFuncToExec,
+            new Semaphore(1),
+            DEFAULT_LOGGER);
+  }
+
+  @VisibleForTesting
+  LockProviderHeartbeatManager(String ownerId,
+                               ScheduledExecutorService scheduler,
+                               long heartbeatTimeMs,
+                               long stopHeartbeatTimeoutMs,
+                               Supplier<Boolean> heartbeatFuncToExec,
+                               Semaphore heartbeatSemaphore,
+                               Logger testLogger) {
+    this.ownerId = ownerId;
+    this.heartbeatTimeMs = heartbeatTimeMs;
+    this.heartbeatFuncToExec = heartbeatFuncToExec;
+    this.logger = testLogger;
+    this.scheduler = scheduler;
+    this.heartbeatSemaphore = heartbeatSemaphore;
+    this.stopHeartbeatTimeoutMs = stopHeartbeatTimeoutMs;
+  }
+
+  /**
+   * Creates a new thread scheduler for heartbeat execution.
+   */
+  private static ScheduledExecutorService createThreadScheduler(String 
shortUuid) {
+    return Executors.newSingleThreadScheduledExecutor(
+            r -> new Thread(r, "LockProvider-HeartbeatManager-Thread-" + 
shortUuid));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public synchronized boolean startHeartbeatForThread(Thread threadToMonitor) {
+    if (threadToMonitor == null) {
+      throw new IllegalArgumentException("threadToMonitor cannot be null.");
+    }
+
+    if (this.hasActiveHeartbeat()) {
+      logger.warn("Owner {}: Heartbeat is already running.", ownerId);
+      return false;
+    }
+    try {
+      scheduledFuture = scheduler.scheduleAtFixedRate(() -> 
heartbeatTaskRunner(threadToMonitor), heartbeatTimeMs, heartbeatTimeMs, 
TimeUnit.MILLISECONDS);
+      logger.debug("Owner {}: Heartbeat started with interval: {} ms", 
ownerId, heartbeatTimeMs);
+      return true;
+    } catch (Exception e) {
+      logger.error("Owner {}: Unable to schedule heartbeat task. {}", ownerId, 
e);
+      return false;
+    }
+  }
+
+  /**
+   * Responsible for managing the execution and result of the heartbeat task.
+   * Maintains a semaphore which ensures thread safety for determining the 
state
+   * of the heartbeat (is the heartbeat executing or not).
+   * @param threadToMonitor The thread to monitor. Required by heartbeat 
execution.
+   */
+  private void heartbeatTaskRunner(Thread threadToMonitor) {
+    if (!heartbeatSemaphore.tryAcquire()) {
+      logger.error("Owner {}: Heartbeat semaphore should be acquirable at the 
start of every heartbeat!", ownerId);
+      return;
+    }
+
+    boolean heartbeatExecutionSuccessful;
+    try {
+      heartbeatExecutionSuccessful = executeHeartbeat(threadToMonitor);
+    } finally {
+      heartbeatSemaphore.release();
+    }
+
+    // Call synchronized method after releasing the semaphore
+    if (!heartbeatExecutionSuccessful) {
+      logger.warn("Owner {}: Heartbeat function did not succeed.", ownerId);
+      // Unschedule self from further execution if heartbeat was unsuccessful.
+      heartbeatTaskUnscheduleItself();
+    }
+  }
+
+  /**
+   * Executes the heartbeat task.
+   * @param threadToMonitor The thread to monitor. If we detect that
+   *                        this thread has stopped we should end the 
heartbeat.
+   * @return Whether the heartbeat task successfully ran.
+   */
+  private boolean executeHeartbeat(Thread threadToMonitor) {
+    // Check if monitored thread is dead
+    if (!threadToMonitor.isAlive()) {
+      logger.warn("Owner {}: Monitored thread is no longer alive.", ownerId);
+      return false;
+    }
+
+    // Execute heartbeat function
+    try {
+      return heartbeatFuncToExec.get();
+    } catch (Exception e) {
+      logger.error("Owner {}: Heartbeat function threw exception {}", ownerId, 
e);
+    }
+    return false;
+  }
+
+  /**
+   * This prevents further scheduling of the heartbeat task. Intended to be 
used by heartbeat task itself.
+   */
+  private synchronized void heartbeatTaskUnscheduleItself() {
+    // Do not interrupt this current task.
+    // This will cancel all future invocations.
+    if (scheduledFuture != null) {
+      boolean cancellationSuccessful = scheduledFuture.cancel(true);
+      logger.info("Owner {}: Requested termination of heartbeat task. 
Cancellation returned {}.", this.ownerId, cancellationSuccessful);
+      scheduledFuture = null;
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public boolean stopHeartbeat(boolean mayInterruptIfRunning) {
+    if (cancelRecurringHeartbeatTask(mayInterruptIfRunning)) {
+      return false;
+    }
+
+    // If we requested to stop heartbeat, here we ensure the cancel request 
results in heartbeat task
+    // exiting synchronously.
+    boolean heartbeatStillInflight = syncWaitInflightHeartbeatTaskToFinish();
+    if (heartbeatStillInflight) {
+      // If waiting for cancellation was interrupted, do not log an error.
+      if (Thread.currentThread().isInterrupted()) {
+        logger.warn("Owner {}: Heartbeat is still in flight due to 
interruption!", ownerId);
+      } else {
+        logger.error("Owner {}: Heartbeat is still in flight!", ownerId);
+      }
+      return false;
+    }
+
+    // We have stopped the heartbeat, now clean up any leftover states.
+    synchronized (this) {
+      logger.debug("Owner {}: Heartbeat task successfully terminated.", 
ownerId);
+      scheduledFuture = null;
+    }
+    return true;
+  }
+
+  /**
+   * Cancels the recurring heartbeat task.
+   * @param mayInterruptIfRunning Whether to interrupt the heartbeat task if 
it is currently running.
+   * @return True if the heartbeat task did not need to be stopped.
+   */
+  private synchronized boolean cancelRecurringHeartbeatTask(boolean 
mayInterruptIfRunning) {
+    if (!this.hasActiveHeartbeat()) {
+      logger.warn("Owner {}: No active heartbeat task to stop.", ownerId);
+      return true;
+    }
+
+    // Attempt to cancel the scheduled future
+    boolean cancellationSuccessful = 
scheduledFuture.cancel(mayInterruptIfRunning);
+    logger.debug("Owner {}: Requested termination of heartbeat task. 
Cancellation returned {}", ownerId, cancellationSuccessful);
+    return false;
+  }
+
+  private boolean syncWaitInflightHeartbeatTaskToFinish() {
+    // Wait for up to stopHeartbeatTimeoutMs for the currently executing 
heartbeat task to complete.
+    // It is assumed that the heartbeat task, when finishing its execution,
+    // sets heartbeatIsExecuting to false and calls notifyAll() on this object.
+    boolean heartbeatStillInflight = true;
+    try {
+      // Semaphore successfully acquired here excludes the heart execution 
task. tryAcquire with timeout
+      // means we wait any inflight task execution to finish synchronously.
+      heartbeatStillInflight = 
!heartbeatSemaphore.tryAcquire(stopHeartbeatTimeoutMs, TimeUnit.MILLISECONDS);
+      if (heartbeatStillInflight) {
+        logger.warn("Owner {}: Timed out while waiting for heartbeat 
termination.", ownerId);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      logger.warn("Owner {}: Interrupted while waiting for heartbeat 
termination.", ownerId);
+    }
+    // If we successfully acquired the semaphore before, return it here.
+    heartbeatSemaphore.release(heartbeatStillInflight ? 0 : 1);
+    return heartbeatStillInflight;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public synchronized boolean hasActiveHeartbeat() {
+    return scheduledFuture != null;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public synchronized void close() throws Exception {
+    if (hasActiveHeartbeat()) {
+      stopHeartbeat(true);
+    }
+    scheduler.shutdown();
+
+    try {
+      if (!scheduler.awaitTermination(5, 
java.util.concurrent.TimeUnit.SECONDS)) {
+        scheduler.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      scheduler.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/TestLockProviderHeartbeatManager.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/TestLockProviderHeartbeatManager.java
new file mode 100644
index 000000000000..cdeca536b3ae
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/TestLockProviderHeartbeatManager.java
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import static 
org.apache.hudi.client.transaction.lock.models.LockProviderHeartbeatManager.DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestLockProviderHeartbeatManager {
+
+  private ScheduledExecutorService mockScheduler;
+  private Logger mockLogger;
+  private ScheduledFuture<?> mockFuture;
+  private HeartbeatManager manager;
+  private static final String LOGGER_ID = "test-owner";
+  private ScheduledExecutorService actualExecutorService;
+
+  @BeforeEach
+  void setUp() {
+    mockScheduler = mock(ScheduledExecutorService.class);
+    mockLogger = mock(Logger.class);
+    mockFuture = mock(ScheduledFuture.class);
+    actualExecutorService = Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r, "Heartbeat-Test-Thread");
+      t.setDaemon(true);
+      return t;
+    });
+  }
+
+  @AfterEach
+  void tearDown() throws Exception {
+    if (manager != null) {
+      manager.close();
+      manager = null;
+    }
+    actualExecutorService.shutdownNow();
+  }
+
+  @Test
+  void testStartHeartbeatSuccess() {
+    when(mockScheduler.scheduleAtFixedRate(any(Runnable.class), eq(100L), 
eq(100L), eq(TimeUnit.MILLISECONDS)))
+            .thenAnswer(invocation -> mockFuture);
+    manager = createDefaultManagerWithMocks(() -> true);
+    assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+  }
+
+  @Test
+  void testStartHeartbeatAlreadyRunning() {
+    when(mockScheduler.scheduleAtFixedRate(any(Runnable.class), anyLong(), 
anyLong(), any(TimeUnit.class)))
+            .thenAnswer(invocation -> mockFuture);
+
+    manager = createDefaultManagerWithMocks(() -> true);
+
+    assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+    assertFalse(manager.startHeartbeatForThread(Thread.currentThread()));
+    verify(mockLogger).warn("Owner {}: Heartbeat is already running.", 
LOGGER_ID);
+  }
+
+  @Test
+  void testStartHeartbeatSchedulerException() {
+    doThrow(new RejectedExecutionException("Scheduler failure"))
+            .when(mockScheduler)
+            .scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), 
any(TimeUnit.class));
+
+    manager = createDefaultManagerWithMocks(() -> true);
+
+    assertFalse(manager.startHeartbeatForThread(Thread.currentThread()));
+    verify(mockLogger).error(eq("Owner {}: Unable to schedule heartbeat task. 
{}"), eq(LOGGER_ID), any(RejectedExecutionException.class));
+  }
+
+  @Test
+  void testStopHeartbeatNeverStarted() {
+    manager = createDefaultManagerWithMocks(() -> true);
+
+    assertFalse(manager.stopHeartbeat(true));
+    verify(mockLogger).warn("Owner {}: No active heartbeat task to stop.", 
LOGGER_ID);
+  }
+
+  @Test
+  void testStopHeartbeatAlreadyRequested() {
+    when(mockScheduler.scheduleAtFixedRate(any(Runnable.class), anyLong(), 
anyLong(), any(TimeUnit.class)))
+            .thenAnswer(invocation -> mockFuture);
+
+    manager = createDefaultManagerWithMocks(() -> true);
+    assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+
+    when(mockFuture.cancel(true)).thenReturn(true);
+    when(mockFuture.isDone()).thenReturn(false).thenReturn(true);
+
+    assertTrue(manager.stopHeartbeat(true));
+
+    // Call stop again
+    assertFalse(manager.stopHeartbeat(true));
+    verify(mockLogger).warn("Owner {}: No active heartbeat task to stop.", 
LOGGER_ID);
+  }
+
+  @Test
+  void testHeartbeatUnableToAcquireSemaphore() throws InterruptedException {
+    CountDownLatch latch = new CountDownLatch(1);
+    AtomicReference<Thread> t = new AtomicReference<>();
+    when(mockScheduler.scheduleAtFixedRate(any(Runnable.class), anyLong(), 
anyLong(), any(TimeUnit.class)))
+            .thenAnswer(invocation -> {
+              Runnable task = invocation.getArgument(0);
+              t.set(new Thread(() -> {
+                task.run();
+                latch.countDown();
+              }));
+              return mockFuture;
+            });
+
+    when(mockFuture.cancel(true)).thenReturn(true);
+    Semaphore semaphore = mock(Semaphore.class);
+
+    // Stub the tryAcquire() method to return false (for the heartbeat) and 
true (for stop)
+    when(semaphore.tryAcquire()).thenReturn(false);
+    when(semaphore.tryAcquire(eq(DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS), 
eq(TimeUnit.MILLISECONDS))).thenReturn(true);
+    manager = new LockProviderHeartbeatManager(
+            LOGGER_ID,
+            mockScheduler,
+            100L,
+            DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS,
+            () -> true,
+            semaphore,
+            mockLogger);
+    assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+    t.get().start();
+    assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
+    assertTrue(manager.stopHeartbeat(true));
+
+    verify(mockLogger).error("Owner {}: Heartbeat semaphore should be 
acquirable at the start of every heartbeat!", LOGGER_ID);
+    assertFalse(manager.hasActiveHeartbeat());
+  }
+
+  @Test
+  void testStopHeartbeatMockSuccessfulCancel() {
+    when(mockScheduler.scheduleAtFixedRate(any(Runnable.class), anyLong(), 
anyLong(), any(TimeUnit.class)))
+            .thenAnswer(invocation -> mockFuture);
+    when(mockFuture.cancel(true)).thenReturn(true);
+
+    manager = createDefaultManagerWithMocks(() -> true);
+    manager.startHeartbeatForThread(Thread.currentThread());
+
+    when(mockFuture.isDone()).thenReturn(false).thenReturn(true);
+    assertTrue(manager.stopHeartbeat(true));
+  }
+
+  @Test
+  void testHeartbeatTaskHandlesInterrupt() throws InterruptedException {
+    CountDownLatch latch = new CountDownLatch(1);
+    AtomicReference<Thread> t = new AtomicReference<>();
+    when(mockScheduler.scheduleAtFixedRate(any(Runnable.class), anyLong(), 
anyLong(), any(TimeUnit.class)))
+            .thenAnswer(invocation -> {
+              Runnable task = invocation.getArgument(0);
+              t.set(new Thread(() -> {
+                task.run();
+                latch.countDown();
+              }));
+              return mockFuture;
+            });
+
+    when(mockFuture.cancel(true)).thenReturn(true);
+
+    // Initialize heartbeat manager with a function that always returns false 
(renewal failure)
+    manager = createDefaultManagerWithMocks(() -> false);
+    assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+    t.get().start();
+    t.get().interrupt();
+
+    assertTrue(latch.await(500, TimeUnit.MILLISECONDS), "Heartbeat task did 
not run in time");
+
+    // This call will wait for heartbeat task to stop itself, as the semaphore 
has already been acquired by the heartbeat task.
+    assertFalse(manager.stopHeartbeat(true));
+
+    verify(mockLogger).warn("Owner {}: No active heartbeat task to stop.", 
LOGGER_ID);
+    verify(mockLogger).debug(
+            "Owner {}: Heartbeat started with interval: {} ms",
+            "test-owner",
+            100L
+    );
+    verify(mockLogger).info("Owner {}: Requested termination of heartbeat 
task. Cancellation returned {}.", LOGGER_ID, true);
+    assertFalse(manager.hasActiveHeartbeat());
+  }
+
+  @Test
+  void testHeartbeatTaskNullWriter() {
+    manager = createDefaultManagerWithMocks(() -> true);
+    assertThrows(IllegalArgumentException.class, () -> 
manager.startHeartbeatForThread(null));
+  }
+
+  @Test
+  void testHeartbeatTaskImmediateDeadMonitoringThread() throws 
InterruptedException {
+    // Use a real thread that will terminate immediately.
+    CountDownLatch latch = new CountDownLatch(1);
+    AtomicReference<Thread> t = new AtomicReference<>();
+    when(mockScheduler.scheduleAtFixedRate(any(Runnable.class), anyLong(), 
anyLong(), any(TimeUnit.class)))
+            .thenAnswer(invocation -> {
+              Runnable task = invocation.getArgument(0);
+              t.set(new Thread(() -> {
+                task.run();
+                latch.countDown();
+              }));
+              return mockFuture;
+            });
+
+    when(mockFuture.cancel(false)).thenReturn(false);
+    Thread deadThread = new Thread(() -> {
+    });
+    deadThread.start();
+    deadThread.join();
+    manager = createDefaultManagerWithMocks(() -> true);
+
+    assertTrue(manager.startHeartbeatForThread(deadThread));
+    t.get().start();
+
+    assertTrue(latch.await(500, TimeUnit.MILLISECONDS), "Heartbeat task did 
not run in time");
+    verify(mockLogger).warn("Owner {}: Monitored thread is no longer alive.", 
LOGGER_ID);
+    verify(mockLogger).info("Owner {}: Requested termination of heartbeat 
task. Cancellation returned {}.", LOGGER_ID, false);
+    assertFalse(manager.hasActiveHeartbeat());
+  }
+
+  @Test
+  void testHeartbeatTaskRenewalException() throws InterruptedException {
+    CountDownLatch latch = new CountDownLatch(1);
+    AtomicReference<Thread> t = new AtomicReference<>();
+    when(mockScheduler.scheduleAtFixedRate(any(Runnable.class), anyLong(), 
anyLong(), any(TimeUnit.class)))
+            .thenAnswer(invocation -> {
+              Runnable task = invocation.getArgument(0);
+              t.set(new Thread(() -> {
+                task.run();
+                latch.countDown();
+              }));
+              return mockFuture;
+            });
+    manager = createDefaultManagerWithMocks(() -> {
+      throw new RuntimeException("Renewal error");
+    });
+
+    assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+    t.get().start();
+    assertTrue(latch.await(500, TimeUnit.MILLISECONDS), "Heartbeat task did 
not run in time");
+    verify(mockLogger).error(
+            eq("Owner {}: Heartbeat function threw exception {}"),
+            eq(LOGGER_ID),
+            any(RuntimeException.class));
+    assertFalse(manager.hasActiveHeartbeat());
+  }
+
+  @Test
+  void testHeartbeatStopWaitsForHeartbeatTaskToFinish() throws 
InterruptedException {
+    // Use a real thread
+    CountDownLatch stopHeartbeatTaskLatch = new CountDownLatch(1);
+    manager = createDefaultManagerWithRealExecutor(() -> {
+      try {
+        // This will freeze the heartbeat task.
+        assertTrue(stopHeartbeatTaskLatch.await(500, TimeUnit.MILLISECONDS));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      return true;
+    });
+
+    assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+    CountDownLatch finishStopHeartbeatLatch = new CountDownLatch(1);
+    Thread t = new Thread(() -> {
+      assertTrue(manager.stopHeartbeat(false));
+      finishStopHeartbeatLatch.countDown();
+    });
+    t.start();
+    // Unblock the heartbeat task.
+    stopHeartbeatTaskLatch.countDown();
+    assertTrue(finishStopHeartbeatLatch.await(500, TimeUnit.MILLISECONDS), 
"Stop heartbeat task did not finish.");
+    assertFalse(manager.hasActiveHeartbeat());
+    verify(mockLogger).debug("Owner {}: Heartbeat task successfully 
terminated.", LOGGER_ID);
+  }
+
+  @Test
+  void testHeartbeatUnableToStopHeartbeatTask() throws InterruptedException {
+    CountDownLatch stopHeartbeatTaskLatch = new CountDownLatch(1);
+    CountDownLatch heartbeatStartedLatch = new CountDownLatch(1);
+    // Set stop heartbeat timeout to 5000ms
+    manager = new LockProviderHeartbeatManager(LOGGER_ID, 
actualExecutorService, 100L, 5000L, () -> {
+      try {
+        // Tells us that the heartbeat has started
+        heartbeatStartedLatch.countDown();
+        // This will freeze the heartbeat task.
+        assertTrue(stopHeartbeatTaskLatch.await(10000, TimeUnit.MILLISECONDS));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      // Regardless of whether we return true or false the future executions 
will be cancelled.
+      return true;
+    }, new Semaphore(1), mockLogger);
+
+    assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+    CountDownLatch stopHeartbeatLatch = new CountDownLatch(1);
+    Thread stopHeartbeatThread = new Thread(() -> {
+      // Try to stop the heartbeat (this should hang for 15 seconds)
+      assertFalse(manager.stopHeartbeat(false));
+      stopHeartbeatLatch.countDown();
+    });
+    assertTrue(heartbeatStartedLatch.await(500, TimeUnit.MILLISECONDS), 
"Heartbeat task did not start.");
+    stopHeartbeatThread.start();
+    assertTrue(stopHeartbeatLatch.await(7000, TimeUnit.MILLISECONDS), "Stop 
heartbeat task did not finish.");
+    assertTrue(manager.hasActiveHeartbeat());
+    verify(mockLogger).error("Owner {}: Heartbeat is still in flight!", 
LOGGER_ID);
+    // Unblock the heartbeat task.
+    stopHeartbeatTaskLatch.countDown();
+  }
+
+  @Test
+  void testHeartbeatInterruptStopHeartbeatTask() throws InterruptedException {
+    CountDownLatch stopHeartbeatTaskLatch = new CountDownLatch(1);
+    CountDownLatch heartbeatStartedLatch = new CountDownLatch(1);
+    // Set stop heartbeat timeout to 5000ms
+    manager = new LockProviderHeartbeatManager(LOGGER_ID, 
actualExecutorService, 100L, 5000L, () -> {
+      try {
+        // Tells us that the heartbeat has started
+        heartbeatStartedLatch.countDown();
+        // This will freeze the heartbeat task.
+        assertTrue(stopHeartbeatTaskLatch.await(10000, TimeUnit.MILLISECONDS));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      // Regardless of whether we return true or false the future executions 
will be cancelled.
+      return true;
+    }, new Semaphore(1), mockLogger);
+
+    assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+    CountDownLatch stopHeartbeatLatch = new CountDownLatch(1);
+    Thread stopHeartbeatThread = new Thread(() -> {
+      // Try to stop the heartbeat (this should hang for 15 seconds)
+      assertFalse(manager.stopHeartbeat(false));
+      stopHeartbeatLatch.countDown();
+    });
+    assertTrue(heartbeatStartedLatch.await(500, TimeUnit.MILLISECONDS), 
"Heartbeat task did not start.");
+    stopHeartbeatThread.start();
+    stopHeartbeatThread.interrupt();
+    assertTrue(stopHeartbeatLatch.await(7000, TimeUnit.MILLISECONDS), "Stop 
heartbeat task did not finish.");
+    assertTrue(manager.hasActiveHeartbeat());
+    verify(mockLogger).warn("Owner {}: Interrupted while waiting for heartbeat 
termination.", LOGGER_ID);
+    // Unblock the heartbeat task.
+    stopHeartbeatTaskLatch.countDown();
+  }
+
+  @Test
+  void testHeartbeatTaskValidateStop() throws InterruptedException {
+    CountDownLatch latch = new CountDownLatch(2);
+
+    manager = createDefaultManagerWithRealExecutor(() -> {
+      latch.countDown();
+      return true;
+    });
+
+    assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+
+    // Wait until at least two heartbeat renewals have occurred
+    assertTrue(latch.await(2000, TimeUnit.MILLISECONDS), "Heartbeat did not 
renew twice in time");
+
+    assertEquals(0, latch.getCount(), "Heartbeat did not execute exactly 
twice");
+
+    assertTrue(manager.hasActiveHeartbeat());
+    assertTrue(manager.stopHeartbeat(false));
+    assertFalse(manager.hasActiveHeartbeat());
+  }
+
+  @Test
+  void testDefaultManagerRapidStartStop1Ms() {
+    manager = new LockProviderHeartbeatManager(LOGGER_ID, 1, () -> true);
+
+    for (int i = 0; i < 100; i++) {
+      assertTrue(manager.startHeartbeatForThread(Thread.currentThread()));
+      assertTrue(manager.hasActiveHeartbeat());
+      assertTrue(manager.stopHeartbeat(true));
+      assertFalse(manager.hasActiveHeartbeat());
+    }
+  }
+
+  @Test
+  void testClose() throws Exception {
+    manager = createDefaultManagerWithMocks(() -> true);
+    manager.close();
+    assertFalse(manager.hasActiveHeartbeat());
+  }
+
+  @Test
+  void testClose_StopsHeartbeatAndShutsDownScheduler() throws Exception {
+    when(mockScheduler.awaitTermination(5, TimeUnit.SECONDS)).thenReturn(true);
+    manager = createDefaultManagerWithMocks(() -> true);
+
+    manager.close();
+
+    verify(mockScheduler).shutdown();
+    verify(mockScheduler, never()).shutdownNow();
+  }
+
+  @Test
+  void testClose_ForceShutdownWhenTerminationTimesOut() throws Exception {
+    when(mockScheduler.awaitTermination(5, 
TimeUnit.SECONDS)).thenReturn(false);
+    manager = createDefaultManagerWithMocks(() -> true);
+
+    manager.close();
+
+    verify(mockScheduler).shutdown();
+    verify(mockScheduler).shutdownNow();
+  }
+
+  @Test
+  void testClose_HandlesInterruptedException() throws Exception {
+    when(mockScheduler.awaitTermination(5, TimeUnit.SECONDS)).thenThrow(new 
InterruptedException());
+    manager = createDefaultManagerWithMocks(() -> true);
+
+    manager.close();
+
+    verify(mockScheduler).shutdown();
+    verify(mockScheduler).shutdownNow();
+    assertTrue(Thread.currentThread().isInterrupted(), "Thread should be 
interrupted after exception handling");
+  }
+
+  private LockProviderHeartbeatManager 
createDefaultManagerWithMocks(Supplier<Boolean> heartbeatFunc) {
+    return new LockProviderHeartbeatManager(
+            LOGGER_ID,
+            mockScheduler,
+            100L,
+            DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS,
+            heartbeatFunc,
+            new Semaphore(1),
+            mockLogger);
+  }
+
+  private LockProviderHeartbeatManager 
createDefaultManagerWithRealExecutor(Supplier<Boolean> heartbeatFunc) {
+    return new LockProviderHeartbeatManager(
+            LOGGER_ID,
+            actualExecutorService,
+            100L,
+            DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS,
+            heartbeatFunc,
+            new Semaphore(1),
+            mockLogger);
+  }
+}


Reply via email to