This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 77ef958515e4 [HUDI-9782] Add audit logging scaffolding to storage 
based LP (#13836)
77ef958515e4 is described below

commit 77ef958515e4a29be07183c35607a7f2e5a541be
Author: Alex R <[email protected]>
AuthorDate: Mon Sep 8 20:11:26 2025 -0700

    [HUDI-9782] Add audit logging scaffolding to storage based LP (#13836)
---
 .../aws/transaction/lock/S3StorageLockClient.java  | 124 +++++++---
 .../transaction/lock/TestS3StorageLockClient.java  |   2 +-
 .../TestS3StorageLockClientAuditOperations.java    | 272 ++++++++++++++++++++
 .../transaction/lock/StorageBasedLockProvider.java | 123 +++++++---
 .../client/transaction/lock/StorageLockClient.java |  75 +++++-
 .../lock/audit/AuditOperationState.java            |  39 +++
 .../transaction/lock/audit/AuditService.java       |  41 ++++
 .../lock/audit/AuditServiceFactory.java            | 113 +++++++++
 .../lock/TestStorageBasedLockProvider.java         | 151 ++++++++++--
 .../lock/audit/TestAuditServiceFactory.java        | 163 ++++++++++++
 .../gcp/transaction/lock/GCSStorageLockClient.java | 111 ++++++---
 .../TestGCSStorageLockClientAuditOperations.java   | 273 +++++++++++++++++++++
 12 files changed, 1364 insertions(+), 123 deletions(-)

diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java
 
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java
index 3fda9ebfe47c..67fef4e75785 100644
--- 
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieLockException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,6 +45,7 @@ import 
software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
 import software.amazon.awssdk.services.s3.model.GetBucketLocationResponse;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
 import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.PutObjectResponse;
 import software.amazon.awssdk.services.s3.model.S3Exception;
@@ -54,8 +54,6 @@ import javax.annotation.concurrent.ThreadSafe;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.time.Duration;
 import java.util.Properties;
 
@@ -94,34 +92,22 @@ public class S3StorageLockClient implements 
StorageLockClient {
 
   @VisibleForTesting
   S3StorageLockClient(String ownerId, String lockFileUri, Properties props, 
Functions.Function2<String, Properties, S3Client> s3ClientSupplier, Logger 
logger) {
-    try {
-      // This logic can likely be extended to other lock client 
implementations.
-      // Consider creating base class with utilities, incl error handling.
-      URI uri = new URI(lockFileUri);
-      this.bucketName = uri.getHost();
-      this.lockFilePath = uri.getPath().replaceFirst("/", "");
-      this.s3Client = s3ClientSupplier.apply(bucketName, props);
-
-      if (StringUtils.isNullOrEmpty(this.bucketName)) {
-        throw new IllegalArgumentException("LockFileUri does not contain a 
valid bucket name.");
-      }
-      if (StringUtils.isNullOrEmpty(this.lockFilePath)) {
-        throw new IllegalArgumentException("LockFileUri does not contain a 
valid lock file path.");
-      }
-      this.ownerId = ownerId;
-      this.logger = logger;
-    } catch (URISyntaxException e) {
-      throw new HoodieLockException(e);
-    }
+    Pair<String, String> bucketAndPath = 
StorageLockClient.parseBucketAndPath(lockFileUri);
+    this.bucketName = bucketAndPath.getLeft();
+    this.lockFilePath = bucketAndPath.getRight();
+
+    this.s3Client = s3ClientSupplier.apply(bucketName, props);
+    this.ownerId = ownerId;
+    this.logger = logger;
   }
 
   @Override
   public Pair<LockGetResult, Option<StorageLockFile>> readCurrentLockFile() {
     try (ResponseInputStream<GetObjectResponse> in = s3Client.getObject(
-            GetObjectRequest.builder()
-                    .bucket(bucketName)
-                    .key(lockFilePath)
-                    .build())) {
+        GetObjectRequest.builder()
+            .bucket(bucketName)
+            .key(lockFilePath)
+            .build())) {
       String eTag = in.response().eTag();
       return Pair.of(LockGetResult.SUCCESS, 
Option.of(StorageLockFile.createFromStream(in, eTag)));
     } catch (S3Exception e) {
@@ -241,16 +227,16 @@ public class S3StorageLockClient implements 
StorageLockClient {
       S3Client s3Client = createS3Client(region, s3CallTimeoutSecs, props);
       if (requiredFallbackRegion) {
         GetBucketLocationResponse bucketLocationResponse = 
s3Client.getBucketLocation(
-                GetBucketLocationRequest.builder().bucket(bucketName).build());
+            GetBucketLocationRequest.builder().bucket(bucketName).build());
         // This is null when the region is US_EAST_1, so we do not need to 
worry about duplicate logic.
         String regionString = 
bucketLocationResponse.locationConstraintAsString();
         if (!StringUtils.isNullOrEmpty(regionString)) {
           // Close existing client and create another.
           s3Client.close();
           return createS3Client(
-                  Region.of(regionString),
-                  s3CallTimeoutSecs,
-                  props);
+              Region.of(regionString),
+              s3CallTimeoutSecs,
+              props);
         }
       }
 
@@ -261,10 +247,80 @@ public class S3StorageLockClient implements 
StorageLockClient {
   private static S3Client createS3Client(Region region, long timeoutSecs, 
Properties props) {
     // Set the timeout, credentials, and region
     return S3Client.builder()
-            .overrideConfiguration(
-                    b -> b.apiCallTimeout(Duration.ofSeconds(timeoutSecs)))
-            
.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(props))
-            .region(region).build();
+        .overrideConfiguration(
+            b -> b.apiCallTimeout(Duration.ofSeconds(timeoutSecs)))
+        
.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(props))
+        .region(region).build();
+  }
+
+  @Override
+  public Option<String> readObject(String filePath, boolean checkExistsFirst) {
+    try {
+      // Parse the file path to get bucket and key
+      Pair<String, String> bucketAndKey = 
StorageLockClient.parseBucketAndPath(filePath);
+      String bucket = bucketAndKey.getLeft();
+      String key = bucketAndKey.getRight();
+
+      if (checkExistsFirst) {
+        // First check if the file exists (lightweight HEAD request)
+        try {
+          s3Client.headObject(HeadObjectRequest.builder()
+              .bucket(bucket)
+              .key(key)
+              .build());
+        } catch (S3Exception e) {
+          if (e.statusCode() == NOT_FOUND_ERROR_CODE) {
+            // File doesn't exist - this is the common case for optional 
configs
+            logger.debug("JSON config file not found: {}", filePath);
+            return Option.empty();
+          }
+          throw e; // Re-throw other errors
+        }
+      }
+
+      // Read the file (either after existence check or directly)
+      String content = s3Client.getObjectAsBytes(
+          GetObjectRequest.builder()
+              .bucket(bucket)
+              .key(key)
+              .build()).asUtf8String();
+
+      return Option.of(content);
+    } catch (S3Exception e) {
+      if (e.statusCode() == NOT_FOUND_ERROR_CODE) {
+        logger.debug("JSON config file not found: {}", filePath);
+        return Option.empty();
+      }
+      logger.warn("Error reading JSON config file: {}", filePath, e);
+      return Option.empty();
+    } catch (Exception e) {
+      logger.warn("Error reading JSON config file: {}", filePath, e);
+      return Option.empty();
+    }
+  }
+
+  @Override
+  public boolean writeObject(String filePath, String content) {
+    try {
+      // Parse the file path to get bucket and key
+      Pair<String, String> bucketAndPath = 
StorageLockClient.parseBucketAndPath(filePath);
+      String bucket = bucketAndPath.getLeft();
+      String key = bucketAndPath.getRight();
+
+      // Write the content to S3
+      s3Client.putObject(
+          PutObjectRequest.builder()
+              .bucket(bucket)
+              .key(key)
+              .build(),
+          RequestBody.fromString(content));
+
+      logger.debug("Successfully wrote object to: {}", filePath);
+      return true;
+    } catch (Exception e) {
+      logger.warn("Error writing object to: {}", filePath, e);
+      return false;
+    }
   }
 
   @Override
diff --git 
a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClient.java
 
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClient.java
index b8b3f077ac66..ce73a2556f61 100644
--- 
a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClient.java
+++ 
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClient.java
@@ -142,7 +142,7 @@ class TestS3StorageLockClient {
             (a,b) -> mockS3Client,
             mockLogger
     ));
-    assertTrue(ex.getMessage().contains("lock file path"));
+    assertTrue(ex.getMessage().contains("path"));
   }
 
   @Test
diff --git 
a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClientAuditOperations.java
 
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClientAuditOperations.java
new file mode 100644
index 000000000000..b92433806644
--- /dev/null
+++ 
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClientAuditOperations.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.aws.transaction.lock;
+
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.contains;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for S3StorageLockClient audit operations (readObject and writeObject 
methods)
+ */
+public class TestS3StorageLockClientAuditOperations {
+
+  private S3Client mockS3Client;
+  private Logger mockLogger;
+  private S3StorageLockClient lockClient;
+
+  @BeforeEach
+  void setUp() {
+    mockS3Client = mock(S3Client.class);
+    mockLogger = mock(Logger.class);
+    String ownerId = "test-owner";
+    String lockFileUri = 
"s3://test-bucket/table/.hoodie/.locks/table_lock.json";
+    lockClient = new S3StorageLockClient(
+        ownerId,
+        lockFileUri,
+        new Properties(),
+        (bucket, props) -> mockS3Client,
+        mockLogger);
+  }
+
+  @Test
+  void testReadConfigWithCheckExistsFirstFileNotFound() {
+    String configPath = 
"s3://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+
+    // HEAD request returns 404
+    S3Exception notFoundException = (S3Exception) S3Exception.builder()
+        .statusCode(404)
+        .message("Not Found")
+        .build();
+    when(mockS3Client.headObject(any(HeadObjectRequest.class)))
+        .thenThrow(notFoundException);
+
+    Option<String> result = lockClient.readObject(configPath, true);
+
+    assertTrue(result.isEmpty());
+    // Should only call HEAD, not GET
+    verify(mockS3Client, times(1)).headObject(any(HeadObjectRequest.class));
+    verify(mockS3Client, 
never()).getObjectAsBytes(any(GetObjectRequest.class));
+  }
+
+  @Test
+  void testReadConfigWithCheckExistsFirstFileExists() {
+    String configPath = 
"s3://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+    String expectedContent = "{\"STORAGE_LP_AUDIT_SERVICE_ENABLED\": true}";
+
+    // HEAD request succeeds
+    HeadObjectResponse headResponse = HeadObjectResponse.builder().build();
+    when(mockS3Client.headObject(any(HeadObjectRequest.class)))
+        .thenReturn(headResponse);
+
+    // GET request returns content
+    ResponseBytes<GetObjectResponse> responseBytes = 
ResponseBytes.fromByteArray(
+        GetObjectResponse.builder().build(),
+        expectedContent.getBytes(StandardCharsets.UTF_8));
+    when(mockS3Client.getObjectAsBytes(any(GetObjectRequest.class)))
+        .thenReturn(responseBytes);
+
+    Option<String> result = lockClient.readObject(configPath, true);
+
+    assertTrue(result.isPresent());
+    assertEquals(expectedContent, result.get());
+    // Should call both HEAD and GET
+    verify(mockS3Client, times(1)).headObject(any(HeadObjectRequest.class));
+    verify(mockS3Client, 
times(1)).getObjectAsBytes(any(GetObjectRequest.class));
+  }
+
+  @Test
+  void testReadConfigWithoutCheckExistsFirstFileNotFound() {
+    String configPath = 
"s3://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+
+    // Direct GET request returns 404
+    S3Exception notFoundException = (S3Exception) S3Exception.builder()
+        .statusCode(404)
+        .message("Not Found")
+        .build();
+    when(mockS3Client.getObjectAsBytes(any(GetObjectRequest.class)))
+        .thenThrow(notFoundException);
+
+    Option<String> result = lockClient.readObject(configPath, false);
+
+    assertTrue(result.isEmpty());
+    // Should not call HEAD, only GET
+    verify(mockS3Client, never()).headObject(any(HeadObjectRequest.class));
+    verify(mockS3Client, 
times(1)).getObjectAsBytes(any(GetObjectRequest.class));
+  }
+
+  @Test
+  void testReadConfigWithoutCheckExistsFirstFileExists() {
+    String configPath = 
"s3://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+    String expectedContent = "{\"STORAGE_LP_AUDIT_SERVICE_ENABLED\": false}";
+
+    // Direct GET request returns content
+    ResponseBytes<GetObjectResponse> responseBytes = 
ResponseBytes.fromByteArray(
+        GetObjectResponse.builder().build(),
+        expectedContent.getBytes(StandardCharsets.UTF_8));
+    when(mockS3Client.getObjectAsBytes(any(GetObjectRequest.class)))
+        .thenReturn(responseBytes);
+
+    Option<String> result = lockClient.readObject(configPath, false);
+
+    assertTrue(result.isPresent());
+    assertEquals(expectedContent, result.get());
+    // Should not call HEAD, only GET
+    verify(mockS3Client, never()).headObject(any(HeadObjectRequest.class));
+    verify(mockS3Client, 
times(1)).getObjectAsBytes(any(GetObjectRequest.class));
+  }
+
+  @Test
+  void testReadConfigWithCheckExistsFirstOtherS3Error() {
+    String configPath = 
"s3://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+
+    // HEAD request returns non-404 error
+    S3Exception serverError = (S3Exception) S3Exception.builder()
+        .statusCode(500)
+        .message("Internal Server Error")
+        .build();
+    when(mockS3Client.headObject(any(HeadObjectRequest.class)))
+        .thenThrow(serverError);
+
+    Option<String> result = lockClient.readObject(configPath, true);
+
+    assertTrue(result.isEmpty());
+    verify(mockS3Client, times(1)).headObject(any(HeadObjectRequest.class));
+    verify(mockS3Client, 
never()).getObjectAsBytes(any(GetObjectRequest.class));
+  }
+
+  @Test
+  void testReadConfigWithInvalidUri() {
+    String invalidPath = "not-a-valid-uri";
+
+    Option<String> result = lockClient.readObject(invalidPath, false);
+
+    assertTrue(result.isEmpty());
+    // Should not make any S3 calls due to URI parsing error
+    verify(mockS3Client, never()).headObject(any(HeadObjectRequest.class));
+  }
+
+  @Test
+  void testReadConfigWithRateLimitError() {
+    String configPath = 
"s3://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+
+    // GET request returns rate limit error
+    S3Exception rateLimitException = (S3Exception) S3Exception.builder()
+        .statusCode(429)
+        .message("Too Many Requests")
+        .build();
+    when(mockS3Client.getObjectAsBytes(any(GetObjectRequest.class)))
+        .thenThrow(rateLimitException);
+
+    Option<String> result = lockClient.readObject(configPath, false);
+
+    assertTrue(result.isEmpty());
+    verify(mockS3Client, 
times(1)).getObjectAsBytes(any(GetObjectRequest.class));
+  }
+
+  // ================================
+  // writeObject() tests
+  // ================================
+
+  @Test
+  void testWriteObject_success() {
+    String filePath = "s3://test-bucket/audit/test-audit.jsonl";
+    String content = "{\"test\": \"data\"}\n";
+    PutObjectResponse putResp = 
PutObjectResponse.builder().eTag("write-etag-123").build();
+    when(mockS3Client.putObject(any(PutObjectRequest.class), 
any(RequestBody.class))).thenReturn(putResp);
+
+    boolean result = lockClient.writeObject(filePath, content);
+
+    assertTrue(result);
+    verify(mockS3Client, times(1)).putObject(
+        
eq(PutObjectRequest.builder().bucket("test-bucket").key("audit/test-audit.jsonl").build()),
+        any(RequestBody.class)
+    );
+    verify(mockLogger).debug("Successfully wrote object to: {}", filePath);
+  }
+
+  @Test
+  void testWriteObject_s3Exception() {
+    String filePath = "s3://test-bucket/audit/test-audit.jsonl";
+    String content = "{\"test\": \"data\"}\n";
+    AwsServiceException s3Exception = 
S3Exception.builder().statusCode(500).build();
+    when(mockS3Client.putObject(any(PutObjectRequest.class), 
any(RequestBody.class))).thenThrow(s3Exception);
+
+    boolean result = lockClient.writeObject(filePath, content);
+
+    assertFalse(result);
+    verify(mockLogger).warn(contains("Error writing object to"), eq(filePath), 
eq(s3Exception));
+  }
+
+  @Test
+  void testWriteObject_invalidPath() {
+    String invalidPath = "invalid-path";
+    String content = "{\"test\": \"data\"}\n";
+
+    boolean result = lockClient.writeObject(invalidPath, content);
+
+    assertFalse(result);
+    verify(mockLogger).warn(contains("Error writing object to"), 
eq(invalidPath), any(Exception.class));
+  }
+
+  @Test
+  void testWriteObject_emptyContent() {
+    String filePath = "s3://test-bucket/audit/empty-content.jsonl";
+    String content = "";
+    PutObjectResponse putResp = 
PutObjectResponse.builder().eTag("empty-etag-456").build();
+    when(mockS3Client.putObject(any(PutObjectRequest.class), 
any(RequestBody.class))).thenReturn(putResp);
+
+    boolean result = lockClient.writeObject(filePath, content);
+
+    assertTrue(result);
+    verify(mockS3Client, times(1)).putObject(
+        
eq(PutObjectRequest.builder().bucket("test-bucket").key("audit/empty-content.jsonl").build()),
+        any(RequestBody.class)
+    );
+    verify(mockLogger).debug("Successfully wrote object to: {}", filePath);
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
index 8c8d0f7d3547..31757906e595 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
@@ -18,6 +18,9 @@
 
 package org.apache.hudi.client.transaction.lock;
 
+import org.apache.hudi.client.transaction.lock.audit.AuditOperationState;
+import org.apache.hudi.client.transaction.lock.audit.AuditService;
+import org.apache.hudi.client.transaction.lock.audit.AuditServiceFactory;
 import org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics;
 import org.apache.hudi.client.transaction.lock.models.HeartbeatManager;
 import org.apache.hudi.client.transaction.lock.models.LockGetResult;
@@ -62,7 +65,6 @@ import static 
org.apache.hudi.common.lock.LockState.FAILED_TO_ACQUIRE;
 import static org.apache.hudi.common.lock.LockState.FAILED_TO_RELEASE;
 import static org.apache.hudi.common.lock.LockState.RELEASED;
 import static org.apache.hudi.common.lock.LockState.RELEASING;
-import static 
org.apache.hudi.common.table.HoodieTableMetaClient.LOCKS_FOLDER_NAME;
 
 /**
  * A distributed filesystem storage based lock provider. This {@link 
LockProvider} implementation
@@ -97,6 +99,8 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
   private final HeartbeatManager heartbeatManager;
   private final transient Thread shutdownThread;
   private final Option<HoodieLockMetrics> hoodieLockMetrics;
+  private Option<AuditService> auditService;
+  private final String basePath;
 
   @GuardedBy("this")
   private StorageLockFile currentLockObj = null;
@@ -114,25 +118,25 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
   /**
    * Default constructor for StorageBasedLockProvider, required by LockManager
    * to instantiate it using reflection.
-   * 
+   *
    * @param lockConfiguration The lock configuration, should be transformable 
into
    *                          StorageBasedLockConfig
    * @param conf              Storage config, ignored.
    */
   public StorageBasedLockProvider(final LockConfiguration lockConfiguration, 
final StorageConfiguration<?> conf) {
     this(
-            UUID.randomUUID().toString(),
-            lockConfiguration.getConfig(),
-            LockProviderHeartbeatManager::new,
-            getStorageLockClientClassName(),
-            LOGGER,
+        UUID.randomUUID().toString(),
+        lockConfiguration.getConfig(),
+        LockProviderHeartbeatManager::new,
+        getStorageLockClientClassName(),
+        LOGGER,
         null);
   }
 
   /**
    * Constructor for StorageBasedLockProvider with HoodieLockMetrics support.
    * This constructor allows lock providers to access metrics for fine-grained 
metrics collection.
-   * 
+   *
    * @param lockConfiguration The lock configuration, should be transformable 
into
    *                          StorageBasedLockConfig
    * @param conf              Storage config, ignored.
@@ -140,21 +144,21 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
    */
   public StorageBasedLockProvider(final LockConfiguration lockConfiguration, 
final StorageConfiguration<?> conf, final HoodieLockMetrics metrics) {
     this(
-            UUID.randomUUID().toString(),
-            lockConfiguration.getConfig(),
-            LockProviderHeartbeatManager::new,
-            getStorageLockClientClassName(),
-            LOGGER,
-            metrics);
+        UUID.randomUUID().toString(),
+        lockConfiguration.getConfig(),
+        LockProviderHeartbeatManager::new,
+        getStorageLockClientClassName(),
+        LOGGER,
+        metrics);
   }
 
   private static Functions.Function3<String, String, TypedProperties, 
StorageLockClient> getStorageLockClientClassName() {
     return (ownerId, lockFilePath, lockConfig) -> {
       try {
         return (StorageLockClient) ReflectionUtils.loadClass(
-                getLockServiceClassName(new URI(lockFilePath).getScheme()),
-                new Class<?>[]{String.class, String.class, Properties.class},
-                new Object[]{ownerId, lockFilePath, lockConfig});
+            getLockServiceClassName(new URI(lockFilePath).getScheme()),
+            new Class<?>[] {String.class, String.class, Properties.class},
+            new Object[] {ownerId, lockFilePath, lockConfig});
       } catch (Throwable e) {
         throw new HoodieLockException("Failed to load and initialize 
StorageLock", e);
       }
@@ -181,18 +185,15 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
     StorageBasedLockConfig config = new 
StorageBasedLockConfig.Builder().fromProperties(properties).build();
     long heartbeatPollSeconds = config.getHeartbeatPollSeconds();
     this.lockValiditySecs = config.getValiditySeconds();
-    this.lockFilePath = String.format(
-            "%s%s%s%s%s",
-            config.getHudiTableBasePath(),
-            StoragePath.SEPARATOR,
-            LOCKS_FOLDER_NAME,
-            StoragePath.SEPARATOR,
-            DEFAULT_TABLE_LOCK_FILE_NAME);
+    this.basePath = config.getHudiTableBasePath();
+    String lockFolderPath = StorageLockClient.getLockFolderPath(basePath);
+    this.lockFilePath = String.format("%s%s%s", lockFolderPath, 
StoragePath.SEPARATOR, DEFAULT_TABLE_LOCK_FILE_NAME);
     this.heartbeatManager = heartbeatManagerLoader.apply(ownerId, 
TimeUnit.SECONDS.toMillis(heartbeatPollSeconds), this::renewLock);
     this.storageLockClient = storageLockClientLoader.apply(ownerId, 
lockFilePath, properties);
     this.ownerId = ownerId;
     this.logger = logger;
     this.hoodieLockMetrics = Option.ofNullable(hoodieLockMetrics);
+    this.auditService = Option.empty(); // Will be created lazily on first 
lock acquisition
     shutdownThread = new Thread(() -> shutdown(true));
     Runtime.getRuntime().addShutdownHook(shutdownThread);
     logger.info("Instantiated new storage-based lock provider, owner: {}, 
lockfilePath: {}", ownerId, lockFilePath);
@@ -267,6 +268,18 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
     } catch (Exception e) {
       logger.error("Owner {}: Heartbeat manager failed to close.", ownerId, e);
     }
+    try {
+      this.auditService.ifPresent(auditService -> {
+        try {
+          auditService.close();
+        } catch (Exception e) {
+          logger.error("Owner {}: Audit service failed to close.", ownerId, e);
+        }
+      });
+      this.auditService = Option.empty();
+    } catch (Exception e) {
+      logger.error("Owner {}: Failed to close audit service.", ownerId, e);
+    }
 
     this.isClosed = true;
   }
@@ -290,7 +303,7 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
 
   /**
    * Attempts a single pass to acquire the lock (non-blocking).
-   * 
+   *
    * @return true if lock acquired, false otherwise
    */
   @Override
@@ -327,7 +340,9 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
     }
 
     // Try to acquire the lock
-    StorageLockData newLockData = new StorageLockData(false, 
getCurrentEpochMs() + TimeUnit.SECONDS.toMillis(lockValiditySecs), ownerId);
+    long acquisitionTimestamp = getCurrentEpochMs();
+    long lockExpirationMs = calculateLockExpiration(acquisitionTimestamp);
+    StorageLockData newLockData = new StorageLockData(false, lockExpirationMs, 
ownerId);
     Pair<LockUpsertResult, Option<StorageLockFile>> lockUpdateStatus = 
this.storageLockClient.tryUpsertLockFile(
         newLockData,
         latestLock.getRight());
@@ -359,6 +374,15 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
     }
 
     logInfoLockState(ACQUIRED);
+
+    // Create audit service lazily on first successful lock acquisition if 
auditing is enabled
+    if (auditService.isEmpty()) {
+      auditService = AuditServiceFactory.createLockProviderAuditService(
+          ownerId, basePath, storageLockClient, acquisitionTimestamp,
+          this::calculateLockExpiration, this::actuallyHoldsLock);
+    }
+
+    recordAuditOperation(AuditOperationState.START, acquisitionTimestamp);
     return true;
   }
 
@@ -371,7 +395,7 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
    * valid only if it exists and has not expired according to its timestamp.
    *
    * @return {@code true} if this provider holds a valid lock, {@code false}
-   *         otherwise
+   * otherwise
    */
   private boolean actuallyHoldsLock() {
     return believesLockMightBeHeld() && isLockStillValid(getLock());
@@ -390,7 +414,7 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
    * StorageBasedLockProvider#actuallyHoldsLock should be used.
    *
    * @return {@code true} if this provider has a non-null lock object,
-   *         {@code false} otherwise
+   * {@code false} otherwise
    * @see StorageBasedLockProvider#actuallyHoldsLock()
    */
   private boolean believesLockMightBeHeld() {
@@ -439,6 +463,7 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
 
   /**
    * Tries to expire the currently held lock.
+   *
    * @param fromShutdownHook Whether we are attempting best effort quick 
unlock from shutdown hook.
    * @return True if we were successfully able to upload an expired lock.
    */
@@ -463,6 +488,7 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
         return false;
       case SUCCESS:
         logInfoLockState(RELEASED);
+        recordAuditOperation(AuditOperationState.END, 
System.currentTimeMillis());
         setLock(null);
         return true;
       case ACQUIRED_BY_OTHERS:
@@ -481,6 +507,7 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
    * Renews (heartbeats) the current lock if we are the holder, it forcefully 
set
    * the expiration flag
    * to false and the lock expiration time to a later time in the future.
+   *
    * @return True if we successfully renewed the lock, false if not.
    */
   @VisibleForTesting
@@ -505,8 +532,10 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
       // Action taken for corner case 2 is just a best effort mitigation. At 
least it
       // prevents further data corruption by
       // letting someone else acquire the lock.
+      long acquisitionTimestamp = getCurrentEpochMs();
+      long lockExpirationMs = calculateLockExpiration(acquisitionTimestamp);
       Pair<LockUpsertResult, Option<StorageLockFile>> currentLock = 
this.storageLockClient.tryUpsertLockFile(
-          new StorageLockData(false, getCurrentEpochMs() + 
TimeUnit.SECONDS.toMillis(lockValiditySecs), ownerId),
+          new StorageLockData(false, lockExpirationMs, ownerId),
           Option.of(getLock()));
       switch (currentLock.getLeft()) {
         case ACQUIRED_BY_OTHERS:
@@ -529,6 +558,7 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
               (int) (oldExpirationMs - getCurrentEpochMs())));
           logger.info("Owner {}: Lock renewal successful. The renewal 
completes {} ms before expiration for lock {}.",
               ownerId, oldExpirationMs - getCurrentEpochMs(), lockFilePath);
+          recordAuditOperation(AuditOperationState.RENEW, 
acquisitionTimestamp);
           // Let heartbeat continue to renew lock lease again later.
           return true;
         default:
@@ -556,11 +586,11 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
   private String generateLockStateMessage(LockState state) {
     String threadName = Thread.currentThread().getName();
     return String.format(
-            "Owner %s: Lock file path %s, Thread %s, Storage based lock state 
%s",
-            ownerId,
-            lockFilePath,
-            threadName,
-            state.toString());
+        "Owner %s: Lock file path %s, Thread %s, Storage based lock state %s",
+        ownerId,
+        lockFilePath,
+        threadName,
+        state.toString());
   }
 
   private static final String LOCK_STATE_LOGGER_MSG = "Owner {}: Lock file 
path {}, Thread {}, Storage based lock state {}";
@@ -590,4 +620,29 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
   long getCurrentEpochMs() {
     return System.currentTimeMillis();
   }
+
+  /**
+   * Calculates the lock expiration time based on given timestamp and validity 
period.
+   * This is the shared function used by both the lock provider and audit 
service.
+   *
+   * @param timestamp The base timestamp to calculate expiration from
+   * @return Lock expiration time in milliseconds
+   */
+  private long calculateLockExpiration(long timestamp) {
+    return timestamp + TimeUnit.SECONDS.toMillis(lockValiditySecs);
+  }
+
+  /**
+   * Helper method to record audit operations.
+   */
+  private void recordAuditOperation(AuditOperationState state, long timestamp) 
{
+    auditService.ifPresent(service -> {
+      try {
+        service.recordOperation(state, timestamp);
+      } catch (Exception e) {
+        // Log but don't fail the lock operation due to recording failures
+        logger.warn("Owner {}: Failed to record audit operation {}: {}", 
ownerId, state, e.getMessage());
+      }
+    });
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
index ac8ff2ed9d0d..865013f3acaa 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
@@ -23,7 +23,15 @@ import 
org.apache.hudi.client.transaction.lock.models.StorageLockData;
 import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
 import org.apache.hudi.client.transaction.lock.models.LockGetResult;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.hudi.storage.StoragePath;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import static 
org.apache.hudi.common.table.HoodieTableMetaClient.LOCKS_FOLDER_NAME;
 
 /**
  * Defines a contract for a service which should be able to perform 
conditional writes to object storage.
@@ -34,7 +42,8 @@ public interface StorageLockClient extends AutoCloseable {
   /**
    * Tries to create or update a lock file.
    * All non pre-condition failure related errors should be returned as 
UNKNOWN_ERROR.
-   * @param newLockData The new data to write to the lock file
+   *
+   * @param newLockData      The new data to write to the lock file
    * @param previousLockFile The previous lock file
    * @return A pair containing the result state and the new lock file (if 
successful)
    */
@@ -44,7 +53,69 @@ public interface StorageLockClient extends AutoCloseable {
 
   /**
    * Reads the current lock file.
+   *
    * @return The lock retrieve result and the current lock file if 
successfully retrieved.
-   * */
+   */
   Pair<LockGetResult, Option<StorageLockFile>> readCurrentLockFile();
+
+  /**
+   * Reads an object from the specified path.
+   * This method is intended for reading small files (e.g., audit config, 
feature flags, JSON objects)
+   * and loads the entire file into memory. Do not use for large files.
+   *
+   * @param filePath         The path to the object file to read
+   * @param checkExistsFirst If true, performs a lightweight existence check 
before attempting to read.
+   *                         Recommended when the file is unlikely to exist 
(e.g., optional configs).
+   * @return An Option containing the content as a string if successful, 
Option.empty() otherwise
+   */
+  Option<String> readObject(String filePath, boolean checkExistsFirst);
+
+  /**
+   * Writes an object to the specified path.
+   * This method is intended for writing small files (e.g., audit logs, 
configuration files)
+   * and should not be used for large files.
+   *
+   * @param filePath The path where the object should be written
+   * @param content  The content to write as a string
+   * @return true if the write was successful, false otherwise
+   */
+  boolean writeObject(String filePath, String content);
+
+  /**
+   * Gets the lock folder path for the given base path.
+   * This is a static utility method that can be used without creating an 
instance.
+   *
+   * @param basePath The base path of the Hudi table
+   * @return The lock folder path (e.g., "s3://bucket/table/.hoodie/locks")
+   */
+  static String getLockFolderPath(String basePath) {
+    return String.format("%s%s%s", basePath, StoragePath.SEPARATOR, 
LOCKS_FOLDER_NAME);
+  }
+
+  /**
+   * Parses a URI and returns bucket name and path as a Pair.
+   * This is a shared utility method for all storage lock client 
implementations.
+   *
+   * @param uriString The URI string to parse
+   * @return A Pair containing bucket name (left) and path (right)
+   * @throws HoodieLockException if URI parsing fails or components are invalid
+   */
+  static Pair<String, String> parseBucketAndPath(String uriString) {
+    try {
+      URI uri = new URI(uriString);
+      String bucketName = uri.getAuthority();
+      String path = uri.getPath().replaceFirst("/", "");
+
+      if (StringUtils.isNullOrEmpty(bucketName)) {
+        throw new IllegalArgumentException("URI does not contain a valid 
bucket name.");
+      }
+      if (StringUtils.isNullOrEmpty(path)) {
+        throw new IllegalArgumentException("URI does not contain a valid 
path.");
+      }
+
+      return Pair.of(bucketName, path);
+    } catch (URISyntaxException e) {
+      throw new HoodieLockException("Failed to parse URI: " + uriString, e);
+    }
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditOperationState.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditOperationState.java
new file mode 100644
index 000000000000..700a8dd79be7
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditOperationState.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.audit;
+
+/**
+ * Enumeration of audit operation states for different operations.
+ */
+public enum AuditOperationState {
+  /**
+   * Operation started.
+   */
+  START,
+
+  /**
+   * Operation renewal/heartbeat.
+   */
+  RENEW,
+
+  /**
+   * Operation end.
+   */
+  END
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditService.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditService.java
new file mode 100644
index 000000000000..bdc062f519af
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditService.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.audit;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+
+/**
+ * Generic audit service interface for tracking operation lifecycles.
+ * Provides a single method for recording all types of audit operations.
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface AuditService extends AutoCloseable {
+
+  /**
+   * Records an audit operation with the given state and timestamp.
+   *
+   * @param state     The type of operation (START, RENEW, END)
+   * @param timestamp When the operation occurred
+   * @throws Exception if the operation cannot be recorded
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  void recordOperation(AuditOperationState state, long timestamp) throws 
Exception;
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditServiceFactory.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditServiceFactory.java
new file mode 100644
index 000000000000..eb9accaf4fdc
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditServiceFactory.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.audit;
+
+import org.apache.hudi.client.transaction.lock.StorageLockClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.storage.StoragePath;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Generic factory for creating audit services.
+ * This factory determines whether auditing is enabled by checking 
configuration files.
+ */
+public class AuditServiceFactory {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AuditServiceFactory.class);
+  private static final String AUDIT_CONFIG_FILE_NAME = "audit_enabled.json";
+  private static final String STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD = 
"STORAGE_LOCK_AUDIT_SERVICE_ENABLED";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  /**
+   * Creates a lock provider audit service instance by checking the audit 
configuration file.
+   *
+   * @param ownerId The owner ID for the lock provider
+   * @param basePath The base path of the Hudi table
+   * @param storageLockClient The storage lock client to use for reading 
configuration
+   * @param transactionStartTime The timestamp when the transaction started 
(lock acquired)
+   * @param lockExpirationFunction Function that takes a timestamp and returns 
the lock expiration time
+   * @param lockHeldSupplier Supplier that provides whether the lock is 
currently held
+   * @return An Option containing the audit service if enabled, Option.empty() 
otherwise
+   */
+  public static Option<AuditService> createLockProviderAuditService(
+      String ownerId,
+      String basePath,
+      StorageLockClient storageLockClient,
+      long transactionStartTime,
+      Function<Long, Long> lockExpirationFunction,
+      Supplier<Boolean> lockHeldSupplier) {
+
+    if (!isAuditEnabled(basePath, storageLockClient)) {
+      return Option.empty();
+    }
+
+    // No-op, will add in a follow up
+    return Option.empty();
+  }
+
+  /**
+   * Checks if audit is enabled by reading the audit configuration file.
+   *
+   * @param basePath The base path of the Hudi table
+   * @param storageLockClient The storage lock client to use for reading 
configuration
+   * @return true if audit is enabled, false otherwise
+   */
+  private static boolean isAuditEnabled(String basePath, StorageLockClient 
storageLockClient) {
+    try {
+      // Construct the audit config path using the same lock folder as the 
lock file
+      String lockFolderPath = StorageLockClient.getLockFolderPath(basePath);
+      String auditConfigPath = String.format("%s%s%s", lockFolderPath, 
StoragePath.SEPARATOR, AUDIT_CONFIG_FILE_NAME);
+
+      LOG.debug("Checking for audit configuration at: {}", auditConfigPath);
+
+      // Use the readObject method to read the config file
+      // Pass true for checkExistsFirst since audit config is rarely present 
(99% miss rate)
+      Option<String> jsonContent = 
storageLockClient.readObject(auditConfigPath, true);
+
+      if (jsonContent.isPresent()) {
+        LOG.debug("Audit configuration file found, parsing content");
+        JsonNode rootNode = OBJECT_MAPPER.readTree(jsonContent.get());
+        JsonNode enabledNode = 
rootNode.get(STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD);
+
+        boolean isEnabled = enabledNode != null && 
enabledNode.asBoolean(false);
+
+        if (isEnabled) {
+          LOG.info("Audit logging is ENABLED for lock operations at base path: 
{}", basePath);
+        } else {
+          LOG.info("Audit configuration present but audit logging is 
DISABLED");
+        }
+
+        return isEnabled;
+      }
+
+      LOG.debug("No audit configuration file found at: {}", auditConfigPath);
+      return false;
+    } catch (Exception e) {
+      LOG.error("Error reading audit configuration from base path: {}. Audit 
will be disabled.", basePath, e);
+      return false;
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
index 5d51a2177939..4ae637cbdc18 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
@@ -56,7 +56,10 @@ import static org.junit.jupiter.api.Assertions.fail;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.contains;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.refEq;
 import static org.mockito.Mockito.atLeastOnce;
@@ -66,6 +69,7 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -86,6 +90,8 @@ class TestStorageBasedLockProvider {
     mockHeartbeatManager = mock(HeartbeatManager.class);
     mockLogger = mock(Logger.class);
     when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
+    // Mock the readObject method to return Option.empty() to prevent NPE in 
audit service creation
+    when(mockLockService.readObject(anyString(), 
anyBoolean())).thenReturn(Option.empty());
     TypedProperties props = new TypedProperties();
     props.put(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "10");
     props.put(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "1");
@@ -94,8 +100,8 @@ class TestStorageBasedLockProvider {
     lockProvider = spy(new StorageBasedLockProvider(
         ownerId,
         props,
-        (a,b,c) -> mockHeartbeatManager,
-        (a,b,c) -> mockLockService,
+        (a, b, c) -> mockHeartbeatManager,
+        (a, b, c) -> mockLockService,
         mockLogger,
         null));
   }
@@ -119,8 +125,8 @@ class TestStorageBasedLockProvider {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = { "gs://bucket/lake/db/tbl-default", 
"s3://bucket/lake/db/tbl-default",
-      "s3a://bucket/lake/db/tbl-default" })
+  @ValueSource(strings = {"gs://bucket/lake/db/tbl-default", 
"s3://bucket/lake/db/tbl-default",
+      "s3a://bucket/lake/db/tbl-default"})
   void testNonExistentWriteServiceWithDefaults(String tableBasePathString) {
     TypedProperties props = new TypedProperties();
     props.put(BASE_PATH.key(), tableBasePathString);
@@ -533,7 +539,7 @@ class TestStorageBasedLockProvider {
     StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
     StorageLockFile realLockFile = new StorageLockFile(data, "v1");
     when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
-            .thenReturn(Pair.of(LockUpsertResult.SUCCESS, 
Option.of(realLockFile)));
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, 
Option.of(realLockFile)));
     when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
 
     boolean acquired = lockProvider.tryLock();
@@ -542,7 +548,7 @@ class TestStorageBasedLockProvider {
     verify(mockLockService, atLeastOnce()).tryUpsertLockFile(any(), any());
 
     when(mockLockService.tryUpsertLockFile(any(StorageLockData.class), 
eq(Option.of(realLockFile))))
-            .thenReturn(Pair.of(LockUpsertResult.SUCCESS, 
Option.of(realLockFile)));
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, 
Option.of(realLockFile)));
 
     // Mock shutdown
     Method shutdownMethod = 
lockProvider.getClass().getDeclaredMethod("shutdown", boolean.class);
@@ -667,10 +673,10 @@ class TestStorageBasedLockProvider {
     props.put(BASE_PATH.key(), "gs://bucket/lake/db/tbl-default");
     LockConfiguration lockConfiguration = new LockConfiguration(props);
     StorageConfiguration<?> storageConf = 
HoodieTestUtils.getDefaultStorageConf();
-    
+
     // Create a mock HoodieLockMetrics object  
     HoodieLockMetrics mockMetrics = mock(HoodieLockMetrics.class);
-    
+
     // Test that constructor with metrics works by using the internal 
constructor to avoid scheme issues
     StorageBasedLockProvider lockProviderWithMetrics = null;
     try {
@@ -679,22 +685,22 @@ class TestStorageBasedLockProvider {
       assertThrows(Exception.class, () -> {
         new StorageBasedLockProvider(lockConfiguration, storageConf, 
mockMetrics);
       }, "Constructor should exist but fail during lock client instantiation");
-      
+
       // Now create a working instance using the internal constructor for 
proper validation
       lockProviderWithMetrics = new StorageBasedLockProvider(
           UUID.randomUUID().toString(),
           props,
-          (a,b,c) -> mock(HeartbeatManager.class),
-          (a,b,c) -> new StubStorageLockClient(a, b, new Properties()),
+          (a, b, c) -> mock(HeartbeatManager.class),
+          (a, b, c) -> new StubStorageLockClient(a, b, new Properties()),
           mock(Logger.class),
           mockMetrics);
-      
+
       // Verify the lock provider was created successfully
       assertNotNull(lockProviderWithMetrics, "StorageBasedLockProvider should 
be created successfully");
-      
+
       // Verify that it can perform basic operations
       assertNull(lockProviderWithMetrics.getLock(), "Initially should have no 
lock");
-      
+
     } catch (Exception e) {
       fail("StorageBasedLockProvider creation should not throw unexpected 
exception: " + e.getMessage());
     } finally {
@@ -704,7 +710,7 @@ class TestStorageBasedLockProvider {
     }
   }
 
-  @Test 
+  @Test
   public void testStorageBasedLockProviderStandardConstructor() {
     // Create test configuration
     TypedProperties props = new TypedProperties();
@@ -713,7 +719,7 @@ class TestStorageBasedLockProvider {
     props.put(BASE_PATH.key(), "gs://bucket/lake/db/tbl-default");
     LockConfiguration lockConfiguration = new LockConfiguration(props);
     StorageConfiguration<?> storageConf = 
HoodieTestUtils.getDefaultStorageConf();
-    
+
     // Test that standard constructor works by using the internal constructor 
to avoid scheme issues
     StorageBasedLockProvider lockProviderStandard = null;
     try {
@@ -722,22 +728,22 @@ class TestStorageBasedLockProvider {
       assertThrows(Exception.class, () -> {
         new StorageBasedLockProvider(lockConfiguration, storageConf);
       }, "Standard constructor should exist but fail during lock client 
instantiation");
-      
+
       // Now create a working instance using the internal constructor for 
proper validation
       lockProviderStandard = new StorageBasedLockProvider(
           UUID.randomUUID().toString(),
           props,
-          (a,b,c) -> mock(HeartbeatManager.class),
-          (a,b,c) -> new StubStorageLockClient(a, b, new Properties()),
+          (a, b, c) -> mock(HeartbeatManager.class),
+          (a, b, c) -> new StubStorageLockClient(a, b, new Properties()),
           mock(Logger.class),
           null);  // No metrics for standard constructor test
-      
+
       // Verify the lock provider was created successfully
       assertNotNull(lockProviderStandard, "StorageBasedLockProvider should be 
created successfully");
-      
+
       // Verify that it can perform basic operations  
       assertNull(lockProviderStandard.getLock(), "Initially should have no 
lock");
-      
+
     } catch (Exception e) {
       fail("StorageBasedLockProvider creation should not throw unexpected 
exception: " + e.getMessage());
     } finally {
@@ -747,6 +753,95 @@ class TestStorageBasedLockProvider {
     }
   }
 
+  @Test
+  void testAuditServiceIntegrationWhenConfigNotPresent() {
+    // Test that lock provider works correctly when audit config is not present
+    TypedProperties props = new TypedProperties();
+    props.put(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "10");
+    props.put(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "1");
+    props.put(BASE_PATH.key(), "gs://bucket/lake/db/tbl-audit-test");
+
+    // Mock client that returns empty for audit config
+    StorageLockClient auditMockClient = mock(StorageLockClient.class);
+    when(auditMockClient.readObject(anyString(), eq(true)))
+        .thenReturn(Option.empty());
+    when(auditMockClient.readCurrentLockFile())
+        .thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
+
+    StorageBasedLockProvider auditLockProvider = new StorageBasedLockProvider(
+        ownerId,
+        props,
+        (a, b, c) -> mockHeartbeatManager,
+        (a, b, c) -> auditMockClient,
+        mockLogger,
+        null);
+
+    // Lock provider should work normally even without audit
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile lockFile = new StorageLockFile(data, "v1");
+    when(auditMockClient.tryUpsertLockFile(any(), eq(Option.empty())))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(lockFile)));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+    // tryLock should trigger audit service creation (lazily)
+    assertTrue(auditLockProvider.tryLock());
+
+    // Verify audit config was checked during tryLock
+    verify(auditMockClient, times(1)).readObject(
+        contains(".locks/audit_enabled.json"), eq(true));
+
+    // No audit writes should happen since audit is not present
+    verify(auditMockClient, never()).writeObject(
+        contains(".locks/audit"), anyString());
+
+    auditLockProvider.close();
+  }
+
+  @Test
+  void testAuditServiceIntegrationWhenConfigDisabled() {
+    // Test that lock provider works correctly when audit is explicitly 
disabled
+    TypedProperties props = new TypedProperties();
+    props.put(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "10");
+    props.put(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "1");
+    props.put(BASE_PATH.key(), "gs://bucket/lake/db/tbl-audit-disabled");
+
+    // Mock client that returns disabled config
+    StorageLockClient auditMockClient = mock(StorageLockClient.class);
+    String disabledConfig = "{\"STORAGE_LOCK_AUDIT_SERVICE_ENABLED\": false}";
+    when(auditMockClient.readObject(anyString(), eq(true)))
+        .thenReturn(Option.of(disabledConfig));
+    when(auditMockClient.readCurrentLockFile())
+        .thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
+
+    StorageBasedLockProvider auditLockProvider = new StorageBasedLockProvider(
+        ownerId,
+        props,
+        (a, b, c) -> mockHeartbeatManager,
+        (a, b, c) -> auditMockClient,
+        mockLogger,
+        null);
+
+    // Set up lock acquisition
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile lockFile = new StorageLockFile(data, "v1");
+    when(auditMockClient.tryUpsertLockFile(any(), eq(Option.empty())))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(lockFile)));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+    // tryLock should trigger audit service creation check
+    assertTrue(auditLockProvider.tryLock());
+
+    // Verify audit config was checked during tryLock
+    verify(auditMockClient, times(1)).readObject(
+        contains(".locks/audit_enabled.json"), eq(true));
+
+    // No audit writes should happen since audit is disabled
+    verify(auditMockClient, never()).writeObject(
+        contains(".locks/audit"), anyString());
+
+    auditLockProvider.close();
+  }
+
   public static class StubStorageLockClient implements StorageLockClient {
     public StubStorageLockClient(String ownerId, String lockFileUri, 
Properties props) {
       assertTrue(lockFileUri.endsWith("table_lock.json"));
@@ -764,6 +859,18 @@ class TestStorageBasedLockProvider {
       return null;
     }
 
+    @Override
+    public Option<String> readObject(String filePath, boolean 
checkExistsFirst) {
+      // Stub implementation for testing
+      return Option.empty();
+    }
+
+    @Override
+    public boolean writeObject(String filePath, String content) {
+      // Stub implementation for testing
+      return true;
+    }
+
     @Override
     public void close() throws Exception {
       // stub, no-op
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestAuditServiceFactory.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestAuditServiceFactory.java
new file mode 100644
index 000000000000..e944fdc89615
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestAuditServiceFactory.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.audit;
+
+import org.apache.hudi.client.transaction.lock.StorageLockClient;
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test class for AuditServiceFactory
+ */
+public class TestAuditServiceFactory {
+
+  private StorageLockClient mockStorageLockClient;
+  private final String ownerId = "test-owner";
+  private final String basePath = "s3://bucket/path/to/table";
+
+  @BeforeEach
+  void setUp() {
+    mockStorageLockClient = mock(StorageLockClient.class);
+  }
+
+  @Test
+  void testCreateAuditServiceWhenConfigNotFound() {
+    // Config file doesn't exist
+    String expectedPath = basePath + "/.hoodie/.locks/audit_enabled.json";
+    when(mockStorageLockClient.readObject(eq(expectedPath), eq(true)))
+        .thenReturn(Option.empty());
+
+    Option<AuditService> result = 
AuditServiceFactory.createLockProviderAuditService(
+        ownerId, basePath, mockStorageLockClient,
+        System.currentTimeMillis(),
+        timestamp -> timestamp + 10000,
+        () -> true);
+
+    // Should return empty for now (no concrete implementation yet)
+    // When implementation is added, this should return a present Option
+    assertTrue(result.isEmpty());
+    verify(mockStorageLockClient).readObject(expectedPath, true);
+  }
+
+  @Test
+  void testCreateAuditServiceWhenDisabledInConfig() {
+    // Config file exists but audit is disabled
+    String expectedPath = basePath + "/.hoodie/.locks/audit_enabled.json";
+    String configJson = "{\"STORAGE_LOCK_AUDIT_SERVICE_ENABLED\": false}";
+    when(mockStorageLockClient.readObject(eq(expectedPath), eq(true)))
+        .thenReturn(Option.of(configJson));
+
+    Option<AuditService> result = 
AuditServiceFactory.createLockProviderAuditService(
+        ownerId, basePath, mockStorageLockClient,
+        System.currentTimeMillis(),
+        timestamp -> timestamp + 10000,
+        () -> true);
+
+    // Should return empty when audit is disabled
+    assertTrue(result.isEmpty());
+    verify(mockStorageLockClient).readObject(expectedPath, true);
+  }
+
+  @Test
+  void testCreateAuditServiceWhenEnabledInConfig() {
+    // Config file exists and audit is enabled
+    String expectedPath = basePath + "/.hoodie/.locks/audit_enabled.json";
+    String configJson = "{\"STORAGE_LOCK_AUDIT_SERVICE_ENABLED\": true}";
+    when(mockStorageLockClient.readObject(eq(expectedPath), eq(true)))
+        .thenReturn(Option.of(configJson));
+    // Mock writeObject method to return true for audit file writes
+    when(mockStorageLockClient.writeObject(anyString(), anyString()))
+        .thenReturn(true);
+
+    Option<AuditService> result = 
AuditServiceFactory.createLockProviderAuditService(
+        ownerId, basePath, mockStorageLockClient,
+        System.currentTimeMillis(),
+        timestamp -> timestamp + 10000, // lockExpirationFunction
+        () -> true); // lockHeldSupplier
+
+    // Should return empty audit service as the service is not added yet.
+    assertFalse(result.isPresent());
+    verify(mockStorageLockClient).readObject(expectedPath, true);
+  }
+
+  @Test
+  void testCreateAuditServiceWithMalformedJson() {
+    // Config file exists but contains invalid JSON
+    String expectedPath = basePath + "/.hoodie/.locks/audit_enabled.json";
+    String malformedJson = "{invalid json}";
+    when(mockStorageLockClient.readObject(eq(expectedPath), eq(true)))
+        .thenReturn(Option.of(malformedJson));
+
+    Option<AuditService> result = 
AuditServiceFactory.createLockProviderAuditService(
+        ownerId, basePath, mockStorageLockClient,
+        System.currentTimeMillis(),
+        timestamp -> timestamp + 10000,
+        () -> true);
+
+    // Should return empty when JSON is malformed
+    assertTrue(result.isEmpty());
+    verify(mockStorageLockClient).readObject(expectedPath, true);
+  }
+
+  @Test
+  void testCreateAuditServiceWithEmptyConfig() {
+    // Config file exists but doesn't contain the expected field
+    String expectedPath = basePath + "/.hoodie/.locks/audit_enabled.json";
+    String configJson = "{\"some_other_field\": true}";
+    when(mockStorageLockClient.readObject(eq(expectedPath), eq(true)))
+        .thenReturn(Option.of(configJson));
+
+    Option<AuditService> result = 
AuditServiceFactory.createLockProviderAuditService(
+        ownerId, basePath, mockStorageLockClient,
+        System.currentTimeMillis(),
+        timestamp -> timestamp + 10000,
+        () -> true);
+
+    // Should return empty when field is missing (defaults to false)
+    assertTrue(result.isEmpty());
+    verify(mockStorageLockClient).readObject(expectedPath, true);
+  }
+
+  @Test
+  void testCreateAuditServiceUsesCheckExistsFirst() {
+    // Verify that the factory passes true for checkExistsFirst
+    String expectedPath = basePath + "/.hoodie/.locks/audit_enabled.json";
+    when(mockStorageLockClient.readObject(eq(expectedPath), eq(true)))
+        .thenReturn(Option.empty());
+
+    AuditServiceFactory.createLockProviderAuditService(
+        ownerId, basePath, mockStorageLockClient,
+        System.currentTimeMillis(),
+        timestamp -> timestamp + 10000,
+        () -> true);
+
+    // Should pass true for checkExistsFirst since audit config is rarely 
present
+    verify(mockStorageLockClient).readObject(expectedPath, true);
+  }
+}
diff --git 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
index c5d1fc011028..1e782cc109dd 100644
--- 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
+++ 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
@@ -25,11 +25,9 @@ import 
org.apache.hudi.client.transaction.lock.models.StorageLockData;
 import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieLockException;
 
 import com.google.cloud.storage.Blob;
 import com.google.cloud.storage.BlobId;
@@ -46,11 +44,11 @@ import javax.annotation.concurrent.ThreadSafe;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UncheckedIOException;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.nio.channels.Channels;
 import java.util.Properties;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 /**
  * A GCS-based implementation of a distributed lock provider using conditional 
writes
  * with generationMatch, plus local concurrency safety, heartbeat/renew, and 
pruning old locks.
@@ -68,10 +66,12 @@ public class GCSStorageLockClient implements 
StorageLockClient {
   private final String lockFilePath;
   private final String ownerId;
 
-  /** Constructor that is used by reflection to instantiate a GCS-based 
locking service.
-   * @param ownerId The owner id.
+  /**
+   * Constructor that is used by reflection to instantiate a GCS-based locking 
service.
+   *
+   * @param ownerId     The owner id.
    * @param lockFileUri The path within the bucket where to write lock files.
-   * @param props The properties for the lock config, can be used to customize 
client.
+   * @param props       The properties for the lock config, can be used to 
customize client.
    */
   public GCSStorageLockClient(
       String ownerId,
@@ -87,25 +87,12 @@ public class GCSStorageLockClient implements 
StorageLockClient {
       Properties properties,
       Functions.Function1<Properties, Storage> gcsClientSupplier,
       Logger logger) {
-    try {
-      // This logic can likely be extended to other lock client 
implementations.
-      // Consider creating base class with utilities, incl error handling.
-      URI uri = new URI(lockFileUri);
-      this.bucketName = uri.getAuthority();
-      this.lockFilePath = uri.getPath().replaceFirst("/", "");
-      this.gcsClient = gcsClientSupplier.apply(properties);
-
-      if (StringUtils.isNullOrEmpty(this.bucketName)) {
-        throw new IllegalArgumentException("LockFileUri does not contain a 
valid bucket name.");
-      }
-      if (StringUtils.isNullOrEmpty(this.lockFilePath)) {
-        throw new IllegalArgumentException("LockFileUri does not contain a 
valid lock file path.");
-      }
-      this.ownerId = ownerId;
-      this.logger = logger;
-    } catch (URISyntaxException e) {
-      throw new HoodieLockException(e);
-    }
+    Pair<String, String> bucketAndPath = 
StorageLockClient.parseBucketAndPath(lockFileUri);
+    this.bucketName = bucketAndPath.getLeft();
+    this.lockFilePath = bucketAndPath.getRight();
+    this.gcsClient = gcsClientSupplier.apply(properties);
+    this.ownerId = ownerId;
+    this.logger = logger;
   }
 
   private static Functions.Function1<Properties, Storage> 
createDefaultGcsClient() {
@@ -119,7 +106,7 @@ public class GCSStorageLockClient implements 
StorageLockClient {
   /**
    * Attempts to create or update the lock file using the given lock data and 
generation number.
    *
-   * @param lockData the new lock data to use.
+   * @param lockData         the new lock data to use.
    * @param generationNumber the expected generation number (0 for creation).
    * @return the updated StorageLockFile instance.
    * @throws StorageException if the update fails.
@@ -149,13 +136,13 @@ public class GCSStorageLockClient implements 
StorageLockClient {
       return Pair.of(LockUpsertResult.SUCCESS, Option.of(updatedFile));
     } catch (StorageException e) {
       if (e.getCode() == PRECONDITION_FAILURE_ERROR_CODE) {
-        logger.info("OwnerId: {}, Unable to write new lock file. Another 
process has modified this lockfile {} already.", 
+        logger.info("OwnerId: {}, Unable to write new lock file. Another 
process has modified this lockfile {} already.",
             ownerId, lockFilePath);
         return Pair.of(LockUpsertResult.ACQUIRED_BY_OTHERS, Option.empty());
       } else if (e.getCode() == RATE_LIMIT_ERROR_CODE) {
         logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}", 
ownerId, lockFilePath);
       } else if (e.getCode() >= INTERNAL_SERVER_ERROR_CODE_MIN) {
-        logger.warn("OwnerId: {}, GCS returned internal server error code for 
lock file: {}", 
+        logger.warn("OwnerId: {}, GCS returned internal server error code for 
lock file: {}",
             ownerId, lockFilePath, e);
       } else {
         throw e;
@@ -166,7 +153,8 @@ public class GCSStorageLockClient implements 
StorageLockClient {
 
   /**
    * Handling storage exception for GET request
-   * @param e The error to handle.
+   *
+   * @param e         The error to handle.
    * @param ignore404 Whether to ignore 404 as a valid exception.
    *                  When we read from stream we might see this, and
    *                  it should not be counted as NOT_EXISTS.
@@ -225,6 +213,69 @@ public class GCSStorageLockClient implements 
StorageLockClient {
     }
   }
 
+  @Override
+  public Option<String> readObject(String filePath, boolean checkExistsFirst) {
+    try {
+      // Parse the file path to get bucket and object path
+      Pair<String, String> bucketAndPath = 
StorageLockClient.parseBucketAndPath(filePath);
+      String bucket = bucketAndPath.getLeft();
+      String objectPath = bucketAndPath.getRight();
+
+      BlobId blobId = BlobId.of(bucket, objectPath);
+
+      if (checkExistsFirst) {
+        // First check if the file exists (lightweight metadata check)
+        Blob blob = gcsClient.get(blobId);
+
+        if (blob == null || !blob.exists()) {
+          // File doesn't exist - this is the common case for optional configs
+          logger.debug("JSON config file not found: {}", filePath);
+          return Option.empty();
+        }
+
+        // File exists, read its content
+        byte[] content = blob.getContent();
+        return Option.of(new String(content, UTF_8));
+      } else {
+        // Direct read without existence check
+        byte[] content = gcsClient.readAllBytes(blobId);
+        return Option.of(new String(content, UTF_8));
+      }
+    } catch (StorageException e) {
+      if (e.getCode() == NOT_FOUND_ERROR_CODE) {
+        logger.debug("JSON config file not found: {}", filePath);
+      } else {
+        logger.warn("Error reading JSON config file: {}", filePath, e);
+      }
+      return Option.empty();
+    } catch (Exception e) {
+      logger.warn("Error reading JSON config file: {}", filePath, e);
+      return Option.empty();
+    }
+  }
+
+  @Override
+  public boolean writeObject(String filePath, String content) {
+    try {
+      // Parse the file path to get bucket and object path
+      Pair<String, String> bucketAndPath = 
StorageLockClient.parseBucketAndPath(filePath);
+      String bucket = bucketAndPath.getLeft();
+      String objectPath = bucketAndPath.getRight();
+
+      BlobId blobId = BlobId.of(bucket, objectPath);
+      BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();
+
+      // Write the content to GCS
+      gcsClient.create(blobInfo, content.getBytes(UTF_8));
+
+      logger.debug("Successfully wrote object to: {}", filePath);
+      return true;
+    } catch (Exception e) {
+      logger.warn("Error writing object to: {}", filePath, e);
+      return false;
+    }
+  }
+
   @Override
   public void close() throws Exception {
     this.gcsClient.close();
diff --git 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClientAuditOperations.java
 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClientAuditOperations.java
new file mode 100644
index 000000000000..02ea0719d764
--- /dev/null
+++ 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClientAuditOperations.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.gcp.transaction.lock;
+
+import org.apache.hudi.common.util.Option;
+
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.contains;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for GCSStorageLockClient audit operations (readObject and writeObject 
methods)
+ */
+public class TestGCSStorageLockClientAuditOperations {
+
+  private Storage mockGcsClient;
+  private Blob mockBlob;
+  private Logger mockLogger;
+  private GCSStorageLockClient lockClient;
+
+  @BeforeEach
+  void setUp() {
+    mockGcsClient = mock(Storage.class);
+    mockLogger = mock(Logger.class);
+    mockBlob = mock(Blob.class);
+    String lockFileUri = 
"gs://test-bucket/table/.hoodie/.locks/table_lock.json";
+    String ownerId = "test-owner";
+    lockClient = new GCSStorageLockClient(
+        ownerId,
+        lockFileUri,
+        new Properties(),
+        props -> mockGcsClient,
+        mockLogger);
+  }
+
+  @Test
+  void testReadConfigWithCheckExistsFirstFileNotFound() {
+    String configPath = 
"gs://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+
+    // get() returns null for non-existent blob
+    when(mockGcsClient.get(any(BlobId.class)))
+        .thenReturn(null);
+
+    Option<String> result = lockClient.readObject(configPath, true);
+
+    assertTrue(result.isEmpty());
+    // Should only call get() for existence check, not readAllBytes
+    verify(mockGcsClient, times(1)).get(any(BlobId.class));
+    verify(mockGcsClient, never()).readAllBytes(any(BlobId.class));
+  }
+
+  @Test
+  void testReadConfigWithCheckExistsFirstBlobExistsFalse() {
+    String configPath = 
"gs://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+
+    // get() returns blob but exists() is false
+    when(mockGcsClient.get(any(BlobId.class)))
+        .thenReturn(mockBlob);
+    when(mockBlob.exists()).thenReturn(false);
+
+    Option<String> result = lockClient.readObject(configPath, true);
+
+    assertTrue(result.isEmpty());
+    // Should only check existence, not read content
+    verify(mockGcsClient, times(1)).get(any(BlobId.class));
+    verify(mockBlob, times(1)).exists();
+    verify(mockBlob, never()).getContent();
+  }
+
+  @Test
+  void testReadConfigWithCheckExistsFirstFileExists() {
+    String configPath = 
"gs://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+    String expectedContent = "{\"STORAGE_LP_AUDIT_SERVICE_ENABLED\": true}";
+
+    // get() returns existing blob
+    when(mockGcsClient.get(any(BlobId.class)))
+        .thenReturn(mockBlob);
+    when(mockBlob.exists()).thenReturn(true);
+    when(mockBlob.getContent())
+        .thenReturn(expectedContent.getBytes(StandardCharsets.UTF_8));
+
+    Option<String> result = lockClient.readObject(configPath, true);
+
+    assertTrue(result.isPresent());
+    assertEquals(expectedContent, result.get());
+    // Should call get() for existence check and getContent() for reading
+    verify(mockGcsClient, times(1)).get(any(BlobId.class));
+    verify(mockBlob, times(1)).exists();
+    verify(mockBlob, times(1)).getContent();
+  }
+
+  @Test
+  void testReadConfigWithoutCheckExistsFirstFileNotFound() {
+    String configPath = 
"gs://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+
+    // Direct readAllBytes throws 404 exception
+    StorageException notFoundException = new StorageException(404, "Not 
Found");
+    when(mockGcsClient.readAllBytes(any(BlobId.class)))
+        .thenThrow(notFoundException);
+
+    Option<String> result = lockClient.readObject(configPath, false);
+
+    assertTrue(result.isEmpty());
+    // Should not call get(), only readAllBytes
+    verify(mockGcsClient, never()).get(any(BlobId.class));
+    verify(mockGcsClient, times(1)).readAllBytes(any(BlobId.class));
+  }
+
+  @Test
+  void testReadConfigWithoutCheckExistsFirstFileExists() {
+    String configPath = 
"gs://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+    String expectedContent = "{\"STORAGE_LP_AUDIT_SERVICE_ENABLED\": false}";
+
+    // Direct readAllBytes returns content
+    when(mockGcsClient.readAllBytes(any(BlobId.class)))
+        .thenReturn(expectedContent.getBytes(StandardCharsets.UTF_8));
+
+    Option<String> result = lockClient.readObject(configPath, false);
+
+    assertTrue(result.isPresent());
+    assertEquals(expectedContent, result.get());
+    // Should not call get(), only readAllBytes
+    verify(mockGcsClient, never()).get(any(BlobId.class));
+    verify(mockGcsClient, times(1)).readAllBytes(any(BlobId.class));
+  }
+
+  @Test
+  void testReadConfigWithCheckExistsFirstOtherGcsError() {
+    String configPath = 
"gs://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+
+    // get() throws non-404 error
+    StorageException serverError = new StorageException(500, "Internal Server 
Error");
+    when(mockGcsClient.get(any(BlobId.class)))
+        .thenThrow(serverError);
+
+    Option<String> result = lockClient.readObject(configPath, true);
+
+    assertTrue(result.isEmpty());
+    verify(mockGcsClient, times(1)).get(any(BlobId.class));
+    verify(mockGcsClient, never()).readAllBytes(any(BlobId.class));
+  }
+
+  @Test
+  void testReadConfigWithInvalidUri() {
+    String invalidPath = "not-a-valid-uri";
+
+    Option<String> result = lockClient.readObject(invalidPath, false);
+
+    assertTrue(result.isEmpty());
+    // Should not make any GCS calls due to URI parsing error
+    verify(mockGcsClient, never()).get(any(BlobId.class));
+    verify(mockGcsClient, never()).readAllBytes(any(BlobId.class));
+  }
+
+  @Test
+  void testReadConfigWithRateLimitError() {
+    String configPath = 
"gs://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+
+    // readAllBytes returns rate limit error
+    StorageException rateLimitException = new StorageException(429, "Too Many 
Requests");
+    when(mockGcsClient.readAllBytes(any(BlobId.class)))
+        .thenThrow(rateLimitException);
+
+    Option<String> result = lockClient.readObject(configPath, false);
+
+    assertTrue(result.isEmpty());
+    verify(mockGcsClient, times(1)).readAllBytes(any(BlobId.class));
+  }
+
+  // ================================
+  // writeObject() tests
+  // ================================
+
+  @Test
+  void testWriteObject_success() {
+    String filePath = "gs://test-bucket/audit/test-audit.jsonl";
+    String content = "{\"test\": \"data\"}\n";
+    when(mockGcsClient.create(any(BlobInfo.class), 
any(byte[].class))).thenReturn(mockBlob);
+
+    boolean result = lockClient.writeObject(filePath, content);
+
+    assertTrue(result);
+    verify(mockGcsClient).create(any(BlobInfo.class), 
eq(content.getBytes(UTF_8)));
+    verify(mockLogger).debug("Successfully wrote object to: {}", filePath);
+  }
+
+  @Test
+  void testWriteObject_storageException() {
+    String filePath = "gs://test-bucket/audit/test-audit.jsonl";
+    String content = "{\"test\": \"data\"}\n";
+    StorageException storageException = new StorageException(500, "Internal 
Server Error");
+    when(mockGcsClient.create(any(BlobInfo.class), 
any(byte[].class))).thenThrow(storageException);
+
+    boolean result = lockClient.writeObject(filePath, content);
+
+    assertFalse(result);
+    verify(mockLogger).warn(contains("Error writing object to"), eq(filePath), 
eq(storageException));
+  }
+
+  @Test
+  void testWriteObject_invalidPath() {
+    String invalidPath = "invalid-path";
+    String content = "{\"test\": \"data\"}\n";
+
+    boolean result = lockClient.writeObject(invalidPath, content);
+
+    assertFalse(result);
+    verify(mockLogger).warn(contains("Error writing object to"), 
eq(invalidPath), any(Exception.class));
+  }
+
+  @Test
+  void testWriteObject_emptyContent() {
+    String filePath = "gs://test-bucket/audit/empty-content.jsonl";
+    String content = "";
+    when(mockGcsClient.create(any(BlobInfo.class), 
any(byte[].class))).thenReturn(mockBlob);
+
+    boolean result = lockClient.writeObject(filePath, content);
+
+    assertTrue(result);
+    verify(mockGcsClient).create(any(BlobInfo.class), 
eq(content.getBytes(UTF_8)));
+    verify(mockLogger).debug("Successfully wrote object to: {}", filePath);
+  }
+
+  @Test
+  void testWriteObject_rateLimitExceeded() {
+    String filePath = "gs://test-bucket/audit/test-audit.jsonl";
+    String content = "{\"test\": \"data\"}\n";
+    StorageException rateLimitException = new StorageException(429, "Rate 
Limit Exceeded");
+    when(mockGcsClient.create(any(BlobInfo.class), 
any(byte[].class))).thenThrow(rateLimitException);
+
+    boolean result = lockClient.writeObject(filePath, content);
+
+    assertFalse(result);
+    verify(mockLogger).warn(contains("Error writing object to"), eq(filePath), 
eq(rateLimitException));
+  }
+}

Reply via email to