This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ca01dcf4ef3a [HUDI-9782] Add audit service implementation for storage 
based lock provider (#13868)
ca01dcf4ef3a is described below

commit ca01dcf4ef3a342d360ac919ccc457d402b048c7
Author: Alex R <[email protected]>
AuthorDate: Thu Sep 11 20:44:06 2025 -0700

    [HUDI-9782] Add audit service implementation for storage based lock 
provider (#13868)
---
 .../hudi/cli/commands/LockAuditingCommand.java     | 188 ++++++++++
 .../hudi/cli/commands/TestLockAuditingCommand.java | 285 ++++++++++++++++
 .../client/transaction/lock/StorageLockClient.java |   2 +-
 .../lock/audit/AuditServiceFactory.java            |  26 +-
 .../audit/StorageLockProviderAuditService.java     | 179 ++++++++++
 .../lock/TestStorageBasedLockProvider.java         |  58 ++++
 .../lock/audit/TestAuditServiceFactory.java        |   5 +-
 .../audit/TestStorageLockProviderAuditService.java | 379 +++++++++++++++++++++
 .../hudi/command/procedures/HoodieProcedures.scala |   2 +
 .../command/procedures/SetAuditLockProcedure.scala | 187 ++++++++++
 .../procedures/ShowAuditLockStatusProcedure.scala  | 168 +++++++++
 .../hudi/procedure/TestSetAuditLockProcedure.scala | 169 +++++++++
 .../TestShowAuditLockStatusProcedure.scala         | 223 ++++++++++++
 13 files changed, 1856 insertions(+), 15 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java
new file mode 100644
index 000000000000..51e2dbe7fe6c
--- /dev/null
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.cli.HoodieCLI;
+import 
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.storage.StoragePath;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.shell.standard.ShellComponent;
+import org.springframework.shell.standard.ShellMethod;
+import org.springframework.shell.standard.ShellOption;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * CLI commands for managing Hudi table lock auditing functionality.
+ */
+@ShellComponent
+public class LockAuditingCommand {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LockAuditingCommand.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  /**
+   * Enables lock audit logging for the currently connected Hudi table.
+   * This command creates or updates the audit configuration file to enable
+   * audit logging for storage lock operations.
+   * 
+   * @return Status message indicating success or failure
+   */
+  @ShellMethod(key = "locks audit enable", value = "Enable storage lock audit 
service for the current table")
+  public String enableLockAudit() {
+    
+    if (HoodieCLI.basePath == null) {
+      return "No Hudi table loaded. Please connect to a table first.";
+    }
+
+    try {
+      // Create the audit config file path using utility method
+      String auditConfigPath = 
StorageLockProviderAuditService.getAuditConfigPath(HoodieCLI.basePath);
+      
+      // Create the JSON content
+      ObjectNode configJson = OBJECT_MAPPER.createObjectNode();
+      
configJson.put(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD,
 true);
+      String jsonContent = OBJECT_MAPPER.writeValueAsString(configJson);
+      
+      // Write the config file using HoodieStorage
+      StoragePath configPath = new StoragePath(auditConfigPath);
+      try (OutputStream outputStream = HoodieCLI.storage.create(configPath, 
true)) {
+        outputStream.write(jsonContent.getBytes());
+      }
+      
+      return String.format("Lock audit enabled successfully.\nAudit config 
written to: %s\n"
+          + "Audit files will be stored at: %s", auditConfigPath, 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
+      
+    } catch (Exception e) {
+      LOG.error("Error enabling lock audit", e);
+      return String.format("Failed to enable lock audit: %s", e.getMessage());
+    }
+  }
+
+  /**
+   * Disables lock audit logging for the currently connected Hudi table.
+   * This command updates the audit configuration file to disable audit 
logging.
+   * 
+   * @param keepAuditFiles Whether to preserve existing audit files when 
disabling
+   * @return Status message indicating success or failure
+   */
+  @ShellMethod(key = "locks audit disable", value = "Disable storage lock 
audit service for the current table")
+  public String disableLockAudit(
+      @ShellOption(value = {"--keepAuditFiles"}, defaultValue = "true",
+          help = "Keep existing audit files when disabling") final boolean 
keepAuditFiles) {
+    
+    if (HoodieCLI.basePath == null) {
+      return "No Hudi table loaded. Please connect to a table first.";
+    }
+
+    try {
+      // Create the audit config file path
+      String auditConfigPath = 
StorageLockProviderAuditService.getAuditConfigPath(HoodieCLI.basePath);
+      
+      // Check if config file exists
+      StoragePath configPath = new StoragePath(auditConfigPath);
+      if (!HoodieCLI.storage.exists(configPath)) {
+        return "Lock audit is already disabled (no configuration file found).";
+      }
+      
+      // Create the JSON content with audit disabled
+      ObjectNode configJson = OBJECT_MAPPER.createObjectNode();
+      
configJson.put(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD,
 false);
+      String jsonContent = OBJECT_MAPPER.writeValueAsString(configJson);
+      
+      // Write the config file
+      try (OutputStream outputStream = HoodieCLI.storage.create(configPath, 
true)) {
+        outputStream.write(jsonContent.getBytes());
+      }
+      
+      String message = String.format("Lock audit disabled successfully.\nAudit 
config updated at: %s", auditConfigPath);
+      
+      if (keepAuditFiles) {
+        message += String.format("\nExisting audit files preserved at: %s", 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
+      } else {
+        // Todo: write then call the api method to prune the old files
+        message += String.format("\nAudit files cleaned up at: %s", 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
+      }
+      
+      return message;
+      
+    } catch (Exception e) {
+      LOG.error("Error disabling lock audit", e);
+      return String.format("Failed to disable lock audit: %s", e.getMessage());
+    }
+  }
+
+  /**
+   * Shows the current status of lock audit logging for the connected table.
+   * This command checks the audit configuration file and reports whether
+   * auditing is currently enabled or disabled.
+   * 
+   * @return Status information about the current audit configuration
+   */
+  @ShellMethod(key = "locks audit status", value = "Show the current status of 
lock audit service")
+  public String showLockAuditStatus() {
+    
+    if (HoodieCLI.basePath == null) {
+      return "No Hudi table loaded. Please connect to a table first.";
+    }
+
+    try {
+      // Create the audit config file path
+      String auditConfigPath = 
StorageLockProviderAuditService.getAuditConfigPath(HoodieCLI.basePath);
+      
+      // Check if config file exists
+      StoragePath configPath = new StoragePath(auditConfigPath);
+      if (!HoodieCLI.storage.exists(configPath)) {
+        return String.format("Lock Audit Status: DISABLED\n"
+            + "Table: %s\n"
+            + "Config file: %s (not found)\n"
+            + "Use 'locks audit enable' to enable audit logging.", 
+            HoodieCLI.basePath, auditConfigPath);
+      }
+      
+      // Read and parse the configuration
+      String configContent;
+      try (InputStream inputStream = HoodieCLI.storage.open(configPath)) {
+        configContent = new String(FileIOUtils.readAsByteArray(inputStream));
+      }
+      JsonNode rootNode = OBJECT_MAPPER.readTree(configContent);
+      JsonNode enabledNode = 
rootNode.get(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD);
+      boolean isEnabled = enabledNode != null && enabledNode.asBoolean(false);
+      
+      String status = isEnabled ? "ENABLED" : "DISABLED";
+      
+      return String.format("Lock Audit Status: %s\n"
+          + "Table: %s\n"
+          + "Config file: %s\n"
+          + "Audit files location: %s",
+          status, HoodieCLI.basePath, auditConfigPath, 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
+      
+    } catch (Exception e) {
+      LOG.error("Error checking lock audit status", e);
+      return String.format("Failed to check lock audit status: %s", 
e.getMessage());
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestLockAuditingCommand.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestLockAuditingCommand.java
new file mode 100644
index 000000000000..da346110e124
--- /dev/null
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestLockAuditingCommand.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
+import org.apache.hudi.cli.testutils.ShellEvaluationResultUtil;
+import 
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.storage.StoragePath;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.InputStream;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.shell.Shell;
+
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for {@link org.apache.hudi.cli.commands.LockAuditingCommand}.
+ */
+@Tag("functional")
+@SpringBootTest(properties = {"spring.shell.interactive.enabled=false", 
"spring.shell.command.script.enabled=false"})
+public class TestLockAuditingCommand extends CLIFunctionalTestHarness {
+
+  @Autowired
+  private Shell shell;
+
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    HoodieCLI.conf = storageConf();
+    String tableName = tableName();
+    String tablePath = tablePath(tableName);
+    HoodieCLI.basePath = tablePath;
+    
+    // Initialize table
+    HoodieTableMetaClient.newTableBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE.name())
+        .setTableName(tableName)
+        .setRecordKeyFields("key")
+        .initTable(storageConf(), tablePath);
+        
+    // Initialize storage
+    HoodieCLI.initFS(true);
+  }
+
+  /**
+   * Test enabling lock audit when no table is loaded.
+   */
+  @Test
+  public void testEnableLockAuditNoTable() {
+    // Clear the base path to simulate no table loaded
+    String originalBasePath = HoodieCLI.basePath;
+    HoodieCLI.basePath = null;
+
+    try {
+      Object result = shell.evaluate(() -> "locks audit enable");
+      assertAll("Command runs with no table loaded",
+          () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+          () -> assertNotNull(result.toString()),
+          () -> assertEquals("No Hudi table loaded. Please connect to a table 
first.", result.toString()));
+    } finally {
+      HoodieCLI.basePath = originalBasePath;
+    }
+  }
+
+  /**
+   * Test disabling lock audit when no table is loaded.
+   */
+  @Test
+  public void testDisableLockAuditNoTable() {
+    // Clear the base path to simulate no table loaded
+    String originalBasePath = HoodieCLI.basePath;
+    HoodieCLI.basePath = null;
+
+    try {
+      Object result = shell.evaluate(() -> "locks audit disable");
+      assertAll("Command runs with no table loaded",
+          () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+          () -> assertNotNull(result.toString()),
+          () -> assertEquals("No Hudi table loaded. Please connect to a table 
first.", result.toString()));
+    } finally {
+      HoodieCLI.basePath = originalBasePath;
+    }
+  }
+
+  /**
+   * Test showing lock audit status when no table is loaded.
+   */
+  @Test
+  public void testShowLockAuditStatusNoTable() {
+    // Clear the base path to simulate no table loaded
+    String originalBasePath = HoodieCLI.basePath;
+    HoodieCLI.basePath = null;
+
+    try {
+      Object result = shell.evaluate(() -> "locks audit status");
+      assertAll("Command runs with no table loaded",
+          () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+          () -> assertNotNull(result.toString()),
+          () -> assertEquals("No Hudi table loaded. Please connect to a table 
first.", result.toString()));
+    } finally {
+      HoodieCLI.basePath = originalBasePath;
+    }
+  }
+
+  /**
+   * Test enabling lock audit successfully.
+   */
+  @Test
+  public void testEnableLockAuditSuccess() throws Exception {
+    Object result = shell.evaluate(() -> "locks audit enable");
+    
+    assertAll("Enable command runs successfully",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("Lock audit enabled 
successfully")));
+
+    // Verify the config file was created with correct content
+    String lockFolderPath = String.format("%s%s.hoodie%s.locks", 
HoodieCLI.basePath, StoragePath.SEPARATOR, StoragePath.SEPARATOR);
+    String auditConfigPath = String.format("%s%s%s", lockFolderPath, 
StoragePath.SEPARATOR, StorageLockProviderAuditService.AUDIT_CONFIG_FILE_NAME);
+    StoragePath configPath = new StoragePath(auditConfigPath);
+    
+    assertTrue(HoodieCLI.storage.exists(configPath), "Config file should 
exist");
+    
+    String configContent;
+    try (InputStream inputStream = HoodieCLI.storage.open(configPath)) {
+      configContent = new String(FileIOUtils.readAsByteArray(inputStream));
+    }
+    JsonNode rootNode = OBJECT_MAPPER.readTree(configContent);
+    JsonNode enabledNode = 
rootNode.get(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD);
+    
+    assertNotNull(enabledNode, "Config should contain enabled field");
+    assertTrue(enabledNode.asBoolean(), "Audit should be enabled");
+  }
+
+  /**
+   * Test showing lock audit status when disabled (no config file).
+   */
+  @Test
+  public void testShowLockAuditStatusDisabled() {
+    Object result = shell.evaluate(() -> "locks audit status");
+    
+    assertAll("Status command shows disabled state",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("Lock Audit Status: 
DISABLED")),
+        () -> assertTrue(result.toString().contains("(not found)")));
+  }
+
+  /**
+   * Test the complete workflow: enable, check status, disable, check status.
+   */
+  @Test
+  public void testCompleteAuditWorkflow() throws Exception {
+    // 1. Enable audit
+    Object enableResult = shell.evaluate(() -> "locks audit enable");
+    assertTrue(ShellEvaluationResultUtil.isSuccess(enableResult));
+    assertTrue(enableResult.toString().contains("Lock audit enabled 
successfully"));
+
+    // 2. Check status - should be enabled
+    Object statusEnabledResult = shell.evaluate(() -> "locks audit status");
+    assertTrue(ShellEvaluationResultUtil.isSuccess(statusEnabledResult));
+    assertTrue(statusEnabledResult.toString().contains("Lock Audit Status: 
ENABLED"));
+
+    // 3. Disable audit with default keepAuditFiles=true
+    Object disableResult = shell.evaluate(() -> "locks audit disable");
+    assertTrue(ShellEvaluationResultUtil.isSuccess(disableResult));
+    assertTrue(disableResult.toString().contains("Lock audit disabled 
successfully"));
+    assertTrue(disableResult.toString().contains("Existing audit files 
preserved"));
+
+    // 4. Check status - should be disabled
+    Object statusDisabledResult = shell.evaluate(() -> "locks audit status");
+    assertTrue(ShellEvaluationResultUtil.isSuccess(statusDisabledResult));
+    assertTrue(statusDisabledResult.toString().contains("Lock Audit Status: 
DISABLED"));
+
+    // Verify the config file still exists but with audit disabled
+    String lockFolderPath = String.format("%s%s.hoodie%s.locks", 
HoodieCLI.basePath, StoragePath.SEPARATOR, StoragePath.SEPARATOR);
+    String auditConfigPath = String.format("%s%s%s", lockFolderPath, 
StoragePath.SEPARATOR, StorageLockProviderAuditService.AUDIT_CONFIG_FILE_NAME);
+    StoragePath configPath = new StoragePath(auditConfigPath);
+    
+    assertTrue(HoodieCLI.storage.exists(configPath), "Config file should still 
exist");
+    
+    String configContent;
+    try (InputStream inputStream = HoodieCLI.storage.open(configPath)) {
+      configContent = new String(FileIOUtils.readAsByteArray(inputStream));
+    }
+    JsonNode rootNode = OBJECT_MAPPER.readTree(configContent);
+    JsonNode enabledNode = 
rootNode.get(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD);
+    
+    assertNotNull(enabledNode, "Config should contain enabled field");
+    assertFalse(enabledNode.asBoolean(), "Audit should be disabled");
+  }
+
+  /**
+   * Test disabling audit when no config file exists.
+   */
+  @Test
+  public void testDisableLockAuditNoConfigFile() {
+    Object result = shell.evaluate(() -> "locks audit disable");
+    
+    assertAll("Disable command handles missing config file",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertEquals("Lock audit is already disabled (no configuration 
file found).", result.toString()));
+  }
+
+  /**
+   * Test disabling audit with keepAuditFiles=false option.
+   */
+  @Test
+  public void testDisableLockAuditWithoutKeepingFiles() throws Exception {
+    // First enable audit
+    shell.evaluate(() -> "locks audit enable");
+
+    // Disable with keepAuditFiles=false
+    Object result = shell.evaluate(() -> "locks audit disable --keepAuditFiles 
false");
+    
+    assertAll("Disable command with keepAuditFiles=false",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("Lock audit disabled 
successfully")),
+        () -> assertTrue(result.toString().contains("Audit files cleaned up 
at:")));
+  }
+
+  /**
+   * Test enabling audit multiple times (should overwrite).
+   */
+  @Test
+  public void testEnableLockAuditMultipleTimes() throws Exception {
+    // Enable first time
+    Object result1 = shell.evaluate(() -> "locks audit enable");
+    assertTrue(ShellEvaluationResultUtil.isSuccess(result1));
+    assertTrue(result1.toString().contains("Lock audit enabled successfully"));
+
+    // Enable second time (should succeed and overwrite)
+    Object result2 = shell.evaluate(() -> "locks audit enable");
+    assertTrue(ShellEvaluationResultUtil.isSuccess(result2));
+    assertTrue(result2.toString().contains("Lock audit enabled successfully"));
+
+    // Verify config is still correct
+    String lockFolderPath = String.format("%s%s.hoodie%s.locks", 
HoodieCLI.basePath, StoragePath.SEPARATOR, StoragePath.SEPARATOR);
+    String auditConfigPath = String.format("%s%s%s", lockFolderPath, 
StoragePath.SEPARATOR, StorageLockProviderAuditService.AUDIT_CONFIG_FILE_NAME);
+    StoragePath configPath = new StoragePath(auditConfigPath);
+    
+    String configContent;
+    try (InputStream inputStream = HoodieCLI.storage.open(configPath)) {
+      configContent = new String(FileIOUtils.readAsByteArray(inputStream));
+    }
+    JsonNode rootNode = OBJECT_MAPPER.readTree(configContent);
+    JsonNode enabledNode = 
rootNode.get(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD);
+    
+    assertTrue(enabledNode.asBoolean(), "Audit should still be enabled");
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
index 865013f3acaa..fa5c55171983 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
@@ -86,7 +86,7 @@ public interface StorageLockClient extends AutoCloseable {
    * This is a static utility method that can be used without creating an 
instance.
    *
    * @param basePath The base path of the Hudi table
-   * @return The lock folder path (e.g., "s3://bucket/table/.hoodie/locks")
+   * @return The lock folder path (e.g., "s3://bucket/table/.hoodie/.locks")
    */
   static String getLockFolderPath(String basePath) {
     return String.format("%s%s%s", basePath, StoragePath.SEPARATOR, 
LOCKS_FOLDER_NAME);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditServiceFactory.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditServiceFactory.java
index eb9accaf4fdc..871330886916 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditServiceFactory.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditServiceFactory.java
@@ -20,7 +20,6 @@ package org.apache.hudi.client.transaction.lock.audit;
 
 import org.apache.hudi.client.transaction.lock.StorageLockClient;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.storage.StoragePath;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -37,12 +36,12 @@ import java.util.function.Supplier;
 public class AuditServiceFactory {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(AuditServiceFactory.class);
-  private static final String AUDIT_CONFIG_FILE_NAME = "audit_enabled.json";
-  private static final String STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD = 
"STORAGE_LOCK_AUDIT_SERVICE_ENABLED";
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
   /**
    * Creates a lock provider audit service instance by checking the audit 
configuration file.
+   * This method reads the audit configuration to determine if auditing is 
enabled.
+   * If auditing is enabled, it returns a StorageLockProviderAuditService 
instance.
    *
    * @param ownerId The owner ID for the lock provider
    * @param basePath The base path of the Hudi table
@@ -63,13 +62,21 @@ public class AuditServiceFactory {
     if (!isAuditEnabled(basePath, storageLockClient)) {
       return Option.empty();
     }
-
-    // No-op, will add in a follow up
-    return Option.empty();
+    
+    return Option.of(new StorageLockProviderAuditService(
+        basePath,
+        ownerId,
+        transactionStartTime,
+        storageLockClient,
+        lockExpirationFunction,
+        lockHeldSupplier));
   }
 
   /**
    * Checks if audit is enabled by reading the audit configuration file.
+   * This method looks for the audit configuration file at the expected 
location
+   * and parses it to determine if auditing is enabled. Returns false if the
+   * configuration file doesn't exist or cannot be read.
    *
    * @param basePath The base path of the Hudi table
    * @param storageLockClient The storage lock client to use for reading 
configuration
@@ -77,9 +84,8 @@ public class AuditServiceFactory {
    */
   private static boolean isAuditEnabled(String basePath, StorageLockClient 
storageLockClient) {
     try {
-      // Construct the audit config path using the same lock folder as the 
lock file
-      String lockFolderPath = StorageLockClient.getLockFolderPath(basePath);
-      String auditConfigPath = String.format("%s%s%s", lockFolderPath, 
StoragePath.SEPARATOR, AUDIT_CONFIG_FILE_NAME);
+      // Construct the audit config path using the utility method
+      String auditConfigPath = 
StorageLockProviderAuditService.getAuditConfigPath(basePath);
 
       LOG.debug("Checking for audit configuration at: {}", auditConfigPath);
 
@@ -90,7 +96,7 @@ public class AuditServiceFactory {
       if (jsonContent.isPresent()) {
         LOG.debug("Audit configuration file found, parsing content");
         JsonNode rootNode = OBJECT_MAPPER.readTree(jsonContent.get());
-        JsonNode enabledNode = 
rootNode.get(STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD);
+        JsonNode enabledNode = 
rootNode.get(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD);
 
         boolean isEnabled = enabledNode != null && 
enabledNode.asBoolean(false);
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/StorageLockProviderAuditService.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/StorageLockProviderAuditService.java
new file mode 100644
index 000000000000..ba353fd8eacb
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/StorageLockProviderAuditService.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.audit;
+
+import org.apache.hudi.client.transaction.lock.StorageLockClient;
+import org.apache.hudi.storage.StoragePath;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Storage-based audit service implementation for lock provider operations.
+ * Writes audit records to a single JSONL file per transaction to track lock 
lifecycle events.
+ */
+public class StorageLockProviderAuditService implements AuditService {
+  
+  private static final Logger LOG = 
LoggerFactory.getLogger(StorageLockProviderAuditService.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  
+  // Audit configuration constants
+  public static final String AUDIT_FOLDER_NAME = "audit";
+  public static final String AUDIT_CONFIG_FILE_NAME = "audit_enabled.json";
+  public static final String STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD = 
"STORAGE_LOCK_AUDIT_SERVICE_ENABLED";
+  
+  /**
+   * Constructs the full path to the audit configuration file for a given 
table.
+   *
+   * @param basePath The base path of the Hudi table
+   * @return The full path to the audit_enabled.json configuration file
+   */
+  public static String getAuditConfigPath(String basePath) {
+    String lockFolderPath = StorageLockClient.getLockFolderPath(basePath);
+    return String.format("%s%s%s", lockFolderPath, StoragePath.SEPARATOR, 
AUDIT_CONFIG_FILE_NAME);
+  }
+  
+  /**
+   * Constructs the full path to the audit folder for a given table.
+   *
+   * @param basePath The base path of the Hudi table
+   * @return The full path to the audit folder where audit files are stored
+   */
+  public static String getAuditFolderPath(String basePath) {
+    String lockFolderPath = StorageLockClient.getLockFolderPath(basePath);
+    return String.format("%s%s%s", lockFolderPath, StoragePath.SEPARATOR, 
AUDIT_FOLDER_NAME);
+  }
+  
+  private final String ownerId;
+  private final long transactionStartTime;
+  private final String auditFilePath;
+  private final StorageLockClient storageLockClient;
+  private final Function<Long, Long> lockExpirationFunction;
+  private final Supplier<Boolean> lockHeldSupplier;
+  private final StringBuilder auditBuffer;
+  
+  /**
+   * Creates a new StorageLockProviderAuditService instance.
+   * 
+   * @param basePath The base path where audit files will be written
+   * @param ownerId The full owner ID for the lock
+   * @param transactionStartTime The timestamp when the transaction started 
(lock acquired)
+   * @param storageLockClient The storage client for writing audit files
+   * @param lockExpirationFunction Function that takes a timestamp and returns 
the lock expiration time
+   * @param lockHeldSupplier Supplier that provides whether the lock is 
currently held
+   */
+  public StorageLockProviderAuditService(
+      String basePath,
+      String ownerId,
+      long transactionStartTime,
+      StorageLockClient storageLockClient,
+      Function<Long, Long> lockExpirationFunction,
+      Supplier<Boolean> lockHeldSupplier) {
+    this.ownerId = ownerId;
+    this.transactionStartTime = transactionStartTime;
+    this.storageLockClient = storageLockClient;
+    this.lockExpirationFunction = lockExpirationFunction;
+    this.lockHeldSupplier = lockHeldSupplier;
+    this.auditBuffer = new StringBuilder();
+    
+    // Generate audit file path: <txn-start>_<full-owner-id>.jsonl
+    String filename = String.format("%d_%s.jsonl", transactionStartTime, 
ownerId);
+    this.auditFilePath = String.format("%s%s%s",
+        getAuditFolderPath(basePath),
+        StoragePath.SEPARATOR,
+        filename);
+    
+    LOG.debug("Initialized audit service for transaction starting at {} with 
file: {}", 
+        transactionStartTime, auditFilePath);
+  }
+  
+  /**
+   * Records an audit operation with the current timestamp and state.
+   * 
+   * @param state The audit operation state to record
+   * @param timestamp The timestamp when the operation occurred
+   * @throws Exception if there's an error creating or writing the audit record
+   */
+  @Override
+  public synchronized void recordOperation(AuditOperationState state, long 
timestamp) throws Exception {
+    // Create audit record
+    Map<String, Object> auditRecord = new HashMap<>();
+    auditRecord.put("ownerId", ownerId);
+    auditRecord.put("transactionStartTime", transactionStartTime);
+    auditRecord.put("timestamp", timestamp);
+    auditRecord.put("state", state.name());
+    auditRecord.put("lockExpiration", lockExpirationFunction.apply(timestamp));
+    auditRecord.put("lockHeld", lockHeldSupplier.get());
+    
+    // Convert to JSON and append newline for JSONL format
+    String jsonLine = OBJECT_MAPPER.writeValueAsString(auditRecord) + "\n";
+    
+    // Append to buffer
+    auditBuffer.append(jsonLine);
+    
+    // Write the accumulated audit records to file
+    writeAuditFile();
+    
+    LOG.debug("Recorded audit operation: state={}, timestamp={}, file={}", 
+        state, timestamp, auditFilePath);
+  }
+  
+  /**
+   * Writes the accumulated audit records to the audit file.
+   * This method is called after each recordOperation to persist the audit log.
+   * Failures to write audit records are logged but do not throw exceptions
+   * to avoid breaking lock operations.
+   */
+  private void writeAuditFile() {
+    try {
+      // Write the entire buffer content to the audit file
+      // This overwrites the file with all accumulated records
+      String content = auditBuffer.toString();
+      boolean success = storageLockClient.writeObject(auditFilePath, content);
+      if (success) {
+        LOG.debug("Successfully wrote audit records to: {}", auditFilePath);
+      } else {
+        LOG.warn("Failed to write audit records to: {}", auditFilePath);
+      }
+    } catch (Exception e) {
+      LOG.warn("Failed to write audit records to: {}", auditFilePath, e);
+      // Don't throw exception - audit failures should not break lock 
operations
+    }
+  }
+  
+  /**
+   * Closes the audit service. Since all audit records are written after each 
operation,
+   * no additional cleanup is required during close.
+   * 
+   * @throws Exception if there's an error during cleanup (not expected in 
current implementation)
+   */
+  @Override
+  public synchronized void close() throws Exception {
+    // All audit records are already written after each recordOperation()
+    // No additional writes needed during close
+    LOG.debug("Closed StorageLockProviderAuditService for transaction: {}, 
owner: {}", 
+        transactionStartTime, ownerId);
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
index 4ae637cbdc18..6675cc14ecf4 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
@@ -842,6 +842,64 @@ class TestStorageBasedLockProvider {
     auditLockProvider.close();
   }
 
+  @Test
+  void testAuditServiceIntegrationWhenConfigEnabled() {
+    // Test that lock provider works correctly when audit is enabled
+    TypedProperties props = new TypedProperties();
+    props.put(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "10");
+    props.put(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "1");
+    props.put(BASE_PATH.key(), "gs://bucket/lake/db/tbl-audit-enabled");
+
+    // Mock client that returns enabled config
+    StorageLockClient auditMockClient = mock(StorageLockClient.class);
+    String enabledConfig = "{\"STORAGE_LOCK_AUDIT_SERVICE_ENABLED\": true}";
+    when(auditMockClient.readObject(anyString(), eq(true)))
+        .thenReturn(Option.of(enabledConfig));
+    when(auditMockClient.readCurrentLockFile())
+        .thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
+    // Mock writeObject method to return true for audit file writes
+    when(auditMockClient.writeObject(anyString(), anyString()))
+        .thenReturn(true);
+
+    StorageBasedLockProvider auditLockProvider = new StorageBasedLockProvider(
+        ownerId,
+        props,
+        (a, b, c) -> mockHeartbeatManager,
+        (a, b, c) -> auditMockClient,
+        mockLogger,
+        null);
+
+    // Set up lock acquisition
+    StorageLockData data = new StorageLockData(false, 
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+    StorageLockFile lockFile = new StorageLockFile(data, "v1");
+    when(auditMockClient.tryUpsertLockFile(any(), eq(Option.empty())))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(lockFile)));
+    when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+    // tryLock should trigger audit service creation and START audit
+    assertTrue(auditLockProvider.tryLock());
+
+    // Verify audit config was checked during tryLock
+    verify(auditMockClient, times(1)).readObject(
+        contains(".locks/audit_enabled.json"), eq(true));
+
+    // Verify audit START operation was written
+    verify(auditMockClient, times(1)).writeObject(
+        contains(".locks/audit/"), anyString());
+
+    // Unlock should trigger END audit
+    when(auditMockClient.tryUpsertLockFile(any(), any()))
+        .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(lockFile)));
+    when(mockHeartbeatManager.stopHeartbeat(anyBoolean())).thenReturn(true);
+    auditLockProvider.unlock();
+
+    // Verify audit END operation was written (total 2 writes: START and END)
+    verify(auditMockClient, times(2)).writeObject(
+        contains(".locks/audit/"), anyString());
+
+    auditLockProvider.close();
+  }
+
   public static class StubStorageLockClient implements StorageLockClient {
     public StubStorageLockClient(String ownerId, String lockFileUri, 
Properties props) {
       assertTrue(lockFileUri.endsWith("table_lock.json"));
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestAuditServiceFactory.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestAuditServiceFactory.java
index e944fdc89615..a53c829e8d75 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestAuditServiceFactory.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestAuditServiceFactory.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.util.Option;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
@@ -94,15 +93,13 @@ public class TestAuditServiceFactory {
     // Mock writeObject method to return true for audit file writes
     when(mockStorageLockClient.writeObject(anyString(), anyString()))
         .thenReturn(true);
-
     Option<AuditService> result = 
AuditServiceFactory.createLockProviderAuditService(
         ownerId, basePath, mockStorageLockClient,
         System.currentTimeMillis(),
         timestamp -> timestamp + 10000, // lockExpirationFunction
         () -> true); // lockHeldSupplier
 
-    // Should return empty audit service as the service is not added yet.
-    assertFalse(result.isPresent());
+    assertTrue(result.isPresent());
     verify(mockStorageLockClient).readObject(expectedPath, true);
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestStorageLockProviderAuditService.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestStorageLockProviderAuditService.java
new file mode 100644
index 000000000000..d5946a6ae410
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestStorageLockProviderAuditService.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.audit;
+
+import org.apache.hudi.client.transaction.lock.StorageLockClient;
+import org.apache.hudi.storage.StoragePath;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.ArgumentCaptor;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for StorageLockProviderAuditService.
+ */
+public class TestStorageLockProviderAuditService {
+
+  private static final String BASE_PATH = "s3://bucket/table";
+  private static final String OWNER_ID = 
"writer-12345678-9abc-def0-1234-567890abcdef";
+  private static final long TRANSACTION_START_TIME = 1234567890000L;
+  private static final long LOCK_EXPIRATION = 1000000L;
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  private StorageLockClient storageLockClient;
+  private Supplier<Boolean> lockHeldSupplier;
+  private StorageLockProviderAuditService auditService;
+
+  @BeforeEach
+  void setUp() {
+    storageLockClient = mock(StorageLockClient.class);
+    lockHeldSupplier = mock(Supplier.class);
+
+    when(lockHeldSupplier.get()).thenReturn(true);
+
+    auditService = new StorageLockProviderAuditService(
+        BASE_PATH,
+        OWNER_ID,
+        TRANSACTION_START_TIME,
+        storageLockClient,
+        timestamp -> LOCK_EXPIRATION,
+        lockHeldSupplier);
+  }
+
+  /**
+   * Helper method to validate audit record structure and content.
+   * 
+   * @param auditRecord The audit record to validate
+   * @param expectedState Expected operation state
+   * @param expectedTimestamp Expected timestamp
+   * @param expectedLockHeld Expected lock held status
+   */
+  private void validateAuditRecord(Map<String, Object> auditRecord, 
AuditOperationState expectedState, 
+                                  long expectedTimestamp, boolean 
expectedLockHeld) {
+    assertNotNull(auditRecord, "Audit record should not be null");
+    assertEquals(6, auditRecord.size(), "Audit record should contain exactly 6 
fields");
+    
+    // Validate all required fields are present
+    assertNotNull(auditRecord.get("ownerId"), "ownerId should be present");
+    assertNotNull(auditRecord.get("transactionStartTime"), 
"transactionStartTime should be present");
+    assertNotNull(auditRecord.get("timestamp"), "timestamp should be present");
+    assertNotNull(auditRecord.get("state"), "state should be present");
+    assertNotNull(auditRecord.get("lockExpiration"), "lockExpiration should be 
present");
+    assertNotNull(auditRecord.get("lockHeld"), "lockHeld should be present");
+    
+    // Validate field values
+    assertEquals(OWNER_ID, auditRecord.get("ownerId"));
+    assertEquals(TRANSACTION_START_TIME, ((Number) 
auditRecord.get("transactionStartTime")).longValue());
+    assertEquals(expectedTimestamp, ((Number) 
auditRecord.get("timestamp")).longValue());
+    assertEquals(expectedState.name(), auditRecord.get("state"));
+    assertEquals(LOCK_EXPIRATION, ((Number) 
auditRecord.get("lockExpiration")).longValue());
+    assertEquals(expectedLockHeld, auditRecord.get("lockHeld"));
+  }
+
+  /**
+   * Helper method to verify expected file path generation.
+   * 
+   * @param actualPath The actual file path captured
+   * @param ownerId The owner ID used
+   * @param transactionStartTime The transaction start time used
+   */
+  private void validateFilePath(String actualPath, String ownerId, long 
transactionStartTime) {
+    String expectedPath = 
String.format("%s%s.hoodie%s.locks%saudit%s%d_%s.jsonl",
+        BASE_PATH,
+        StoragePath.SEPARATOR,
+        StoragePath.SEPARATOR,
+        StoragePath.SEPARATOR,
+        StoragePath.SEPARATOR,
+        transactionStartTime,
+        ownerId);
+    assertEquals(expectedPath, actualPath);
+  }
+
+  /**
+   * Comprehensive test for complete audit lifecycle: START -> RENEW -> END.
+   * This consolidates individual operation tests into a single comprehensive 
test
+   * that validates the entire audit flow.
+   */
+  @Test
+  void testCompleteAuditLifecycle() throws Exception {
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+
+    long startTime = System.currentTimeMillis();
+    long renewTime = startTime + 1000;
+    long endTime = startTime + 2000;
+
+    // Configure lock held status for each operation
+    when(lockHeldSupplier.get()).thenReturn(true, true, false);
+
+    // Execute complete lifecycle
+    auditService.recordOperation(AuditOperationState.START, startTime);
+    auditService.recordOperation(AuditOperationState.RENEW, renewTime);
+    auditService.recordOperation(AuditOperationState.END, endTime);
+
+    // Capture all write calls
+    ArgumentCaptor<String> pathCaptor = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<String> contentCaptor = 
ArgumentCaptor.forClass(String.class);
+    verify(storageLockClient, times(3)).writeObject(pathCaptor.capture(), 
contentCaptor.capture());
+
+    // Validate file path (should be consistent across all operations)
+    validateFilePath(pathCaptor.getValue(), OWNER_ID, TRANSACTION_START_TIME);
+
+    // Parse final content - should contain all three records
+    String finalContent = contentCaptor.getValue();
+    String[] lines = finalContent.trim().split("\n");
+    assertEquals(3, lines.length, "Should have three JSON lines");
+
+    // Validate each audit record
+    @SuppressWarnings("unchecked")
+    Map<String, Object> startRecord = OBJECT_MAPPER.readValue(lines[0], 
Map.class);
+    @SuppressWarnings("unchecked")
+    Map<String, Object> renewRecord = OBJECT_MAPPER.readValue(lines[1], 
Map.class);
+    @SuppressWarnings("unchecked")
+    Map<String, Object> endRecord = OBJECT_MAPPER.readValue(lines[2], 
Map.class);
+
+    validateAuditRecord(startRecord, AuditOperationState.START, startTime, 
true);
+    validateAuditRecord(renewRecord, AuditOperationState.RENEW, renewTime, 
true);
+    validateAuditRecord(endRecord, AuditOperationState.END, endTime, false);
+  }
+
+  /**
+   * Test data provider for different owner ID formats.
+   * 
+   * @return Stream of owner ID test cases
+   */
+  static java.util.stream.Stream<Arguments> ownerIdTestCases() {
+    return java.util.stream.Stream.of(
+        Arguments.of("12345678-9abc-def0-1234-567890abcdef", "Full UUID 
format"),
+        Arguments.of("abc123", "Short owner ID"),
+        Arguments.of("regular-owner-id-without-uuid", "Regular owner ID"),
+        Arguments.of("writer-12345678-9abc-def0-1234-567890abcdef", "Writer 
with UUID")
+    );
+  }
+
+  /**
+   * Test audit file naming with different owner ID formats.
+   * Consolidated test that verifies file path generation for various owner ID 
formats.
+   * 
+   * @param ownerId The owner ID to test
+   * @param description Description of the test case
+   */
+  @ParameterizedTest(name = "{1}: {0}")
+  @MethodSource("ownerIdTestCases")
+  void testFileNameWithDifferentOwnerIds(String ownerId, String description) 
throws Exception {
+    long txnStartTime = System.currentTimeMillis();
+    StorageLockProviderAuditService service = new 
StorageLockProviderAuditService(
+        BASE_PATH,
+        ownerId,
+        txnStartTime,
+        storageLockClient,
+        timestamp -> LOCK_EXPIRATION,
+        lockHeldSupplier);
+
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+    service.recordOperation(AuditOperationState.START, 
System.currentTimeMillis());
+
+    ArgumentCaptor<String> pathCaptor = ArgumentCaptor.forClass(String.class);
+    verify(storageLockClient, times(1)).writeObject(pathCaptor.capture(), 
anyString());
+    
+    validateFilePath(pathCaptor.getValue(), ownerId, txnStartTime);
+  }
+
+  /**
+   * Test data provider for write failure scenarios.
+   * 
+   * @return Stream of write failure test cases
+   */
+  static Stream<Arguments> writeFailureTestCases() {
+    return Stream.of(
+        Arguments.of(false, "Write returns false"),
+        Arguments.of(new RuntimeException("Storage error"), "Write throws 
exception")
+    );
+  }
+
+  /**
+   * Test handling of write failures using parameterized test.
+   * Consolidates multiple write failure scenarios into a single parameterized 
test.
+   * 
+   * @param failureCondition The failure condition (Boolean false or Exception)
+   * @param description Description of the test case
+   */
+  @ParameterizedTest(name = "{1}")
+  @MethodSource("writeFailureTestCases")
+  void testWriteFailureHandling(Object failureCondition, String description) 
throws Exception {
+    long timestamp = System.currentTimeMillis();
+    
+    if (failureCondition instanceof Boolean) {
+      when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn((Boolean) failureCondition);
+    } else if (failureCondition instanceof Exception) {
+      when(storageLockClient.writeObject(anyString(), 
anyString())).thenThrow((Exception) failureCondition);
+    }
+
+    // Should not throw exception - audit failures should be handled gracefully
+    auditService.recordOperation(AuditOperationState.START, timestamp);
+
+    verify(storageLockClient, times(1)).writeObject(anyString(), anyString());
+  }
+
+  @Test
+  void testDynamicFunctionValues() throws Exception {
+    long timestamp = System.currentTimeMillis();
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+
+    // Create a service with dynamic expiration calculation
+    StorageLockProviderAuditService dynamicService = new 
StorageLockProviderAuditService(
+        BASE_PATH,
+        OWNER_ID,
+        TRANSACTION_START_TIME,
+        storageLockClient,
+        ts -> ts + 1000L, // Adds 1000ms to timestamp
+        lockHeldSupplier);
+
+    when(lockHeldSupplier.get()).thenReturn(true, false);
+
+    dynamicService.recordOperation(AuditOperationState.START, timestamp);
+    dynamicService.recordOperation(AuditOperationState.END, timestamp + 1000);
+
+    ArgumentCaptor<String> contentCaptor = 
ArgumentCaptor.forClass(String.class);
+    verify(storageLockClient, times(2)).writeObject(anyString(), 
contentCaptor.capture());
+
+    // Validate dynamic expiration calculation
+    String[] firstLines = 
contentCaptor.getAllValues().get(0).trim().split("\n");
+    @SuppressWarnings("unchecked")
+    Map<String, Object> firstRecord = OBJECT_MAPPER.readValue(firstLines[0], 
Map.class);
+
+    String[] secondLines = 
contentCaptor.getAllValues().get(1).trim().split("\n");
+    @SuppressWarnings("unchecked")
+    Map<String, Object> secondRecord = OBJECT_MAPPER.readValue(secondLines[1], 
Map.class);
+
+    assertTrue((Boolean) firstRecord.get("lockHeld"));
+    assertEquals(timestamp + 1000L, ((Number) 
firstRecord.get("lockExpiration")).longValue());
+
+    assertFalse((Boolean) secondRecord.get("lockHeld"));
+    assertEquals(timestamp + 1000L + 1000L, ((Number) 
secondRecord.get("lockExpiration")).longValue());
+  }
+
+  @Test
+  void testFilePathGeneration() throws Exception {
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+
+    auditService.recordOperation(AuditOperationState.START, 
System.currentTimeMillis());
+
+    String expectedPath = String.format("%s/.hoodie/.locks/audit/%d_%s.jsonl", 
+        BASE_PATH, TRANSACTION_START_TIME, OWNER_ID);
+
+    verify(storageLockClient).writeObject(eq(expectedPath), anyString());
+  }
+
+  @Test
+  void testCloseMethodWithBufferedData() throws Exception {
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+
+    // Record an operation to buffer data
+    auditService.recordOperation(AuditOperationState.START, 
System.currentTimeMillis());
+
+    // Close should not write additional data (data is written immediately 
after each operation)
+    auditService.close();
+
+    // Verify write was called only once for the START operation
+    verify(storageLockClient, times(1)).writeObject(anyString(), anyString());
+  }
+
+  @Test
+  void testConcurrentOperations() throws Exception {
+    when(storageLockClient.writeObject(anyString(), 
anyString())).thenReturn(true);
+    when(lockHeldSupplier.get()).thenReturn(true, false);
+
+    ExecutorService executor = Executors.newFixedThreadPool(2);
+    CountDownLatch startLatch = new CountDownLatch(1);
+    CountDownLatch completionLatch = new CountDownLatch(2);
+
+    long baseTime = System.currentTimeMillis();
+
+    // Concurrent RENEW and END operations
+    executor.submit(() -> {
+      try {
+        startLatch.await();
+        auditService.recordOperation(AuditOperationState.RENEW, baseTime + 
1000);
+      } catch (Exception e) {
+        // Handle gracefully
+      } finally {
+        completionLatch.countDown();
+      }
+    });
+
+    executor.submit(() -> {
+      try {
+        startLatch.await();
+        auditService.recordOperation(AuditOperationState.END, baseTime + 1001);
+      } catch (Exception e) {
+        // Handle gracefully
+      } finally {
+        completionLatch.countDown();
+      }
+    });
+
+    startLatch.countDown();
+    assertTrue(completionLatch.await(5, TimeUnit.SECONDS));
+
+    // Verify both operations were recorded
+    ArgumentCaptor<String> contentCaptor = 
ArgumentCaptor.forClass(String.class);
+    verify(storageLockClient, times(2)).writeObject(anyString(), 
contentCaptor.capture());
+
+    String finalContent = contentCaptor.getValue();
+    String[] lines = finalContent.trim().split("\\n");
+    assertEquals(2, lines.length);
+
+    // Verify both RENEW and END states are present
+    @SuppressWarnings("unchecked")
+    Map<String, Object> record1 = OBJECT_MAPPER.readValue(lines[0], Map.class);
+    @SuppressWarnings("unchecked")
+    Map<String, Object> record2 = OBJECT_MAPPER.readValue(lines[1], Map.class);
+
+    String state1 = (String) record1.get("state");
+    String state2 = (String) record2.get("state");
+    assertTrue((state1.equals("RENEW") && state2.equals("END"))
+        || (state1.equals("END") && state2.equals("RENEW")));
+
+    executor.shutdown();
+    assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS));
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index ce86c409baf6..ffde5337397d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -100,6 +100,8 @@ object HoodieProcedures {
       ,(ShowCleansProcedure.NAME, ShowCleansProcedure.builder)
       ,(ShowCleansPartitionMetadataProcedure.NAME, 
ShowCleansPartitionMetadataProcedure.builder)
       ,(ShowCleansPlanProcedure.NAME, ShowCleansPlanProcedure.builder)
+      ,(SetAuditLockProcedure.NAME, SetAuditLockProcedure.builder)
+      ,(ShowAuditLockStatusProcedure.NAME, 
ShowAuditLockStatusProcedure.builder)
     )
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/SetAuditLockProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/SetAuditLockProcedure.scala
new file mode 100644
index 000000000000..826572ad97fc
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/SetAuditLockProcedure.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.client.transaction.lock.StorageLockClient
+import 
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.storage.StoragePath
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.databind.node.ObjectNode
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+import scala.util.{Failure, Success, Try}
+
+
+/**
+ * Spark SQL procedure for enabling or disabling lock audit logging for Hudi 
tables.
+ *
+ * This procedure allows users to control audit logging for storage lock 
operations through
+ * Spark SQL commands. When enabled, lock operations will generate audit logs 
in JSONL format
+ * that track lock lifecycle events.
+ *
+ * Usage:
+ * {{{
+ * CALL set_audit_lock(table => 'my_table', state => 'enabled')
+ * CALL set_audit_lock(path => '/path/to/table', state => 'disabled')
+ * }}}
+ *
+ * The procedure creates or updates an audit configuration file at:
+ * `{table_path}/.hoodie/.locks/audit_enabled.json`
+ *
+ * @author Apache Hudi
+ * @since 1.0.0
+ */
+class SetAuditLockProcedure extends BaseProcedure with ProcedureBuilder {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType),
+    ProcedureParameter.required(2, "state", DataTypes.StringType)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("table", DataTypes.StringType, nullable = false, 
Metadata.empty),
+    StructField("audit_state", DataTypes.StringType, nullable = false, 
Metadata.empty),
+    StructField("message", DataTypes.StringType, nullable = false, 
Metadata.empty)
+  ))
+
+  private val OBJECT_MAPPER = new ObjectMapper()
+
+  /**
+   * Returns the procedure parameters definition.
+   *
+   * @return Array of parameters: table (optional String), path (optional 
String), and state (required String)
+   */
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  /**
+   * Returns the output schema for the procedure result.
+   *
+   * @return StructType containing table, audit_state, and message columns
+   */
+  def outputType: StructType = OUTPUT_TYPE
+
+  /**
+   * Executes the audit lock set procedure.
+   *
+   * @param args Procedure arguments containing table name or path and desired 
state
+   * @return Sequence containing a single Row with execution results
+   * @throws IllegalArgumentException if state parameter is not 'enabled' or 
'disabled'
+   */
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+    val state = getArgValueOrDefault(args, 
PARAMETERS(2)).get.asInstanceOf[String].toLowerCase
+
+    // Validate state parameter
+    if (state != "enabled" && state != "disabled") {
+      throw new IllegalArgumentException("State parameter must be 'enabled' or 
'disabled'")
+    }
+
+    // Get the base path using BaseProcedure helper (handles table/path 
validation)
+    val basePath: String = getBasePath(tableName, tablePath)
+    val metaClient = createMetaClient(jsc, basePath)
+
+    // Use table name if provided, otherwise extract from path
+    val displayName = 
tableName.map(_.asInstanceOf[String]).getOrElse(tablePath.get.asInstanceOf[String])
+
+    try {
+      val auditEnabled = state == "enabled"
+      setAuditState(metaClient, basePath, auditEnabled)
+
+      val resultState = if (auditEnabled) "enabled" else "disabled"
+      val message = s"Lock audit logging successfully $resultState"
+
+      Seq(Row(displayName, resultState, message))
+    } catch {
+      case e: Exception =>
+        val errorMessage = s"Failed to set audit state: ${e.getMessage}"
+        Seq(Row(displayName, "error", errorMessage))
+    }
+  }
+
+  /**
+   * Sets the audit state by creating or updating the audit configuration file.
+   *
+   * @param metaClient Hudi table meta client for storage operations
+   * @param basePath Base path of the Hudi table
+   * @param enabled Whether audit logging should be enabled
+   * @throws RuntimeException if unable to write the audit configuration
+   */
+  private def setAuditState(metaClient: HoodieTableMetaClient, basePath: 
String, enabled: Boolean): Unit = {
+    val storage = metaClient.getStorage
+    val lockFolderPath = StorageLockClient.getLockFolderPath(basePath)
+    val auditConfigPath = new 
StoragePath(StorageLockProviderAuditService.getAuditConfigPath(basePath))
+
+    // Ensure the locks folder exists
+    if (!storage.exists(new StoragePath(lockFolderPath))) {
+      storage.createDirectory(new StoragePath(lockFolderPath))
+    }
+
+    // Create or update the audit configuration file
+    val jsonContent = createAuditConfig(enabled)
+
+    Try {
+      val outputStream = storage.create(auditConfigPath, true) // overwrite if 
exists
+      try {
+        outputStream.write(jsonContent.getBytes("UTF-8"))
+      } finally {
+        outputStream.close()
+      }
+    } match {
+      case Success(_) =>
+        // Configuration written successfully
+      case Failure(exception) =>
+        throw new RuntimeException(s"Failed to write audit configuration to 
${auditConfigPath.toString}", exception)
+    }
+  }
+
+  /**
+   * Creates the JSON configuration content for audit settings.
+   *
+   * @param enabled Whether audit logging should be enabled
+   * @return JSON string representation of the audit configuration
+   */
+  private def createAuditConfig(enabled: Boolean): String = {
+    val rootNode: ObjectNode = OBJECT_MAPPER.createObjectNode()
+    
rootNode.put(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD,
 enabled)
+    OBJECT_MAPPER.writeValueAsString(rootNode)
+  }
+
+  override def build: Procedure = new SetAuditLockProcedure()
+}
+
+/**
+ * Companion object for SetAuditLockProcedure containing constants and factory 
methods.
+ */
+object SetAuditLockProcedure {
+  val NAME = "set_audit_lock"
+
+  /**
+   * Factory method to create procedure builder instances.
+   *
+   * @return Supplier that creates new SetAuditLockProcedure instances
+   */
+  def builder: Supplier[ProcedureBuilder] = () => new SetAuditLockProcedure()
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowAuditLockStatusProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowAuditLockStatusProcedure.scala
new file mode 100644
index 000000000000..97cce27d4586
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowAuditLockStatusProcedure.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import 
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService
+import org.apache.hudi.common.util.FileIOUtils
+import org.apache.hudi.storage.StoragePath
+
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Spark SQL procedure for showing the current audit logging status for Hudi 
tables.
+ *
+ * This procedure allows users to check whether audit logging is currently 
enabled
+ * or disabled for storage lock operations. It reads the audit configuration 
file
+ * and reports the current status along with relevant paths.
+ *
+ * Usage:
+ * {{{
+ * CALL show_audit_lock_status(table => 'my_table')
+ * CALL show_audit_lock_status(path => '/path/to/table')
+ * }}}
+ *
+ * The procedure reads the audit configuration file from:
+ * `{table_path}/.hoodie/.locks/audit_enabled.json`
+ *
+ * @author Apache Hudi
+ * @since 1.0.0
+ */
+class ShowAuditLockStatusProcedure extends BaseProcedure with ProcedureBuilder 
{
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("table", DataTypes.StringType, nullable = false, 
Metadata.empty),
+    StructField("audit_enabled", DataTypes.BooleanType, nullable = false, 
Metadata.empty),
+    StructField("config_path", DataTypes.StringType, nullable = false, 
Metadata.empty),
+    StructField("audit_folder_path", DataTypes.StringType, nullable = false, 
Metadata.empty)
+  ))
+
+  private val OBJECT_MAPPER = new ObjectMapper()
+
+  /**
+   * Returns the procedure parameters definition.
+   *
+   * @return Array of parameters: table (optional String) and path (optional 
String)
+   */
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  /**
+   * Returns the output schema for the procedure result.
+   *
+   * @return StructType containing table, audit_enabled, config_path, and 
audit_folder_path columns
+   */
+  def outputType: StructType = OUTPUT_TYPE
+
+  /**
+   * Executes the show audit lock status procedure.
+   *
+   * @param args Procedure arguments containing table name or path
+   * @return Sequence containing a single Row with status information
+   * @throws IllegalArgumentException if neither table nor path is provided, 
or both are provided
+   */
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+
+    // Get the base path using BaseProcedure helper (handles table/path 
validation)
+    val basePath: String = getBasePath(tableName, tablePath)
+    val metaClient = createMetaClient(jsc, basePath)
+
+    // Use table name if provided, otherwise extract from path
+    val displayName = 
tableName.map(_.asInstanceOf[String]).getOrElse(tablePath.get.asInstanceOf[String])
+
+    try {
+      val auditStatus = checkAuditStatus(metaClient, basePath)
+      val configPath = 
StorageLockProviderAuditService.getAuditConfigPath(basePath)
+      val auditFolderPath = 
StorageLockProviderAuditService.getAuditFolderPath(basePath)
+
+      Seq(Row(displayName, auditStatus, configPath, auditFolderPath))
+    } catch {
+      case e: Exception =>
+        // Return false for audit status if we can't read the config
+        val configPath = 
StorageLockProviderAuditService.getAuditConfigPath(basePath)
+        val auditFolderPath = 
StorageLockProviderAuditService.getAuditFolderPath(basePath)
+        Seq(Row(displayName, false, configPath, auditFolderPath))
+    }
+  }
+
+  /**
+   * Checks the current audit status by reading the audit configuration file.
+   *
+   * @param metaClient Hudi table meta client for storage operations
+   * @param basePath Base path of the Hudi table
+   * @return true if audit logging is enabled, false otherwise
+   */
+  private def checkAuditStatus(metaClient: 
org.apache.hudi.common.table.HoodieTableMetaClient, basePath: String): Boolean 
= {
+    val storage = metaClient.getStorage
+    val auditConfigPath = new 
StoragePath(StorageLockProviderAuditService.getAuditConfigPath(basePath))
+
+    if (!storage.exists(auditConfigPath)) {
+      false
+    } else {
+
+      Try {
+        val inputStream = storage.open(auditConfigPath)
+        try {
+          val configContent = new 
String(FileIOUtils.readAsByteArray(inputStream))
+          val rootNode: JsonNode = OBJECT_MAPPER.readTree(configContent)
+          val enabledNode = 
rootNode.get(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD)
+          Option(enabledNode).exists(_.asBoolean(false))
+        } finally {
+          inputStream.close()
+        }
+      } match {
+        case Success(enabled) => enabled
+        case Failure(_) => false
+      }
+    }
+  }
+
+  /**
+   * Builds a new instance of the ShowAuditLockStatusProcedure.
+   *
+   * @return New ShowAuditLockStatusProcedure instance
+   */
+  override def build: Procedure = new ShowAuditLockStatusProcedure()
+}
+
+/**
+ * Companion object for ShowAuditLockStatusProcedure containing constants and 
factory methods.
+ */
+object ShowAuditLockStatusProcedure {
+  /** The name used to register and invoke this procedure */
+  val NAME = "show_audit_lock_status"
+
+  /**
+   * Factory method to create procedure builder instances.
+   *
+   * @return Supplier that creates new ShowAuditLockStatusProcedure instances
+   */
+  def builder: Supplier[ProcedureBuilder] = () => new 
ShowAuditLockStatusProcedure()
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSetAuditLockProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSetAuditLockProcedure.scala
new file mode 100644
index 000000000000..d8c2c79d8a57
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSetAuditLockProcedure.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import java.io.File
+
+/**
+ * Test suite for the SetAuditLockProcedure Spark SQL procedure.
+ *
+ * This class contains comprehensive tests to verify the functionality of
+ * the set_audit_lock procedure, including enabling/disabling audit logging,
+ * parameter validation, and error handling scenarios.
+ *
+ * @author Apache Hudi
+ * @since 1.1.0
+ */
+class TestSetAuditLockProcedure extends HoodieSparkProcedureTestBase {
+
+  override def generateTableName: String = {
+    super.generateTableName.split("\\.").last
+  }
+
+  /**
+   * Helper method to create a test table and return its path.
+   */
+  private def createTestTable(tmp: File, tableName: String): String = {
+    spark.sql(
+      s"""
+         |create table $tableName (
+         |  id int,
+         |  name string,
+         |  price double,
+         |  ts long
+         |) using hudi
+         | location '${tmp.getCanonicalPath}/$tableName'
+         | tblproperties (
+         |  primaryKey = 'id',
+         |  orderingFields = 'ts'
+         | )
+       """.stripMargin)
+    // Insert data to initialize the Hudi metadata structure
+    spark.sql(s"insert into $tableName select 1, 'test', 10.0, 1000")
+    s"${tmp.getCanonicalPath}/$tableName"
+  }
+
+  /**
+   * Test enabling audit logging using table name parameter.
+   */
+  test("Test Call set_audit_lock Procedure - Enable Audit with Table Name") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = createTestTable(tmp, tableName)
+
+      val result = spark.sql(s"""call set_audit_lock(table => '$tableName', 
state => 'enabled')""").collect()
+
+      assertResult(1)(result.length)
+      assertResult(tableName)(result.head.get(0))
+      assertResult("enabled")(result.head.get(1))
+      assert(result.head.get(2).toString.contains("successfully enabled"))
+    }
+  }
+
+  /**
+   * Test enabling audit logging using path parameter.
+   */
+  test("Test Call set_audit_lock Procedure - Enable Audit with Path") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = createTestTable(tmp, tableName)
+
+      val result = spark.sql(s"""call set_audit_lock(path => '$tablePath', 
state => 'enabled')""").collect()
+
+      assertResult(1)(result.length)
+      assertResult(tablePath)(result.head.get(0))
+      assertResult("enabled")(result.head.get(1))
+      assert(result.head.get(2).toString.contains("successfully enabled"))
+    }
+  }
+
+  /**
+   * Test disabling audit logging using table name parameter.
+   */
+  test("Test Call set_audit_lock Procedure - Disable Audit with Table Name") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = createTestTable(tmp, tableName)
+
+      val result = spark.sql(s"""call set_audit_lock(table => '$tableName', 
state => 'disabled')""").collect()
+
+      assertResult(1)(result.length)
+      assertResult(tableName)(result.head.get(0))
+      assertResult("disabled")(result.head.get(1))
+      assert(result.head.get(2).toString.contains("successfully disabled"))
+    }
+  }
+
+  /**
+   * Test disabling audit logging using path parameter.
+   */
+  test("Test Call set_audit_lock Procedure - Disable Audit with Path") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = createTestTable(tmp, tableName)
+
+      val result = spark.sql(s"""call set_audit_lock(path => '$tablePath', 
state => 'disabled')""").collect()
+
+      assertResult(1)(result.length)
+      assertResult(tablePath)(result.head.get(0))
+      assertResult("disabled")(result.head.get(1))
+      assert(result.head.get(2).toString.contains("successfully disabled"))
+    }
+  }
+
+  /**
+   * Test parameter validation by providing an invalid state parameter.
+   * Verifies that the procedure rejects invalid state values and provides
+   * an appropriate error message.
+   */
+  test("Test Call set_audit_lock Procedure - Invalid State Parameter") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      createTestTable(tmp, tableName)
+
+      // Test invalid state parameter
+      checkExceptionContain(s"""call set_audit_lock(table => '$tableName', 
state => 'invalid')""")(
+        "State parameter must be 'enabled' or 'disabled'")
+    }
+  }
+
+  /**
+   * Test parameter validation by omitting required arguments.
+   * Verifies that the procedure properly validates required parameters
+   * and provides appropriate error messages for missing arguments.
+   */
+  test("Test Call set_audit_lock Procedure - Missing Required Arguments") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = createTestTable(tmp, tableName)
+
+      // Test missing both table and path parameters
+      checkExceptionContain(s"""call set_audit_lock(state => 'enabled')""")(
+        "Table name or table path must be given one")
+
+      // Test missing state parameter
+      checkExceptionContain(s"""call set_audit_lock(table => '$tableName')""")(
+        "Argument: state is required")
+
+      // Test providing both table and path parameters - should work fine 
(uses table parameter)
+      val result = spark.sql(s"""call set_audit_lock(table => '$tableName', 
path => '$tablePath', state => 'enabled')""").collect()
+      assertResult(1)(result.length)
+      assertResult(tableName)(result.head.getString(0))
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowAuditLockStatusProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowAuditLockStatusProcedure.scala
new file mode 100644
index 000000000000..87cf04967694
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowAuditLockStatusProcedure.scala
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import java.io.File
+
+/**
+ * Test suite for the ShowAuditLockStatusProcedure Spark SQL procedure.
+ *
+ * This class contains comprehensive tests to verify the functionality of
+ * the show_audit_lock_status procedure, including status checking with both
+ * table name and path parameters, and various audit states.
+ *
+ * @author Apache Hudi
+ * @since 1.1.0
+ */
+class TestShowAuditLockStatusProcedure extends HoodieSparkProcedureTestBase {
+
+  override def generateTableName: String = {
+    super.generateTableName.split("\\.").last
+  }
+
+  /**
+   * Helper method to create a test table and return its path.
+   */
+  private def createTestTable(tmp: File, tableName: String): String = {
+    spark.sql(
+      s"""
+         |create table $tableName (
+         |  id int,
+         |  name string,
+         |  price double,
+         |  ts long
+         |) using hudi
+         | location '${tmp.getCanonicalPath}/$tableName'
+         | tblproperties (
+         |  primaryKey = 'id',
+         |  orderingFields = 'ts'
+         | )
+       """.stripMargin)
+    // Insert data to initialize the Hudi metadata structure
+    spark.sql(s"insert into $tableName select 1, 'test', 10.0, 1000")
+    s"${tmp.getCanonicalPath}/$tableName"
+  }
+
+  /**
+   * Test showing audit status when audit is disabled (default state) using 
table name.
+   */
+  test("Test Show Audit Status - Disabled with Table Name") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      createTestTable(tmp, tableName)
+
+      val result = spark.sql(s"""call show_audit_lock_status(table => 
'$tableName')""").collect()
+
+      assertResult(1)(result.length)
+      assertResult(tableName)(result.head.get(0)) // table name
+      assertResult(false)(result.head.get(1)) // audit_enabled
+      assert(result.head.get(2).toString.contains("audit_enabled.json")) // 
config_path
+      assert(result.head.get(3).toString.contains("audit")) // 
audit_folder_path
+    }
+  }
+
+  /**
+   * Test showing audit status when audit is disabled (default state) using 
path.
+   */
+  test("Test Show Audit Status - Disabled with Path") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = createTestTable(tmp, tableName)
+
+      val result = spark.sql(s"""call show_audit_lock_status(path => 
'$tablePath')""").collect()
+
+      assertResult(1)(result.length)
+      assertResult(tablePath)(result.head.get(0)) // path
+      assertResult(false)(result.head.get(1)) // audit_enabled
+      assert(result.head.get(2).toString.contains("audit_enabled.json")) // 
config_path
+      assert(result.head.get(3).toString.contains("audit")) // 
audit_folder_path
+    }
+  }
+
+  /**
+   * Test showing audit status when audit is enabled using table name.
+   */
+  test("Test Show Audit Status - Enabled with Table Name") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      createTestTable(tmp, tableName)
+
+      // First enable audit logging
+      spark.sql(s"""call set_audit_lock(table => '$tableName', state => 
'enabled')""")
+
+      // Then check the status
+      val result = spark.sql(s"""call show_audit_lock_status(table => 
'$tableName')""").collect()
+
+      assertResult(1)(result.length)
+      assertResult(tableName)(result.head.get(0)) // table name
+      assertResult(true)(result.head.get(1)) // audit_enabled
+      assert(result.head.get(2).toString.contains("audit_enabled.json")) // 
config_path
+      assert(result.head.get(3).toString.contains("audit")) // 
audit_folder_path
+    }
+  }
+
+  /**
+   * Test showing audit status when audit is enabled using path.
+   */
+  test("Test Show Audit Status - Enabled with Path") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = createTestTable(tmp, tableName)
+
+      // First enable audit logging
+      spark.sql(s"""call set_audit_lock(path => '$tablePath', state => 
'enabled')""")
+
+      // Then check the status
+      val result = spark.sql(s"""call show_audit_lock_status(path => 
'$tablePath')""").collect()
+
+      assertResult(1)(result.length)
+      assertResult(tablePath)(result.head.get(0)) // path
+      assertResult(true)(result.head.get(1)) // audit_enabled
+      assert(result.head.get(2).toString.contains("audit_enabled.json")) // 
config_path
+      assert(result.head.get(3).toString.contains("audit")) // 
audit_folder_path
+    }
+  }
+
+  /**
+   * Test audit status after enabling and then disabling audit.
+   * Verifies that the status correctly reflects the disabled state.
+   */
+  test("Test Show Audit Status After Enable and Disable") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      createTestTable(tmp, tableName)
+
+      // Enable audit
+      spark.sql(s"""call set_audit_lock(table => '$tableName', state => 
'enabled')""")
+      // Verify enabled status
+      val enabledResult = spark.sql(s"""call show_audit_lock_status(table => 
'$tableName')""").collect()
+      assertResult(true)(enabledResult.head.get(1))
+
+      // Disable audit
+      spark.sql(s"""call set_audit_lock(table => '$tableName', state => 
'disabled')""")
+      // Verify disabled status
+      val disabledResult = spark.sql(s"""call show_audit_lock_status(table => 
'$tableName')""").collect()
+      assertResult(false)(disabledResult.head.get(1))
+    }
+  }
+
+  /**
+   * Test parameter validation by omitting required arguments.
+   * Verifies that the procedure properly validates required parameters.
+   */
+  test("Test Show Audit Status - Missing Required Arguments") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = createTestTable(tmp, tableName)
+
+      // Test missing both table and path parameters
+      checkExceptionContain(s"""call show_audit_lock_status()""")(
+        "Table name or table path must be given one")
+
+      // Test providing both table and path parameters - should work fine 
(uses table parameter)
+      val result = spark.sql(s"""call show_audit_lock_status(table => 
'$tableName', path => '$tablePath')""").collect()
+      assertResult(1)(result.length)
+      assertResult(tableName)(result.head.getString(0))
+    }
+  }
+
+  /**
+   * Test showing audit status for a table that doesn't exist.
+   * Verifies graceful handling of non-existent tables.
+   */
+  test("Test Show Audit Status - Non-existent Table") {
+    val nonExistentTable = "non_existent_table_" + System.currentTimeMillis()
+    // This should throw an exception for non-existent table
+    assertThrows[Exception] {
+      spark.sql(s"""call show_audit_lock_status(table => 
'$nonExistentTable')""").collect()
+    }
+  }
+
+  /**
+   * Test output schema verification.
+   * Ensures the procedure returns the correct column structure.
+   */
+  test("Test Show Audit Status - Output Schema") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      createTestTable(tmp, tableName)
+
+      val result = spark.sql(s"""call show_audit_lock_status(table => 
'$tableName')""")
+      val schema = result.schema
+
+      // Verify column count and names
+      assertResult(4)(schema.fields.length)
+      assertResult("table")(schema.fields(0).name)
+      assertResult("audit_enabled")(schema.fields(1).name)
+      assertResult("config_path")(schema.fields(2).name)
+      assertResult("audit_folder_path")(schema.fields(3).name)
+
+      // Verify column types
+      assertResult("string")(schema.fields(0).dataType.typeName)
+      assertResult("boolean")(schema.fields(1).dataType.typeName)
+      assertResult("string")(schema.fields(2).dataType.typeName)
+      assertResult("string")(schema.fields(3).dataType.typeName)
+    }
+  }
+
+}


Reply via email to