This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 12e87b80afe8 fix: Fix dangling lock from storage based lock (#18439)
12e87b80afe8 is described below

commit 12e87b80afe83a3a01df7b063352c78da8f765fc
Author: Lin Liu <[email protected]>
AuthorDate: Wed May 20 00:14:32 2026 -0700

    fix: Fix dangling lock from storage based lock (#18439)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../aws/transaction/lock/S3StorageLockClient.java  |   1 +
 .../transaction/lock/TestS3StorageLockClient.java  |   6 +-
 .../transaction/lock/AzureStorageLockClient.java   |   5 +-
 .../lock/TestAzureStorageLockClient.java           |   4 +-
 .../transaction/lock/StorageBasedLockProvider.java | 139 +++++++++++++++++----
 .../lock/metrics/HoodieLockMetrics.java            |   9 ++
 .../transaction/lock/models/LockUpsertResult.java  |   4 +-
 .../client/transaction/TestHoodieLockMetrics.java  |  37 ++++++
 .../lock/TestStorageBasedLockProvider.java         | 113 +++++++++++++++++
 .../gcp/transaction/lock/GCSStorageLockClient.java |   1 +
 .../transaction/lock/TestGCSStorageLockClient.java |   5 +-
 11 files changed, 291 insertions(+), 33 deletions(-)

diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java
 
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java
index 10af6c30d2f7..290a38f8661a 100644
--- 
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java
@@ -193,6 +193,7 @@ public class S3StorageLockClient implements 
StorageLockClient {
       logger.info("OwnerId: {}, Retriable conditional request conflict error: 
{}", ownerId, lockFilePath);
     } else if (status == RATE_LIMIT_ERROR_CODE) {
       logger.warn("OwnerId: {}, Rate limit exceeded for: {}", ownerId, 
lockFilePath);
+      return LockUpsertResult.THROTTLED;
     } else if (status >= INTERNAL_SERVER_ERROR_CODE_MIN) {
       logger.warn("OwnerId: {}, internal server error for: {}", ownerId, 
lockFilePath, e);
     } else {
diff --git 
a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClient.java
 
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClient.java
index 0358cfa281ae..487268a6b4a2 100644
--- 
a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClient.java
+++ 
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClient.java
@@ -49,8 +49,10 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.util.Properties;
 
+import static 
org.apache.hudi.client.transaction.lock.models.LockUpsertResult.THROTTLED;
 import static 
org.apache.hudi.client.transaction.lock.models.LockUpsertResult.UNKNOWN_ERROR;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
@@ -222,8 +224,8 @@ class TestS3StorageLockClient {
     Pair<LockUpsertResult, Option<StorageLockFile>> result =
             lockService.tryUpsertLockFile(lockData, Option.empty());
 
-    assertEquals(UNKNOWN_ERROR, result.getLeft());
-    assertTrue(result.getRight().isEmpty());
+    assertEquals(THROTTLED, result.getLeft());
+    assertFalse(result.getRight().isPresent());
     verify(mockLogger).warn(contains("Rate limit exceeded"), eq(OWNER_ID), 
eq(LOCK_FILE_PATH));
   }
 
diff --git 
a/hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java
 
b/hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java
index d344cfc341bd..d62a0f018430 100644
--- 
a/hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java
+++ 
b/hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java
@@ -317,10 +317,11 @@ public class AzureStorageLockClient implements 
StorageLockClient {
       logger.info("OwnerId: {}, Unable to write new lock file. Another process 
has modified this lockfile {} already.",
           ownerId, lockFileUri);
       return LockUpsertResult.ACQUIRED_BY_OTHERS;
-    } else if (code == CONFLICT_ERROR_CODE) {
-      logger.info("OwnerId: {}, Retriable conditional request conflict error: 
{}", ownerId, lockFileUri);
     } else if (code == RATE_LIMIT_ERROR_CODE) {
       logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}", 
ownerId, lockFileUri);
+      return LockUpsertResult.THROTTLED;
+    } else if (code == CONFLICT_ERROR_CODE) {
+      logger.info("OwnerId: {}, Retriable conditional request conflict error: 
{}", ownerId, lockFileUri);
     } else if (code >= INTERNAL_SERVER_ERROR_CODE_MIN) {
       logger.warn("OwnerId: {}, Azure returned internal server error code for 
lock file: {}", ownerId, lockFileUri, e);
     } else {
diff --git 
a/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageLockClient.java
 
b/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageLockClient.java
index d4a1ff822d7c..da6eb7446e07 100644
--- 
a/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageLockClient.java
+++ 
b/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageLockClient.java
@@ -196,7 +196,7 @@ public class TestAzureStorageLockClient {
   }
 
   @Test
-  void testTryUpsertLockFile_rateLimit_returnsUnknownError() {
+  void testTryUpsertLockFile_rateLimit_returnsThrottled() {
     StorageLockData lockData = new StorageLockData(false, 999L, "owner");
     BlobStorageException ex = mock(BlobStorageException.class);
     when(ex.getStatusCode()).thenReturn(429);
@@ -204,7 +204,7 @@ public class TestAzureStorageLockClient {
 
     Pair<LockUpsertResult, Option<StorageLockFile>> result = 
lockClient.tryUpsertLockFile(lockData, Option.empty());
 
-    assertEquals(LockUpsertResult.UNKNOWN_ERROR, result.getLeft());
+    assertEquals(LockUpsertResult.THROTTLED, result.getLeft());
     assertTrue(result.getRight().isEmpty());
     verify(mockLogger).warn(contains("Rate limit exceeded"), eq(OWNER_ID), 
eq(LOCK_FILE_URI));
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
index 69ed1c6f572e..2ba31e36897d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
@@ -86,6 +86,16 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
   // However, since our lock leases are pretty long, we can use a high buffer.
   private static final long CLOCK_DRIFT_BUFFER_MS = 500;
 
+  // Max number of retry attempts on the lock-expire write after a THROTTLED 
response.
+  @VisibleForTesting
+  static final int THROTTLE_MAX_RETRIES = 3;
+
+  // Initial backoff delay; doubles on each subsequent retry (e.g. 1s, 2s, 4s 
for 3 retries).
+  // The base of 1s is tuned to outlast typical cloud-storage rate-limit 
windows
+  // (e.g., GCS's 1-write/sec per object limit) so the first retry has a 
reasonable chance of succeeding.
+  @VisibleForTesting
+  static final long THROTTLE_INITIAL_RETRY_DELAY_SECONDS = 1;
+
   // Use for testing
   private final Logger logger;
 
@@ -346,9 +356,24 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
         newLockData,
         latestLock.getRight());
     if (lockUpdateStatus.getLeft() != LockUpsertResult.SUCCESS) {
-      // failed to acquire the lock, indicates concurrent contention
       logInfoLockState(FAILED_TO_ACQUIRE);
-      
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockAcquirePreconditionFailureMetric);
+      switch (lockUpdateStatus.getLeft()) {
+        case ACQUIRED_BY_OTHERS:
+          // failed to acquire the lock, indicates concurrent contention
+          
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockAcquirePreconditionFailureMetric);
+          break;
+        case THROTTLED:
+          // The write was rejected; we did not acquire. Transient — caller 
may retry.
+          
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockThrottledMetric);
+          break;
+        case UNKNOWN_ERROR:
+          // Lock state is unknown after the upsert attempt; surface it as 
such.
+          
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric);
+          break;
+        default:
+          
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockProviderFatalErrorMetric);
+          break;
+      }
       return false;
     }
     this.setLock(lockUpdateStatus.getRight().get());
@@ -424,22 +449,62 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
    * Unlock by marking our current lock file "expired": true.
    */
   @Override
-  public synchronized void unlock() {
-    assertHeartbeatManagerExists();
-    if (!believesLockMightBeHeld()) {
-      return;
+  public void unlock() {
+    ExpireLockResult expireResult;
+    // Snapshot the exact StorageLockFile we intend to expire. Identity 
comparison in the retry
+    // path guards against a concurrent tryLock() that, if our original lock 
had expired during
+    // the 1s sleep, could have installed a fresh lock that the retry must NOT 
mark expired.
+    StorageLockFile lockToExpire;
+    synchronized (this) {
+      assertHeartbeatManagerExists();
+      if (!believesLockMightBeHeld()) {
+        return;
+      }
+
+      // Try to stop the heartbeat first
+      if (heartbeatManager.hasActiveHeartbeat()) {
+        logger.debug("Owner {}: Gracefully shutting down heartbeat.", ownerId);
+        if (!heartbeatManager.stopHeartbeat(true)) {
+          
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockReleaseFailureMetric);
+          throw new 
HoodieLockException(generateLockStateMessage(FAILED_TO_RELEASE));
+        }
+      }
+
+      // Then expire the current lock.
+      lockToExpire = getLock();
+      expireResult = tryExpireCurrentLock(false);
     }
-    boolean believesNoLongerHoldsLock = true;
 
-    // Try to stop the heartbeat first
-    if (heartbeatManager.hasActiveHeartbeat()) {
-      logger.debug("Owner {}: Gracefully shutting down heartbeat.", ownerId);
-      believesNoLongerHoldsLock &= heartbeatManager.stopHeartbeat(true);
+    // If throttled, retry up to THROTTLE_MAX_RETRIES times with exponential 
backoff. Each sleep
+    // happens outside the monitor so other threads aren't blocked during the 
wait.
+    // Note: when unlock() is called via close() -> shutdown(), the outer 
synchronized caller still
+    // holds the provider monitor through reentrant locking, so other threads 
remain blocked in
+    // that scenario. This is acceptable since close() is a shutdown path, not 
the hot path.
+    for (int attempt = 1; attempt <= THROTTLE_MAX_RETRIES && expireResult == 
ExpireLockResult.THROTTLED; attempt++) {
+      long delaySeconds = THROTTLE_INITIAL_RETRY_DELAY_SECONDS << (attempt - 
1);
+      logger.warn("Owner {}: Lock expiration was throttled (retry {}/{}), 
backing off for {} seconds.",
+          ownerId, attempt, THROTTLE_MAX_RETRIES, delaySeconds);
+      try {
+        sleepForThrottleRetry(delaySeconds);
+      } catch (InterruptedException ie) {
+        // Re-set the interrupt flag and abandon the retry — an interrupted 
thread shouldn't keep
+        // doing work. The caller will see FAILED_TO_RELEASE below.
+        Thread.currentThread().interrupt();
+        
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockReleaseFailureMetric);
+        throw new 
HoodieLockException(generateLockStateMessage(FAILED_TO_RELEASE));
+      }
+      synchronized (this) {
+        // Bail out if the lock was either cleared by another path (e.g. 
shutdown hook,
+        // concurrent unlock) OR replaced by a concurrent tryLock(): operating 
on a fresh
+        // lock would mark a lock that another caller now holds as expired.
+        if (!believesLockMightBeHeld() || getLock() != lockToExpire) {
+          return;
+        }
+        expireResult = tryExpireCurrentLock(false);
+      }
     }
 
-    // Then expire the current lock.
-    believesNoLongerHoldsLock &= tryExpireCurrentLock(false);
-    if (!believesNoLongerHoldsLock) {
+    if (expireResult != ExpireLockResult.SUCCESS) {
       
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockReleaseFailureMetric);
       throw new 
HoodieLockException(generateLockStateMessage(FAILED_TO_RELEASE));
     }
@@ -461,15 +526,26 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
   }
 
   /**
-   * Tries to expire the currently held lock.
+   * Result of a single attempt to expire the currently held lock.
+   */
+  @VisibleForTesting
+  enum ExpireLockResult {
+    SUCCESS,
+    THROTTLED,
+    FAILED
+  }
+
+  /**
+   * Tries to expire the currently held lock. This is a single-attempt 
primitive;
+   * callers are responsible for retry policy.
    *
    * @param fromShutdownHook Whether we are attempting best effort quick 
unlock from shutdown hook.
-   * @return True if we were successfully able to upload an expired lock.
+   * @return The result of the expire attempt.
    */
-  private synchronized boolean tryExpireCurrentLock(boolean fromShutdownHook) {
+  @VisibleForTesting
+  synchronized ExpireLockResult tryExpireCurrentLock(boolean fromShutdownHook) 
{
     // It does not make sense to have heartbeat alive extending the lock lease 
while
-    // here we are trying
-    // to expire the lock.
+    // here we are trying to expire the lock.
     if (!fromShutdownHook && heartbeatManager.hasActiveHeartbeat()) {
       // broken function precondition.
       throw new HoodieLockException("Must stop heartbeat before expire lock 
file");
@@ -477,26 +553,30 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
     logDebugLockState(RELEASING);
     // Upload metadata that will unlock this lock.
     StorageLockData expiredLockData = new StorageLockData(true, 
this.getLock().getValidUntilMs(), ownerId);
-    Pair<LockUpsertResult, Option<StorageLockFile>> result;
     long lockExpirationTimeMs = System.currentTimeMillis();
-    result = this.storageLockClient.tryUpsertLockFile(expiredLockData, 
Option.of(this.getLock()));
+    Pair<LockUpsertResult, Option<StorageLockFile>> result =
+        this.storageLockClient.tryUpsertLockFile(expiredLockData, 
Option.of(this.getLock()));
     switch (result.getLeft()) {
       case UNKNOWN_ERROR:
         // Here we do not know the state of the lock.
         logErrorLockState(FAILED_TO_RELEASE, "Lock state is unknown.");
         
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric);
-        return false;
+        return ExpireLockResult.FAILED;
+      case THROTTLED:
+        logWarnLockState(FAILED_TO_RELEASE, "Lock expiration write was 
throttled.");
+        
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockThrottledMetric);
+        return ExpireLockResult.THROTTLED;
       case SUCCESS:
         logInfoLockState(RELEASED);
         recordAuditOperation(AuditOperationState.END, lockExpirationTimeMs);
         setLock(null);
-        return true;
+        return ExpireLockResult.SUCCESS;
       case ACQUIRED_BY_OTHERS:
         // Lock was acquired by others, indicating heartbeat failure during 
lock hold period.
         logErrorLockState(FAILED_TO_RELEASE, "lock was acquired by others, 
indicating heartbeat failure.");
         setLock(null);
         
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockAcquiredByOthersErrorMetric);
-        return false;
+        return ExpireLockResult.FAILED;
       default:
         
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockReleaseFailureMetric);
         throw new HoodieLockException("Unexpected lock update result: " + 
result.getLeft());
@@ -551,6 +631,12 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
           
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric);
           // Let heartbeat retry later.
           return true;
+        case THROTTLED:
+          // Throttling is transient, let the heartbeat retry on its next 
cycle.
+          logger.warn("Owner {}: Unable to renew lock due to throttling, will 
retry on next heartbeat.", ownerId);
+          
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockThrottledMetric);
+          // Let heartbeat retry later.
+          return true;
         case SUCCESS:
           // Only positive outcome
           this.setLock(currentLock.getRight().get());
@@ -621,6 +707,11 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
     return System.currentTimeMillis();
   }
 
+  @VisibleForTesting
+  void sleepForThrottleRetry(long delaySeconds) throws InterruptedException {
+    TimeUnit.SECONDS.sleep(delaySeconds);
+  }
+
   /**
    * Calculates the lock expiration time based on given timestamp and validity 
period.
    * This is the shared function used by both the lock provider and audit 
service.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
index 988e0f354978..78187dfe7c72 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
@@ -49,6 +49,7 @@ public class HoodieLockMetrics {
   public static final String LOCK_EXPIRATION_DEADLINE_COUNTER_NAME = 
"lock.expiration.deadline";
   public static final String LOCK_DANGLING_COUNTER_NAME = "lock.dangling";
   public static final String LOCK_INTERRUPTED_COUNTER_NAME = 
"lock.interrupted";
+  public static final String LOCK_THROTTLED_COUNTER_NAME = "lock.throttled";
   private final HoodieWriteConfig writeConfig;
   private final boolean isMetricsEnabled;
   private final int keepLastNtimes = 100;
@@ -65,6 +66,7 @@ public class HoodieLockMetrics {
   private transient Counter lockReleaseFailure;
   private transient Counter lockDangling;
   private transient Counter lockInterrupted;
+  private transient Counter lockThrottled;
   private transient Timer lockDuration;
   private transient Timer lockApiRequestDuration;
   private static final Object REGISTRY_LOCK = new Object();
@@ -89,6 +91,7 @@ public class HoodieLockMetrics {
       lockReleaseFailure = 
registry.counter(getMetricsName(LOCK_RELEASE_FAILURE_COUNTER_NAME));
       lockDangling = 
registry.counter(getMetricsName(LOCK_DANGLING_COUNTER_NAME));
       lockInterrupted = 
registry.counter(getMetricsName(LOCK_INTERRUPTED_COUNTER_NAME));
+      lockThrottled = 
registry.counter(getMetricsName(LOCK_THROTTLED_COUNTER_NAME));
       lockDuration = createTimerForMetrics(registry, 
LOCK_ACQUIRE_DURATION_TIMER_NAME);
       lockApiRequestDuration = createTimerForMetrics(registry, 
LOCK_REQUEST_LATENCY_TIMER_NAME);
     }
@@ -202,4 +205,10 @@ public class HoodieLockMetrics {
       this.lockInterrupted.inc();
     }
   }
+
+  public void updateLockThrottledMetric() {
+    if (isMetricsEnabled) {
+      this.lockThrottled.inc();
+    }
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java
index 27f06f59f58b..558a1bf11dd8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java
@@ -29,7 +29,9 @@ public enum LockUpsertResult {
   // Another process has modified the lock file (precondition failure) with 
code 1
   ACQUIRED_BY_OTHERS(1),
   // Unable to determine lock state due to transient errors with code 2
-  UNKNOWN_ERROR(2);
+  UNKNOWN_ERROR(2),
+  // Request was throttled by the storage backend (e.g. HTTP 429) with code 3
+  THROTTLED(3);
 
   private final int code;
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestHoodieLockMetrics.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestHoodieLockMetrics.java
index f20a6dee404b..2712cdb9f5d0 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestHoodieLockMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestHoodieLockMetrics.java
@@ -341,6 +341,41 @@ public class TestHoodieLockMetrics {
         "Lock dangling counter should increment by 2");
   }
 
+  @Test
+  public void testLockThrottledMetric() {
+    HoodieStorage storage = mock(HoodieStorage.class);
+    HoodieMetricsConfig metricsConfig = 
HoodieMetricsConfig.newBuilder().withPath("/test")
+        
.withReporterType(MetricsReporterType.INMEMORY.name()).withLockingMetrics(true).build();
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .forTable("testTable").withPath("/test/path")
+        .withMetricsConfig(metricsConfig)
+        .build();
+    HoodieLockMetrics lockMetrics = new HoodieLockMetrics(writeConfig, 
storage);
+
+    // Get the metrics registry to verify counter values
+    Metrics metrics = Metrics.getInstance(metricsConfig, storage);
+    MetricRegistry registry = metrics.getRegistry();
+    String metricName = writeConfig.getMetricReporterMetricsNamePrefix() + "." 
+ HoodieLockMetrics.LOCK_THROTTLED_COUNTER_NAME;
+
+    // Test that the throttled metric can be called
+    assertDoesNotThrow(lockMetrics::updateLockThrottledMetric,
+        "updateLockThrottledMetric should not throw");
+
+    // Verify the counter exists and increments
+    Counter throttledCounter = registry.getCounters().get(metricName);
+    assertNotNull(throttledCounter, "Lock throttled counter should exist");
+
+    long initialCount = throttledCounter.getCount();
+
+    // Call the metric multiple times
+    lockMetrics.updateLockThrottledMetric();
+    lockMetrics.updateLockThrottledMetric();
+
+    // Verify the counter incremented
+    assertEquals(initialCount + 2, throttledCounter.getCount(),
+        "Lock throttled counter should increment by 2");
+  }
+
   @Test
   public void testNewMetricsWithDisabledLocking() {
     HoodieStorage storage = mock(HoodieStorage.class);
@@ -359,6 +394,8 @@ public class TestHoodieLockMetrics {
         "updateLockExpirationDeadlineMetric should not throw when locking 
metrics disabled");
     assertDoesNotThrow(lockMetrics::updateLockDanglingMetric,
         "updateLockDanglingMetric should not throw when locking metrics 
disabled");
+    assertDoesNotThrow(lockMetrics::updateLockThrottledMetric,
+        "updateLockThrottledMetric should not throw when locking metrics 
disabled");
   }
 
   @Test
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
index f2f967a08237..11ea99df36aa 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
@@ -38,6 +38,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
 import org.slf4j.Logger;
 
 import java.lang.reflect.Method;
@@ -64,6 +65,7 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.refEq;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -411,6 +413,105 @@ class TestStorageBasedLockProvider {
     when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
   }
 
+  @Test
+  void testUnlockSucceedsAfterThrottledRetry() throws InterruptedException {
+    // Simulates the GCP 1-write/sec limit: the first expire attempt gets 
THROTTLED,
+    // the retry (after sleeping outside the monitor) succeeds.
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 Option.empty()));
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, 
Option.of(realLockFile)));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+    assertTrue(lockProvider.tryLock());
+
+    when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
+    
when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true).thenReturn(false);
+    // Skip the real sleep so the test runs instantly.
+    doNothing().when(lockProvider).sleepForThrottleRetry(anyLong());
+    StorageLockFile expiredLockFile = new StorageLockFile(new 
StorageLockData(true, data.getValidUntil(), ownerId), "v2");
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(realLockFile))))
+        .thenReturn(Pair.of(LockUpsertResult.THROTTLED, Option.empty()))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, 
Option.of(expiredLockFile)));
+
+    lockProvider.unlock();
+
+    assertNull(lockProvider.getLock(), "Lock should be null after successful 
unlock on retry");
+    verify(mockLockService, times(2)).tryUpsertLockFile(any(), 
eq(Option.of(realLockFile)));
+  }
+
+  @Test
+  void testUnlockBailsOutWhenLockReplacedDuringRetrySleep() throws 
InterruptedException {
+    // Race: the first expire attempt returns THROTTLED. While unlock() is 
sleeping outside
+    // the monitor, the original lock expires and a concurrent tryLock() 
acquires a fresh
+    // lock (currentLockObj is replaced from lock1 -> lock2). The retry MUST 
detect this
+    // and bail out — operating on lock2 would mark a lock that another caller 
now holds
+    // as expired.
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 Option.empty()));
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile lock1 = new StorageLockFile(data, "v1");
+    StorageLockFile lock2 = new StorageLockFile(data, "v2");
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(lock1)));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+    assertTrue(lockProvider.tryLock());
+
+    when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
+    
when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true).thenReturn(false);
+    // Skip the real sleep so the test runs instantly.
+    doNothing().when(lockProvider).sleepForThrottleRetry(anyLong());
+    // Simulate the race in the same answer that returns THROTTLED: by the 
time the sleep
+    // ends, getLock() returns lock2 (the lock a concurrent tryLock() would 
have acquired).
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lock1))))
+        .thenAnswer(inv -> {
+          doReturn(lock2).when(lockProvider).getLock();
+          return Pair.of(LockUpsertResult.THROTTLED, Option.empty());
+        });
+
+    // unlock() should return normally — the original lock is no longer the 
caller's
+    // responsibility, and we must not touch lock2.
+    lockProvider.unlock();
+
+    verify(mockLockService, times(1)).tryUpsertLockFile(any(), 
eq(Option.of(lock1)));
+    verify(mockLockService, never()).tryUpsertLockFile(any(), 
eq(Option.of(lock2)));
+    when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
+  }
+
+  @Test
+  void testUnlockThrowsExceptionWhenStillThrottledAfterAllRetries() throws 
InterruptedException {
+    // Every attempt (initial + THROTTLE_MAX_RETRIES retries) gets THROTTLED — 
unlock should
+    // exhaust the retry budget, throw FAILED_TO_RELEASE, and the backoff 
sequence should
+    // follow the exponential schedule 1s, 2s, 4s.
+    
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 Option.empty()));
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, 
Option.of(realLockFile)));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+    assertTrue(lockProvider.tryLock());
+
+    when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
+    
when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true).thenReturn(false);
+    // Capture sleep durations to verify the backoff sequence; skip the real 
sleep so the
+    // test runs instantly.
+    ArgumentCaptor<Long> sleepCaptor = ArgumentCaptor.forClass(Long.class);
+    
doNothing().when(lockProvider).sleepForThrottleRetry(sleepCaptor.capture());
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(realLockFile))))
+        .thenReturn(Pair.of(LockUpsertResult.THROTTLED, Option.empty()));
+
+    HoodieLockException exception = assertThrows(HoodieLockException.class, () 
-> lockProvider.unlock());
+    assertTrue(exception.getMessage().contains("FAILED_TO_RELEASE"));
+    // 1 initial attempt + THROTTLE_MAX_RETRIES retries.
+    verify(mockLockService, times(1 + 
StorageBasedLockProvider.THROTTLE_MAX_RETRIES))
+        .tryUpsertLockFile(any(), eq(Option.of(realLockFile)));
+    // Backoff doubles each retry: 1s, 2s, 4s.
+    assertEquals(StorageBasedLockProvider.THROTTLE_MAX_RETRIES, 
sleepCaptor.getAllValues().size());
+    assertEquals(1L, sleepCaptor.getAllValues().get(0));
+    assertEquals(2L, sleepCaptor.getAllValues().get(1));
+    assertEquals(4L, sleepCaptor.getAllValues().get(2));
+    when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
+  }
+
   @Test
   void testCloseFailsToStopHeartbeat() {
     
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
 Option.empty()));
@@ -466,6 +567,18 @@ class TestStorageBasedLockProvider {
     assertTrue(lockProvider.renewLock());
   }
 
+  @Test
+  void testRenewLockThrottledReturnsTrue() {
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile lockFile = new StorageLockFile(data, "v1");
+    doReturn(lockFile).when(lockProvider).getLock();
+    when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile))))
+        .thenReturn(Pair.of(LockUpsertResult.THROTTLED, Option.empty()));
+    // Throttling is transient, renewLock should return true so the heartbeat 
retries later.
+    assertTrue(lockProvider.renewLock());
+    verify(mockLogger).warn("Owner {}: Unable to renew lock due to throttling, 
will retry on next heartbeat.", this.ownerId);
+  }
+
   @Test
   void testRenewLockUnableToUpsertLockFileFatal() {
     StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
diff --git 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
index 123b76bb6926..503d2f07c889 100644
--- 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
+++ 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
@@ -150,6 +150,7 @@ public class GCSStorageLockClient implements 
StorageLockClient {
         return Pair.of(LockUpsertResult.ACQUIRED_BY_OTHERS, Option.empty());
       } else if (e.getCode() == RATE_LIMIT_ERROR_CODE) {
         logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}", 
ownerId, lockFilePath);
+        return Pair.of(LockUpsertResult.THROTTLED, Option.empty());
       } else if (e.getCode() >= INTERNAL_SERVER_ERROR_CODE_MIN) {
         logger.warn("OwnerId: {}, GCS returned internal server error code for 
lock file: {}",
             ownerId, lockFilePath, e);
diff --git 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClient.java
 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClient.java
index 9f57077bc1c8..2033d8bc5073 100644
--- 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClient.java
+++ 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClient.java
@@ -49,6 +49,7 @@ import java.nio.ByteBuffer;
 import java.util.Properties;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -187,8 +188,8 @@ public class TestGCSStorageLockClient {
 
     Pair<LockUpsertResult, Option<StorageLockFile>> result = 
lockService.tryUpsertLockFile(lockData, Option.empty());
 
-    assertEquals(LockUpsertResult.UNKNOWN_ERROR, result.getLeft());
-    assertTrue(result.getRight().isEmpty(), "Should return empty when a 429 
occurs");
+    assertEquals(LockUpsertResult.THROTTLED, result.getLeft());
+    assertFalse(result.getRight().isPresent(), "Should return empty when a 429 
occurs");
     verify(mockLogger).warn(contains("Rate limit exceeded"), eq(OWNER_ID), 
eq(LOCK_FILE_PATH));
   }
 

Reply via email to