This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new 1e73c91e3686 [HUDI-9160] Add GCS implementation of 
StorageBasedLockProvider (#13715)
1e73c91e3686 is described below

commit 1e73c91e368647e2223283b37101e8dbe306e2b6
Author: Alex R <[email protected]>
AuthorDate: Tue Aug 12 22:37:33 2025 -0700

    [HUDI-9160] Add GCS implementation of StorageBasedLockProvider (#13715)
---
 hudi-gcp/pom.xml                                   |  17 ++
 .../gcp/transaction/lock/GCSStorageLockClient.java | 238 ++++++++++++++++
 .../transaction/lock/TestGCSStorageLockClient.java | 304 +++++++++++++++++++++
 3 files changed, 559 insertions(+)

diff --git a/hudi-gcp/pom.xml b/hudi-gcp/pom.xml
index 0d70ee2c9e8b..c61de180cea6 100644
--- a/hudi-gcp/pom.xml
+++ b/hudi-gcp/pom.xml
@@ -61,6 +61,11 @@ See 
https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google
       <artifactId>hudi-sync-common</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-client-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
 
     <dependency>
       <groupId>com.google.cloud</groupId>
@@ -77,6 +82,10 @@ See 
https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google
       <artifactId>gcs-connector</artifactId>
       <version>${gcs.connector.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.google.cloud</groupId>
+      <artifactId>google-cloud-storage</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.parquet</groupId>
@@ -95,6 +104,14 @@ See 
https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google
       <artifactId>hadoop-common</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-client-common</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.hudi</groupId>
       <artifactId>hudi-tests-common</artifactId>
diff --git 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
new file mode 100644
index 000000000000..e9b5e8c82aab
--- /dev/null
+++ 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.gcp.transaction.lock;
+
+import org.apache.hudi.client.transaction.lock.StorageLockClient;
+import org.apache.hudi.client.transaction.lock.models.LockGetResult;
+import org.apache.hudi.client.transaction.lock.models.LockUpsertResult;
+import org.apache.hudi.client.transaction.lock.models.StorageLockData;
+import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieLockException;
+
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import com.google.cloud.storage.StorageOptions;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.channels.Channels;
+import java.util.Properties;
+
+/**
+ * A GCS-based implementation of a distributed lock provider using conditional 
writes
+ * with generationMatch, plus local concurrency safety, heartbeat/renew, and 
pruning old locks.
+ */
+@ThreadSafe
+public class GCSStorageLockClient implements StorageLockClient {
+  private static final Logger DEFAULT_LOGGER = 
LoggerFactory.getLogger(GCSStorageLockClient.class);
+  private static final long PRECONDITION_FAILURE_ERROR_CODE = 412;
+  private static final long NOT_FOUND_ERROR_CODE = 404;
+  private static final long RATE_LIMIT_ERROR_CODE = 429;
+  private static final long INTERNAL_SERVER_ERROR_CODE_MIN = 500;
+  private final Logger logger;
+  private final Storage gcsClient;
+  private final String bucketName;
+  private final String lockFilePath;
+  private final String ownerId;
+
+  /** Constructor that is used by reflection to instantiate a GCS-based 
locking service.
+   * @param ownerId The owner id.
+   * @param lockFileUri The path within the bucket where to write lock files.
+   * @param props The properties for the lock config, can be used to customize 
client.
+   */
+  public GCSStorageLockClient(
+      String ownerId,
+      String lockFileUri,
+      Properties props) {
+    this(ownerId, lockFileUri, props, createDefaultGcsClient(), 
DEFAULT_LOGGER);
+  }
+
+  @VisibleForTesting
+  GCSStorageLockClient(
+      String ownerId,
+      String lockFileUri,
+      Properties properties,
+      Functions.Function1<Properties, Storage> gcsClientSupplier,
+      Logger logger) {
+    try {
+      // This logic can likely be extended to other lock client 
implementations.
+      // Consider creating base class with utilities, incl error handling.
+      URI uri = new URI(lockFileUri);
+      this.bucketName = uri.getHost();
+      this.lockFilePath = uri.getPath().replaceFirst("/", "");
+      this.gcsClient = gcsClientSupplier.apply(properties);
+
+      if (StringUtils.isNullOrEmpty(this.bucketName)) {
+        throw new IllegalArgumentException("LockFileUri does not contain a 
valid bucket name.");
+      }
+      if (StringUtils.isNullOrEmpty(this.lockFilePath)) {
+        throw new IllegalArgumentException("LockFileUri does not contain a 
valid lock file path.");
+      }
+      this.ownerId = ownerId;
+      this.logger = logger;
+    } catch (URISyntaxException e) {
+      throw new HoodieLockException(e);
+    }
+  }
+
+  private static Functions.Function1<Properties, Storage> 
createDefaultGcsClient() {
+    return (props) -> {
+      // Provide the option to customize the timeouts later on.
+      // For now, defaults suffice
+      return StorageOptions.newBuilder().build().getService();
+    };
+  }
+
+  /**
+   * Attempts to create or update the lock file using the given lock data and 
generation number.
+   *
+   * @param lockData the new lock data to use.
+   * @param generationNumber the expected generation number (0 for creation).
+   * @return the updated StorageLockFile instance.
+   * @throws StorageException if the update fails.
+   */
+  private StorageLockFile createOrUpdateLockFileInternal(StorageLockData 
lockData, long generationNumber)
+      throws StorageException {
+    BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(bucketName, 
lockFilePath)).build();
+    Blob updatedBlob = gcsClient.create(
+        blobInfo,
+        StorageLockFile.toByteArray(lockData),
+        Storage.BlobTargetOption.generationMatch(generationNumber));
+    return new StorageLockFile(
+        lockData,
+        String.valueOf(updatedBlob.getGeneration()));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Pair<LockUpsertResult, Option<StorageLockFile>> tryUpsertLockFile(
+      StorageLockData newLockData,
+      Option<StorageLockFile> previousLockFile) {
+    long generationNumber = getGenerationNumber(previousLockFile);
+    try {
+      StorageLockFile updatedFile = 
createOrUpdateLockFileInternal(newLockData, generationNumber);
+      return Pair.of(LockUpsertResult.SUCCESS, Option.of(updatedFile));
+    } catch (StorageException e) {
+      if (e.getCode() == PRECONDITION_FAILURE_ERROR_CODE) {
+        logger.info("OwnerId: {}, Unable to write new lock file. Another 
process has modified this lockfile {} already.", 
+            ownerId, lockFilePath);
+        return Pair.of(LockUpsertResult.ACQUIRED_BY_OTHERS, Option.empty());
+      } else if (e.getCode() == RATE_LIMIT_ERROR_CODE) {
+        logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}", 
ownerId, lockFilePath);
+      } else if (e.getCode() >= INTERNAL_SERVER_ERROR_CODE_MIN) {
+        logger.warn("OwnerId: {}, GCS returned internal server error code for 
lock file: {}", 
+            ownerId, lockFilePath, e);
+      } else {
+        throw e;
+      }
+      return Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty());
+    }
+  }
+
+  /**
+   * Handling storage exception for GET request
+   * @param e The error to handle.
+   * @param ignore404 Whether to ignore 404 as a valid exception.
+   *                  When we read from stream we might see this, and
+   *                  it should not be counted as NOT_EXISTS.
+   * @return The type of getResult error
+   */
+  private LockGetResult handleGetStorageException(StorageException e, boolean 
ignore404) {
+    if (e.getCode() == NOT_FOUND_ERROR_CODE) {
+      if (ignore404) {
+        logger.info("OwnerId: {}, GCS stream read failure detected: {}", 
ownerId, lockFilePath);
+      } else {
+        logger.info("OwnerId: {}, Object not found in the path: {}", ownerId, 
lockFilePath);
+        return LockGetResult.NOT_EXISTS;
+      }
+    } else if (e.getCode() == RATE_LIMIT_ERROR_CODE) {
+      logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}", 
ownerId, lockFilePath);
+    } else if (e.getCode() >= INTERNAL_SERVER_ERROR_CODE_MIN) {
+      logger.warn("OwnerId: {}, GCS returned internal server error code for 
lock file: {}", ownerId, lockFilePath, e);
+    } else {
+      throw e;
+    }
+    return LockGetResult.UNKNOWN_ERROR;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Pair<LockGetResult, Option<StorageLockFile>> readCurrentLockFile() {
+    try {
+      Blob blob = gcsClient.get(BlobId.of(bucketName, lockFilePath));
+      if (blob == null) {
+        return Pair.of(LockGetResult.NOT_EXISTS, Option.empty());
+      }
+      return getLockFileFromBlob(blob);
+    } catch (StorageException e) {
+      return Pair.of(handleGetStorageException(e, false), Option.empty());
+    } catch (HoodieIOException e) {
+      // GCS will throw IOException wrapping 404 when reading from stream for 
file that has been modified in between calling gcsClient.get
+      // and StorageLockFile.createFromStream. People have complained that 
this is not being strongly consistent, however
+      // we have to handle this case. https://stackoverflow.com/q/66759993
+      Throwable cause = e.getCause();
+      if (cause instanceof IOException && cause.getCause() instanceof 
StorageException) {
+        return Pair.of(handleGetStorageException((StorageException) 
cause.getCause(), true), Option.empty());
+      }
+      throw e;
+    }
+  }
+
+  private @NotNull Pair<LockGetResult, Option<StorageLockFile>> 
getLockFileFromBlob(Blob blob) {
+    try (InputStream inputStream = Channels.newInputStream(blob.reader())) {
+      return Pair.of(LockGetResult.SUCCESS,
+          Option.of(StorageLockFile.createFromStream(inputStream, 
String.valueOf(blob.getGeneration()))));
+    } catch (IOException e) {
+      // Our createFromStream method does not throw IOExceptions, it wraps in 
HoodieIOException, however Sonar requires handling this.
+      throw new UncheckedIOException("Failed reading blob: " + lockFilePath, 
e);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    this.gcsClient.close();
+  }
+
+  private long getGenerationNumber(Option<StorageLockFile> file) {
+    return (file.isPresent())
+        ? Long.parseLong(file.get().getVersionId())
+        : 0;
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClient.java
 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClient.java
new file mode 100644
index 000000000000..8b270023d6c7
--- /dev/null
+++ 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClient.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.gcp.transaction.lock;
+
+import org.apache.hudi.client.transaction.lock.models.LockGetResult;
+import org.apache.hudi.client.transaction.lock.models.LockUpsertResult;
+import org.apache.hudi.client.transaction.lock.models.StorageLockData;
+import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.contains;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test class for GCSStorageLockClient.
+ */
+@ExtendWith(MockitoExtension.class)
+public class TestGCSStorageLockClient {
+
+  private static final String OWNER_ID = "ownerId";
+  private static final String LOCK_FILE_URI = "gs://bucket/lockFilePath";
+  private static final String LOCK_FILE_PATH = "lockFilePath";
+  private static final String BUCKET_NAME = "bucket";
+
+  @Mock
+  private Storage mockStorage;
+
+  @Mock
+  private Blob mockBlob;
+
+  @Mock
+  private Logger mockLogger;
+
+  private GCSStorageLockClient lockService;
+
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  /**
+   * Mocks the given {@link Blob} so that when {@code blob.reader()} is called,
+   * it returns a {@link ReadChannel} whose first read supplies JSON
+   * for the provided {@link StorageLockData}, followed by EOF on subsequent 
reads.
+   */
+  public static void mockBlobReaderWithLockData(Blob mockBlob, StorageLockData 
data) throws IOException {
+    String json = OBJECT_MAPPER.writeValueAsString(data);
+    byte[] jsonBytes = json.getBytes();
+
+    ReadChannel mockReadChannel = mock(ReadChannel.class);
+
+    Answer<Integer> fillBufferWithJson = invocation -> {
+      ByteBuffer buffer = invocation.getArgument(0);
+      buffer.put(jsonBytes);
+      return jsonBytes.length;
+    };
+    doAnswer(fillBufferWithJson)
+        .doAnswer(invocation -> -1)
+        .when(mockReadChannel).read(any(ByteBuffer.class));
+    
+    when(mockBlob.reader()).thenReturn(mockReadChannel);
+  }
+
+  @BeforeEach
+  void setUp() {
+    lockService = new GCSStorageLockClient(OWNER_ID, LOCK_FILE_URI, new 
Properties(), (a) -> mockStorage, mockLogger);
+  }
+
+  @Test
+  void testTryCreateOrUpdateLockFile_noPreviousLock_success() {
+    StorageLockData lockData = new StorageLockData(false, 123L, "test-owner");
+
+    when(mockStorage.create(
+        any(BlobInfo.class),
+        any(byte[].class),
+        any(Storage.BlobTargetOption.class))
+    ).thenReturn(mockBlob);
+
+    Pair<LockUpsertResult, Option<StorageLockFile>> result = 
lockService.tryUpsertLockFile(lockData, Option.empty());
+
+    assertNotNull(result.getRight(), "Expected a valid StorageLockFile on 
success");
+    verify(mockStorage).create(
+        any(BlobInfo.class),
+        any(byte[].class),
+        eq(Storage.BlobTargetOption.generationMatch(0))
+    );
+    verifyNoMoreInteractions(mockLogger);
+  }
+
+  @Test
+  void testTryCreateOrUpdateLockFile_withPreviousLock_success() {
+    StorageLockData lockData = new StorageLockData(false, 999L, 
"existing-owner");
+    StorageLockFile previousLockFile = new StorageLockFile(lockData, "123"); 
// versionId=123
+
+    when(mockStorage.create(
+        any(BlobInfo.class),
+        any(byte[].class),
+        any(Storage.BlobTargetOption.class))
+    ).thenReturn(mockBlob);
+
+    Pair<LockUpsertResult, Option<StorageLockFile>> result =
+        lockService.tryUpsertLockFile(lockData, Option.of(previousLockFile));
+
+    assertNotNull(result, "Expected a valid StorageLockFile on success");
+    verify(mockStorage).create(
+        any(BlobInfo.class),
+        any(byte[].class),
+        eq(Storage.BlobTargetOption.generationMatch(123L))
+    );
+  }
+
+  @Test
+  void testTryCreateOrUpdateLockFile_preconditionFailed() {
+    StorageLockData lockData = new StorageLockData(false, 999L, "owner");
+    StorageLockFile previousLockFile = new StorageLockFile(lockData, "123");
+
+    StorageException exception = new StorageException(412, "Precondition 
Failed");
+    when(mockStorage.create(any(BlobInfo.class), any(byte[].class), 
any(Storage.BlobTargetOption.class)))
+        .thenThrow(exception);
+
+    Pair<LockUpsertResult, Option<StorageLockFile>> result = 
lockService.tryUpsertLockFile(lockData, Option.of(previousLockFile));
+
+    assertEquals(LockUpsertResult.ACQUIRED_BY_OTHERS, result.getLeft());
+    assertTrue(result.getRight().isEmpty(), "Should return empty when a 412 
occurs");
+    verify(mockLogger).info(contains("Unable to write new lock file. Another 
process has modified this lockfile"), eq(OWNER_ID), eq(LOCK_FILE_PATH));
+  }
+
+  @Test
+  void testTryCreateOrUpdateLockFile_rateLimitExceeded() {
+    StorageLockData lockData = new StorageLockData(false, 999L, "owner");
+    StorageException exception = new StorageException(429, "Rate Limit 
Exceeded");
+    when(mockStorage.create(any(BlobInfo.class), any(byte[].class), 
any(Storage.BlobTargetOption.class)))
+        .thenThrow(exception);
+
+    Pair<LockUpsertResult, Option<StorageLockFile>> result = 
lockService.tryUpsertLockFile(lockData, Option.empty());
+
+    assertEquals(LockUpsertResult.UNKNOWN_ERROR, result.getLeft());
+    assertTrue(result.getRight().isEmpty(), "Should return empty when a 429 
occurs");
+    verify(mockLogger).warn(contains("Rate limit exceeded"), eq(OWNER_ID), 
eq(LOCK_FILE_PATH));
+  }
+
+  @Test
+  void testTryCreateOrUpdateLockFile_serverError() {
+    StorageLockData lockData = new StorageLockData(false, 999L, "owner");
+    StorageException exception = new StorageException(503, "Service 
Unavailable");
+    when(mockStorage.create(any(BlobInfo.class), any(byte[].class), 
any(Storage.BlobTargetOption.class)))
+        .thenThrow(exception);
+
+    Pair<LockUpsertResult, Option<StorageLockFile>> result = 
lockService.tryUpsertLockFile(lockData, Option.empty());
+
+    assertEquals(LockUpsertResult.UNKNOWN_ERROR, result.getLeft());
+    assertTrue(result.getRight().isEmpty(), "Should return empty when a 5xx 
error occurs");
+    verify(mockLogger).warn(contains("GCS returned internal server error 
code"), eq(OWNER_ID), eq(LOCK_FILE_PATH), eq(exception));
+  }
+
+  @Test
+  void testTryCreateOrUpdateLockFile_unexpectedError() {
+    StorageLockData lockData = new StorageLockData(false, 999L, "owner");
+    StorageException exception = new StorageException(400, "Bad Request");
+    when(mockStorage.create(any(BlobInfo.class), any(byte[].class), 
any(Storage.BlobTargetOption.class)))
+        .thenThrow(exception);
+
+    assertThrows(StorageException.class, () ->
+            lockService.tryUpsertLockFile(lockData, Option.empty()),
+        "Expected the method to rethrow the exception"
+    );
+  }
+
+  @Test
+  void testGetCurrentLockFile_blobNotFound() {
+    when(mockStorage.get(BlobId.of(BUCKET_NAME, 
LOCK_FILE_PATH))).thenReturn(null);
+
+    Pair<LockGetResult, Option<StorageLockFile>> result = 
lockService.readCurrentLockFile();
+
+    assertEquals(LockGetResult.NOT_EXISTS, result.getLeft());
+    assertTrue(result.getRight().isEmpty(), "Expected empty when no blob is 
found");
+  }
+
+  @Test
+  void testGetCurrentLockFile_blobFound() throws IOException {
+    when(mockStorage.get(BlobId.of(BUCKET_NAME, 
LOCK_FILE_PATH))).thenReturn(mockBlob);
+    mockBlobReaderWithLockData(mockBlob, new StorageLockData(false, 123L, 
"owner"));
+    when(mockBlob.getGeneration()).thenReturn(123L);
+
+    Pair<LockGetResult, Option<StorageLockFile>> result = 
lockService.readCurrentLockFile();
+
+    assertEquals(LockGetResult.SUCCESS, result.getLeft());
+    assertNotNull(result.getRight(), "Should return a StorageLockFile if blob 
is found");
+    assertEquals("123", result.getRight().get().getVersionId(), "Version ID 
should match blob generation");
+  }
+
+  @Test
+  void testGetCurrentLockFile_404Error() {
+    StorageException exception404 = new StorageException(404, "Not Found");
+    when(mockStorage.get(BlobId.of(BUCKET_NAME, 
LOCK_FILE_PATH))).thenThrow(exception404);
+
+    Pair<LockGetResult, Option<StorageLockFile>> result = 
lockService.readCurrentLockFile();
+
+    assertEquals(LockGetResult.NOT_EXISTS, result.getLeft());
+    assertTrue(result.getRight().isEmpty(), "Should return empty on 404 
error");
+    verify(mockLogger).info(contains("Object not found"), eq(OWNER_ID), 
eq(LOCK_FILE_PATH));
+  }
+
+  @Test
+  void testGetCurrentLockFile_rateLimit() {
+    StorageException exception429 = new StorageException(429, "Rate Limit 
Exceeded");
+    when(mockStorage.get(BlobId.of(BUCKET_NAME, 
LOCK_FILE_PATH))).thenThrow(exception429);
+
+    Pair<LockGetResult, Option<StorageLockFile>> result = 
lockService.readCurrentLockFile();
+
+    assertEquals(LockGetResult.UNKNOWN_ERROR, result.getLeft());
+    assertTrue(result.getRight().isEmpty(), "Should return empty on 429 
error");
+    verify(mockLogger).warn(contains("Rate limit exceeded"), eq(OWNER_ID), 
eq(LOCK_FILE_PATH));
+  }
+
+  @Test
+  void testGetCurrentLockFile_serverError() {
+    StorageException exception500 = new StorageException(503, "Service 
Unavailable");
+    when(mockStorage.get(BlobId.of(BUCKET_NAME, 
LOCK_FILE_PATH))).thenThrow(exception500);
+
+    Pair<LockGetResult, Option<StorageLockFile>> result = 
lockService.readCurrentLockFile();
+
+    assertEquals(LockGetResult.UNKNOWN_ERROR, result.getLeft());
+    assertTrue(result.getRight().isEmpty(), "Should return empty on 5xx 
error");
+    verify(mockLogger).warn(contains("GCS returned internal server error 
code"), eq(OWNER_ID), eq(LOCK_FILE_PATH), eq(exception500));
+  }
+
+  @Test
+  void testGetCurrentLockFile_unexpectedError() {
+    StorageException exception400 = new StorageException(400, "Bad Request");
+    when(mockStorage.get(BlobId.of(BUCKET_NAME, 
LOCK_FILE_PATH))).thenThrow(exception400);
+
+    assertThrows(StorageException.class, () -> 
lockService.readCurrentLockFile(),
+        "Should rethrow unexpected errors");
+  }
+
+  @Test
+  void testGetCurrentLockFile_IOException() {
+    when(mockStorage.get(BlobId.of(BUCKET_NAME, 
LOCK_FILE_PATH))).thenReturn(mockBlob);
+    when(mockBlob.reader()).thenThrow(new HoodieIOException("IO Error"));
+
+    assertThrows(HoodieIOException.class, () -> 
lockService.readCurrentLockFile());
+  }
+
+  @Test
+  void testGetCurrentLockFile_IOExceptionWrapping404() {
+    when(mockStorage.get(BlobId.of(BUCKET_NAME, 
LOCK_FILE_PATH))).thenReturn(mockBlob);
+    when(mockBlob.reader()).thenThrow(new HoodieIOException("IO Error", new 
IOException(new StorageException(404, "storage 404"))));
+
+    Pair<LockGetResult, Option<StorageLockFile>> result = 
lockService.readCurrentLockFile();
+    assertEquals(LockGetResult.UNKNOWN_ERROR, result.getLeft());
+    assertTrue(result.getRight().isEmpty(), "Should return empty on IO 
exception wrapping 404 error");
+  }
+
+  @Test
+  void testClose() throws Exception {
+    lockService.close();
+    verify(mockStorage).close();
+  }
+}

Reply via email to