This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 12e87b80afe8 fix: Fix dangling lock from storage based lock (#18439)
12e87b80afe8 is described below
commit 12e87b80afe83a3a01df7b063352c78da8f765fc
Author: Lin Liu <[email protected]>
AuthorDate: Wed May 20 00:14:32 2026 -0700
fix: Fix dangling lock from storage based lock (#18439)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../aws/transaction/lock/S3StorageLockClient.java | 1 +
.../transaction/lock/TestS3StorageLockClient.java | 6 +-
.../transaction/lock/AzureStorageLockClient.java | 5 +-
.../lock/TestAzureStorageLockClient.java | 4 +-
.../transaction/lock/StorageBasedLockProvider.java | 139 +++++++++++++++++----
.../lock/metrics/HoodieLockMetrics.java | 9 ++
.../transaction/lock/models/LockUpsertResult.java | 4 +-
.../client/transaction/TestHoodieLockMetrics.java | 37 ++++++
.../lock/TestStorageBasedLockProvider.java | 113 +++++++++++++++++
.../gcp/transaction/lock/GCSStorageLockClient.java | 1 +
.../transaction/lock/TestGCSStorageLockClient.java | 5 +-
11 files changed, 291 insertions(+), 33 deletions(-)
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java
index 10af6c30d2f7..290a38f8661a 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java
@@ -193,6 +193,7 @@ public class S3StorageLockClient implements
StorageLockClient {
logger.info("OwnerId: {}, Retriable conditional request conflict error:
{}", ownerId, lockFilePath);
} else if (status == RATE_LIMIT_ERROR_CODE) {
logger.warn("OwnerId: {}, Rate limit exceeded for: {}", ownerId,
lockFilePath);
+ return LockUpsertResult.THROTTLED;
} else if (status >= INTERNAL_SERVER_ERROR_CODE_MIN) {
logger.warn("OwnerId: {}, internal server error for: {}", ownerId,
lockFilePath, e);
} else {
diff --git
a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClient.java
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClient.java
index 0358cfa281ae..487268a6b4a2 100644
---
a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClient.java
+++
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClient.java
@@ -49,8 +49,10 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Properties;
+import static
org.apache.hudi.client.transaction.lock.models.LockUpsertResult.THROTTLED;
import static
org.apache.hudi.client.transaction.lock.models.LockUpsertResult.UNKNOWN_ERROR;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -222,8 +224,8 @@ class TestS3StorageLockClient {
Pair<LockUpsertResult, Option<StorageLockFile>> result =
lockService.tryUpsertLockFile(lockData, Option.empty());
- assertEquals(UNKNOWN_ERROR, result.getLeft());
- assertTrue(result.getRight().isEmpty());
+ assertEquals(THROTTLED, result.getLeft());
+ assertFalse(result.getRight().isPresent());
verify(mockLogger).warn(contains("Rate limit exceeded"), eq(OWNER_ID),
eq(LOCK_FILE_PATH));
}
diff --git
a/hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java
b/hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java
index d344cfc341bd..d62a0f018430 100644
---
a/hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java
+++
b/hudi-azure/src/main/java/org/apache/hudi/azure/transaction/lock/AzureStorageLockClient.java
@@ -317,10 +317,11 @@ public class AzureStorageLockClient implements
StorageLockClient {
logger.info("OwnerId: {}, Unable to write new lock file. Another process
has modified this lockfile {} already.",
ownerId, lockFileUri);
return LockUpsertResult.ACQUIRED_BY_OTHERS;
- } else if (code == CONFLICT_ERROR_CODE) {
- logger.info("OwnerId: {}, Retriable conditional request conflict error:
{}", ownerId, lockFileUri);
} else if (code == RATE_LIMIT_ERROR_CODE) {
logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}",
ownerId, lockFileUri);
+ return LockUpsertResult.THROTTLED;
+ } else if (code == CONFLICT_ERROR_CODE) {
+ logger.info("OwnerId: {}, Retriable conditional request conflict error:
{}", ownerId, lockFileUri);
} else if (code >= INTERNAL_SERVER_ERROR_CODE_MIN) {
logger.warn("OwnerId: {}, Azure returned internal server error code for
lock file: {}", ownerId, lockFileUri, e);
} else {
diff --git
a/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageLockClient.java
b/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageLockClient.java
index d4a1ff822d7c..da6eb7446e07 100644
---
a/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageLockClient.java
+++
b/hudi-azure/src/test/java/org/apache/hudi/azure/transaction/lock/TestAzureStorageLockClient.java
@@ -196,7 +196,7 @@ public class TestAzureStorageLockClient {
}
@Test
- void testTryUpsertLockFile_rateLimit_returnsUnknownError() {
+ void testTryUpsertLockFile_rateLimit_returnsThrottled() {
StorageLockData lockData = new StorageLockData(false, 999L, "owner");
BlobStorageException ex = mock(BlobStorageException.class);
when(ex.getStatusCode()).thenReturn(429);
@@ -204,7 +204,7 @@ public class TestAzureStorageLockClient {
Pair<LockUpsertResult, Option<StorageLockFile>> result =
lockClient.tryUpsertLockFile(lockData, Option.empty());
- assertEquals(LockUpsertResult.UNKNOWN_ERROR, result.getLeft());
+ assertEquals(LockUpsertResult.THROTTLED, result.getLeft());
assertTrue(result.getRight().isEmpty());
verify(mockLogger).warn(contains("Rate limit exceeded"), eq(OWNER_ID),
eq(LOCK_FILE_URI));
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
index 69ed1c6f572e..2ba31e36897d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
@@ -86,6 +86,16 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
// However, since our lock leases are pretty long, we can use a high buffer.
private static final long CLOCK_DRIFT_BUFFER_MS = 500;
+ // Max number of retry attempts on the lock-expire write after a THROTTLED
response.
+ @VisibleForTesting
+ static final int THROTTLE_MAX_RETRIES = 3;
+
+ // Initial backoff delay; doubles on each subsequent retry (e.g. 1s, 2s, 4s
for 3 retries).
+ // The base of 1s is tuned to outlast typical cloud-storage rate-limit
windows
+ // (e.g., GCS's 1-write/sec per object limit) so the first retry has a
reasonable chance of succeeding.
+ @VisibleForTesting
+ static final long THROTTLE_INITIAL_RETRY_DELAY_SECONDS = 1;
+
// Use for testing
private final Logger logger;
@@ -346,9 +356,24 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
newLockData,
latestLock.getRight());
if (lockUpdateStatus.getLeft() != LockUpsertResult.SUCCESS) {
- // failed to acquire the lock, indicates concurrent contention
logInfoLockState(FAILED_TO_ACQUIRE);
-
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockAcquirePreconditionFailureMetric);
+ switch (lockUpdateStatus.getLeft()) {
+ case ACQUIRED_BY_OTHERS:
+ // failed to acquire the lock, indicates concurrent contention
+
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockAcquirePreconditionFailureMetric);
+ break;
+ case THROTTLED:
+ // The write was rejected; we did not acquire. Transient — caller
may retry.
+
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockThrottledMetric);
+ break;
+ case UNKNOWN_ERROR:
+ // Lock state is unknown after the upsert attempt; surface it as
such.
+
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric);
+ break;
+ default:
+
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockProviderFatalErrorMetric);
+ break;
+ }
return false;
}
this.setLock(lockUpdateStatus.getRight().get());
@@ -424,22 +449,62 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
* Unlock by marking our current lock file "expired": true.
*/
@Override
- public synchronized void unlock() {
- assertHeartbeatManagerExists();
- if (!believesLockMightBeHeld()) {
- return;
+ public void unlock() {
+ ExpireLockResult expireResult;
+ // Snapshot the exact StorageLockFile we intend to expire. Identity
comparison in the retry
+ // path guards against a concurrent tryLock() that, if our original lock
had expired during
+ // the 1s sleep, could have installed a fresh lock that the retry must NOT
mark expired.
+ StorageLockFile lockToExpire;
+ synchronized (this) {
+ assertHeartbeatManagerExists();
+ if (!believesLockMightBeHeld()) {
+ return;
+ }
+
+ // Try to stop the heartbeat first
+ if (heartbeatManager.hasActiveHeartbeat()) {
+ logger.debug("Owner {}: Gracefully shutting down heartbeat.", ownerId);
+ if (!heartbeatManager.stopHeartbeat(true)) {
+
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockReleaseFailureMetric);
+ throw new
HoodieLockException(generateLockStateMessage(FAILED_TO_RELEASE));
+ }
+ }
+
+ // Then expire the current lock.
+ lockToExpire = getLock();
+ expireResult = tryExpireCurrentLock(false);
}
- boolean believesNoLongerHoldsLock = true;
- // Try to stop the heartbeat first
- if (heartbeatManager.hasActiveHeartbeat()) {
- logger.debug("Owner {}: Gracefully shutting down heartbeat.", ownerId);
- believesNoLongerHoldsLock &= heartbeatManager.stopHeartbeat(true);
+ // If throttled, retry up to THROTTLE_MAX_RETRIES times with exponential
backoff. Each sleep
+ // happens outside the monitor so other threads aren't blocked during the
wait.
+ // Note: when unlock() is called via close() -> shutdown(), the outer
synchronized caller still
+ // holds the provider monitor through reentrant locking, so other threads
remain blocked in
+ // that scenario. This is acceptable since close() is a shutdown path, not
the hot path.
+ for (int attempt = 1; attempt <= THROTTLE_MAX_RETRIES && expireResult ==
ExpireLockResult.THROTTLED; attempt++) {
+ long delaySeconds = THROTTLE_INITIAL_RETRY_DELAY_SECONDS << (attempt -
1);
+ logger.warn("Owner {}: Lock expiration was throttled (retry {}/{}),
backing off for {} seconds.",
+ ownerId, attempt, THROTTLE_MAX_RETRIES, delaySeconds);
+ try {
+ sleepForThrottleRetry(delaySeconds);
+ } catch (InterruptedException ie) {
+ // Re-set the interrupt flag and abandon the retry — an interrupted
thread shouldn't keep
+ // doing work. The caller will see FAILED_TO_RELEASE below.
+ Thread.currentThread().interrupt();
+
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockReleaseFailureMetric);
+ throw new
HoodieLockException(generateLockStateMessage(FAILED_TO_RELEASE));
+ }
+ synchronized (this) {
+ // Bail out if the lock was either cleared by another path (e.g.
shutdown hook,
+ // concurrent unlock) OR replaced by a concurrent tryLock(): operating
on a fresh
+ // lock would mark a lock that another caller now holds as expired.
+ if (!believesLockMightBeHeld() || getLock() != lockToExpire) {
+ return;
+ }
+ expireResult = tryExpireCurrentLock(false);
+ }
}
- // Then expire the current lock.
- believesNoLongerHoldsLock &= tryExpireCurrentLock(false);
- if (!believesNoLongerHoldsLock) {
+ if (expireResult != ExpireLockResult.SUCCESS) {
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockReleaseFailureMetric);
throw new
HoodieLockException(generateLockStateMessage(FAILED_TO_RELEASE));
}
@@ -461,15 +526,26 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
}
/**
- * Tries to expire the currently held lock.
+ * Result of a single attempt to expire the currently held lock.
+ */
+ @VisibleForTesting
+ enum ExpireLockResult {
+ SUCCESS,
+ THROTTLED,
+ FAILED
+ }
+
+ /**
+ * Tries to expire the currently held lock. This is a single-attempt
primitive;
+ * callers are responsible for retry policy.
*
* @param fromShutdownHook Whether we are attempting best effort quick
unlock from shutdown hook.
- * @return True if we were successfully able to upload an expired lock.
+ * @return The result of the expire attempt.
*/
- private synchronized boolean tryExpireCurrentLock(boolean fromShutdownHook) {
+ @VisibleForTesting
+ synchronized ExpireLockResult tryExpireCurrentLock(boolean fromShutdownHook)
{
// It does not make sense to have heartbeat alive extending the lock lease
while
- // here we are trying
- // to expire the lock.
+ // here we are trying to expire the lock.
if (!fromShutdownHook && heartbeatManager.hasActiveHeartbeat()) {
// broken function precondition.
throw new HoodieLockException("Must stop heartbeat before expire lock
file");
@@ -477,26 +553,30 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
logDebugLockState(RELEASING);
// Upload metadata that will unlock this lock.
StorageLockData expiredLockData = new StorageLockData(true,
this.getLock().getValidUntilMs(), ownerId);
- Pair<LockUpsertResult, Option<StorageLockFile>> result;
long lockExpirationTimeMs = System.currentTimeMillis();
- result = this.storageLockClient.tryUpsertLockFile(expiredLockData,
Option.of(this.getLock()));
+ Pair<LockUpsertResult, Option<StorageLockFile>> result =
+ this.storageLockClient.tryUpsertLockFile(expiredLockData,
Option.of(this.getLock()));
switch (result.getLeft()) {
case UNKNOWN_ERROR:
// Here we do not know the state of the lock.
logErrorLockState(FAILED_TO_RELEASE, "Lock state is unknown.");
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric);
- return false;
+ return ExpireLockResult.FAILED;
+ case THROTTLED:
+ logWarnLockState(FAILED_TO_RELEASE, "Lock expiration write was
throttled.");
+
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockThrottledMetric);
+ return ExpireLockResult.THROTTLED;
case SUCCESS:
logInfoLockState(RELEASED);
recordAuditOperation(AuditOperationState.END, lockExpirationTimeMs);
setLock(null);
- return true;
+ return ExpireLockResult.SUCCESS;
case ACQUIRED_BY_OTHERS:
// Lock was acquired by others, indicating heartbeat failure during
lock hold period.
logErrorLockState(FAILED_TO_RELEASE, "lock was acquired by others,
indicating heartbeat failure.");
setLock(null);
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockAcquiredByOthersErrorMetric);
- return false;
+ return ExpireLockResult.FAILED;
default:
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockReleaseFailureMetric);
throw new HoodieLockException("Unexpected lock update result: " +
result.getLeft());
@@ -551,6 +631,12 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockStateUnknownMetric);
// Let heartbeat retry later.
return true;
+ case THROTTLED:
+ // Throttling is transient, let the heartbeat retry on its next
cycle.
+ logger.warn("Owner {}: Unable to renew lock due to throttling, will
retry on next heartbeat.", ownerId);
+
hoodieLockMetrics.ifPresent(HoodieLockMetrics::updateLockThrottledMetric);
+ // Let heartbeat retry later.
+ return true;
case SUCCESS:
// Only positive outcome
this.setLock(currentLock.getRight().get());
@@ -621,6 +707,11 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
return System.currentTimeMillis();
}
+ @VisibleForTesting
+ void sleepForThrottleRetry(long delaySeconds) throws InterruptedException {
+ TimeUnit.SECONDS.sleep(delaySeconds);
+ }
+
/**
* Calculates the lock expiration time based on given timestamp and validity
period.
* This is the shared function used by both the lock provider and audit
service.
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
index 988e0f354978..78187dfe7c72 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java
@@ -49,6 +49,7 @@ public class HoodieLockMetrics {
public static final String LOCK_EXPIRATION_DEADLINE_COUNTER_NAME =
"lock.expiration.deadline";
public static final String LOCK_DANGLING_COUNTER_NAME = "lock.dangling";
public static final String LOCK_INTERRUPTED_COUNTER_NAME =
"lock.interrupted";
+ public static final String LOCK_THROTTLED_COUNTER_NAME = "lock.throttled";
private final HoodieWriteConfig writeConfig;
private final boolean isMetricsEnabled;
private final int keepLastNtimes = 100;
@@ -65,6 +66,7 @@ public class HoodieLockMetrics {
private transient Counter lockReleaseFailure;
private transient Counter lockDangling;
private transient Counter lockInterrupted;
+ private transient Counter lockThrottled;
private transient Timer lockDuration;
private transient Timer lockApiRequestDuration;
private static final Object REGISTRY_LOCK = new Object();
@@ -89,6 +91,7 @@ public class HoodieLockMetrics {
lockReleaseFailure =
registry.counter(getMetricsName(LOCK_RELEASE_FAILURE_COUNTER_NAME));
lockDangling =
registry.counter(getMetricsName(LOCK_DANGLING_COUNTER_NAME));
lockInterrupted =
registry.counter(getMetricsName(LOCK_INTERRUPTED_COUNTER_NAME));
+ lockThrottled =
registry.counter(getMetricsName(LOCK_THROTTLED_COUNTER_NAME));
lockDuration = createTimerForMetrics(registry,
LOCK_ACQUIRE_DURATION_TIMER_NAME);
lockApiRequestDuration = createTimerForMetrics(registry,
LOCK_REQUEST_LATENCY_TIMER_NAME);
}
@@ -202,4 +205,10 @@ public class HoodieLockMetrics {
this.lockInterrupted.inc();
}
}
+
+ public void updateLockThrottledMetric() {
+ if (isMetricsEnabled) {
+ this.lockThrottled.inc();
+ }
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java
index 27f06f59f58b..558a1bf11dd8 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java
@@ -29,7 +29,9 @@ public enum LockUpsertResult {
// Another process has modified the lock file (precondition failure) with
code 1
ACQUIRED_BY_OTHERS(1),
// Unable to determine lock state due to transient errors with code 2
- UNKNOWN_ERROR(2);
+ UNKNOWN_ERROR(2),
+ // Request was throttled by the storage backend (e.g. HTTP 429) with code 3
+ THROTTLED(3);
private final int code;
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestHoodieLockMetrics.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestHoodieLockMetrics.java
index f20a6dee404b..2712cdb9f5d0 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestHoodieLockMetrics.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestHoodieLockMetrics.java
@@ -341,6 +341,41 @@ public class TestHoodieLockMetrics {
"Lock dangling counter should increment by 2");
}
+ @Test
+ public void testLockThrottledMetric() {
+ HoodieStorage storage = mock(HoodieStorage.class);
+ HoodieMetricsConfig metricsConfig =
HoodieMetricsConfig.newBuilder().withPath("/test")
+
.withReporterType(MetricsReporterType.INMEMORY.name()).withLockingMetrics(true).build();
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .forTable("testTable").withPath("/test/path")
+ .withMetricsConfig(metricsConfig)
+ .build();
+ HoodieLockMetrics lockMetrics = new HoodieLockMetrics(writeConfig,
storage);
+
+ // Get the metrics registry to verify counter values
+ Metrics metrics = Metrics.getInstance(metricsConfig, storage);
+ MetricRegistry registry = metrics.getRegistry();
+ String metricName = writeConfig.getMetricReporterMetricsNamePrefix() + "."
+ HoodieLockMetrics.LOCK_THROTTLED_COUNTER_NAME;
+
+ // Test that the throttled metric can be called
+ assertDoesNotThrow(lockMetrics::updateLockThrottledMetric,
+ "updateLockThrottledMetric should not throw");
+
+ // Verify the counter exists and increments
+ Counter throttledCounter = registry.getCounters().get(metricName);
+ assertNotNull(throttledCounter, "Lock throttled counter should exist");
+
+ long initialCount = throttledCounter.getCount();
+
+ // Call the metric multiple times
+ lockMetrics.updateLockThrottledMetric();
+ lockMetrics.updateLockThrottledMetric();
+
+ // Verify the counter incremented
+ assertEquals(initialCount + 2, throttledCounter.getCount(),
+ "Lock throttled counter should increment by 2");
+ }
+
@Test
public void testNewMetricsWithDisabledLocking() {
HoodieStorage storage = mock(HoodieStorage.class);
@@ -359,6 +394,8 @@ public class TestHoodieLockMetrics {
"updateLockExpirationDeadlineMetric should not throw when locking
metrics disabled");
assertDoesNotThrow(lockMetrics::updateLockDanglingMetric,
"updateLockDanglingMetric should not throw when locking metrics
disabled");
+ assertDoesNotThrow(lockMetrics::updateLockThrottledMetric,
+ "updateLockThrottledMetric should not throw when locking metrics
disabled");
}
@Test
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
index f2f967a08237..11ea99df36aa 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
@@ -38,6 +38,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;
import java.lang.reflect.Method;
@@ -64,6 +65,7 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.refEq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -411,6 +413,105 @@ class TestStorageBasedLockProvider {
when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
}
+ @Test
+ void testUnlockSucceedsAfterThrottledRetry() throws InterruptedException {
+ // Simulates the GCP 1-write/sec limit: the first expire attempt gets
THROTTLED,
+ // the retry (after sleeping outside the monitor) succeeds.
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
Option.empty()));
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
Option.of(realLockFile)));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+ assertTrue(lockProvider.tryLock());
+
+ when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
+
when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true).thenReturn(false);
+ // Skip the real sleep so the test runs instantly.
+ doNothing().when(lockProvider).sleepForThrottleRetry(anyLong());
+ StorageLockFile expiredLockFile = new StorageLockFile(new
StorageLockData(true, data.getValidUntil(), ownerId), "v2");
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(realLockFile))))
+ .thenReturn(Pair.of(LockUpsertResult.THROTTLED, Option.empty()))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
Option.of(expiredLockFile)));
+
+ lockProvider.unlock();
+
+ assertNull(lockProvider.getLock(), "Lock should be null after successful
unlock on retry");
+ verify(mockLockService, times(2)).tryUpsertLockFile(any(),
eq(Option.of(realLockFile)));
+ }
+
+ @Test
+ void testUnlockBailsOutWhenLockReplacedDuringRetrySleep() throws
InterruptedException {
+ // Race: the first expire attempt returns THROTTLED. While unlock() is
sleeping outside
+ // the monitor, the original lock expires and a concurrent tryLock()
acquires a fresh
+ // lock (currentLockObj is replaced from lock1 -> lock2). The retry MUST
detect this
+ // and bail out — operating on lock2 would mark a lock that another caller
now holds
+ // as expired.
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
Option.empty()));
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile lock1 = new StorageLockFile(data, "v1");
+ StorageLockFile lock2 = new StorageLockFile(data, "v2");
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(lock1)));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+ assertTrue(lockProvider.tryLock());
+
+ when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
+
when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true).thenReturn(false);
+ // Skip the real sleep so the test runs instantly.
+ doNothing().when(lockProvider).sleepForThrottleRetry(anyLong());
+ // Simulate the race in the same answer that returns THROTTLED: by the
time the sleep
+ // ends, getLock() returns lock2 (the lock a concurrent tryLock() would
have acquired).
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lock1))))
+ .thenAnswer(inv -> {
+ doReturn(lock2).when(lockProvider).getLock();
+ return Pair.of(LockUpsertResult.THROTTLED, Option.empty());
+ });
+
+ // unlock() should return normally — the original lock is no longer the
caller's
+ // responsibility, and we must not touch lock2.
+ lockProvider.unlock();
+
+ verify(mockLockService, times(1)).tryUpsertLockFile(any(),
eq(Option.of(lock1)));
+ verify(mockLockService, never()).tryUpsertLockFile(any(),
eq(Option.of(lock2)));
+ when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
+ }
+
+ @Test
+ void testUnlockThrowsExceptionWhenStillThrottledAfterAllRetries() throws
InterruptedException {
+ // Every attempt (initial + THROTTLE_MAX_RETRIES retries) gets THROTTLED —
unlock should
+ // exhaust the retry budget, throw FAILED_TO_RELEASE, and the backoff
sequence should
+ // follow the exponential schedule 1s, 2s, 4s.
+
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
Option.empty()));
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile realLockFile = new StorageLockFile(data, "v1");
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
Option.of(realLockFile)));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+ assertTrue(lockProvider.tryLock());
+
+ when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
+
when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true).thenReturn(false);
+ // Capture sleep durations to verify the backoff sequence; skip the real
sleep so the
+ // test runs instantly.
+ ArgumentCaptor<Long> sleepCaptor = ArgumentCaptor.forClass(Long.class);
+
doNothing().when(lockProvider).sleepForThrottleRetry(sleepCaptor.capture());
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(realLockFile))))
+ .thenReturn(Pair.of(LockUpsertResult.THROTTLED, Option.empty()));
+
+ HoodieLockException exception = assertThrows(HoodieLockException.class, ()
-> lockProvider.unlock());
+ assertTrue(exception.getMessage().contains("FAILED_TO_RELEASE"));
+ // 1 initial attempt + THROTTLE_MAX_RETRIES retries.
+ verify(mockLockService, times(1 +
StorageBasedLockProvider.THROTTLE_MAX_RETRIES))
+ .tryUpsertLockFile(any(), eq(Option.of(realLockFile)));
+ // Backoff doubles each retry: 1s, 2s, 4s.
+ assertEquals(StorageBasedLockProvider.THROTTLE_MAX_RETRIES,
sleepCaptor.getAllValues().size());
+ assertEquals(1L, sleepCaptor.getAllValues().get(0));
+ assertEquals(2L, sleepCaptor.getAllValues().get(1));
+ assertEquals(4L, sleepCaptor.getAllValues().get(2));
+ when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false);
+ }
+
@Test
void testCloseFailsToStopHeartbeat() {
when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS,
Option.empty()));
@@ -466,6 +567,18 @@ class TestStorageBasedLockProvider {
assertTrue(lockProvider.renewLock());
}
+ @Test
+ void testRenewLockThrottledReturnsTrue() {
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile lockFile = new StorageLockFile(data, "v1");
+ doReturn(lockFile).when(lockProvider).getLock();
+ when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile))))
+ .thenReturn(Pair.of(LockUpsertResult.THROTTLED, Option.empty()));
+ // Throttling is transient, renewLock should return true so the heartbeat
retries later.
+ assertTrue(lockProvider.renewLock());
+ verify(mockLogger).warn("Owner {}: Unable to renew lock due to throttling,
will retry on next heartbeat.", this.ownerId);
+ }
+
@Test
void testRenewLockUnableToUpsertLockFileFatal() {
StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
diff --git
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
index 123b76bb6926..503d2f07c889 100644
---
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
+++
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
@@ -150,6 +150,7 @@ public class GCSStorageLockClient implements
StorageLockClient {
return Pair.of(LockUpsertResult.ACQUIRED_BY_OTHERS, Option.empty());
} else if (e.getCode() == RATE_LIMIT_ERROR_CODE) {
logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}",
ownerId, lockFilePath);
+ return Pair.of(LockUpsertResult.THROTTLED, Option.empty());
} else if (e.getCode() >= INTERNAL_SERVER_ERROR_CODE_MIN) {
logger.warn("OwnerId: {}, GCS returned internal server error code for
lock file: {}",
ownerId, lockFilePath, e);
diff --git
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClient.java
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClient.java
index 9f57077bc1c8..2033d8bc5073 100644
---
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClient.java
+++
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClient.java
@@ -49,6 +49,7 @@ import java.nio.ByteBuffer;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -187,8 +188,8 @@ public class TestGCSStorageLockClient {
Pair<LockUpsertResult, Option<StorageLockFile>> result =
lockService.tryUpsertLockFile(lockData, Option.empty());
- assertEquals(LockUpsertResult.UNKNOWN_ERROR, result.getLeft());
- assertTrue(result.getRight().isEmpty(), "Should return empty when a 429
occurs");
+ assertEquals(LockUpsertResult.THROTTLED, result.getLeft());
+ assertFalse(result.getRight().isPresent(), "Should return empty when a 429
occurs");
verify(mockLogger).warn(contains("Rate limit exceeded"), eq(OWNER_ID),
eq(LOCK_FILE_PATH));
}