This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new 1e73c91e3686 [HUDI-9160] Add GCS implementation of
StorageBasedLockProvider (#13715)
1e73c91e3686 is described below
commit 1e73c91e368647e2223283b37101e8dbe306e2b6
Author: Alex R <[email protected]>
AuthorDate: Tue Aug 12 22:37:33 2025 -0700
[HUDI-9160] Add GCS implementation of StorageBasedLockProvider (#13715)
---
hudi-gcp/pom.xml | 17 ++
.../gcp/transaction/lock/GCSStorageLockClient.java | 238 ++++++++++++++++
.../transaction/lock/TestGCSStorageLockClient.java | 304 +++++++++++++++++++++
3 files changed, 559 insertions(+)
diff --git a/hudi-gcp/pom.xml b/hudi-gcp/pom.xml
index 0d70ee2c9e8b..c61de180cea6 100644
--- a/hudi-gcp/pom.xml
+++ b/hudi-gcp/pom.xml
@@ -61,6 +61,11 @@ See
https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google
<artifactId>hudi-sync-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-client-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>com.google.cloud</groupId>
@@ -77,6 +82,10 @@ See
https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google
<artifactId>gcs-connector</artifactId>
<version>${gcs.connector.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.google.cloud</groupId>
+ <artifactId>google-cloud-storage</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
@@ -95,6 +104,14 @@ See
https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google
<artifactId>hadoop-common</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-client-common</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-tests-common</artifactId>
diff --git
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
new file mode 100644
index 000000000000..e9b5e8c82aab
--- /dev/null
+++
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.gcp.transaction.lock;
+
+import org.apache.hudi.client.transaction.lock.StorageLockClient;
+import org.apache.hudi.client.transaction.lock.models.LockGetResult;
+import org.apache.hudi.client.transaction.lock.models.LockUpsertResult;
+import org.apache.hudi.client.transaction.lock.models.StorageLockData;
+import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieLockException;
+
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.channels.Channels;
+import java.util.Properties;
+
+/**
+ * A GCS-based implementation of a distributed lock provider using conditional
writes
+ * with generationMatch, plus local concurrency safety, heartbeat/renew, and
pruning old locks.
+ */
+@ThreadSafe
+public class GCSStorageLockClient implements StorageLockClient {
+ private static final Logger DEFAULT_LOGGER =
LoggerFactory.getLogger(GCSStorageLockClient.class);
+ private static final long PRECONDITION_FAILURE_ERROR_CODE = 412;
+ private static final long NOT_FOUND_ERROR_CODE = 404;
+ private static final long RATE_LIMIT_ERROR_CODE = 429;
+ private static final long INTERNAL_SERVER_ERROR_CODE_MIN = 500;
+ private final Logger logger;
+ private final Storage gcsClient;
+ private final String bucketName;
+ private final String lockFilePath;
+ private final String ownerId;
+
+ /** Constructor that is used by reflection to instantiate a GCS-based
locking service.
+ * @param ownerId The owner id.
+ * @param lockFileUri The path within the bucket where to write lock files.
+ * @param props The properties for the lock config, can be used to customize
client.
+ */
+ public GCSStorageLockClient(
+ String ownerId,
+ String lockFileUri,
+ Properties props) {
+ this(ownerId, lockFileUri, props, createDefaultGcsClient(),
DEFAULT_LOGGER);
+ }
+
+ @VisibleForTesting
+ GCSStorageLockClient(
+ String ownerId,
+ String lockFileUri,
+ Properties properties,
+ Functions.Function1<Properties, Storage> gcsClientSupplier,
+ Logger logger) {
+ try {
+ // This logic can likely be extended to other lock client
implementations.
+ // Consider creating base class with utilities, incl error handling.
+ URI uri = new URI(lockFileUri);
+ this.bucketName = uri.getHost();
+ this.lockFilePath = uri.getPath().replaceFirst("/", "");
+ this.gcsClient = gcsClientSupplier.apply(properties);
+
+ if (StringUtils.isNullOrEmpty(this.bucketName)) {
+ throw new IllegalArgumentException("LockFileUri does not contain a
valid bucket name.");
+ }
+ if (StringUtils.isNullOrEmpty(this.lockFilePath)) {
+ throw new IllegalArgumentException("LockFileUri does not contain a
valid lock file path.");
+ }
+ this.ownerId = ownerId;
+ this.logger = logger;
+ } catch (URISyntaxException e) {
+ throw new HoodieLockException(e);
+ }
+ }
+
+ private static Functions.Function1<Properties, Storage>
createDefaultGcsClient() {
+ return (props) -> {
+ // Provide the option to customize the timeouts later on.
+ // For now, defaults suffice
+ return StorageOptions.newBuilder().build().getService();
+ };
+ }
+
+ /**
+ * Attempts to create or update the lock file using the given lock data and
generation number.
+ *
+ * @param lockData the new lock data to use.
+ * @param generationNumber the expected generation number (0 for creation).
+ * @return the updated StorageLockFile instance.
+ * @throws StorageException if the update fails.
+ */
+ private StorageLockFile createOrUpdateLockFileInternal(StorageLockData
lockData, long generationNumber)
+ throws StorageException {
+ BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(bucketName,
lockFilePath)).build();
+ Blob updatedBlob = gcsClient.create(
+ blobInfo,
+ StorageLockFile.toByteArray(lockData),
+ Storage.BlobTargetOption.generationMatch(generationNumber));
+ return new StorageLockFile(
+ lockData,
+ String.valueOf(updatedBlob.getGeneration()));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Pair<LockUpsertResult, Option<StorageLockFile>> tryUpsertLockFile(
+ StorageLockData newLockData,
+ Option<StorageLockFile> previousLockFile) {
+ long generationNumber = getGenerationNumber(previousLockFile);
+ try {
+ StorageLockFile updatedFile =
createOrUpdateLockFileInternal(newLockData, generationNumber);
+ return Pair.of(LockUpsertResult.SUCCESS, Option.of(updatedFile));
+ } catch (StorageException e) {
+ if (e.getCode() == PRECONDITION_FAILURE_ERROR_CODE) {
+ logger.info("OwnerId: {}, Unable to write new lock file. Another
process has modified this lockfile {} already.",
+ ownerId, lockFilePath);
+ return Pair.of(LockUpsertResult.ACQUIRED_BY_OTHERS, Option.empty());
+ } else if (e.getCode() == RATE_LIMIT_ERROR_CODE) {
+ logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}",
ownerId, lockFilePath);
+ } else if (e.getCode() >= INTERNAL_SERVER_ERROR_CODE_MIN) {
+ logger.warn("OwnerId: {}, GCS returned internal server error code for
lock file: {}",
+ ownerId, lockFilePath, e);
+ } else {
+ throw e;
+ }
+ return Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty());
+ }
+ }
+
+ /**
+ * Handling storage exception for GET request
+ * @param e The error to handle.
+ * @param ignore404 Whether to ignore 404 as a valid exception.
+ * When we read from stream we might see this, and
+ * it should not be counted as NOT_EXISTS.
+ * @return The type of getResult error
+ */
+ private LockGetResult handleGetStorageException(StorageException e, boolean
ignore404) {
+ if (e.getCode() == NOT_FOUND_ERROR_CODE) {
+ if (ignore404) {
+ logger.info("OwnerId: {}, GCS stream read failure detected: {}",
ownerId, lockFilePath);
+ } else {
+ logger.info("OwnerId: {}, Object not found in the path: {}", ownerId,
lockFilePath);
+ return LockGetResult.NOT_EXISTS;
+ }
+ } else if (e.getCode() == RATE_LIMIT_ERROR_CODE) {
+ logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}",
ownerId, lockFilePath);
+ } else if (e.getCode() >= INTERNAL_SERVER_ERROR_CODE_MIN) {
+ logger.warn("OwnerId: {}, GCS returned internal server error code for
lock file: {}", ownerId, lockFilePath, e);
+ } else {
+ throw e;
+ }
+ return LockGetResult.UNKNOWN_ERROR;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Pair<LockGetResult, Option<StorageLockFile>> readCurrentLockFile() {
+ try {
+ Blob blob = gcsClient.get(BlobId.of(bucketName, lockFilePath));
+ if (blob == null) {
+ return Pair.of(LockGetResult.NOT_EXISTS, Option.empty());
+ }
+ return getLockFileFromBlob(blob);
+ } catch (StorageException e) {
+ return Pair.of(handleGetStorageException(e, false), Option.empty());
+ } catch (HoodieIOException e) {
+ // GCS will throw IOException wrapping 404 when reading from stream for
file that has been modified in between calling gcsClient.get
+ // and StorageLockFile.createFromStream. People have complained that
this is not being strongly consistent, however
+ // we have to handle this case. https://stackoverflow.com/q/66759993
+ Throwable cause = e.getCause();
+ if (cause instanceof IOException && cause.getCause() instanceof
StorageException) {
+ return Pair.of(handleGetStorageException((StorageException)
cause.getCause(), true), Option.empty());
+ }
+ throw e;
+ }
+ }
+
+ private @NotNull Pair<LockGetResult, Option<StorageLockFile>>
getLockFileFromBlob(Blob blob) {
+ try (InputStream inputStream = Channels.newInputStream(blob.reader())) {
+ return Pair.of(LockGetResult.SUCCESS,
+ Option.of(StorageLockFile.createFromStream(inputStream,
String.valueOf(blob.getGeneration()))));
+ } catch (IOException e) {
+ // Our createFromStream method does not throw IOExceptions, it wraps in
HoodieIOException, however Sonar requires handling this.
+ throw new UncheckedIOException("Failed reading blob: " + lockFilePath,
e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.gcsClient.close();
+ }
+
+ private long getGenerationNumber(Option<StorageLockFile> file) {
+ return (file.isPresent())
+ ? Long.parseLong(file.get().getVersionId())
+ : 0;
+ }
+}
\ No newline at end of file
diff --git
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClient.java
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClient.java
new file mode 100644
index 000000000000..8b270023d6c7
--- /dev/null
+++
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClient.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.gcp.transaction.lock;
+
+import org.apache.hudi.client.transaction.lock.models.LockGetResult;
+import org.apache.hudi.client.transaction.lock.models.LockUpsertResult;
+import org.apache.hudi.client.transaction.lock.models.StorageLockData;
+import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.contains;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test class for GCSStorageLockClient.
+ */
+@ExtendWith(MockitoExtension.class)
+public class TestGCSStorageLockClient {
+
+ private static final String OWNER_ID = "ownerId";
+ private static final String LOCK_FILE_URI = "gs://bucket/lockFilePath";
+ private static final String LOCK_FILE_PATH = "lockFilePath";
+ private static final String BUCKET_NAME = "bucket";
+
+ @Mock
+ private Storage mockStorage;
+
+ @Mock
+ private Blob mockBlob;
+
+ @Mock
+ private Logger mockLogger;
+
+ private GCSStorageLockClient lockService;
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ /**
+ * Mocks the given {@link Blob} so that when {@code blob.reader()} is called,
+ * it returns a {@link ReadChannel} whose first read supplies JSON
+ * for the provided {@link StorageLockData}, followed by EOF on subsequent
reads.
+ */
+ public static void mockBlobReaderWithLockData(Blob mockBlob, StorageLockData
data) throws IOException {
+ String json = OBJECT_MAPPER.writeValueAsString(data);
+ byte[] jsonBytes = json.getBytes();
+
+ ReadChannel mockReadChannel = mock(ReadChannel.class);
+
+ Answer<Integer> fillBufferWithJson = invocation -> {
+ ByteBuffer buffer = invocation.getArgument(0);
+ buffer.put(jsonBytes);
+ return jsonBytes.length;
+ };
+ doAnswer(fillBufferWithJson)
+ .doAnswer(invocation -> -1)
+ .when(mockReadChannel).read(any(ByteBuffer.class));
+
+ when(mockBlob.reader()).thenReturn(mockReadChannel);
+ }
+
+ @BeforeEach
+ void setUp() {
+ lockService = new GCSStorageLockClient(OWNER_ID, LOCK_FILE_URI, new
Properties(), (a) -> mockStorage, mockLogger);
+ }
+
+ @Test
+ void testTryCreateOrUpdateLockFile_noPreviousLock_success() {
+ StorageLockData lockData = new StorageLockData(false, 123L, "test-owner");
+
+ when(mockStorage.create(
+ any(BlobInfo.class),
+ any(byte[].class),
+ any(Storage.BlobTargetOption.class))
+ ).thenReturn(mockBlob);
+
+ Pair<LockUpsertResult, Option<StorageLockFile>> result =
lockService.tryUpsertLockFile(lockData, Option.empty());
+
+ assertNotNull(result.getRight(), "Expected a valid StorageLockFile on
success");
+ verify(mockStorage).create(
+ any(BlobInfo.class),
+ any(byte[].class),
+ eq(Storage.BlobTargetOption.generationMatch(0))
+ );
+ verifyNoMoreInteractions(mockLogger);
+ }
+
+ @Test
+ void testTryCreateOrUpdateLockFile_withPreviousLock_success() {
+ StorageLockData lockData = new StorageLockData(false, 999L,
"existing-owner");
+ StorageLockFile previousLockFile = new StorageLockFile(lockData, "123");
// versionId=123
+
+ when(mockStorage.create(
+ any(BlobInfo.class),
+ any(byte[].class),
+ any(Storage.BlobTargetOption.class))
+ ).thenReturn(mockBlob);
+
+ Pair<LockUpsertResult, Option<StorageLockFile>> result =
+ lockService.tryUpsertLockFile(lockData, Option.of(previousLockFile));
+
+ assertNotNull(result, "Expected a valid StorageLockFile on success");
+ verify(mockStorage).create(
+ any(BlobInfo.class),
+ any(byte[].class),
+ eq(Storage.BlobTargetOption.generationMatch(123L))
+ );
+ }
+
+ @Test
+ void testTryCreateOrUpdateLockFile_preconditionFailed() {
+ StorageLockData lockData = new StorageLockData(false, 999L, "owner");
+ StorageLockFile previousLockFile = new StorageLockFile(lockData, "123");
+
+ StorageException exception = new StorageException(412, "Precondition
Failed");
+ when(mockStorage.create(any(BlobInfo.class), any(byte[].class),
any(Storage.BlobTargetOption.class)))
+ .thenThrow(exception);
+
+ Pair<LockUpsertResult, Option<StorageLockFile>> result =
lockService.tryUpsertLockFile(lockData, Option.of(previousLockFile));
+
+ assertEquals(LockUpsertResult.ACQUIRED_BY_OTHERS, result.getLeft());
+ assertTrue(result.getRight().isEmpty(), "Should return empty when a 412
occurs");
+ verify(mockLogger).info(contains("Unable to write new lock file. Another
process has modified this lockfile"), eq(OWNER_ID), eq(LOCK_FILE_PATH));
+ }
+
+ @Test
+ void testTryCreateOrUpdateLockFile_rateLimitExceeded() {
+ StorageLockData lockData = new StorageLockData(false, 999L, "owner");
+ StorageException exception = new StorageException(429, "Rate Limit
Exceeded");
+ when(mockStorage.create(any(BlobInfo.class), any(byte[].class),
any(Storage.BlobTargetOption.class)))
+ .thenThrow(exception);
+
+ Pair<LockUpsertResult, Option<StorageLockFile>> result =
lockService.tryUpsertLockFile(lockData, Option.empty());
+
+ assertEquals(LockUpsertResult.UNKNOWN_ERROR, result.getLeft());
+ assertTrue(result.getRight().isEmpty(), "Should return empty when a 429
occurs");
+ verify(mockLogger).warn(contains("Rate limit exceeded"), eq(OWNER_ID),
eq(LOCK_FILE_PATH));
+ }
+
+ @Test
+ void testTryCreateOrUpdateLockFile_serverError() {
+ StorageLockData lockData = new StorageLockData(false, 999L, "owner");
+ StorageException exception = new StorageException(503, "Service
Unavailable");
+ when(mockStorage.create(any(BlobInfo.class), any(byte[].class),
any(Storage.BlobTargetOption.class)))
+ .thenThrow(exception);
+
+ Pair<LockUpsertResult, Option<StorageLockFile>> result =
lockService.tryUpsertLockFile(lockData, Option.empty());
+
+ assertEquals(LockUpsertResult.UNKNOWN_ERROR, result.getLeft());
+ assertTrue(result.getRight().isEmpty(), "Should return empty when a 5xx
error occurs");
+ verify(mockLogger).warn(contains("GCS returned internal server error
code"), eq(OWNER_ID), eq(LOCK_FILE_PATH), eq(exception));
+ }
+
+ @Test
+ void testTryCreateOrUpdateLockFile_unexpectedError() {
+ StorageLockData lockData = new StorageLockData(false, 999L, "owner");
+ StorageException exception = new StorageException(400, "Bad Request");
+ when(mockStorage.create(any(BlobInfo.class), any(byte[].class),
any(Storage.BlobTargetOption.class)))
+ .thenThrow(exception);
+
+ assertThrows(StorageException.class, () ->
+ lockService.tryUpsertLockFile(lockData, Option.empty()),
+ "Expected the method to rethrow the exception"
+ );
+ }
+
+ @Test
+ void testGetCurrentLockFile_blobNotFound() {
+ when(mockStorage.get(BlobId.of(BUCKET_NAME,
LOCK_FILE_PATH))).thenReturn(null);
+
+ Pair<LockGetResult, Option<StorageLockFile>> result =
lockService.readCurrentLockFile();
+
+ assertEquals(LockGetResult.NOT_EXISTS, result.getLeft());
+ assertTrue(result.getRight().isEmpty(), "Expected empty when no blob is
found");
+ }
+
+ @Test
+ void testGetCurrentLockFile_blobFound() throws IOException {
+ when(mockStorage.get(BlobId.of(BUCKET_NAME,
LOCK_FILE_PATH))).thenReturn(mockBlob);
+ mockBlobReaderWithLockData(mockBlob, new StorageLockData(false, 123L,
"owner"));
+ when(mockBlob.getGeneration()).thenReturn(123L);
+
+ Pair<LockGetResult, Option<StorageLockFile>> result =
lockService.readCurrentLockFile();
+
+ assertEquals(LockGetResult.SUCCESS, result.getLeft());
+ assertNotNull(result.getRight(), "Should return a StorageLockFile if blob
is found");
+ assertEquals("123", result.getRight().get().getVersionId(), "Version ID
should match blob generation");
+ }
+
+ @Test
+ void testGetCurrentLockFile_404Error() {
+ StorageException exception404 = new StorageException(404, "Not Found");
+ when(mockStorage.get(BlobId.of(BUCKET_NAME,
LOCK_FILE_PATH))).thenThrow(exception404);
+
+ Pair<LockGetResult, Option<StorageLockFile>> result =
lockService.readCurrentLockFile();
+
+ assertEquals(LockGetResult.NOT_EXISTS, result.getLeft());
+ assertTrue(result.getRight().isEmpty(), "Should return empty on 404
error");
+ verify(mockLogger).info(contains("Object not found"), eq(OWNER_ID),
eq(LOCK_FILE_PATH));
+ }
+
+ @Test
+ void testGetCurrentLockFile_rateLimit() {
+ StorageException exception429 = new StorageException(429, "Rate Limit
Exceeded");
+ when(mockStorage.get(BlobId.of(BUCKET_NAME,
LOCK_FILE_PATH))).thenThrow(exception429);
+
+ Pair<LockGetResult, Option<StorageLockFile>> result =
lockService.readCurrentLockFile();
+
+ assertEquals(LockGetResult.UNKNOWN_ERROR, result.getLeft());
+ assertTrue(result.getRight().isEmpty(), "Should return empty on 429
error");
+ verify(mockLogger).warn(contains("Rate limit exceeded"), eq(OWNER_ID),
eq(LOCK_FILE_PATH));
+ }
+
+ @Test
+ void testGetCurrentLockFile_serverError() {
+ StorageException exception500 = new StorageException(503, "Service
Unavailable");
+ when(mockStorage.get(BlobId.of(BUCKET_NAME,
LOCK_FILE_PATH))).thenThrow(exception500);
+
+ Pair<LockGetResult, Option<StorageLockFile>> result =
lockService.readCurrentLockFile();
+
+ assertEquals(LockGetResult.UNKNOWN_ERROR, result.getLeft());
+ assertTrue(result.getRight().isEmpty(), "Should return empty on 5xx
error");
+ verify(mockLogger).warn(contains("GCS returned internal server error
code"), eq(OWNER_ID), eq(LOCK_FILE_PATH), eq(exception500));
+ }
+
+ @Test
+ void testGetCurrentLockFile_unexpectedError() {
+ StorageException exception400 = new StorageException(400, "Bad Request");
+ when(mockStorage.get(BlobId.of(BUCKET_NAME,
LOCK_FILE_PATH))).thenThrow(exception400);
+
+ assertThrows(StorageException.class, () ->
lockService.readCurrentLockFile(),
+ "Should rethrow unexpected errors");
+ }
+
+ @Test
+ void testGetCurrentLockFile_IOException() {
+ when(mockStorage.get(BlobId.of(BUCKET_NAME,
LOCK_FILE_PATH))).thenReturn(mockBlob);
+ when(mockBlob.reader()).thenThrow(new HoodieIOException("IO Error"));
+
+ assertThrows(HoodieIOException.class, () ->
lockService.readCurrentLockFile());
+ }
+
+ @Test
+ void testGetCurrentLockFile_IOExceptionWrapping404() {
+ when(mockStorage.get(BlobId.of(BUCKET_NAME,
LOCK_FILE_PATH))).thenReturn(mockBlob);
+ when(mockBlob.reader()).thenThrow(new HoodieIOException("IO Error", new
IOException(new StorageException(404, "storage 404"))));
+
+ Pair<LockGetResult, Option<StorageLockFile>> result =
lockService.readCurrentLockFile();
+ assertEquals(LockGetResult.UNKNOWN_ERROR, result.getLeft());
+ assertTrue(result.getRight().isEmpty(), "Should return empty on IO
exception wrapping 404 error");
+ }
+
+ @Test
+ void testClose() throws Exception {
+ lockService.close();
+ verify(mockStorage).close();
+ }
+}