This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 41c5765908b0 [HUDI-9782] Add validation and cleanup apis for storage 
LP audit validation (#13886)
41c5765908b0 is described below

commit 41c5765908b050d5b36b279c97cf5ae65fec78ab
Author: Alex R <[email protected]>
AuthorDate: Tue Oct 7 10:43:10 2025 -0700

    [HUDI-9782] Add validation and cleanup apis for storage LP audit validation 
(#13886)
---
 .../hudi/cli/commands/LockAuditingCommand.java     | 509 +++++++++++++++-
 .../hudi/cli/commands/TestLockAuditingCommand.java | 660 ++++++++++++++++++++-
 .../transaction/lock/StorageBasedLockProvider.java |   5 +-
 .../client/transaction/lock/StorageLockClient.java |   2 +-
 .../audit/StorageLockProviderAuditService.java     |   9 +-
 .../lock/models/StorageLockClientFileTest.java     |  58 ++
 .../procedures/CleanupAuditLockProcedure.scala     | 190 ++++++
 .../hudi/command/procedures/HoodieProcedures.scala |   2 +
 .../procedures/ValidateAuditLockProcedure.scala    | 314 ++++++++++
 .../procedure/TestCleanupAuditLockProcedure.scala  | 293 +++++++++
 .../procedure/TestValidateAuditLockProcedure.scala | 454 ++++++++++++++
 11 files changed, 2469 insertions(+), 27 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java
index 51e2dbe7fe6c..18d3862aff63 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java
@@ -21,8 +21,12 @@ package org.apache.hudi.cli.commands;
 import org.apache.hudi.cli.HoodieCLI;
 import 
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService;
 import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -32,8 +36,14 @@ import org.springframework.shell.standard.ShellComponent;
 import org.springframework.shell.standard.ShellMethod;
 import org.springframework.shell.standard.ShellOption;
 
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
 
 /**
  * CLI commands for managing Hudi table lock auditing functionality.
@@ -44,6 +54,132 @@ public class LockAuditingCommand {
   private static final Logger LOG = 
LoggerFactory.getLogger(LockAuditingCommand.class);
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
+  /**
+   * Represents a single audit log entry in the JSONL file.
+   * Maps to the structure written by StorageLockProviderAuditService.
+   */
+  public static class AuditRecord {
+    private final String ownerId;
+    private final long transactionStartTime;
+    private final long timestamp;
+    private final String state;
+    private final long lockExpiration;
+    private final boolean lockHeld;
+
+    @JsonCreator
+    public AuditRecord(
+        @JsonProperty("ownerId") String ownerId,
+        @JsonProperty("transactionStartTime") long transactionStartTime,
+        @JsonProperty("timestamp") long timestamp,
+        @JsonProperty("state") String state,
+        @JsonProperty("lockExpiration") long lockExpiration,
+        @JsonProperty("lockHeld") boolean lockHeld) {
+      this.ownerId = ownerId;
+      this.transactionStartTime = transactionStartTime;
+      this.timestamp = timestamp;
+      this.state = state;
+      this.lockExpiration = lockExpiration;
+      this.lockHeld = lockHeld;
+    }
+
+    public String getOwnerId() {
+      return ownerId;
+    }
+
+    public long getTransactionStartTime() {
+      return transactionStartTime;
+    }
+
+    public long getTimestamp() {
+      return timestamp;
+    }
+
+    public String getState() {
+      return state;
+    }
+
+    public long getLockExpiration() {
+      return lockExpiration;
+    }
+
+    public boolean isLockHeld() {
+      return lockHeld;
+    }
+  }
+
+  /**
+   * Represents a transaction window with start time, end time, and metadata.
+   */
+  static class TransactionWindow {
+    final String ownerId;
+    final long transactionCreationTime;
+    final long lockAcquisitionTime;
+    final Option<Long> lockReleaseTime;
+    final Option<Long> lockExpirationTime;
+    final String filename;
+
+    TransactionWindow(String ownerId, long transactionCreationTime, long 
lockAcquisitionTime,
+                      Option<Long> lockReleaseTime, Option<Long> 
lockExpirationTime, String filename) {
+      this.ownerId = ownerId;
+      this.transactionCreationTime = transactionCreationTime;
+      this.lockAcquisitionTime = lockAcquisitionTime;
+      this.lockReleaseTime = lockReleaseTime;
+      this.lockExpirationTime = lockExpirationTime;
+      this.filename = filename;
+    }
+
+    long getEffectiveEndTime() {
+      return 
lockReleaseTime.orElse(lockExpirationTime.orElse(lockAcquisitionTime));
+    }
+  }
+
+  /**
+   * Holds validation results.
+   */
+  static class ValidationResults {
+    final List<String> errors;
+    final List<String> warnings;
+
+    ValidationResults(List<String> errors, List<String> warnings) {
+      this.errors = errors;
+      this.warnings = warnings;
+    }
+  }
+
+  /**
+   * Holds cleanup operation results.
+   */
+  static class CleanupResult {
+    final boolean success;
+    final int deletedCount;
+    final int failedCount;
+    final int totalFiles;
+    final String message;
+    final boolean isDryRun;
+
+    CleanupResult(boolean success, int deletedCount, int failedCount, int 
totalFiles, String message, boolean isDryRun) {
+      this.success = success;
+      this.deletedCount = deletedCount;
+      this.failedCount = failedCount;
+      this.totalFiles = totalFiles;
+      this.message = message;
+      this.isDryRun = isDryRun;
+    }
+
+    boolean hasErrors() {
+      return failedCount > 0;
+    }
+
+    boolean hasFiles() {
+      return totalFiles > 0;
+    }
+
+    @Override
+    public String toString() {
+      return message;
+    }
+  }
+
   /**
    * Enables lock audit logging for the currently connected Hudi table.
    * This command creates or updates the audit configuration file to enable
@@ -101,18 +237,20 @@ public class LockAuditingCommand {
     try {
       // Create the audit config file path
       String auditConfigPath = 
StorageLockProviderAuditService.getAuditConfigPath(HoodieCLI.basePath);
-      
-      // Check if config file exists
       StoragePath configPath = new StoragePath(auditConfigPath);
-      if (!HoodieCLI.storage.exists(configPath)) {
+
+      // Check if config file exists by attempting to get its info
+      try {
+        HoodieCLI.storage.getPathInfo(configPath);
+      } catch (FileNotFoundException e) {
         return "Lock audit is already disabled (no configuration file found).";
       }
-      
+
       // Create the JSON content with audit disabled
       ObjectNode configJson = OBJECT_MAPPER.createObjectNode();
       
configJson.put(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD,
 false);
       String jsonContent = OBJECT_MAPPER.writeValueAsString(configJson);
-      
+
       // Write the config file
       try (OutputStream outputStream = HoodieCLI.storage.create(configPath, 
true)) {
         outputStream.write(jsonContent.getBytes());
@@ -123,8 +261,15 @@ public class LockAuditingCommand {
       if (keepAuditFiles) {
         message += String.format("\nExisting audit files preserved at: %s", 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
       } else {
-        // Todo: write then call the api method to prune the old files
-        message += String.format("\nAudit files cleaned up at: %s", 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
+        // Call the cleanup method to prune all old files
+        CleanupResult cleanupResult = performAuditCleanup(false, 0);
+        if (cleanupResult.success && cleanupResult.deletedCount > 0) {
+          message += String.format("\nAudit files cleaned up at: %s", 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
+        } else if (cleanupResult.success && cleanupResult.deletedCount == 0) {
+          message += String.format("\nNo audit files to clean up at: %s", 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
+        } else {
+          message += String.format("\nWarning: Some audit files may not have 
been cleaned up: %s", cleanupResult.message);
+        }
       }
       
       return message;
@@ -152,21 +297,18 @@ public class LockAuditingCommand {
     try {
       // Create the audit config file path
       String auditConfigPath = 
StorageLockProviderAuditService.getAuditConfigPath(HoodieCLI.basePath);
-      
-      // Check if config file exists
       StoragePath configPath = new StoragePath(auditConfigPath);
-      if (!HoodieCLI.storage.exists(configPath)) {
-        return String.format("Lock Audit Status: DISABLED\n"
-            + "Table: %s\n"
-            + "Config file: %s (not found)\n"
-            + "Use 'locks audit enable' to enable audit logging.", 
-            HoodieCLI.basePath, auditConfigPath);
-      }
-      
+
       // Read and parse the configuration
       String configContent;
       try (InputStream inputStream = HoodieCLI.storage.open(configPath)) {
         configContent = new String(FileIOUtils.readAsByteArray(inputStream));
+      } catch (FileNotFoundException e) {
+        return String.format("Lock Audit Status: DISABLED\n"
+            + "Table: %s\n"
+            + "Config file: %s (not found)\n"
+            + "Use 'locks audit enable' to enable audit logging.",
+            HoodieCLI.basePath, auditConfigPath);
       }
       JsonNode rootNode = OBJECT_MAPPER.readTree(configContent);
       JsonNode enabledNode = 
rootNode.get(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD);
@@ -185,4 +327,337 @@ public class LockAuditingCommand {
       return String.format("Failed to check lock audit status: %s", 
e.getMessage());
     }
   }
+
+  /**
+   * Validates the audit lock files for consistency and integrity.
+   * This command checks for issues such as corrupted files, invalid format,
+   * incorrect state transitions, and orphaned locks.
+   * 
+   * @return Validation results including any issues found
+   */
+  @ShellMethod(key = "locks audit validate", value = "Validate audit lock 
files for consistency and integrity")
+  public String validateAuditLocks() {
+    
+    if (HoodieCLI.basePath == null) {
+      return "No Hudi table loaded. Please connect to a table first.";
+    }
+
+    try {
+      String auditFolderPath = 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+      StoragePath auditFolder = new StoragePath(auditFolderPath);
+
+      // Get all audit files
+      List<StoragePathInfo> allFiles;
+      try {
+        allFiles = HoodieCLI.storage.listDirectEntries(auditFolder);
+      } catch (FileNotFoundException e) {
+        return "Validation Result: PASSED\n"
+            + "Transactions Validated: 0\n"
+            + "Issues Found: 0\n"
+            + "Details: No audit folder found - nothing to validate";
+      }
+      List<StoragePathInfo> auditFiles = new ArrayList<>();
+      for (StoragePathInfo pathInfo : allFiles) {
+        if (pathInfo.isFile() && 
pathInfo.getPath().getName().endsWith(".jsonl")) {
+          auditFiles.add(pathInfo);
+        }
+      }
+      
+      if (auditFiles.isEmpty()) {
+        return "Validation Result: PASSED\n"
+            + "Transactions Validated: 0\n"
+            + "Issues Found: 0\n"
+            + "Details: No audit files found - nothing to validate";
+      }
+      
+      // Parse all audit files into transaction windows
+      List<TransactionWindow> windows = new ArrayList<>();
+      List<String> parseErrors = new ArrayList<>();
+      for (StoragePathInfo pathInfo : auditFiles) {
+        Option<TransactionWindow> window = parseAuditFile(pathInfo);
+        if (window.isPresent()) {
+          windows.add(window.get());
+        } else {
+          parseErrors.add(String.format("[ERROR] Failed to parse audit file: 
%s", pathInfo.getPath().getName()));
+        }
+      }
+
+      if (windows.isEmpty()) {
+        return String.format("Validation Result: FAILED\n"
+            + "Transactions Validated: 0\n"
+            + "Issues Found: %d\n"
+            + "Details: Failed to parse any audit files", auditFiles.size());
+      }
+
+      // Validate transactions
+      ValidationResults validationResults = 
validateTransactionWindows(windows);
+
+      // Add parse errors to the validation results
+      validationResults.errors.addAll(parseErrors);
+      
+      // Generate result
+      int totalIssues = validationResults.errors.size() + 
validationResults.warnings.size();
+      String result;
+      String details;
+      
+      if (totalIssues == 0) {
+        result = "PASSED";
+        details = "All audit lock transactions validated successfully";
+      } else {
+        result = validationResults.errors.isEmpty() ? "WARNING" : "FAILED";
+        List<String> allIssues = new ArrayList<>();
+        allIssues.addAll(validationResults.errors);
+        allIssues.addAll(validationResults.warnings);
+        details = String.join(", ", allIssues);
+      }
+      
+      return String.format("Validation Result: %s\n"
+          + "Audit Files: %d total, %d parsed successfully, %d failed to 
parse\n"
+          + "Transactions Validated: %d\n"
+          + "Issues Found: %d\n"
+          + "Details: %s", result, auditFiles.size(), windows.size(), 
parseErrors.size(), windows.size(), totalIssues, details);
+      
+    } catch (Exception e) {
+      LOG.error("Error validating audit locks", e);
+      return String.format("Validation Result: ERROR\n"
+          + "Transactions Validated: 0\n"
+          + "Issues Found: -1\n"
+          + "Details: Validation failed: %s", e.getMessage());
+    }
+  }
+
+  /**
+   * Cleans up old audit lock files based on age threshold.
+   * This command removes audit files that are older than the specified number 
of days.
+   * 
+   * @param dryRun Whether to perform a dry run (preview changes without 
deletion)
+   * @param ageDays Number of days to keep audit files (default 7)
+   * @return Status message indicating files cleaned or to be cleaned
+   */
+  @ShellMethod(key = "locks audit cleanup", value = "Clean up old audit lock 
files")
+  public String cleanupAuditLocks(
+      @ShellOption(value = {"--dryRun"}, defaultValue = "false",
+          help = "Preview changes without actually deleting files") final 
boolean dryRun,
+      @ShellOption(value = {"--ageDays"}, defaultValue = "7",
+          help = "Delete audit files older than this many days") final String 
ageDaysStr) {
+
+    try {
+      if (HoodieCLI.basePath == null) {
+        return "No Hudi table loaded. Please connect to a table first.";
+      }
+
+      // Parse ageDays manually to handle validation properly
+      int ageDays;
+      try {
+        ageDays = Integer.parseInt(ageDaysStr);
+      } catch (NumberFormatException e) {
+        return "Error: ageDays must be a value greater than 0.";
+      }
+
+      if (ageDays < 0) {
+        return "Error: ageDays must be non-negative (>= 0).";
+      }
+
+      return performAuditCleanup(dryRun, ageDays).toString();
+    } catch (Exception e) {
+      LOG.error("Error during audit cleanup", e);
+      return String.format("Error during cleanup: %s", e.getMessage() != null 
? e.getMessage() : e.getClass().getSimpleName());
+    }
+  }
+
+  /**
+   * Internal method to perform audit cleanup. Used by both the CLI command 
and disable method.
+   *
+   * @param dryRun Whether to perform a dry run (preview changes without 
deletion)
+   * @param ageDays Number of days to keep audit files (0 means delete all, 
must be non-negative)
+   * @return CleanupResult containing cleanup operation details
+   */
+  private CleanupResult performAuditCleanup(boolean dryRun, int ageDays) {
+    try {
+      if (HoodieCLI.storage == null) {
+        String message = "Storage not initialized.";
+        return new CleanupResult(false, 0, 0, 0, message, dryRun);
+      }
+
+      String auditFolderPath = 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+      StoragePath auditFolder = new StoragePath(auditFolderPath);
+
+      // Calculate cutoff timestamp (ageDays ago)
+      long cutoffTime = System.currentTimeMillis() - (ageDays * 24L * 60L * 
60L * 1000L);
+
+      // List all files in audit folder and filter by modification time
+      List<StoragePathInfo> allFiles;
+      try {
+        allFiles = HoodieCLI.storage.listDirectEntries(auditFolder);
+      } catch (FileNotFoundException e) {
+        String message = "No audit folder found - nothing to cleanup.";
+        return new CleanupResult(true, 0, 0, 0, message, dryRun);
+      }
+      List<StoragePathInfo> auditFiles = new ArrayList<>();
+      List<StoragePathInfo> oldFiles = new ArrayList<>();
+      
+      // Filter to get only .jsonl files
+      for (StoragePathInfo pathInfo : allFiles) {
+        if (pathInfo.isFile() && 
pathInfo.getPath().getName().endsWith(".jsonl")) {
+          auditFiles.add(pathInfo);
+          if (pathInfo.getModificationTime() < cutoffTime) {
+            oldFiles.add(pathInfo);
+          }
+        }
+      }
+
+      if (oldFiles.isEmpty()) {
+        String message = String.format("No audit files older than %d days 
found.", ageDays);
+        return new CleanupResult(true, 0, 0, auditFiles.size(), message, 
dryRun);
+      }
+
+      int fileCount = oldFiles.size();
+
+      if (dryRun) {
+        String message = String.format("Dry run: Would delete %d audit files 
older than %d days.", fileCount, ageDays);
+        return new CleanupResult(true, 0, 0, fileCount, message, dryRun);
+      } else {
+        // Actually delete the files
+        int deletedCount = 0;
+        int failedCount = 0;
+
+        for (StoragePathInfo pathInfo : oldFiles) {
+          try {
+            HoodieCLI.storage.deleteFile(pathInfo.getPath());
+            deletedCount++;
+          } catch (Exception e) {
+            failedCount++;
+            LOG.warn("Failed to delete audit file: " + pathInfo.getPath(), e);
+          }
+        }
+        
+        if (failedCount == 0) {
+          String message = String.format("Successfully deleted %d audit files 
older than %d days.", deletedCount, ageDays);
+          return new CleanupResult(true, deletedCount, failedCount, fileCount, 
message, dryRun);
+        } else {
+          String message = String.format("Deleted %d audit files, failed to 
delete %d files.", deletedCount, failedCount);
+          return new CleanupResult(false, deletedCount, failedCount, 
fileCount, message, dryRun);
+        }
+      }
+      
+    } catch (Exception e) {
+      LOG.error("Error cleaning up audit locks", e);
+      String message = String.format("Failed to cleanup audit locks: %s", 
e.getMessage());
+      return new CleanupResult(false, 0, 0, 0, message, dryRun);
+    }
+  }
+
+  /**
+   * Parses an audit file and extracts transaction window information.
+   */
+  private Option<TransactionWindow> parseAuditFile(StoragePathInfo pathInfo) {
+    String filename = pathInfo.getPath().getName();
+
+    try {
+      // Read and parse JSONL content
+      List<AuditRecord> entries = new ArrayList<>();
+      try (InputStream inputStream = 
HoodieCLI.storage.open(pathInfo.getPath());
+           BufferedReader reader = new BufferedReader(new 
InputStreamReader(inputStream))) {
+        String line;
+        while ((line = reader.readLine()) != null) {
+          if (line.trim().isEmpty()) {
+            continue;
+          }
+          try {
+            AuditRecord entry = OBJECT_MAPPER.readValue(line, 
AuditRecord.class);
+            entries.add(entry);
+          } catch (Exception e) {
+            LOG.warn("Failed to parse JSON line in file " + filename + ": " + 
line, e);
+          }
+        }
+      }
+
+      if (entries.isEmpty()) {
+        return Option.empty();
+      }
+
+      // Extract transaction metadata from first entry
+      AuditRecord firstEntry = entries.get(0);
+      String ownerId = firstEntry.getOwnerId();
+      long transactionCreationTime = firstEntry.getTransactionStartTime();
+
+      // Find first START timestamp
+      long lockAcquisitionTime = transactionCreationTime; // default to 
transaction creation time
+      for (AuditRecord entry : entries) {
+        if ("START".equals(entry.getState())) {
+          lockAcquisitionTime = entry.getTimestamp();
+          break;
+        }
+      }
+
+      // Find last END timestamp
+      Option<Long> lockReleaseTime = Option.empty();
+      for (int i = entries.size() - 1; i >= 0; i--) {
+        AuditRecord entry = entries.get(i);
+        if ("END".equals(entry.getState())) {
+          lockReleaseTime = Option.of(entry.getTimestamp());
+          break;
+        }
+      }
+
+      // Find last expiration time as fallback
+      Option<Long> lockExpirationTime = Option.empty();
+      if (!entries.isEmpty()) {
+        AuditRecord lastEntry = entries.get(entries.size() - 1);
+        lockExpirationTime = Option.of(lastEntry.getLockExpiration());
+      }
+
+      return Option.of(new TransactionWindow(
+          ownerId,
+          transactionCreationTime,
+          lockAcquisitionTime,
+          lockReleaseTime,
+          lockExpirationTime,
+          filename
+      ));
+    } catch (Exception e) {
+      LOG.warn("Failed to parse audit file: " + filename, e);
+      return Option.empty();
+    }
+  }
+
+  /**
+   * Validates transaction windows for overlaps and proper closure.
+   */
+  private ValidationResults validateTransactionWindows(List<TransactionWindow> 
windows) {
+    List<String> errors = new ArrayList<>();
+    List<String> warnings = new ArrayList<>();
+    
+    // Check for transactions without proper END
+    for (TransactionWindow window : windows) {
+      if (!window.lockReleaseTime.isPresent()) {
+        warnings.add(String.format("[WARNING] %s => transaction did not end 
gracefully. This could be due to driver OOM or non-graceful shutdown.",
+            window.filename));
+      }
+    }
+
+    // Sort windows by start time for overlap detection
+    List<TransactionWindow> sortedWindows = new ArrayList<>(windows);
+    sortedWindows.sort(Comparator.comparingLong(w -> w.lockAcquisitionTime));
+
+    // Check for overlaps
+    for (int i = 0; i < sortedWindows.size(); i++) {
+      TransactionWindow currentWindow = sortedWindows.get(i);
+      long currentEnd = currentWindow.getEffectiveEndTime();
+
+      // Check all subsequent windows for overlaps
+      for (int j = i + 1; j < sortedWindows.size(); j++) {
+        TransactionWindow otherWindow = sortedWindows.get(j);
+        long otherStart = otherWindow.lockAcquisitionTime;
+
+        // Check if windows overlap
+        if (otherStart < currentEnd) {
+          errors.add(String.format("[ERROR] %s => overlaps with %s",
+              currentWindow.filename, otherWindow.filename));
+        }
+      }
+    }
+    
+    return new ValidationResults(errors, warnings);
+  }
 }
\ No newline at end of file
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestLockAuditingCommand.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestLockAuditingCommand.java
index da346110e124..266a7f4058a7 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestLockAuditingCommand.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestLockAuditingCommand.java
@@ -26,12 +26,17 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.junit.jupiter.api.BeforeEach;
 
+import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -56,6 +61,55 @@ public class TestLockAuditingCommand extends 
CLIFunctionalTestHarness {
 
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
+  /**
+   * Helper method to create AuditRecord for tests with lockHeld defaulting to 
true.
+   */
+  private static LockAuditingCommand.AuditRecord createAuditRecord(
+      String ownerId, long transactionStartTime, long timestamp, String state, 
long lockExpiration) {
+    return new LockAuditingCommand.AuditRecord(ownerId, transactionStartTime, 
timestamp, state, lockExpiration, true);
+  }
+
+  /**
+   * Represents a transaction scenario with its audit records
+   */
+  static class TransactionScenario {
+    final String filename; // e.g., "1234567890_owner1.jsonl"
+    final List<LockAuditingCommand.AuditRecord> records;
+
+    TransactionScenario(String filename, List<LockAuditingCommand.AuditRecord> 
records) {
+      this.filename = filename;
+      this.records = records;
+    }
+  }
+
+  /**
+   * Helper method to create audit files from scenarios
+   */
+  private void createAuditFiles(List<TransactionScenario> scenarios) throws 
IOException {
+    String auditFolderPath = 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+    StoragePath auditDir = new StoragePath(auditFolderPath);
+
+    // Create audit directory if it doesn't exist
+    if (!HoodieCLI.storage.exists(auditDir)) {
+      HoodieCLI.storage.createDirectory(auditDir);
+    }
+
+    for (TransactionScenario scenario : scenarios) {
+      StoragePath filePath = new StoragePath(auditDir, scenario.filename);
+      StringBuilder jsonLines = new StringBuilder();
+      for (LockAuditingCommand.AuditRecord record : scenario.records) {
+        if (jsonLines.length() > 0) {
+          jsonLines.append("\n");
+        }
+        jsonLines.append(OBJECT_MAPPER.writeValueAsString(record));
+      }
+
+      try (OutputStream outputStream = HoodieCLI.storage.create(filePath, 
true)) {
+        outputStream.write(jsonLines.toString().getBytes());
+      }
+    }
+  }
+
   @BeforeEach
   public void setUp() throws Exception {
     HoodieCLI.conf = storageConf();
@@ -243,6 +297,14 @@ public class TestLockAuditingCommand extends 
CLIFunctionalTestHarness {
     // First enable audit
     shell.evaluate(() -> "locks audit enable");
 
+    // Create some audit files to be cleaned up
+    List<TransactionScenario> scenarios = new ArrayList<>();
+    List<LockAuditingCommand.AuditRecord> records = new ArrayList<>();
+    records.add(createAuditRecord("owner1", 1000L, 1100L, "START", 61000L));
+    records.add(createAuditRecord("owner1", 1000L, 1200L, "END", 61000L));
+    scenarios.add(new TransactionScenario("1000_owner1.jsonl", records));
+    createAuditFiles(scenarios);
+
     // Disable with keepAuditFiles=false
     Object result = shell.evaluate(() -> "locks audit disable --keepAuditFiles 
false");
     
@@ -250,7 +312,63 @@ public class TestLockAuditingCommand extends 
CLIFunctionalTestHarness {
         () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
         () -> assertNotNull(result.toString()),
         () -> assertTrue(result.toString().contains("Lock audit disabled 
successfully")),
-        () -> assertTrue(result.toString().contains("Audit files cleaned up 
at:")));
+        () -> assertTrue(result.toString().contains("Audit files cleaned up 
at:")
+                        || result.toString().contains("Some audit files may 
not have been cleaned up")));
+  }
+
+  /**
+   * Test that disable with keepAuditFiles=false actually cleans up files.
+   */
+  @Test
+  public void testDisableLockAuditCleansUpFiles() throws Exception {
+    // First enable audit
+    shell.evaluate(() -> "locks audit enable");
+
+    // Create some audit files
+    List<TransactionScenario> scenarios = new ArrayList<>();
+    List<LockAuditingCommand.AuditRecord> records = new ArrayList<>();
+    records.add(createAuditRecord("owner1", 1000L, 1100L, "START", 61000L));
+    records.add(createAuditRecord("owner1", 1000L, 1200L, "END", 61000L));
+    scenarios.add(new TransactionScenario("1000_owner1.jsonl", records));
+    createAuditFiles(scenarios);
+
+    // Verify files exist before disable
+    String auditFolderPath = 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+    StoragePath auditFolder = new StoragePath(auditFolderPath);
+    List<StoragePathInfo> filesBefore = 
HoodieCLI.storage.listDirectEntries(auditFolder);
+    long jsonlFilesBefore = filesBefore.stream()
+        .filter(pathInfo -> pathInfo.isFile() && 
pathInfo.getPath().getName().endsWith(".jsonl"))
+        .count();
+    assertTrue(jsonlFilesBefore > 0, "Should have audit files before disable");
+
+    // Disable with keepAuditFiles=false - this should trigger cleanup
+    Object result = shell.evaluate(() -> "locks audit disable --keepAuditFiles 
false");
+    
+    assertAll("Cleanup triggered by disable",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("Lock audit disabled 
successfully")));
+  }
+
+  /**
+   * Test disable with keepAuditFiles=false when no audit files exist.
+   */
+  @Test
+  public void testDisableLockAuditWithoutKeepingFilesNoFiles() {
+    // First enable audit but don't create any audit files
+    shell.evaluate(() -> "locks audit enable");
+
+    // Disable with keepAuditFiles=false when no files exist
+    Object result = shell.evaluate(() -> "locks audit disable --keepAuditFiles 
false");
+    
+    assertAll("Disable with cleanup when no files exist",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("Lock audit disabled 
successfully")),
+        // Should handle the case where no files exist to clean up
+        () -> assertTrue(result.toString().contains("Audit files cleaned up 
at:")
+                        || result.toString().contains("No audit files")
+                        || result.toString().contains("nothing to cleanup")));
   }
 
   /**
@@ -282,4 +400,544 @@ public class TestLockAuditingCommand extends 
CLIFunctionalTestHarness {
     
     assertTrue(enabledNode.asBoolean(), "Audit should still be enabled");
   }
+
+  // ==================== Validation Tests ====================
+
+  /**
+   * Test validation when no audit folder exists.
+   */
+  @Test
+  public void testValidateAuditLocksNoAuditFolder() {
+    Object result = shell.evaluate(() -> "locks audit validate");
+    
+    assertAll("Validation handles missing audit folder",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("Validation Result: 
PASSED")),
+        () -> assertTrue(result.toString().contains("Transactions Validated: 
0")),
+        () -> assertTrue(result.toString().contains("Issues Found: 0")),
+        () -> assertTrue(result.toString().contains("No audit folder found")));
+  }
+
+  /**
+   * Test validation when audit folder exists but no audit files.
+   */
+  @Test
+  public void testValidateAuditLocksNoAuditFiles() throws IOException {
+    // Create audit folder but no files
+    String auditFolderPath = 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+    StoragePath auditDir = new StoragePath(auditFolderPath);
+    HoodieCLI.storage.createDirectory(auditDir);
+    
+    Object result = shell.evaluate(() -> "locks audit validate");
+    
+    assertAll("Validation handles no audit files",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("Validation Result: 
PASSED")),
+        () -> assertTrue(result.toString().contains("Transactions Validated: 
0")),
+        () -> assertTrue(result.toString().contains("Issues Found: 0")),
+        () -> assertTrue(result.toString().contains("No audit files found")));
+  }
+
+  /**
+   * Test validation - No Issues (PASSED)
+   */
+  @Test
+  public void testValidateAuditLocksNoIssues() throws IOException {
+    long baseTime = System.currentTimeMillis();
+    List<TransactionScenario> scenarios = new ArrayList<>();
+    
+    // Transaction 1: Complete transaction
+    List<LockAuditingCommand.AuditRecord> records1 = new ArrayList<>();
+    records1.add(createAuditRecord("owner1", baseTime, baseTime + 100, 
"START", baseTime + 60000));
+    records1.add(createAuditRecord("owner1", baseTime, baseTime + 200, 
"RENEW", baseTime + 60000));
+    records1.add(createAuditRecord("owner1", baseTime, baseTime + 300, "END", 
baseTime + 60000));
+    scenarios.add(new TransactionScenario(baseTime + "_owner1.jsonl", 
records1));
+    
+    // Transaction 2: Complete transaction starting after first one ends
+    List<LockAuditingCommand.AuditRecord> records2 = new ArrayList<>();
+    records2.add(createAuditRecord("owner2", baseTime + 500, baseTime + 600, 
"START", baseTime + 60000));
+    records2.add(createAuditRecord("owner2", baseTime + 500, baseTime + 700, 
"END", baseTime + 60000));
+    scenarios.add(new TransactionScenario((baseTime + 500) + "_owner2.jsonl", 
records2));
+    
+    createAuditFiles(scenarios);
+    
+    Object result = shell.evaluate(() -> "locks audit validate");
+    
+    assertAll("Validation passes with no issues",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("Validation Result: 
PASSED")),
+        () -> assertTrue(result.toString().contains("Transactions Validated: 
2")),
+        () -> assertTrue(result.toString().contains("Issues Found: 0")),
+        () -> assertTrue(result.toString().contains("successfully")));
+  }
+
+  /**
+   * Test validation - Single Unclosed Transaction (WARNING)
+   */
+  @Test
+  public void testValidateAuditLocksSingleUnclosedTransaction() throws 
IOException {
+    long baseTime = 1000000L;
+    List<TransactionScenario> scenarios = new ArrayList<>();
+    
+    // Single audit file without END record
+    List<LockAuditingCommand.AuditRecord> records1 = new ArrayList<>();
+    records1.add(createAuditRecord("owner1", baseTime, baseTime + 100, 
"START", baseTime + 200));
+    scenarios.add(new TransactionScenario(baseTime + "_owner1.jsonl", 
records1));
+    
+    createAuditFiles(scenarios);
+    
+    Object result = shell.evaluate(() -> "locks audit validate");
+    
+    assertAll("Single unclosed transaction shows warning",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("Validation Result: 
WARNING")),
+        () -> assertTrue(result.toString().contains("Transactions Validated: 
1")),
+        () -> assertTrue(result.toString().contains("Issues Found: 1")));
+  }
+
+  /**
+   * Test validation - Unclosed Transactions (WARNING)
+   */
+  @Test
+  public void testValidateAuditLocksUnclosedTransactions() throws IOException {
+    long baseTime = 1000000L;
+    List<TransactionScenario> scenarios = new ArrayList<>();
+    
+    // Transaction 1: Unclosed (effective end at expiration = baseTime + 200)
+    List<LockAuditingCommand.AuditRecord> records1 = new ArrayList<>();
+    records1.add(createAuditRecord("owner1", baseTime, baseTime + 100, 
"START", baseTime + 200));
+    scenarios.add(new TransactionScenario(baseTime + "_owner1.jsonl", 
records1));
+    
+    // Transaction 2: Complete, starts after owner1's expiration
+    List<LockAuditingCommand.AuditRecord> records2 = new ArrayList<>();
+    records2.add(createAuditRecord("owner2", baseTime + 300, baseTime + 400, 
"START", baseTime + 60000));
+    records2.add(createAuditRecord("owner2", baseTime + 300, baseTime + 500, 
"END", baseTime + 60000));
+    scenarios.add(new TransactionScenario((baseTime + 300) + "_owner2.jsonl", 
records2));
+    
+    createAuditFiles(scenarios);
+    
+    Object result = shell.evaluate(() -> "locks audit validate");
+    
+    assertAll("Unclosed transactions show warning",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("Validation Result: 
WARNING")),
+        () -> assertTrue(result.toString().contains("Transactions Validated: 
2")),
+        () -> assertTrue(result.toString().contains("Issues Found: 1")),
+        () -> assertTrue(result.toString().contains("[WARNING]")),
+        () -> assertTrue(result.toString().contains("owner1.jsonl")),
+        () -> assertTrue(result.toString().contains("did not end 
gracefully")));
+  }
+
+  /**
+   * Test validation - Overlapping Transactions (FAILED)
+   */
+  @Test
+  public void testValidateAuditLocksOverlappingTransactions() throws 
IOException {
+    long baseTime = System.currentTimeMillis();
+    List<TransactionScenario> scenarios = new ArrayList<>();
+    
+    // Transaction 1: Ends after owner2 starts (overlapping)
+    List<LockAuditingCommand.AuditRecord> records1 = new ArrayList<>();
+    records1.add(createAuditRecord("owner1", baseTime, baseTime + 100, 
"START", baseTime + 60000));
+    records1.add(createAuditRecord("owner1", baseTime, baseTime + 500, "END", 
baseTime + 60000)); // Ends after owner2 starts
+    scenarios.add(new TransactionScenario(baseTime + "_owner1.jsonl", 
records1));
+    
+    // Transaction 2: Starts before owner1 ends (overlapping)
+    List<LockAuditingCommand.AuditRecord> records2 = new ArrayList<>();
+    records2.add(createAuditRecord("owner2", baseTime + 200, baseTime + 300, 
"START", baseTime + 60000)); // Starts before owner1 ends
+    records2.add(createAuditRecord("owner2", baseTime + 200, baseTime + 400, 
"END", baseTime + 60000));
+    scenarios.add(new TransactionScenario((baseTime + 200) + "_owner2.jsonl", 
records2));
+    
+    createAuditFiles(scenarios);
+    
+    Object result = shell.evaluate(() -> "locks audit validate");
+    
+    assertAll("Overlapping transactions show failure",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("Validation Result: 
FAILED")),
+        () -> assertTrue(result.toString().contains("Transactions Validated: 
2")),
+        () -> assertTrue(result.toString().contains("Issues Found: 1")),
+        () -> assertTrue(result.toString().contains("[ERROR]")),
+        () -> assertTrue(result.toString().contains("owner1.jsonl")),
+        () -> assertTrue(result.toString().contains("overlaps with")),
+        () -> assertTrue(result.toString().contains("owner2.jsonl")));
+  }
+
+  /**
+   * Test validation - Mixed Issues (FAILED)
+   */
+  @Test
+  public void testValidateAuditLocksMixedIssues() throws IOException {
+    long baseTime = System.currentTimeMillis();
+    List<TransactionScenario> scenarios = new ArrayList<>();
+    
+    // Transaction 1: Unclosed transaction
+    List<LockAuditingCommand.AuditRecord> records1 = new ArrayList<>();
+    records1.add(createAuditRecord("owner1", baseTime, baseTime + 100, 
"START", baseTime + 60000));
+    // No END - unclosed
+    scenarios.add(new TransactionScenario(baseTime + "_owner1.jsonl", 
records1));
+    
+    // Transaction 2: Overlaps with owner1
+    List<LockAuditingCommand.AuditRecord> records2 = new ArrayList<>();
+    records2.add(createAuditRecord("owner2", baseTime + 50, baseTime + 150, 
"START", baseTime + 60000)); // Overlaps with owner1
+    records2.add(createAuditRecord("owner2", baseTime + 50, baseTime + 250, 
"END", baseTime + 60000));
+    scenarios.add(new TransactionScenario((baseTime + 50) + "_owner2.jsonl", 
records2));
+    
+    createAuditFiles(scenarios);
+    
+    Object result = shell.evaluate(() -> "locks audit validate");
+    
+    assertAll("Mixed issues show failure",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("Validation Result: 
FAILED")),
+        () -> assertTrue(result.toString().contains("Transactions Validated: 
2")),
+        () -> assertTrue(result.toString().contains("Issues Found: 2")),
+        () -> assertTrue(result.toString().contains("[ERROR]")),
+        () -> assertTrue(result.toString().contains("[WARNING]")),
+        () -> assertTrue(result.toString().contains("overlaps with")),
+        () -> assertTrue(result.toString().contains("did not end 
gracefully")));
+  }
+
+  /**
+   * Test validation - Out of Order Filenames but Valid Transactions (PASSED)
+   */
+  @Test
+  public void testValidateAuditLocksOutOfOrderFilenames() throws IOException {
+    long baseTime = System.currentTimeMillis();
+    List<TransactionScenario> scenarios = new ArrayList<>();
+    
+    // File with later timestamp in name but contains earlier transaction
+    List<LockAuditingCommand.AuditRecord> records1 = new ArrayList<>();
+    records1.add(createAuditRecord("owner2", baseTime + 100, baseTime + 200, 
"START", baseTime + 60000)); // Actually starts first
+    records1.add(createAuditRecord("owner2", baseTime + 100, baseTime + 300, 
"END", baseTime + 60000));
+    scenarios.add(new TransactionScenario((baseTime + 2000) + "_owner2.jsonl", 
records1)); // Filename suggests later time
+    
+    // File with earlier timestamp in name but contains later transaction
+    List<LockAuditingCommand.AuditRecord> records2 = new ArrayList<>();
+    records2.add(createAuditRecord("owner1", baseTime + 500, baseTime + 600, 
"START", baseTime + 60000)); // Actually starts second
+    records2.add(createAuditRecord("owner1", baseTime + 500, baseTime + 700, 
"END", baseTime + 60000));
+    scenarios.add(new TransactionScenario(baseTime + "_owner1.jsonl", 
records2)); // Filename suggests earlier time
+    
+    createAuditFiles(scenarios);
+    
+    Object result = shell.evaluate(() -> "locks audit validate");
+    
+    assertAll("Out of order filenames but valid transactions pass",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("Validation Result: 
PASSED")),
+        () -> assertTrue(result.toString().contains("Transactions Validated: 
2")),
+        () -> assertTrue(result.toString().contains("Issues Found: 0")),
+        () -> assertTrue(result.toString().contains("successfully")));
+  }
+
+  /**
+   * Test validation when no table is loaded.
+   */
+  @Test
+  public void testValidateAuditLocksNoTable() {
+    // Clear the base path to simulate no table loaded
+    String originalBasePath = HoodieCLI.basePath;
+    HoodieCLI.basePath = null;
+
+    try {
+      Object result = shell.evaluate(() -> "locks audit validate");
+      assertAll("Validation handles no table loaded",
+          () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+          () -> assertNotNull(result.toString()),
+          () -> assertEquals("No Hudi table loaded. Please connect to a table 
first.", result.toString()));
+    } finally {
+      HoodieCLI.basePath = originalBasePath;
+    }
+  }
+
+  /**
+   * Test validation with malformed audit files.
+   */
+  @Test
+  public void testValidateAuditLocksMalformedFiles() throws IOException {
+    // Create audit directory
+    String auditFolderPath = 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+    StoragePath auditDir = new StoragePath(auditFolderPath);
+    HoodieCLI.storage.createDirectory(auditDir);
+    
+    // Create a malformed audit file
+    StoragePath filePath = new StoragePath(auditDir, "malformed_file.jsonl");
+    try (OutputStream outputStream = HoodieCLI.storage.create(filePath, true)) 
{
+      outputStream.write("invalid json content".getBytes());
+    }
+    
+    Object result = shell.evaluate(() -> "locks audit validate");
+    
+    assertAll("Validation handles malformed files",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("Validation Result: 
FAILED")),
+        () -> assertTrue(result.toString().contains("Transactions Validated: 
0")),
+        () -> assertTrue(result.toString().contains("Failed to parse any audit 
files")));
+  }
+
+  // ==================== Cleanup Tests ====================
+
+  /**
+   * Test cleanup when no audit folder exists.
+   */
+  @Test
+  public void testCleanupAuditLocksNoAuditFolder() {
+    Object result = shell.evaluate(() -> "locks audit cleanup");
+    
+    assertAll("Cleanup handles missing audit folder",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertEquals("No audit folder found - nothing to cleanup.", 
result.toString()));
+  }
+
+  /**
+   * Test cleanup when audit folder exists but no audit files.
+   */
+  @Test
+  public void testCleanupAuditLocksNoAuditFiles() throws IOException {
+    // Create audit folder but no files
+    String auditFolderPath = 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+    StoragePath auditDir = new StoragePath(auditFolderPath);
+    HoodieCLI.storage.createDirectory(auditDir);
+    
+    Object result = shell.evaluate(() -> "locks audit cleanup");
+    
+    assertAll("Cleanup handles no audit files",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("No audit files older than 
7 days found")));
+  }
+
+  /**
+   * Test cleanup with dry run - test basic functionality.
+   */
+  @Test
+  public void testCleanupAuditLocksDryRun() throws IOException {
+    // Create some audit files (they will have current modification time)
+    long oldTime = System.currentTimeMillis() - (10 * 24 * 60 * 60 * 1000L); 
// 10 days ago
+    List<TransactionScenario> scenarios = new ArrayList<>();
+    
+    List<LockAuditingCommand.AuditRecord> records = new ArrayList<>();
+    records.add(createAuditRecord("owner1", oldTime, oldTime + 100, "START", 
oldTime + 60000));
+    records.add(createAuditRecord("owner1", oldTime, oldTime + 200, "END", 
oldTime + 60000));
+    scenarios.add(new TransactionScenario(oldTime + "_owner1.jsonl", records));
+    
+    createAuditFiles(scenarios);
+    
+    // Test default cleanup with dry run
+    Object result = shell.evaluate(() -> "locks audit cleanup --dryRun true");
+    
+    assertAll("Dry run executes successfully",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        // Since files are created with current time, they should be too 
recent to delete
+        () -> assertTrue(result.toString().contains("No audit files older 
than") 
+                        || result.toString().contains("Dry run: Would 
delete")));
+    
+    // Verify files still exist
+    String auditFolderPath = 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+    StoragePath auditFolder = new StoragePath(auditFolderPath);
+    List<StoragePathInfo> filesAfter = 
HoodieCLI.storage.listDirectEntries(auditFolder);
+    long jsonlFiles = filesAfter.stream()
+        .filter(pathInfo -> pathInfo.isFile() && 
pathInfo.getPath().getName().endsWith(".jsonl"))
+        .count();
+    assertEquals(1, jsonlFiles, "Files should still exist after dry run");
+  }
+
+  /**
+   * Test cleanup with custom age threshold.
+   */
+  @Test
+  public void testCleanupAuditLocksCustomAge() throws IOException {
+    // Create some audit files
+    List<TransactionScenario> scenarios = new ArrayList<>();
+    
+    List<LockAuditingCommand.AuditRecord> records = new ArrayList<>();
+    records.add(createAuditRecord("owner1", 1000L, 1100L, "START", 61000L));
+    records.add(createAuditRecord("owner1", 1000L, 1200L, "END", 61000L));
+    scenarios.add(new TransactionScenario("1000_owner1.jsonl", records));
+    
+    createAuditFiles(scenarios);
+    
+    // Test with custom age threshold - files will be recent so won't be 
deleted
+    Object result = shell.evaluate(() -> "locks audit cleanup --ageDays 30 
--dryRun true");
+    
+    assertAll("Custom age threshold works",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("No audit files older than 
30 days found")));
+  }
+
+  /**
+   * Test actual cleanup (not dry run).
+   */
+  @Test
+  public void testCleanupAuditLocksActualDelete() throws IOException {
+    // Create some audit files
+    List<TransactionScenario> scenarios = new ArrayList<>();
+    
+    List<LockAuditingCommand.AuditRecord> records = new ArrayList<>();
+    records.add(createAuditRecord("owner1", 1000L, 1100L, "START", 61000L));
+    records.add(createAuditRecord("owner1", 1000L, 1200L, "END", 61000L));
+    scenarios.add(new TransactionScenario("1000_owner1.jsonl", records));
+    
+    createAuditFiles(scenarios);
+    
+    // Test actual cleanup - files will be recent so shouldn't be deleted
+    Object result = shell.evaluate(() -> "locks audit cleanup --dryRun false");
+    
+    assertAll("Actual cleanup executes successfully",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        // Recent files shouldn't be deleted
+        () -> assertTrue(result.toString().contains("No audit files older than 
7 days found")));
+  }
+
+  /**
+   * Test cleanup with recent files (should not delete).
+   */
+  @Test
+  public void testCleanupAuditLocksRecentFiles() throws IOException {
+    // Create recent audit files
+    long recentTime = System.currentTimeMillis() - (2 * 24 * 60 * 60 * 1000L); 
// 2 days ago
+    List<TransactionScenario> scenarios = new ArrayList<>();
+    
+    List<LockAuditingCommand.AuditRecord> records = new ArrayList<>();
+    records.add(createAuditRecord("owner1", recentTime, recentTime + 100, 
"START", recentTime + 60000));
+    records.add(createAuditRecord("owner1", recentTime, recentTime + 200, 
"END", recentTime + 60000));
+    scenarios.add(new TransactionScenario(recentTime + "_owner1.jsonl", 
records));
+    
+    createAuditFiles(scenarios);
+    
+    Object result = shell.evaluate(() -> "locks audit cleanup");
+    
+    assertAll("Recent files are not deleted",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("No audit files older than 
7 days found")));
+  }
+
+  /**
+   * Test cleanup parameter validation - invalid age days.
+   */
+  @Test
+  public void testCleanupAuditLocksInvalidAgeDays() {
+    // Test with negative ageDays (Spring Shell treats this as invalid integer 
format)
+    Object resultNegative = shell.evaluate(() -> "locks audit cleanup 
--ageDays -1");
+
+    assertAll("Negative ageDays should be rejected",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(resultNegative)),
+        () -> assertNotNull(resultNegative.toString()),
+        () -> assertEquals("Error: ageDays must be a value greater than 0.", 
resultNegative.toString()));
+
+    // Test with invalid string to verify our parsing validation
+    Object resultInvalid = shell.evaluate(() -> "locks audit cleanup --ageDays 
abc");
+
+    assertAll("Invalid string ageDays should be rejected",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(resultInvalid)),
+        () -> assertNotNull(resultInvalid.toString()),
+        () -> assertEquals("Error: ageDays must be a value greater than 0.", 
resultInvalid.toString()));
+  }
+
+  /**
+   * Test cleanup when no table is loaded.
+   */
+  @Test
+  public void testCleanupAuditLocksNoTable() {
+    // Clear the base path to simulate no table loaded
+    String originalBasePath = HoodieCLI.basePath;
+    HoodieCLI.basePath = null;
+
+    Object result = shell.evaluate(() -> "locks audit cleanup");
+    assertAll("Cleanup handles no table loaded",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertEquals("No Hudi table loaded. Please connect to a table 
first.", result.toString()));
+    HoodieCLI.basePath = originalBasePath;
+  }
+
+  /**
+   * Test cleanup with multiple files.
+   */
+  @Test
+  public void testCleanupAuditLocksMixedFiles() throws IOException {
+    // Create multiple audit files
+    List<TransactionScenario> scenarios = new ArrayList<>();
+    
+    // File 1
+    List<LockAuditingCommand.AuditRecord> records1 = new ArrayList<>();
+    records1.add(createAuditRecord("owner1", 1000L, 1100L, "START", 61000L));
+    records1.add(createAuditRecord("owner1", 1000L, 1200L, "END", 61000L));
+    scenarios.add(new TransactionScenario("1000_owner1.jsonl", records1));
+    
+    // File 2
+    List<LockAuditingCommand.AuditRecord> records2 = new ArrayList<>();
+    records2.add(createAuditRecord("owner2", 2000L, 2100L, "START", 62000L));
+    records2.add(createAuditRecord("owner2", 2000L, 2200L, "END", 62000L));
+    scenarios.add(new TransactionScenario("2000_owner2.jsonl", records2));
+    
+    createAuditFiles(scenarios);
+    
+    Object result = shell.evaluate(() -> "locks audit cleanup --dryRun true");
+    System.out.println("DEBUG - Mixed files result: " + result.toString());
+    
+    assertAll("Multiple files handled correctly",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("No audit files older 
than")
+                        || result.toString().contains("cleanup")
+                        || result.toString().contains("found")));
+  }
+
+  /**
+   * Test that disable method can call performAuditCleanup with ageDays=0 to 
delete all files.
+   * This validates that the internal validation allows ageDays >= 0 (not just 
> 0).
+   */
+  @Test
+  public void testDisableLockAuditWithAgeDaysZero() throws Exception {
+    // First enable audit
+    shell.evaluate(() -> "locks audit enable");
+
+    // Create some audit files to be cleaned up
+    List<TransactionScenario> scenarios = new ArrayList<>();
+    List<LockAuditingCommand.AuditRecord> records = new ArrayList<>();
+    records.add(createAuditRecord("owner1", 1000L, 1100L, "START", 61000L));
+    records.add(createAuditRecord("owner1", 1000L, 1200L, "END", 61000L));
+    scenarios.add(new TransactionScenario("1000_owner1.jsonl", records));
+    createAuditFiles(scenarios);
+
+    // Verify files exist before disable
+    String auditFolderPath = 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+    StoragePath auditFolder = new StoragePath(auditFolderPath);
+    List<StoragePathInfo> filesBefore = 
HoodieCLI.storage.listDirectEntries(auditFolder);
+    long jsonlFilesBefore = filesBefore.stream()
+        .filter(pathInfo -> pathInfo.isFile() && 
pathInfo.getPath().getName().endsWith(".jsonl"))
+        .count();
+    assertTrue(jsonlFilesBefore > 0, "Should have audit files before disable");
+
+    // Disable with keepAuditFiles=false, which internally calls 
performAuditCleanup(false, 0)
+    Object result = shell.evaluate(() -> "locks audit disable --keepAuditFiles 
false");
+
+    assertAll("Disable with ageDays=0 deletes all files",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("Lock audit disabled 
successfully")),
+        () -> assertTrue(result.toString().contains("cleaned up") || 
result.toString().contains("No audit files to clean up")));
+
+    // Verify files were cleaned up (ageDays=0 should delete all files)
+    List<StoragePathInfo> filesAfter = 
HoodieCLI.storage.listDirectEntries(auditFolder);
+    long jsonlFilesAfter = filesAfter.stream()
+        .filter(pathInfo -> pathInfo.isFile() && 
pathInfo.getPath().getName().endsWith(".jsonl"))
+        .count();
+    assertEquals(0, jsonlFilesAfter, "All audit files should be deleted when 
ageDays=0");
+  }
 }
\ No newline at end of file
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
index 56317c92075a..ad6577aed172 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
@@ -187,7 +187,7 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
     this.lockValiditySecs = config.getValiditySeconds();
     this.basePath = config.getHudiTableBasePath();
     String lockFolderPath = StorageLockClient.getLockFolderPath(basePath);
-    this.lockFilePath = String.format("%s%s%s", lockFolderPath, 
StoragePath.SEPARATOR, DEFAULT_TABLE_LOCK_FILE_NAME);
+    this.lockFilePath = new StoragePath(lockFolderPath, 
DEFAULT_TABLE_LOCK_FILE_NAME).toString();
     this.heartbeatManager = heartbeatManagerLoader.apply(ownerId, 
TimeUnit.SECONDS.toMillis(heartbeatPollSeconds), this::renewLock);
     this.storageLockClient = storageLockClientLoader.apply(ownerId, 
lockFilePath, properties);
     this.ownerId = ownerId;
@@ -479,6 +479,7 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
     // Upload metadata that will unlock this lock.
     StorageLockData expiredLockData = new StorageLockData(true, 
this.getLock().getValidUntilMs(), ownerId);
     Pair<LockUpsertResult, Option<StorageLockFile>> result;
+    long lockExpirationTimeMs = System.currentTimeMillis();
     result = this.storageLockClient.tryUpsertLockFile(expiredLockData, 
Option.of(this.getLock()));
     switch (result.getLeft()) {
       case UNKNOWN_ERROR:
@@ -488,7 +489,7 @@ public class StorageBasedLockProvider implements 
LockProvider<StorageLockFile> {
         return false;
       case SUCCESS:
         logInfoLockState(RELEASED);
-        recordAuditOperation(AuditOperationState.END, 
System.currentTimeMillis());
+        recordAuditOperation(AuditOperationState.END, lockExpirationTimeMs);
         setLock(null);
         return true;
       case ACQUIRED_BY_OTHERS:
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
index fa5c55171983..0ae66a58dbb8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
@@ -89,7 +89,7 @@ public interface StorageLockClient extends AutoCloseable {
    * @return The lock folder path (e.g., "s3://bucket/table/.hoodie/.locks")
    */
   static String getLockFolderPath(String basePath) {
-    return String.format("%s%s%s", basePath, StoragePath.SEPARATOR, 
LOCKS_FOLDER_NAME);
+    return new StoragePath(basePath, LOCKS_FOLDER_NAME).toString();
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/StorageLockProviderAuditService.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/StorageLockProviderAuditService.java
index ba353fd8eacb..cc93f9f7388a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/StorageLockProviderAuditService.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/StorageLockProviderAuditService.java
@@ -52,7 +52,7 @@ public class StorageLockProviderAuditService implements 
AuditService {
    */
   public static String getAuditConfigPath(String basePath) {
     String lockFolderPath = StorageLockClient.getLockFolderPath(basePath);
-    return String.format("%s%s%s", lockFolderPath, StoragePath.SEPARATOR, 
AUDIT_CONFIG_FILE_NAME);
+    return new StoragePath(lockFolderPath, AUDIT_CONFIG_FILE_NAME).toString();
   }
   
   /**
@@ -63,7 +63,7 @@ public class StorageLockProviderAuditService implements 
AuditService {
    */
   public static String getAuditFolderPath(String basePath) {
     String lockFolderPath = StorageLockClient.getLockFolderPath(basePath);
-    return String.format("%s%s%s", lockFolderPath, StoragePath.SEPARATOR, 
AUDIT_FOLDER_NAME);
+    return new StoragePath(lockFolderPath, AUDIT_FOLDER_NAME).toString();
   }
   
   private final String ownerId;
@@ -100,10 +100,7 @@ public class StorageLockProviderAuditService implements 
AuditService {
     
     // Generate audit file path: <txn-start>_<full-owner-id>.jsonl
     String filename = String.format("%d_%s.jsonl", transactionStartTime, 
ownerId);
-    this.auditFilePath = String.format("%s%s%s",
-        getAuditFolderPath(basePath),
-        StoragePath.SEPARATOR,
-        filename);
+    this.auditFilePath = new StoragePath(getAuditFolderPath(basePath), 
filename).toString();
     
     LOG.debug("Initialized audit service for transaction starting at {} with 
file: {}", 
         transactionStartTime, auditFilePath);
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockClientFileTest.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockClientFileTest.java
index 33d3c9bf44ff..9a415850289a 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockClientFileTest.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockClientFileTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.client.transaction.lock.models;
 
+import org.apache.hudi.client.transaction.lock.StorageLockClient;
+import 
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService;
 import org.apache.hudi.exception.HoodieIOException;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -162,4 +164,60 @@ public class StorageLockClientFileTest {
     StorageLockFile file = new StorageLockFile(data, VERSION_ID);
     assertEquals(VERSION_ID, file.getVersionId());
   }
+
+  @Test
+  void testGetLockFolderPathWithoutTrailingSlash() {
+    String basePath = "s3://bucket/table";
+    String expected = "s3://bucket/table/.hoodie/.locks";
+    String actual = StorageLockClient.getLockFolderPath(basePath);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  void testGetLockFolderPathWithTrailingSlash() {
+    String basePath = "s3://bucket/table/";
+    String expected = "s3://bucket/table/.hoodie/.locks";
+    String actual = StorageLockClient.getLockFolderPath(basePath);
+    assertEquals(expected, actual, "Path with trailing slash should be 
normalized correctly");
+  }
+
+  @Test
+  void testGetLockFolderPathWithMultipleTrailingSlashes() {
+    String basePath = "s3://bucket/table///";
+    String expected = "s3://bucket/table/.hoodie/.locks";
+    String actual = StorageLockClient.getLockFolderPath(basePath);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  void testGetLockFolderPathLocalFileSystem() {
+    String basePath = "/tmp/hudi/table";
+    String expected = "/tmp/hudi/table/.hoodie/.locks";
+    String actual = StorageLockClient.getLockFolderPath(basePath);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  void testGetLockFolderPathLocalFileSystemWithTrailingSlash() {
+    String basePath = "/tmp/hudi/table/";
+    String expected = "/tmp/hudi/table/.hoodie/.locks";
+    String actual = StorageLockClient.getLockFolderPath(basePath);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  void testGetAuditConfigPathWithTrailingSlash() {
+    String basePath = "s3://bucket/table/";
+    String expected = "s3://bucket/table/.hoodie/.locks/audit_enabled.json";
+    String actual = 
StorageLockProviderAuditService.getAuditConfigPath(basePath);
+    assertEquals(expected, actual, "Audit config path with trailing slash 
should be normalized correctly");
+  }
+
+  @Test
+  void testGetAuditFolderPathWithTrailingSlash() {
+    String basePath = "s3://bucket/table/";
+    String expected = "s3://bucket/table/.hoodie/.locks/audit";
+    String actual = 
StorageLockProviderAuditService.getAuditFolderPath(basePath);
+    assertEquals(expected, actual, "Audit folder path with trailing slash 
should be normalized correctly");
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CleanupAuditLockProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CleanupAuditLockProcedure.scala
new file mode 100644
index 000000000000..64c8aaacf505
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CleanupAuditLockProcedure.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import 
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService
+import org.apache.hudi.storage.StoragePath
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+
+/**
+ * Spark SQL procedure for cleaning up old audit lock files for Hudi tables.
+ *
+ * This procedure removes audit lock files that are older than a specified 
number
+ * of days. It supports dry-run mode to preview which files would be deleted
+ * without actually removing them.
+ *
+ * Usage:
+ * {{{
+ * CALL cleanup_audit_lock(table => 'my_table')
+ * CALL cleanup_audit_lock(path => '/path/to/table', dry_run => true, age_days 
=> 30)
+ * }}}
+ *
+ * The procedure operates on audit files in:
+ * `{table_path}/.hoodie/.locks/audit/`
+ *
+ * @author Apache Hudi
+ * @since 1.0.0
+ */
+class CleanupAuditLockProcedure extends BaseProcedure with ProcedureBuilder {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType),
+    ProcedureParameter.optional(2, "dry_run", DataTypes.BooleanType, false),
+    ProcedureParameter.optional(3, "age_days", DataTypes.IntegerType, 7)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("table", DataTypes.StringType, nullable = false, 
Metadata.empty),
+    StructField("files_cleaned", DataTypes.IntegerType, nullable = false, 
Metadata.empty),
+    StructField("dry_run", DataTypes.BooleanType, nullable = false, 
Metadata.empty),
+    StructField("age_days", DataTypes.IntegerType, nullable = false, 
Metadata.empty),
+    StructField("message", DataTypes.StringType, nullable = false, 
Metadata.empty)
+  ))
+
+  /**
+   * Returns the procedure parameters definition.
+   *
+   * @return Array of parameters: table (optional String), path (optional 
String),
+   *         dry_run (optional Boolean, default false), and age_days (optional 
Integer, default 7)
+   */
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  /**
+   * Returns the output schema for the procedure result.
+   *
+   * @return StructType containing table, files_cleaned, dry_run, age_days, 
and message columns
+   */
+  def outputType: StructType = OUTPUT_TYPE
+
+  /**
+   * Executes the audit lock cleanup procedure.
+   *
+   * @param args Procedure arguments containing table name or path, dry_run 
flag, and age threshold
+   * @return Sequence containing a single Row with cleanup results
+   * @throws IllegalArgumentException if neither table nor path is provided, 
or both are provided
+   */
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+    val dryRun = getArgValueOrDefault(args, 
PARAMETERS(2)).getOrElse(false).asInstanceOf[Boolean]
+    val ageDays = getArgValueOrDefault(args, 
PARAMETERS(3)).getOrElse(7).asInstanceOf[Int]
+
+    // Validate age_days is positive
+    if (ageDays <= 0) {
+      throw new IllegalArgumentException("age_days must be a positive integer")
+    }
+
+    // Get the base path using BaseProcedure helper (handles table/path 
validation)
+    val basePath: String = getBasePath(tableName, tablePath)
+    val metaClient = createMetaClient(jsc, basePath)
+
+    // Use table name if provided, otherwise extract from path
+    val displayName = 
tableName.map(_.asInstanceOf[String]).getOrElse(tablePath.get.asInstanceOf[String])
+
+    try {
+      val auditFolderPath = new 
StoragePath(StorageLockProviderAuditService.getAuditFolderPath(basePath))
+      val storage = metaClient.getStorage
+
+      // Check if audit folder exists
+      if (!storage.exists(auditFolderPath)) {
+        val message = "No audit folder found - nothing to cleanup"
+        Seq(Row(displayName, 0, dryRun, ageDays, message))
+      } else {
+
+        // Calculate cutoff timestamp (ageDays ago)
+        val cutoffTime = System.currentTimeMillis() - (ageDays * 24 * 60 * 60 
* 1000L)
+
+        // List all files in audit folder and filter by modification time
+        val allFiles = storage.listDirectEntries(auditFolderPath).asScala
+        val auditFiles = allFiles.filter(pathInfo => pathInfo.isFile && 
pathInfo.getPath.getName.endsWith(".jsonl"))
+        val oldFiles = auditFiles.filter(_.getModificationTime < cutoffTime)
+
+        if (oldFiles.isEmpty) {
+          val message = s"No audit files older than $ageDays days found"
+          Seq(Row(displayName, 0, dryRun, ageDays, message))
+        } else {
+
+          val fileCount = oldFiles.size
+
+          if (dryRun) {
+            val message = s"Dry run: Would delete $fileCount audit files older 
than $ageDays days"
+            Seq(Row(displayName, fileCount, dryRun, ageDays, message))
+          } else {
+            // Actually delete the files
+            var deletedCount = 0
+            var failedCount = 0
+
+            oldFiles.foreach { pathInfo =>
+              try {
+                storage.deleteFile(pathInfo.getPath)
+                deletedCount += 1
+              } catch {
+                case e: Exception =>
+                  failedCount += 1
+                  // Log the error but continue with other files
+              }
+            }
+
+            val message = if (failedCount == 0) {
+              s"Successfully deleted $deletedCount audit files older than 
$ageDays days"
+            } else {
+              s"Deleted $deletedCount audit files, failed to delete 
$failedCount files"
+            }
+
+            Seq(Row(displayName, deletedCount, dryRun, ageDays, message))
+          }
+        }
+      }
+    } catch {
+      case e: Exception =>
+        val errorMessage = s"Cleanup failed: ${e.getMessage}"
+        Seq(Row(displayName, 0, dryRun, ageDays, errorMessage))
+    }
+  }
+
+  /**
+   * Builds a new instance of the CleanupAuditLockProcedure.
+   *
+   * @return New CleanupAuditLockProcedure instance
+   */
+  override def build: Procedure = new CleanupAuditLockProcedure()
+}
+
+
+/**
+ * Companion object for CleanupAuditLockProcedure containing constants and 
factory methods.
+ */
+object CleanupAuditLockProcedure {
+  /** The name used to register and invoke this procedure */
+  val NAME = "cleanup_audit_lock"
+
+  /**
+   * Factory method to create procedure builder instances.
+   *
+   * @return Supplier that creates new CleanupAuditLockProcedure instances
+   */
+  def builder: Supplier[ProcedureBuilder] = () => new 
CleanupAuditLockProcedure()
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index ffde5337397d..20c0e8c31340 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -102,6 +102,8 @@ object HoodieProcedures {
       ,(ShowCleansPlanProcedure.NAME, ShowCleansPlanProcedure.builder)
       ,(SetAuditLockProcedure.NAME, SetAuditLockProcedure.builder)
       ,(ShowAuditLockStatusProcedure.NAME, 
ShowAuditLockStatusProcedure.builder)
+      ,(ValidateAuditLockProcedure.NAME, ValidateAuditLockProcedure.builder)
+      ,(CleanupAuditLockProcedure.NAME, CleanupAuditLockProcedure.builder)
     )
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateAuditLockProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateAuditLockProcedure.scala
new file mode 100644
index 000000000000..22f9b777a5eb
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateAuditLockProcedure.scala
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import 
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService
+import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
+
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.io.FileNotFoundException
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Spark SQL procedure for validating audit lock files for Hudi tables.
+ *
+ * This procedure validates the integrity and consistency of audit lock files
+ * generated by the storage lock audit service. It checks for various issues
+ * including file format, lock state transitions, and orphaned locks.
+ *
+ * Usage:
+ * {{{
+ * CALL validate_audit_lock(table => 'my_table')
+ * CALL validate_audit_lock(path => '/path/to/table')
+ * }}}
+ *
+ * The procedure reads audit files from:
+ * `{table_path}/.hoodie/.locks/audit/`
+ *
+ * @author Apache Hudi
+ * @since 1.0.0
+ */
+class ValidateAuditLockProcedure extends BaseProcedure with ProcedureBuilder {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("table", DataTypes.StringType, nullable = false, 
Metadata.empty),
+    StructField("validation_result", DataTypes.StringType, nullable = false, 
Metadata.empty),
+    StructField("transactions_validated", DataTypes.IntegerType, nullable = 
false, Metadata.empty),
+    StructField("issues_found", DataTypes.IntegerType, nullable = false, 
Metadata.empty),
+    StructField("details", DataTypes.StringType, nullable = false, 
Metadata.empty)
+  ))
+
+  private val OBJECT_MAPPER = new ObjectMapper()
+
+  /**
+   * Represents a transaction window with start time, end time, and metadata.
+   */
+  case class TransactionWindow(
+    ownerId: String,
+    transactionCreationTime: Long,
+    lockAcquisitionTime: Long,
+    lockReleaseTime: Option[Long],
+    lockExpirationTime: Option[Long],
+    filename: String
+  ) {
+    def effectiveEndTime: Long = 
lockReleaseTime.orElse(lockExpirationTime).getOrElse(lockAcquisitionTime)
+  }
+
+  /**
+   * Returns the procedure parameters definition.
+   *
+   * @return Array of parameters: table (optional String) and path (optional 
String)
+   */
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  /**
+   * Returns the output schema for the procedure result.
+   *
+   * @return StructType containing table, validation_result, issues_found, and 
details columns
+   */
+  def outputType: StructType = OUTPUT_TYPE
+
+  /**
+   * Executes the audit lock validation procedure.
+   *
+   * @param args Procedure arguments containing table name or path
+   * @return Sequence containing a single Row with validation results
+   * @throws IllegalArgumentException if neither table nor path is provided, 
or both are provided
+   */
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+
+    // Get the base path using BaseProcedure helper (handles table/path 
validation)
+    val basePath: String = getBasePath(tableName, tablePath)
+    val metaClient = createMetaClient(jsc, basePath)
+
+    // Use table name if provided, otherwise extract from path
+    val displayName = 
tableName.map(_.asInstanceOf[String]).getOrElse(tablePath.get.asInstanceOf[String])
+
+    try {
+      val auditFolderPath = new 
StoragePath(StorageLockProviderAuditService.getAuditFolderPath(basePath))
+      val storage = metaClient.getStorage
+
+      // Get all audit files
+      val allFilesResult = try {
+        Some(storage.listDirectEntries(auditFolderPath).asScala)
+      } catch {
+        case _: FileNotFoundException =>
+          None
+      }
+
+      allFilesResult match {
+        case None =>
+          Seq(Row(displayName, "PASSED", 0, 0, "No audit folder found - 
nothing to validate"))
+        case Some(allFiles) =>
+          val auditFiles = allFiles.filter(pathInfo => pathInfo.isFile && 
pathInfo.getPath.getName.endsWith(".jsonl"))
+
+          if (auditFiles.isEmpty) {
+            Seq(Row(displayName, "PASSED", 0, 0, "No audit files found - 
nothing to validate"))
+          } else {
+
+            // Parse all audit files into transaction windows
+            val parseResults = auditFiles.map(pathInfo => (pathInfo, 
parseAuditFile(pathInfo, storage)))
+            val windows = parseResults.flatMap(_._2).toSeq
+            val parseErrors = parseResults.filter(_._2.isEmpty).map(p =>
+              s"[ERROR] Failed to parse audit file: ${p._1.getPath.getName}")
+
+            if (windows.isEmpty) {
+              Seq(Row(displayName, "FAILED", 0, auditFiles.size, "Failed to 
parse any audit files"))
+            } else {
+
+              // Validate transactions
+              val validationResults = validateTransactionWindows(windows)
+
+              // Add parse errors to validation results
+              val allErrors = validationResults.errors ++ parseErrors
+              val resultsWithParseErrors = validationResults.copy(errors = 
allErrors.toList)
+
+              // Generate result
+              val (result, issuesFound, details) = 
formatValidationResults(resultsWithParseErrors, auditFiles.size, windows.size, 
parseErrors.size)
+
+              Seq(Row(displayName, result, windows.size, issuesFound, details))
+            }
+          }
+      }
+    } catch {
+      case e: Exception =>
+        val errorMessage = s"Validation failed: ${e.getMessage}"
+        Seq(Row(displayName, "ERROR", 0, -1, errorMessage))
+    }
+  }
+
+  /**
+   * Parses an audit file and extracts transaction window information.
+   */
+  private def parseAuditFile(pathInfo: StoragePathInfo, storage: 
org.apache.hudi.storage.HoodieStorage): Option[TransactionWindow] = {
+    val filename = pathInfo.getPath.getName
+
+    Try {
+      // Read and parse JSONL content
+      val inputStream = storage.open(pathInfo.getPath)
+      val jsonNodes = try {
+        val content = scala.io.Source.fromInputStream(inputStream).mkString
+        val lines = content.split('\n').filter(_.trim.nonEmpty)
+        lines.map(line => OBJECT_MAPPER.readTree(line)).toSeq
+      } finally {
+        inputStream.close()
+      }
+
+      if (jsonNodes.isEmpty) {
+        None
+      } else {
+
+        // Extract transaction metadata from first entry
+        val firstNode = jsonNodes.head
+        val ownerId = firstNode.get("ownerId").asText()
+        val transactionCreationTime = 
firstNode.get("transactionStartTime").asLong()
+
+        // Find first START timestamp
+        val startNodes = jsonNodes.filter(node => node.get("state").asText() 
== "START")
+        val lockAcquisitionTime = if (startNodes.nonEmpty) {
+          startNodes.head.get("timestamp").asLong()
+        } else {
+          transactionCreationTime // fallback to transaction creation time
+        }
+
+        // Find last END timestamp
+        val endNodes = jsonNodes.filter(node => node.get("state").asText() == 
"END")
+        val lockReleaseTime = if (endNodes.nonEmpty) {
+          Some(endNodes.last.get("timestamp").asLong())
+        } else {
+          None
+        }
+
+        // Find last expiration time as fallback
+        val lockExpirationTime = if (jsonNodes.nonEmpty) {
+          Some(jsonNodes.last.get("lockExpiration").asLong())
+        } else {
+          None
+        }
+
+        Some(TransactionWindow(
+          ownerId = ownerId,
+          transactionCreationTime = transactionCreationTime,
+          lockAcquisitionTime = lockAcquisitionTime,
+          lockReleaseTime = lockReleaseTime,
+          lockExpirationTime = lockExpirationTime,
+          filename = filename
+        ))
+      }
+    } match {
+      case Success(window) => window
+      case Failure(_) => None // Skip corrupted files
+    }
+  }
+
+  /**
+   * Validates transaction windows for overlaps and proper closure.
+   */
+  private def validateTransactionWindows(windows: Seq[TransactionWindow]): 
ValidationResults = {
+    val errors = mutable.ListBuffer[String]()
+    val warnings = mutable.ListBuffer[String]()
+
+    // Check for transactions without proper END
+    windows.foreach { window =>
+      if (window.lockReleaseTime.isEmpty) {
+        warnings += s"[WARNING] ${window.filename} => transaction did not end 
gracefully. This could be due to driver OOM or non-graceful shutdown."
+      }
+    }
+
+    // Sort windows by start time for overlap detection
+    val sortedWindows = windows.sortBy(_.lockAcquisitionTime)
+
+    // Check for overlaps
+    for (i <- sortedWindows.indices) {
+      val currentWindow = sortedWindows(i)
+      val currentEnd = currentWindow.effectiveEndTime
+
+      // Check all subsequent windows for overlaps
+      for (j <- (i + 1) until sortedWindows.length) {
+        val otherWindow = sortedWindows(j)
+        val otherStart = otherWindow.lockAcquisitionTime
+
+        // Check if windows overlap
+        if (otherStart < currentEnd) {
+          errors += s"[ERROR] ${currentWindow.filename} => overlaps with 
${otherWindow.filename}"
+        }
+      }
+    }
+
+    ValidationResults(errors.toList, warnings.toList)
+  }
+
+  /**
+   * Formats validation results into the output format.
+   */
+  private def formatValidationResults(results: ValidationResults, totalFiles: 
Int, parsedFiles: Int, failedFiles: Int): (String, Int, String) = {
+    val totalIssues = results.errors.size + results.warnings.size
+
+    if (totalIssues == 0) {
+      ("PASSED", 0, s"All audit lock transactions validated successfully. 
Audit Files: $totalFiles total, $parsedFiles parsed successfully, $failedFiles 
failed to parse")
+    } else {
+      val resultType = if (results.errors.nonEmpty) "FAILED" else "WARNING"
+      val fileInfo = s"Audit Files: $totalFiles total, $parsedFiles parsed 
successfully, $failedFiles failed to parse. "
+      val details = fileInfo + (results.errors ++ 
results.warnings).mkString(", ")
+      (resultType, totalIssues, details)
+    }
+  }
+
+  /**
+   * Case class to hold validation results.
+   */
+  private case class ValidationResults(errors: List[String], warnings: 
List[String])
+
+  /**
+   * Builds a new instance of the ValidateAuditLockProcedure.
+   *
+   * @return New ValidateAuditLockProcedure instance
+   */
+  override def build: Procedure = new ValidateAuditLockProcedure()
+}
+
+
+/**
+ * Companion object for ValidateAuditLockProcedure containing constants and 
factory methods.
+ */
+object ValidateAuditLockProcedure {
+  /** The name used to register and invoke this procedure */
+  val NAME = "validate_audit_lock"
+
+  /**
+   * Factory method to create procedure builder instances.
+   *
+   * @return Supplier that creates new ValidateAuditLockProcedure instances
+   */
+  def builder: Supplier[ProcedureBuilder] = () => new 
ValidateAuditLockProcedure()
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanupAuditLockProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanupAuditLockProcedure.scala
new file mode 100644
index 000000000000..b0be9dd4208e
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanupAuditLockProcedure.scala
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import 
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService
+import org.apache.hudi.storage.StoragePath
+import org.apache.hudi.testutils.HoodieClientTestUtils
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+/**
+ * Test suite for the CleanupAuditLockProcedure Spark SQL procedure.
+ *
+ * This class contains comprehensive tests to verify the functionality of
+ * the cleanup_audit_lock procedure, including dry run mode, age-based cleanup,
+ * parameter validation, and error handling scenarios.
+ *
+ * @author Apache Hudi
+ * @since 1.1.0
+ */
+class TestCleanupAuditLockProcedure extends HoodieSparkProcedureTestBase {
+
+  override def generateTableName: String = {
+    super.generateTableName.split("\\.").last
+  }
+
+  /**
+   * Helper method to create a test table and return its path.
+   */
+  private def createTestTable(tmp: File, tableName: String): String = {
+    spark.sql(
+      s"""
+         |create table $tableName (
+         |  id int,
+         |  name string,
+         |  price double,
+         |  ts long
+         |) using hudi
+         | location '${tmp.getCanonicalPath}/$tableName'
+         | tblproperties (
+         |  primaryKey = 'id',
+         |  orderingFields = 'ts'
+         | )
+       """.stripMargin)
+    // Insert data to initialize the Hudi metadata structure
+    spark.sql(s"insert into $tableName select 1, 'test', 10.0, 1000")
+    s"${tmp.getCanonicalPath}/$tableName"
+  }
+
+  /**
+   * Helper method to create test audit files with specified ages.
+   *
+   * @param tablePath The base path of the table
+   * @param filenames List of filenames to create
+   * @param ageDaysAgo How many days ago to set the modification time
+   */
+  private def createTestAuditFiles(tablePath: String, filenames: List[String], 
ageDaysAgo: Int): Unit = {
+    val auditFolderPath = 
StorageLockProviderAuditService.getAuditFolderPath(tablePath)
+    val auditDir = Paths.get(auditFolderPath)
+
+    // Create audit directory if it doesn't exist
+    if (!Files.exists(auditDir)) {
+      Files.createDirectories(auditDir)
+    }
+
+    // Create test audit files with specified modification time
+    val targetTime = System.currentTimeMillis() - (ageDaysAgo * 24L * 60L * 
60L * 1000L)
+
+    filenames.foreach { filename =>
+      val filePath = auditDir.resolve(filename)
+      val content = 
s"""{"ownerId":"test","transactionStartTime":${System.currentTimeMillis()},"timestamp":${System.currentTimeMillis()},"state":"START","lockExpiration":${System.currentTimeMillis()
 + 60000},"lockHeld":true}"""
+      Files.write(filePath, content.getBytes())
+      // Set the modification time to simulate old files
+      Files.setLastModifiedTime(filePath, 
java.nio.file.attribute.FileTime.fromMillis(targetTime))
+    }
+  }
+
+  /**
+   * Helper method to count audit files in the audit folder.
+   */
+  private def countAuditFiles(tablePath: String): Int = {
+    val auditFolderPath = 
StorageLockProviderAuditService.getAuditFolderPath(tablePath)
+    val auditDir = Paths.get(auditFolderPath)
+
+    if (Files.exists(auditDir)) {
+      Files.list(auditDir)
+        .filter(_.toString.endsWith(".jsonl"))
+        .count()
+        .toInt
+    } else {
+      0
+    }
+  }
+
+  /**
+   * Test cleanup with table name parameter - dry run mode.
+   */
+  test("Test Call cleanup_audit_lock Procedure - Dry Run with Table Name") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = createTestTable(tmp, tableName)
+
+      // Create some old audit files (10 days old)
+      createTestAuditFiles(tablePath, List("old1.jsonl", "old2.jsonl"), 
ageDaysAgo = 10)
+      // Create some recent audit files (2 days old)
+      createTestAuditFiles(tablePath, List("recent1.jsonl"), ageDaysAgo = 2)
+
+      val result = spark.sql(s"""call cleanup_audit_lock(table => 
'$tableName', dry_run => true, age_days => 7)""").collect()
+
+      assertResult(1)(result.length)
+      assertResult(tableName)(result.head.get(0))
+      assertResult(2)(result.head.get(1)) // Should find 2 old files
+      assertResult(true)(result.head.get(2)) // dry_run = true
+      assertResult(7)(result.head.get(3)) // age_days = 7
+      assert(result.head.get(4).toString.contains("Dry run"))
+
+      // Verify files still exist (dry run shouldn't delete)
+      assertResult(3)(countAuditFiles(tablePath))
+    }
+  }
+
+  /**
+   * Test cleanup with path parameter - actual deletion.
+   */
+  test("Test Call cleanup_audit_lock Procedure - Actual Cleanup with Path") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = createTestTable(tmp, tableName)
+
+      // Create some old audit files (10 days old)
+      createTestAuditFiles(tablePath, List("old1.jsonl", "old2.jsonl"), 
ageDaysAgo = 10)
+      // Create some recent audit files (2 days old)
+      createTestAuditFiles(tablePath, List("recent1.jsonl"), ageDaysAgo = 2)
+
+      val result = spark.sql(s"""call cleanup_audit_lock(path => '$tablePath', 
dry_run => false, age_days => 7)""").collect()
+
+      assertResult(1)(result.length)
+      assertResult(tablePath)(result.head.get(0))
+      assertResult(2)(result.head.get(1)) // Should delete 2 old files
+      assertResult(false)(result.head.get(2)) // dry_run = false
+      assertResult(7)(result.head.get(3)) // age_days = 7
+      assert(result.head.get(4).toString.contains("Successfully deleted"))
+
+      // Verify only recent file remains
+      assertResult(1)(countAuditFiles(tablePath))
+    }
+  }
+
+  /**
+   * Test cleanup with default parameters.
+   */
+  test("Test Call cleanup_audit_lock Procedure - Default Parameters") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = createTestTable(tmp, tableName)
+
+      // Create some old audit files (10 days old) - should be deleted with 
default 7 days
+      createTestAuditFiles(tablePath, List("old1.jsonl"), ageDaysAgo = 10)
+      // Create some recent audit files (3 days old) - should be kept
+      createTestAuditFiles(tablePath, List("recent1.jsonl"), ageDaysAgo = 3)
+
+      val result = spark.sql(s"""call cleanup_audit_lock(table => 
'$tableName')""").collect()
+
+      assertResult(1)(result.length)
+      assertResult(tableName)(result.head.get(0))
+      assertResult(1)(result.head.get(1)) // Should delete 1 old file
+      assertResult(false)(result.head.get(2)) // dry_run defaults to false
+      assertResult(7)(result.head.get(3)) // age_days defaults to 7
+
+      // Verify only recent file remains
+      assertResult(1)(countAuditFiles(tablePath))
+    }
+  }
+
+  /**
+   * Test cleanup when no audit folder exists.
+   */
+  test("Test Call cleanup_audit_lock Procedure - No Audit Folder") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = createTestTable(tmp, tableName)
+      // Don't create any audit files
+
+      val result = spark.sql(s"""call cleanup_audit_lock(table => 
'$tableName')""").collect()
+
+      assertResult(1)(result.length)
+      assertResult(tableName)(result.head.get(0))
+      assertResult(0)(result.head.get(1)) // No files to delete
+      assert(result.head.get(4).toString.contains("No audit folder found"))
+    }
+  }
+
+  /**
+   * Test cleanup when no old files exist.
+   */
+  test("Test Call cleanup_audit_lock Procedure - No Old Files") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = createTestTable(tmp, tableName)
+
+      // Create only recent audit files (2 days old)
+      createTestAuditFiles(tablePath, List("recent1.jsonl", "recent2.jsonl"), 
ageDaysAgo = 2)
+
+      val result = spark.sql(s"""call cleanup_audit_lock(table => 
'$tableName', age_days => 7)""").collect()
+
+      assertResult(1)(result.length)
+      assertResult(tableName)(result.head.get(0))
+      assertResult(0)(result.head.get(1)) // No old files to delete
+      assert(result.head.get(4).toString.contains("No audit files older than 7 
days found"))
+
+      // Verify all files remain
+      assertResult(2)(countAuditFiles(tablePath))
+    }
+  }
+
+  /**
+   * Test parameter validation - negative age_days.
+   */
+  test("Test Call cleanup_audit_lock Procedure - Invalid Age Days") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      createTestTable(tmp, tableName)
+
+      checkExceptionContain(s"""call cleanup_audit_lock(table => '$tableName', 
age_days => -1)""")(
+        "age_days must be a positive integer")
+
+      checkExceptionContain(s"""call cleanup_audit_lock(table => '$tableName', 
age_days => 0)""")(
+        "age_days must be a positive integer")
+    }
+  }
+
+  /**
+   * Test parameter validation - both table and path provided.
+   * Since both are provided, the procedure will use the table parameter 
(consistent with other procedures).
+   */
+  test("Test Call cleanup_audit_lock Procedure - Both Table and Path 
Parameters") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = createTestTable(tmp, tableName)
+
+      // Should work fine - will use table parameter when both are provided
+      val result = spark.sql(s"""call cleanup_audit_lock(table => 
'$tableName', path => '$tablePath')""").collect()
+      assertResult(1)(result.length)
+      assertResult(tableName)(result.head.getString(0))
+    }
+  }
+
+  /**
+   * Test parameter validation - missing required arguments.
+   */
+  test("Test Call cleanup_audit_lock Procedure - Missing Required Arguments") {
+    checkExceptionContain(s"""call cleanup_audit_lock(dry_run => true)""")(
+      "Table name or table path must be given one")
+  }
+
+  /**
+   * Test cleanup with non-existent table.
+   */
+  test("Test Call cleanup_audit_lock Procedure - Non-existent Table") {
+    val nonExistentTable = "non_existent_table"
+
+    intercept[Exception] {
+      spark.sql(s"""call cleanup_audit_lock(table => '$nonExistentTable')""")
+    }
+  }
+
+  /**
+   * Test cleanup with invalid path.
+   */
+  test("Test Call cleanup_audit_lock Procedure - Invalid Path") {
+    val invalidPath = "/non/existent/path"
+
+    intercept[Exception] {
+      spark.sql(s"""call cleanup_audit_lock(path => '$invalidPath')""")
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestValidateAuditLockProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestValidateAuditLockProcedure.scala
new file mode 100644
index 000000000000..48fdd3ee66d7
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestValidateAuditLockProcedure.scala
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import 
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService
+
+import org.apache.spark.sql.Row
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+/**
+ * Test suite for the ValidateAuditLockProcedure Spark SQL procedure.
+ *
+ * This class contains comprehensive tests to verify the functionality of
+ * the validate_audit_lock procedure with parameterized test scenarios.
+ *
+ * @author Apache Hudi
+ * @since 1.1.0
+ */
+class TestValidateAuditLockProcedure extends HoodieSparkProcedureTestBase {
+
+  override def generateTableName: String = {
+    super.generateTableName.split("\\.").last
+  }
+
+  /**
+   * Helper method to create a test table and return its path.
+   */
+  private def createTestTable(tmp: File, tableName: String): String = {
+    spark.sql(
+      s"""
+         |create table $tableName (
+         |  id int,
+         |  name string,
+         |  price double,
+         |  ts long
+         |) using hudi
+         | location '${tmp.getCanonicalPath}/$tableName'
+         | tblproperties (
+         |  primaryKey = 'id',
+         |  orderingFields = 'ts'
+         | )
+       """.stripMargin)
+    // Insert data to initialize the Hudi metadata structure
+    spark.sql(s"insert into $tableName select 1, 'test', 10.0, 1000")
+    s"${tmp.getCanonicalPath}/$tableName"
+  }
+
+  /**
+   * Represents a single audit record (JSON line in a .jsonl file)
+   */
+  case class AuditRecord(
+    ownerId: String,
+    transactionStartTime: Long,
+    timestamp: Long,
+    state: String, // START, RENEW, or END
+    lockExpiration: Long,
+    lockHeld: Boolean = true
+  )
+
+  /**
+   * Represents a transaction scenario with its audit records
+   */
+  case class TransactionScenario(
+    filename: String, // e.g., "1234567890_owner1.jsonl"
+    records: List[AuditRecord]
+  )
+
+  /**
+   * Represents expected validation results
+   */
+  case class ExpectedResult(
+    validationResult: String, // PASSED, WARNING, FAILED, ERROR
+    transactionsValidated: Int,
+    issuesFound: Int,
+    detailsContains: List[String] = List() // Strings that should be in details
+  )
+
+  /**
+   * Test scenario definition with input and expected output
+   */
+  case class ValidationTestScenario(
+    name: String,
+    scenarioGenerator: () => List[TransactionScenario],
+    expectedResultGenerator: () => ExpectedResult
+  )
+
+  /**
+   * Helper method to create audit files from scenarios
+   */
+  private def createAuditFiles(tablePath: String, scenarios: 
List[TransactionScenario]): Unit = {
+    val auditFolderPath = 
StorageLockProviderAuditService.getAuditFolderPath(tablePath)
+    val auditDir = Paths.get(auditFolderPath)
+
+    // Create audit directory if it doesn't exist
+    if (!Files.exists(auditDir)) {
+      Files.createDirectories(auditDir)
+    }
+
+    scenarios.foreach { scenario =>
+      val filePath = auditDir.resolve(scenario.filename)
+      val jsonLines = scenario.records.map { record =>
+        
s"""{"ownerId":"${record.ownerId}","transactionStartTime":${record.transactionStartTime},"timestamp":${record.timestamp},"state":"${record.state}","lockExpiration":${record.lockExpiration},"lockHeld":${record.lockHeld}}"""
+      }.mkString("\n")
+
+      Files.write(filePath, jsonLines.getBytes())
+    }
+  }
+
+  /**
+   * Helper method to run a parameterized validation test scenario
+   */
+  private def runValidationScenario(scenario: ValidationTestScenario): Unit = {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = createTestTable(tmp, tableName)
+
+      // Generate test data
+      val transactionScenarios = scenario.scenarioGenerator()
+      val expectedResult = scenario.expectedResultGenerator()
+
+      // Create audit files
+      createAuditFiles(tablePath, transactionScenarios)
+
+      // Run validation
+      val result = spark.sql(s"""call validate_audit_lock(table => 
'$tableName')""").collect()
+
+      // Verify results
+      assertResult(1)(result.length)
+
+      val row = result.head
+      assertResult(tableName)(row.getString(0))
+      assertResult(expectedResult.validationResult)(row.getString(1))
+      assertResult(expectedResult.transactionsValidated)(row.getInt(2))
+      assertResult(expectedResult.issuesFound)(row.getInt(3))
+
+      val details = row.getString(4)
+      expectedResult.detailsContains.foreach { expectedSubstring =>
+        assert(details.contains(expectedSubstring),
+          s"Details '$details' should contain '$expectedSubstring'")
+      }
+    }
+  }
+
+  // ==================== Scenario-Based Validation Tests ====================
+
+  /**
+   * Test scenario: No overlapping transactions, all transactions closed 
properly
+   */
+  test("Test Validation - No Issues (PASSED)") {
+    val scenario = ValidationTestScenario(
+      name = "No Issues - All Transactions Completed Properly",
+      scenarioGenerator = () => {
+        val baseTime = System.currentTimeMillis()
+        List(
+          TransactionScenario(
+            filename = s"${baseTime}_owner1.jsonl",
+            records = List(
+              AuditRecord("owner1", baseTime, baseTime + 100, "START", 
baseTime + 60000),
+              AuditRecord("owner1", baseTime, baseTime + 200, "RENEW", 
baseTime + 60000),
+              AuditRecord("owner1", baseTime, baseTime + 300, "END", baseTime 
+ 60000)
+            )
+          ),
+          TransactionScenario(
+            filename = s"${baseTime + 500}_owner2.jsonl",
+            records = List(
+              AuditRecord("owner2", baseTime + 500, baseTime + 600, "START", 
baseTime + 60000),
+              AuditRecord("owner2", baseTime + 500, baseTime + 700, "END", 
baseTime + 60000)
+            )
+          )
+        )
+      },
+      expectedResultGenerator = () => ExpectedResult(
+        validationResult = "PASSED",
+        transactionsValidated = 2,
+        issuesFound = 0,
+        detailsContains = List("successfully")
+      )
+    )
+
+    runValidationScenario(scenario)
+  }
+
+  /**
+   * Test scenario: Single unclosed transaction (should be WARNING only)
+   */
+  test("Test Validation - Single Unclosed Transaction (WARNING)") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = createTestTable(tmp, tableName)
+
+      val baseTime = 1000000L
+
+      // Create single audit file without END record
+      val scenarios = List(
+        TransactionScenario(
+          filename = s"${baseTime}_owner1.jsonl",
+          records = List(
+            AuditRecord("owner1", baseTime, baseTime + 100, "START", baseTime 
+ 200)
+          )
+        )
+      )
+
+      createAuditFiles(tablePath, scenarios)
+
+      val result = spark.sql(s"""call validate_audit_lock(table => 
'$tableName')""").collect()
+      val row = result.head
+
+      // This should be WARNING only since there's no overlap possible with 
just one transaction
+      assertResult("WARNING")(row.getString(1))
+      assertResult(1)(row.getInt(2)) // transactions_validated
+      assertResult(1)(row.getInt(3)) // issues_found
+    }
+  }
+
+  /**
+   * Test scenario: Transactions without proper END, with proper separation 
(WARNING)
+   */
+  test("Test Validation - Unclosed Transactions (WARNING)") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = createTestTable(tmp, tableName)
+
+      val baseTime = 1000000L
+
+      // Create two transactions: one unclosed, one complete, no overlap
+      val scenarios = List(
+        TransactionScenario(
+          filename = s"${baseTime}_owner1.jsonl",
+          records = List(
+            AuditRecord("owner1", baseTime, baseTime + 100, "START", baseTime 
+ 200)
+            // No END record - effective end at expiration (baseTime + 200)
+          )
+        ),
+        TransactionScenario(
+          filename = s"${baseTime + 300}_owner2.jsonl", // Starts after 
owner1's expiration
+          records = List(
+            AuditRecord("owner2", baseTime + 300, baseTime + 400, "START", 
baseTime + 60000),
+            AuditRecord("owner2", baseTime + 300, baseTime + 500, "END", 
baseTime + 60000)
+          )
+        )
+      )
+
+      createAuditFiles(tablePath, scenarios)
+
+      val result = spark.sql(s"""call validate_audit_lock(table => 
'$tableName')""").collect()
+      val row = result.head
+
+      println(s"DEBUG - Result: ${row.getString(1)}, Issues: ${row.getInt(3)}, 
Details: ${row.getString(4)}")
+
+      assertResult("WARNING")(row.getString(1))
+      assertResult(2)(row.getInt(2)) // transactions_validated
+      assertResult(1)(row.getInt(3)) // issues_found - only the unclosed 
transaction
+      assert(row.getString(4).contains("[WARNING]"))
+      assert(row.getString(4).contains("owner1.jsonl"))
+      assert(row.getString(4).contains("did not end gracefully"))
+    }
+  }
+
+  /**
+   * Test scenario: Overlapping transactions with improper closure (FAILED)
+   */
+  test("Test Validation - Overlapping Transactions (FAILED)") {
+    val scenario = ValidationTestScenario(
+      name = "Overlapping Transactions",
+      scenarioGenerator = () => {
+        val baseTime = System.currentTimeMillis()
+        List(
+          TransactionScenario(
+            filename = s"${baseTime}_owner1.jsonl",
+            records = List(
+              AuditRecord("owner1", baseTime, baseTime + 100, "START", 
baseTime + 60000),
+              AuditRecord("owner1", baseTime, baseTime + 500, "END", baseTime 
+ 60000) // Ends after owner2 starts
+            )
+          ),
+          TransactionScenario(
+            filename = s"${baseTime + 200}_owner2.jsonl", // Starts before 
owner1 ends
+            records = List(
+              AuditRecord("owner2", baseTime + 200, baseTime + 300, "START", 
baseTime + 60000),
+              AuditRecord("owner2", baseTime + 200, baseTime + 400, "END", 
baseTime + 60000)
+            )
+          )
+        )
+      },
+      expectedResultGenerator = () => ExpectedResult(
+        validationResult = "FAILED",
+        transactionsValidated = 2,
+        issuesFound = 1,
+        detailsContains = List("[ERROR]", "owner1.jsonl", "overlaps with", 
"owner2.jsonl")
+      )
+    )
+
+    runValidationScenario(scenario)
+  }
+
+  /**
+   * Test scenario: Mixed issues - overlaps and unclosed transactions
+   */
+  test("Test Validation - Mixed Issues (FAILED)") {
+    val scenario = ValidationTestScenario(
+      name = "Mixed Issues",
+      scenarioGenerator = () => {
+        val baseTime = System.currentTimeMillis()
+        List(
+          TransactionScenario(
+            filename = s"${baseTime}_owner1.jsonl",
+            records = List(
+              AuditRecord("owner1", baseTime, baseTime + 100, "START", 
baseTime + 60000)
+              // No END - unclosed
+            )
+          ),
+          TransactionScenario(
+            filename = s"${baseTime + 50}_owner2.jsonl", // Overlaps with 
owner1
+            records = List(
+              AuditRecord("owner2", baseTime + 50, baseTime + 150, "START", 
baseTime + 60000),
+              AuditRecord("owner2", baseTime + 50, baseTime + 250, "END", 
baseTime + 60000)
+            )
+          )
+        )
+      },
+      expectedResultGenerator = () => ExpectedResult(
+        validationResult = "FAILED",
+        transactionsValidated = 2,
+        issuesFound = 2,
+        detailsContains = List("[ERROR]", "[WARNING]", "overlaps with", "did 
not end gracefully")
+      )
+    )
+
+    runValidationScenario(scenario)
+  }
+
+  /**
+   * Test scenario: Out-of-order filenames but valid non-overlapping 
transactions (PASSED)
+   */
+  test("Test Validation - Out of Order Filenames but Valid Transactions 
(PASSED)") {
+    val scenario = ValidationTestScenario(
+      name = "Out of Order Filenames - Valid Transactions",
+      scenarioGenerator = () => {
+        val baseTime = System.currentTimeMillis()
+        List(
+          // File with later timestamp in name but contains earlier transaction
+          TransactionScenario(
+            filename = s"${baseTime + 2000}_owner2.jsonl", // Filename 
suggests later time
+            records = List(
+              AuditRecord("owner2", baseTime + 100, baseTime + 200, "START", 
baseTime + 60000), // Actually starts first
+              AuditRecord("owner2", baseTime + 100, baseTime + 300, "END", 
baseTime + 60000)
+            )
+          ),
+          // File with earlier timestamp in name but contains later transaction
+          TransactionScenario(
+            filename = s"${baseTime}_owner1.jsonl", // Filename suggests 
earlier time
+            records = List(
+              AuditRecord("owner1", baseTime + 500, baseTime + 600, "START", 
baseTime + 60000), // Actually starts second
+              AuditRecord("owner1", baseTime + 500, baseTime + 700, "END", 
baseTime + 60000)
+            )
+          )
+        )
+      },
+      expectedResultGenerator = () => ExpectedResult(
+        validationResult = "PASSED",
+        transactionsValidated = 2,
+        issuesFound = 0,
+        detailsContains = List("successfully")
+      )
+    )
+
+    runValidationScenario(scenario)
+  }
+
+  // ==================== Parameter Validation Tests ====================
+
+  /**
+   * Test parameter validation by providing both table and path parameters.
+   * Since both are provided, the procedure will use the table parameter 
(consistent with other procedures).
+   */
+  test("Test Parameter Validation - Both Table and Path Parameters") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = createTestTable(tmp, tableName)
+
+      // Should work fine - will use table parameter when both are provided
+      val result = spark.sql(s"""call validate_audit_lock(table => 
'$tableName', path => '$tablePath')""").collect()
+      assertResult(1)(result.length)
+      assertResult(tableName)(result.head.getString(0))
+    }
+  }
+
+  /**
+   * Test parameter validation by omitting both required arguments.
+   */
+  test("Test Parameter Validation - Missing Required Arguments") {
+    checkExceptionContain(s"""call validate_audit_lock()""")(
+      "Table name or table path must be given one")
+  }
+
+  /**
+   * Test validation with non-existent table.
+   */
+  test("Test Parameter Validation - Non-existent Table") {
+    val nonExistentTable = "non_existent_table"
+
+    intercept[Exception] {
+      spark.sql(s"""call validate_audit_lock(table => '$nonExistentTable')""")
+    }
+  }
+
+  /**
+   * Test validation with invalid path.
+   */
+  test("Test Parameter Validation - Invalid Path") {
+    val invalidPath = "/non/existent/path"
+
+    intercept[Exception] {
+      spark.sql(s"""call validate_audit_lock(path => '$invalidPath')""")
+    }
+  }
+
+  /**
+   * Test validation with no audit folder.
+   */
+  test("Test Validation - No Audit Folder (PASSED)") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = createTestTable(tmp, tableName)
+      // Don't create any audit files
+
+      val result = spark.sql(s"""call validate_audit_lock(table => 
'$tableName')""").collect()
+
+      assertResult(1)(result.length)
+      val row = result.head
+      assertResult(tableName)(row.getString(0))
+      assertResult("PASSED")(row.getString(1))
+      assertResult(0)(row.getInt(2)) // transactions_validated
+      assertResult(0)(row.getInt(3)) // issues_found
+      assert(row.getString(4).contains("No audit folder found"))
+    }
+  }
+}

Reply via email to