This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 41c5765908b0 [HUDI-9782] Add validation and cleanup apis for storage
LP audit validation (#13886)
41c5765908b0 is described below
commit 41c5765908b050d5b36b279c97cf5ae65fec78ab
Author: Alex R <[email protected]>
AuthorDate: Tue Oct 7 10:43:10 2025 -0700
[HUDI-9782] Add validation and cleanup apis for storage LP audit validation
(#13886)
---
.../hudi/cli/commands/LockAuditingCommand.java | 509 +++++++++++++++-
.../hudi/cli/commands/TestLockAuditingCommand.java | 660 ++++++++++++++++++++-
.../transaction/lock/StorageBasedLockProvider.java | 5 +-
.../client/transaction/lock/StorageLockClient.java | 2 +-
.../audit/StorageLockProviderAuditService.java | 9 +-
.../lock/models/StorageLockClientFileTest.java | 58 ++
.../procedures/CleanupAuditLockProcedure.scala | 190 ++++++
.../hudi/command/procedures/HoodieProcedures.scala | 2 +
.../procedures/ValidateAuditLockProcedure.scala | 314 ++++++++++
.../procedure/TestCleanupAuditLockProcedure.scala | 293 +++++++++
.../procedure/TestValidateAuditLockProcedure.scala | 454 ++++++++++++++
11 files changed, 2469 insertions(+), 27 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java
index 51e2dbe7fe6c..18d3862aff63 100644
---
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java
+++
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java
@@ -21,8 +21,12 @@ package org.apache.hudi.cli.commands;
import org.apache.hudi.cli.HoodieCLI;
import
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService;
import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -32,8 +36,14 @@ import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
import org.springframework.shell.standard.ShellOption;
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
/**
* CLI commands for managing Hudi table lock auditing functionality.
@@ -44,6 +54,132 @@ public class LockAuditingCommand {
private static final Logger LOG =
LoggerFactory.getLogger(LockAuditingCommand.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ /**
+ * Represents a single audit log entry in the JSONL file.
+ * Maps to the structure written by StorageLockProviderAuditService.
+ */
+ public static class AuditRecord {
+ private final String ownerId;
+ private final long transactionStartTime;
+ private final long timestamp;
+ private final String state;
+ private final long lockExpiration;
+ private final boolean lockHeld;
+
+ @JsonCreator
+ public AuditRecord(
+ @JsonProperty("ownerId") String ownerId,
+ @JsonProperty("transactionStartTime") long transactionStartTime,
+ @JsonProperty("timestamp") long timestamp,
+ @JsonProperty("state") String state,
+ @JsonProperty("lockExpiration") long lockExpiration,
+ @JsonProperty("lockHeld") boolean lockHeld) {
+ this.ownerId = ownerId;
+ this.transactionStartTime = transactionStartTime;
+ this.timestamp = timestamp;
+ this.state = state;
+ this.lockExpiration = lockExpiration;
+ this.lockHeld = lockHeld;
+ }
+
+ public String getOwnerId() {
+ return ownerId;
+ }
+
+ public long getTransactionStartTime() {
+ return transactionStartTime;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public long getLockExpiration() {
+ return lockExpiration;
+ }
+
+ public boolean isLockHeld() {
+ return lockHeld;
+ }
+ }
+
+ /**
+ * Represents a transaction window with start time, end time, and metadata.
+ */
+ static class TransactionWindow {
+ final String ownerId;
+ final long transactionCreationTime;
+ final long lockAcquisitionTime;
+ final Option<Long> lockReleaseTime;
+ final Option<Long> lockExpirationTime;
+ final String filename;
+
+ TransactionWindow(String ownerId, long transactionCreationTime, long
lockAcquisitionTime,
+ Option<Long> lockReleaseTime, Option<Long>
lockExpirationTime, String filename) {
+ this.ownerId = ownerId;
+ this.transactionCreationTime = transactionCreationTime;
+ this.lockAcquisitionTime = lockAcquisitionTime;
+ this.lockReleaseTime = lockReleaseTime;
+ this.lockExpirationTime = lockExpirationTime;
+ this.filename = filename;
+ }
+
+ long getEffectiveEndTime() {
+ return
lockReleaseTime.orElse(lockExpirationTime.orElse(lockAcquisitionTime));
+ }
+ }
+
+ /**
+ * Holds validation results.
+ */
+ static class ValidationResults {
+ final List<String> errors;
+ final List<String> warnings;
+
+ ValidationResults(List<String> errors, List<String> warnings) {
+ this.errors = errors;
+ this.warnings = warnings;
+ }
+ }
+
+ /**
+ * Holds cleanup operation results.
+ */
+ static class CleanupResult {
+ final boolean success;
+ final int deletedCount;
+ final int failedCount;
+ final int totalFiles;
+ final String message;
+ final boolean isDryRun;
+
+ CleanupResult(boolean success, int deletedCount, int failedCount, int
totalFiles, String message, boolean isDryRun) {
+ this.success = success;
+ this.deletedCount = deletedCount;
+ this.failedCount = failedCount;
+ this.totalFiles = totalFiles;
+ this.message = message;
+ this.isDryRun = isDryRun;
+ }
+
+ boolean hasErrors() {
+ return failedCount > 0;
+ }
+
+ boolean hasFiles() {
+ return totalFiles > 0;
+ }
+
+ @Override
+ public String toString() {
+ return message;
+ }
+ }
+
/**
* Enables lock audit logging for the currently connected Hudi table.
* This command creates or updates the audit configuration file to enable
@@ -101,18 +237,20 @@ public class LockAuditingCommand {
try {
// Create the audit config file path
String auditConfigPath =
StorageLockProviderAuditService.getAuditConfigPath(HoodieCLI.basePath);
-
- // Check if config file exists
StoragePath configPath = new StoragePath(auditConfigPath);
- if (!HoodieCLI.storage.exists(configPath)) {
+
+ // Check if config file exists by attempting to get its info
+ try {
+ HoodieCLI.storage.getPathInfo(configPath);
+ } catch (FileNotFoundException e) {
return "Lock audit is already disabled (no configuration file found).";
}
-
+
// Create the JSON content with audit disabled
ObjectNode configJson = OBJECT_MAPPER.createObjectNode();
configJson.put(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD,
false);
String jsonContent = OBJECT_MAPPER.writeValueAsString(configJson);
-
+
// Write the config file
try (OutputStream outputStream = HoodieCLI.storage.create(configPath,
true)) {
outputStream.write(jsonContent.getBytes());
@@ -123,8 +261,15 @@ public class LockAuditingCommand {
if (keepAuditFiles) {
message += String.format("\nExisting audit files preserved at: %s",
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
} else {
- // Todo: write then call the api method to prune the old files
- message += String.format("\nAudit files cleaned up at: %s",
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
+ // Call the cleanup method to prune all old files
+ CleanupResult cleanupResult = performAuditCleanup(false, 0);
+ if (cleanupResult.success && cleanupResult.deletedCount > 0) {
+ message += String.format("\nAudit files cleaned up at: %s",
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
+ } else if (cleanupResult.success && cleanupResult.deletedCount == 0) {
+ message += String.format("\nNo audit files to clean up at: %s",
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
+ } else {
+ message += String.format("\nWarning: Some audit files may not have
been cleaned up: %s", cleanupResult.message);
+ }
}
return message;
@@ -152,21 +297,18 @@ public class LockAuditingCommand {
try {
// Create the audit config file path
String auditConfigPath =
StorageLockProviderAuditService.getAuditConfigPath(HoodieCLI.basePath);
-
- // Check if config file exists
StoragePath configPath = new StoragePath(auditConfigPath);
- if (!HoodieCLI.storage.exists(configPath)) {
- return String.format("Lock Audit Status: DISABLED\n"
- + "Table: %s\n"
- + "Config file: %s (not found)\n"
- + "Use 'locks audit enable' to enable audit logging.",
- HoodieCLI.basePath, auditConfigPath);
- }
-
+
// Read and parse the configuration
String configContent;
try (InputStream inputStream = HoodieCLI.storage.open(configPath)) {
configContent = new String(FileIOUtils.readAsByteArray(inputStream));
+ } catch (FileNotFoundException e) {
+ return String.format("Lock Audit Status: DISABLED\n"
+ + "Table: %s\n"
+ + "Config file: %s (not found)\n"
+ + "Use 'locks audit enable' to enable audit logging.",
+ HoodieCLI.basePath, auditConfigPath);
}
JsonNode rootNode = OBJECT_MAPPER.readTree(configContent);
JsonNode enabledNode =
rootNode.get(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD);
@@ -185,4 +327,337 @@ public class LockAuditingCommand {
return String.format("Failed to check lock audit status: %s",
e.getMessage());
}
}
+
+ /**
+ * Validates the audit lock files for consistency and integrity.
+ * This command checks for issues such as corrupted files, invalid format,
+ * incorrect state transitions, and orphaned locks.
+ *
+ * @return Validation results including any issues found
+ */
+ @ShellMethod(key = "locks audit validate", value = "Validate audit lock
files for consistency and integrity")
+ public String validateAuditLocks() {
+
+ if (HoodieCLI.basePath == null) {
+ return "No Hudi table loaded. Please connect to a table first.";
+ }
+
+ try {
+ String auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+ StoragePath auditFolder = new StoragePath(auditFolderPath);
+
+ // Get all audit files
+ List<StoragePathInfo> allFiles;
+ try {
+ allFiles = HoodieCLI.storage.listDirectEntries(auditFolder);
+ } catch (FileNotFoundException e) {
+ return "Validation Result: PASSED\n"
+ + "Transactions Validated: 0\n"
+ + "Issues Found: 0\n"
+ + "Details: No audit folder found - nothing to validate";
+ }
+ List<StoragePathInfo> auditFiles = new ArrayList<>();
+ for (StoragePathInfo pathInfo : allFiles) {
+ if (pathInfo.isFile() &&
pathInfo.getPath().getName().endsWith(".jsonl")) {
+ auditFiles.add(pathInfo);
+ }
+ }
+
+ if (auditFiles.isEmpty()) {
+ return "Validation Result: PASSED\n"
+ + "Transactions Validated: 0\n"
+ + "Issues Found: 0\n"
+ + "Details: No audit files found - nothing to validate";
+ }
+
+ // Parse all audit files into transaction windows
+ List<TransactionWindow> windows = new ArrayList<>();
+ List<String> parseErrors = new ArrayList<>();
+ for (StoragePathInfo pathInfo : auditFiles) {
+ Option<TransactionWindow> window = parseAuditFile(pathInfo);
+ if (window.isPresent()) {
+ windows.add(window.get());
+ } else {
+ parseErrors.add(String.format("[ERROR] Failed to parse audit file:
%s", pathInfo.getPath().getName()));
+ }
+ }
+
+ if (windows.isEmpty()) {
+ return String.format("Validation Result: FAILED\n"
+ + "Transactions Validated: 0\n"
+ + "Issues Found: %d\n"
+ + "Details: Failed to parse any audit files", auditFiles.size());
+ }
+
+ // Validate transactions
+ ValidationResults validationResults =
validateTransactionWindows(windows);
+
+ // Add parse errors to the validation results
+ validationResults.errors.addAll(parseErrors);
+
+ // Generate result
+ int totalIssues = validationResults.errors.size() +
validationResults.warnings.size();
+ String result;
+ String details;
+
+ if (totalIssues == 0) {
+ result = "PASSED";
+ details = "All audit lock transactions validated successfully";
+ } else {
+ result = validationResults.errors.isEmpty() ? "WARNING" : "FAILED";
+ List<String> allIssues = new ArrayList<>();
+ allIssues.addAll(validationResults.errors);
+ allIssues.addAll(validationResults.warnings);
+ details = String.join(", ", allIssues);
+ }
+
+ return String.format("Validation Result: %s\n"
+ + "Audit Files: %d total, %d parsed successfully, %d failed to
parse\n"
+ + "Transactions Validated: %d\n"
+ + "Issues Found: %d\n"
+ + "Details: %s", result, auditFiles.size(), windows.size(),
parseErrors.size(), windows.size(), totalIssues, details);
+
+ } catch (Exception e) {
+ LOG.error("Error validating audit locks", e);
+ return String.format("Validation Result: ERROR\n"
+ + "Transactions Validated: 0\n"
+ + "Issues Found: -1\n"
+ + "Details: Validation failed: %s", e.getMessage());
+ }
+ }
+
+ /**
+ * Cleans up old audit lock files based on age threshold.
+ * This command removes audit files that are older than the specified number
of days.
+ *
+ * @param dryRun Whether to perform a dry run (preview changes without
deletion)
+ * @param ageDays Number of days to keep audit files (default 7)
+ * @return Status message indicating files cleaned or to be cleaned
+ */
+ @ShellMethod(key = "locks audit cleanup", value = "Clean up old audit lock
files")
+ public String cleanupAuditLocks(
+ @ShellOption(value = {"--dryRun"}, defaultValue = "false",
+ help = "Preview changes without actually deleting files") final
boolean dryRun,
+ @ShellOption(value = {"--ageDays"}, defaultValue = "7",
+ help = "Delete audit files older than this many days") final String
ageDaysStr) {
+
+ try {
+ if (HoodieCLI.basePath == null) {
+ return "No Hudi table loaded. Please connect to a table first.";
+ }
+
+ // Parse ageDays manually to handle validation properly
+ int ageDays;
+ try {
+ ageDays = Integer.parseInt(ageDaysStr);
+ } catch (NumberFormatException e) {
+ return "Error: ageDays must be a value greater than 0.";
+ }
+
+ if (ageDays < 0) {
+ return "Error: ageDays must be non-negative (>= 0).";
+ }
+
+ return performAuditCleanup(dryRun, ageDays).toString();
+ } catch (Exception e) {
+ LOG.error("Error during audit cleanup", e);
+ return String.format("Error during cleanup: %s", e.getMessage() != null
? e.getMessage() : e.getClass().getSimpleName());
+ }
+ }
+
+ /**
+ * Internal method to perform audit cleanup. Used by both the CLI command
and disable method.
+ *
+ * @param dryRun Whether to perform a dry run (preview changes without
deletion)
+ * @param ageDays Number of days to keep audit files (0 means delete all,
must be non-negative)
+ * @return CleanupResult containing cleanup operation details
+ */
+ private CleanupResult performAuditCleanup(boolean dryRun, int ageDays) {
+ try {
+ if (HoodieCLI.storage == null) {
+ String message = "Storage not initialized.";
+ return new CleanupResult(false, 0, 0, 0, message, dryRun);
+ }
+
+ String auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+ StoragePath auditFolder = new StoragePath(auditFolderPath);
+
+ // Calculate cutoff timestamp (ageDays ago)
+ long cutoffTime = System.currentTimeMillis() - (ageDays * 24L * 60L *
60L * 1000L);
+
+ // List all files in audit folder and filter by modification time
+ List<StoragePathInfo> allFiles;
+ try {
+ allFiles = HoodieCLI.storage.listDirectEntries(auditFolder);
+ } catch (FileNotFoundException e) {
+ String message = "No audit folder found - nothing to cleanup.";
+ return new CleanupResult(true, 0, 0, 0, message, dryRun);
+ }
+ List<StoragePathInfo> auditFiles = new ArrayList<>();
+ List<StoragePathInfo> oldFiles = new ArrayList<>();
+
+ // Filter to get only .jsonl files
+ for (StoragePathInfo pathInfo : allFiles) {
+ if (pathInfo.isFile() &&
pathInfo.getPath().getName().endsWith(".jsonl")) {
+ auditFiles.add(pathInfo);
+ if (pathInfo.getModificationTime() < cutoffTime) {
+ oldFiles.add(pathInfo);
+ }
+ }
+ }
+
+ if (oldFiles.isEmpty()) {
+ String message = String.format("No audit files older than %d days
found.", ageDays);
+ return new CleanupResult(true, 0, 0, auditFiles.size(), message,
dryRun);
+ }
+
+ int fileCount = oldFiles.size();
+
+ if (dryRun) {
+ String message = String.format("Dry run: Would delete %d audit files
older than %d days.", fileCount, ageDays);
+ return new CleanupResult(true, 0, 0, fileCount, message, dryRun);
+ } else {
+ // Actually delete the files
+ int deletedCount = 0;
+ int failedCount = 0;
+
+ for (StoragePathInfo pathInfo : oldFiles) {
+ try {
+ HoodieCLI.storage.deleteFile(pathInfo.getPath());
+ deletedCount++;
+ } catch (Exception e) {
+ failedCount++;
+ LOG.warn("Failed to delete audit file: " + pathInfo.getPath(), e);
+ }
+ }
+
+ if (failedCount == 0) {
+ String message = String.format("Successfully deleted %d audit files
older than %d days.", deletedCount, ageDays);
+ return new CleanupResult(true, deletedCount, failedCount, fileCount,
message, dryRun);
+ } else {
+ String message = String.format("Deleted %d audit files, failed to
delete %d files.", deletedCount, failedCount);
+ return new CleanupResult(false, deletedCount, failedCount,
fileCount, message, dryRun);
+ }
+ }
+
+ } catch (Exception e) {
+ LOG.error("Error cleaning up audit locks", e);
+ String message = String.format("Failed to cleanup audit locks: %s",
e.getMessage());
+ return new CleanupResult(false, 0, 0, 0, message, dryRun);
+ }
+ }
+
+ /**
+ * Parses an audit file and extracts transaction window information.
+ */
+ private Option<TransactionWindow> parseAuditFile(StoragePathInfo pathInfo) {
+ String filename = pathInfo.getPath().getName();
+
+ try {
+ // Read and parse JSONL content
+ List<AuditRecord> entries = new ArrayList<>();
+ try (InputStream inputStream =
HoodieCLI.storage.open(pathInfo.getPath());
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(inputStream))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ if (line.trim().isEmpty()) {
+ continue;
+ }
+ try {
+ AuditRecord entry = OBJECT_MAPPER.readValue(line,
AuditRecord.class);
+ entries.add(entry);
+ } catch (Exception e) {
+ LOG.warn("Failed to parse JSON line in file " + filename + ": " +
line, e);
+ }
+ }
+ }
+
+ if (entries.isEmpty()) {
+ return Option.empty();
+ }
+
+ // Extract transaction metadata from first entry
+ AuditRecord firstEntry = entries.get(0);
+ String ownerId = firstEntry.getOwnerId();
+ long transactionCreationTime = firstEntry.getTransactionStartTime();
+
+ // Find first START timestamp
+ long lockAcquisitionTime = transactionCreationTime; // default to
transaction creation time
+ for (AuditRecord entry : entries) {
+ if ("START".equals(entry.getState())) {
+ lockAcquisitionTime = entry.getTimestamp();
+ break;
+ }
+ }
+
+ // Find last END timestamp
+ Option<Long> lockReleaseTime = Option.empty();
+ for (int i = entries.size() - 1; i >= 0; i--) {
+ AuditRecord entry = entries.get(i);
+ if ("END".equals(entry.getState())) {
+ lockReleaseTime = Option.of(entry.getTimestamp());
+ break;
+ }
+ }
+
+ // Find last expiration time as fallback
+ Option<Long> lockExpirationTime = Option.empty();
+ if (!entries.isEmpty()) {
+ AuditRecord lastEntry = entries.get(entries.size() - 1);
+ lockExpirationTime = Option.of(lastEntry.getLockExpiration());
+ }
+
+ return Option.of(new TransactionWindow(
+ ownerId,
+ transactionCreationTime,
+ lockAcquisitionTime,
+ lockReleaseTime,
+ lockExpirationTime,
+ filename
+ ));
+ } catch (Exception e) {
+ LOG.warn("Failed to parse audit file: " + filename, e);
+ return Option.empty();
+ }
+ }
+
+ /**
+ * Validates transaction windows for overlaps and proper closure.
+ */
+ private ValidationResults validateTransactionWindows(List<TransactionWindow>
windows) {
+ List<String> errors = new ArrayList<>();
+ List<String> warnings = new ArrayList<>();
+
+ // Check for transactions without proper END
+ for (TransactionWindow window : windows) {
+ if (!window.lockReleaseTime.isPresent()) {
+ warnings.add(String.format("[WARNING] %s => transaction did not end
gracefully. This could be due to driver OOM or non-graceful shutdown.",
+ window.filename));
+ }
+ }
+
+ // Sort windows by start time for overlap detection
+ List<TransactionWindow> sortedWindows = new ArrayList<>(windows);
+ sortedWindows.sort(Comparator.comparingLong(w -> w.lockAcquisitionTime));
+
+ // Check for overlaps
+ for (int i = 0; i < sortedWindows.size(); i++) {
+ TransactionWindow currentWindow = sortedWindows.get(i);
+ long currentEnd = currentWindow.getEffectiveEndTime();
+
+ // Check all subsequent windows for overlaps
+ for (int j = i + 1; j < sortedWindows.size(); j++) {
+ TransactionWindow otherWindow = sortedWindows.get(j);
+ long otherStart = otherWindow.lockAcquisitionTime;
+
+ // Check if windows overlap
+ if (otherStart < currentEnd) {
+ errors.add(String.format("[ERROR] %s => overlaps with %s",
+ currentWindow.filename, otherWindow.filename));
+ }
+ }
+ }
+
+ return new ValidationResults(errors, warnings);
+ }
}
\ No newline at end of file
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestLockAuditingCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestLockAuditingCommand.java
index da346110e124..266a7f4058a7 100644
---
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestLockAuditingCommand.java
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestLockAuditingCommand.java
@@ -26,12 +26,17 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.BeforeEach;
+import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
@@ -56,6 +61,55 @@ public class TestLockAuditingCommand extends
CLIFunctionalTestHarness {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ /**
+ * Helper method to create AuditRecord for tests with lockHeld defaulting to
true.
+ */
+ private static LockAuditingCommand.AuditRecord createAuditRecord(
+ String ownerId, long transactionStartTime, long timestamp, String state,
long lockExpiration) {
+ return new LockAuditingCommand.AuditRecord(ownerId, transactionStartTime,
timestamp, state, lockExpiration, true);
+ }
+
+ /**
+ * Represents a transaction scenario with its audit records
+ */
+ static class TransactionScenario {
+ final String filename; // e.g., "1234567890_owner1.jsonl"
+ final List<LockAuditingCommand.AuditRecord> records;
+
+ TransactionScenario(String filename, List<LockAuditingCommand.AuditRecord>
records) {
+ this.filename = filename;
+ this.records = records;
+ }
+ }
+
+ /**
+ * Helper method to create audit files from scenarios
+ */
+ private void createAuditFiles(List<TransactionScenario> scenarios) throws
IOException {
+ String auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+ StoragePath auditDir = new StoragePath(auditFolderPath);
+
+ // Create audit directory if it doesn't exist
+ if (!HoodieCLI.storage.exists(auditDir)) {
+ HoodieCLI.storage.createDirectory(auditDir);
+ }
+
+ for (TransactionScenario scenario : scenarios) {
+ StoragePath filePath = new StoragePath(auditDir, scenario.filename);
+ StringBuilder jsonLines = new StringBuilder();
+ for (LockAuditingCommand.AuditRecord record : scenario.records) {
+ if (jsonLines.length() > 0) {
+ jsonLines.append("\n");
+ }
+ jsonLines.append(OBJECT_MAPPER.writeValueAsString(record));
+ }
+
+ try (OutputStream outputStream = HoodieCLI.storage.create(filePath,
true)) {
+ outputStream.write(jsonLines.toString().getBytes());
+ }
+ }
+ }
+
@BeforeEach
public void setUp() throws Exception {
HoodieCLI.conf = storageConf();
@@ -243,6 +297,14 @@ public class TestLockAuditingCommand extends
CLIFunctionalTestHarness {
// First enable audit
shell.evaluate(() -> "locks audit enable");
+ // Create some audit files to be cleaned up
+ List<TransactionScenario> scenarios = new ArrayList<>();
+ List<LockAuditingCommand.AuditRecord> records = new ArrayList<>();
+ records.add(createAuditRecord("owner1", 1000L, 1100L, "START", 61000L));
+ records.add(createAuditRecord("owner1", 1000L, 1200L, "END", 61000L));
+ scenarios.add(new TransactionScenario("1000_owner1.jsonl", records));
+ createAuditFiles(scenarios);
+
// Disable with keepAuditFiles=false
Object result = shell.evaluate(() -> "locks audit disable --keepAuditFiles
false");
@@ -250,7 +312,63 @@ public class TestLockAuditingCommand extends
CLIFunctionalTestHarness {
() -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
() -> assertNotNull(result.toString()),
() -> assertTrue(result.toString().contains("Lock audit disabled
successfully")),
- () -> assertTrue(result.toString().contains("Audit files cleaned up
at:")));
+ () -> assertTrue(result.toString().contains("Audit files cleaned up
at:")
+ || result.toString().contains("Some audit files may
not have been cleaned up")));
+ }
+
+ /**
+ * Test that disable with keepAuditFiles=false actually cleans up files.
+ */
+ @Test
+ public void testDisableLockAuditCleansUpFiles() throws Exception {
+ // First enable audit
+ shell.evaluate(() -> "locks audit enable");
+
+ // Create some audit files
+ List<TransactionScenario> scenarios = new ArrayList<>();
+ List<LockAuditingCommand.AuditRecord> records = new ArrayList<>();
+ records.add(createAuditRecord("owner1", 1000L, 1100L, "START", 61000L));
+ records.add(createAuditRecord("owner1", 1000L, 1200L, "END", 61000L));
+ scenarios.add(new TransactionScenario("1000_owner1.jsonl", records));
+ createAuditFiles(scenarios);
+
+ // Verify files exist before disable
+ String auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+ StoragePath auditFolder = new StoragePath(auditFolderPath);
+ List<StoragePathInfo> filesBefore =
HoodieCLI.storage.listDirectEntries(auditFolder);
+ long jsonlFilesBefore = filesBefore.stream()
+ .filter(pathInfo -> pathInfo.isFile() &&
pathInfo.getPath().getName().endsWith(".jsonl"))
+ .count();
+ assertTrue(jsonlFilesBefore > 0, "Should have audit files before disable");
+
+ // Disable with keepAuditFiles=false - this should trigger cleanup
+ Object result = shell.evaluate(() -> "locks audit disable --keepAuditFiles
false");
+
+ assertAll("Cleanup triggered by disable",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertTrue(result.toString().contains("Lock audit disabled
successfully")));
+ }
+
+ /**
+ * Test disable with keepAuditFiles=false when no audit files exist.
+ */
+ @Test
+ public void testDisableLockAuditWithoutKeepingFilesNoFiles() {
+ // First enable audit but don't create any audit files
+ shell.evaluate(() -> "locks audit enable");
+
+ // Disable with keepAuditFiles=false when no files exist
+ Object result = shell.evaluate(() -> "locks audit disable --keepAuditFiles
false");
+
+ assertAll("Disable with cleanup when no files exist",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertTrue(result.toString().contains("Lock audit disabled
successfully")),
+ // Should handle the case where no files exist to clean up
+ () -> assertTrue(result.toString().contains("Audit files cleaned up
at:")
+ || result.toString().contains("No audit files")
+ || result.toString().contains("nothing to cleanup")));
}
/**
@@ -282,4 +400,544 @@ public class TestLockAuditingCommand extends
CLIFunctionalTestHarness {
assertTrue(enabledNode.asBoolean(), "Audit should still be enabled");
}
+
+ // ==================== Validation Tests ====================
+
+ /**
+ * Test validation when no audit folder exists.
+ */
+ @Test
+ public void testValidateAuditLocksNoAuditFolder() {
+ Object result = shell.evaluate(() -> "locks audit validate");
+
+ assertAll("Validation handles missing audit folder",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertTrue(result.toString().contains("Validation Result:
PASSED")),
+ () -> assertTrue(result.toString().contains("Transactions Validated:
0")),
+ () -> assertTrue(result.toString().contains("Issues Found: 0")),
+ () -> assertTrue(result.toString().contains("No audit folder found")));
+ }
+
+ /**
+ * Test validation when audit folder exists but no audit files.
+ */
+ @Test
+ public void testValidateAuditLocksNoAuditFiles() throws IOException {
+ // Create audit folder but no files
+ String auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+ StoragePath auditDir = new StoragePath(auditFolderPath);
+ HoodieCLI.storage.createDirectory(auditDir);
+
+ Object result = shell.evaluate(() -> "locks audit validate");
+
+ assertAll("Validation handles no audit files",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertTrue(result.toString().contains("Validation Result:
PASSED")),
+ () -> assertTrue(result.toString().contains("Transactions Validated:
0")),
+ () -> assertTrue(result.toString().contains("Issues Found: 0")),
+ () -> assertTrue(result.toString().contains("No audit files found")));
+ }
+
+ /**
+ * Test validation - No Issues (PASSED)
+ */
+ @Test
+ public void testValidateAuditLocksNoIssues() throws IOException {
+ long baseTime = System.currentTimeMillis();
+ List<TransactionScenario> scenarios = new ArrayList<>();
+
+ // Transaction 1: Complete transaction
+ List<LockAuditingCommand.AuditRecord> records1 = new ArrayList<>();
+ records1.add(createAuditRecord("owner1", baseTime, baseTime + 100,
"START", baseTime + 60000));
+ records1.add(createAuditRecord("owner1", baseTime, baseTime + 200,
"RENEW", baseTime + 60000));
+ records1.add(createAuditRecord("owner1", baseTime, baseTime + 300, "END",
baseTime + 60000));
+ scenarios.add(new TransactionScenario(baseTime + "_owner1.jsonl",
records1));
+
+ // Transaction 2: Complete transaction starting after first one ends
+ List<LockAuditingCommand.AuditRecord> records2 = new ArrayList<>();
+ records2.add(createAuditRecord("owner2", baseTime + 500, baseTime + 600,
"START", baseTime + 60000));
+ records2.add(createAuditRecord("owner2", baseTime + 500, baseTime + 700,
"END", baseTime + 60000));
+ scenarios.add(new TransactionScenario((baseTime + 500) + "_owner2.jsonl",
records2));
+
+ createAuditFiles(scenarios);
+
+ Object result = shell.evaluate(() -> "locks audit validate");
+
+ assertAll("Validation passes with no issues",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertTrue(result.toString().contains("Validation Result:
PASSED")),
+ () -> assertTrue(result.toString().contains("Transactions Validated:
2")),
+ () -> assertTrue(result.toString().contains("Issues Found: 0")),
+ () -> assertTrue(result.toString().contains("successfully")));
+ }
+
+ /**
+ * Test validation - Single Unclosed Transaction (WARNING)
+ */
+ @Test
+ public void testValidateAuditLocksSingleUnclosedTransaction() throws
IOException {
+ long baseTime = 1000000L;
+ List<TransactionScenario> scenarios = new ArrayList<>();
+
+ // Single audit file without END record
+ List<LockAuditingCommand.AuditRecord> records1 = new ArrayList<>();
+ records1.add(createAuditRecord("owner1", baseTime, baseTime + 100,
"START", baseTime + 200));
+ scenarios.add(new TransactionScenario(baseTime + "_owner1.jsonl",
records1));
+
+ createAuditFiles(scenarios);
+
+ Object result = shell.evaluate(() -> "locks audit validate");
+
+ assertAll("Single unclosed transaction shows warning",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertTrue(result.toString().contains("Validation Result:
WARNING")),
+ () -> assertTrue(result.toString().contains("Transactions Validated:
1")),
+ () -> assertTrue(result.toString().contains("Issues Found: 1")));
+ }
+
+ /**
+ * Test validation - Unclosed Transactions (WARNING)
+ */
+ @Test
+ public void testValidateAuditLocksUnclosedTransactions() throws IOException {
+ long baseTime = 1000000L;
+ List<TransactionScenario> scenarios = new ArrayList<>();
+
+ // Transaction 1: Unclosed (effective end at expiration = baseTime + 200)
+ List<LockAuditingCommand.AuditRecord> records1 = new ArrayList<>();
+ records1.add(createAuditRecord("owner1", baseTime, baseTime + 100,
"START", baseTime + 200));
+ scenarios.add(new TransactionScenario(baseTime + "_owner1.jsonl",
records1));
+
+ // Transaction 2: Complete, starts after owner1's expiration
+ List<LockAuditingCommand.AuditRecord> records2 = new ArrayList<>();
+ records2.add(createAuditRecord("owner2", baseTime + 300, baseTime + 400,
"START", baseTime + 60000));
+ records2.add(createAuditRecord("owner2", baseTime + 300, baseTime + 500,
"END", baseTime + 60000));
+ scenarios.add(new TransactionScenario((baseTime + 300) + "_owner2.jsonl",
records2));
+
+ createAuditFiles(scenarios);
+
+ Object result = shell.evaluate(() -> "locks audit validate");
+
+ assertAll("Unclosed transactions show warning",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertTrue(result.toString().contains("Validation Result:
WARNING")),
+ () -> assertTrue(result.toString().contains("Transactions Validated:
2")),
+ () -> assertTrue(result.toString().contains("Issues Found: 1")),
+ () -> assertTrue(result.toString().contains("[WARNING]")),
+ () -> assertTrue(result.toString().contains("owner1.jsonl")),
+ () -> assertTrue(result.toString().contains("did not end
gracefully")));
+ }
+
+ /**
+ * Test validation - Overlapping Transactions (FAILED)
+ */
+ @Test
+ public void testValidateAuditLocksOverlappingTransactions() throws
IOException {
+ long baseTime = System.currentTimeMillis();
+ List<TransactionScenario> scenarios = new ArrayList<>();
+
+ // Transaction 1: Ends after owner2 starts (overlapping)
+ List<LockAuditingCommand.AuditRecord> records1 = new ArrayList<>();
+ records1.add(createAuditRecord("owner1", baseTime, baseTime + 100,
"START", baseTime + 60000));
+ records1.add(createAuditRecord("owner1", baseTime, baseTime + 500, "END",
baseTime + 60000)); // Ends after owner2 starts
+ scenarios.add(new TransactionScenario(baseTime + "_owner1.jsonl",
records1));
+
+ // Transaction 2: Starts before owner1 ends (overlapping)
+ List<LockAuditingCommand.AuditRecord> records2 = new ArrayList<>();
+ records2.add(createAuditRecord("owner2", baseTime + 200, baseTime + 300,
"START", baseTime + 60000)); // Starts before owner1 ends
+ records2.add(createAuditRecord("owner2", baseTime + 200, baseTime + 400,
"END", baseTime + 60000));
+ scenarios.add(new TransactionScenario((baseTime + 200) + "_owner2.jsonl",
records2));
+
+ createAuditFiles(scenarios);
+
+ Object result = shell.evaluate(() -> "locks audit validate");
+
+ assertAll("Overlapping transactions show failure",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertTrue(result.toString().contains("Validation Result:
FAILED")),
+ () -> assertTrue(result.toString().contains("Transactions Validated:
2")),
+ () -> assertTrue(result.toString().contains("Issues Found: 1")),
+ () -> assertTrue(result.toString().contains("[ERROR]")),
+ () -> assertTrue(result.toString().contains("owner1.jsonl")),
+ () -> assertTrue(result.toString().contains("overlaps with")),
+ () -> assertTrue(result.toString().contains("owner2.jsonl")));
+ }
+
+ /**
+ * Test validation - Mixed Issues (FAILED)
+ */
+ @Test
+ public void testValidateAuditLocksMixedIssues() throws IOException {
+ long baseTime = System.currentTimeMillis();
+ List<TransactionScenario> scenarios = new ArrayList<>();
+
+ // Transaction 1: Unclosed transaction
+ List<LockAuditingCommand.AuditRecord> records1 = new ArrayList<>();
+ records1.add(createAuditRecord("owner1", baseTime, baseTime + 100,
"START", baseTime + 60000));
+ // No END - unclosed
+ scenarios.add(new TransactionScenario(baseTime + "_owner1.jsonl",
records1));
+
+ // Transaction 2: Overlaps with owner1
+ List<LockAuditingCommand.AuditRecord> records2 = new ArrayList<>();
+ records2.add(createAuditRecord("owner2", baseTime + 50, baseTime + 150,
"START", baseTime + 60000)); // Overlaps with owner1
+ records2.add(createAuditRecord("owner2", baseTime + 50, baseTime + 250,
"END", baseTime + 60000));
+ scenarios.add(new TransactionScenario((baseTime + 50) + "_owner2.jsonl",
records2));
+
+ createAuditFiles(scenarios);
+
+ Object result = shell.evaluate(() -> "locks audit validate");
+
+ assertAll("Mixed issues show failure",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertTrue(result.toString().contains("Validation Result:
FAILED")),
+ () -> assertTrue(result.toString().contains("Transactions Validated:
2")),
+ () -> assertTrue(result.toString().contains("Issues Found: 2")),
+ () -> assertTrue(result.toString().contains("[ERROR]")),
+ () -> assertTrue(result.toString().contains("[WARNING]")),
+ () -> assertTrue(result.toString().contains("overlaps with")),
+ () -> assertTrue(result.toString().contains("did not end
gracefully")));
+ }
+
+ /**
+ * Test validation - Out of Order Filenames but Valid Transactions (PASSED)
+ */
+ @Test
+ public void testValidateAuditLocksOutOfOrderFilenames() throws IOException {
+ long baseTime = System.currentTimeMillis();
+ List<TransactionScenario> scenarios = new ArrayList<>();
+
+ // File with later timestamp in name but contains earlier transaction
+ List<LockAuditingCommand.AuditRecord> records1 = new ArrayList<>();
+ records1.add(createAuditRecord("owner2", baseTime + 100, baseTime + 200,
"START", baseTime + 60000)); // Actually starts first
+ records1.add(createAuditRecord("owner2", baseTime + 100, baseTime + 300,
"END", baseTime + 60000));
+ scenarios.add(new TransactionScenario((baseTime + 2000) + "_owner2.jsonl",
records1)); // Filename suggests later time
+
+ // File with earlier timestamp in name but contains later transaction
+ List<LockAuditingCommand.AuditRecord> records2 = new ArrayList<>();
+ records2.add(createAuditRecord("owner1", baseTime + 500, baseTime + 600,
"START", baseTime + 60000)); // Actually starts second
+ records2.add(createAuditRecord("owner1", baseTime + 500, baseTime + 700,
"END", baseTime + 60000));
+ scenarios.add(new TransactionScenario(baseTime + "_owner1.jsonl",
records2)); // Filename suggests earlier time
+
+ createAuditFiles(scenarios);
+
+ Object result = shell.evaluate(() -> "locks audit validate");
+
+ assertAll("Out of order filenames but valid transactions pass",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertTrue(result.toString().contains("Validation Result:
PASSED")),
+ () -> assertTrue(result.toString().contains("Transactions Validated:
2")),
+ () -> assertTrue(result.toString().contains("Issues Found: 0")),
+ () -> assertTrue(result.toString().contains("successfully")));
+ }
+
+ /**
+ * Test validation when no table is loaded.
+ */
+ @Test
+ public void testValidateAuditLocksNoTable() {
+ // Clear the base path to simulate no table loaded
+ String originalBasePath = HoodieCLI.basePath;
+ HoodieCLI.basePath = null;
+
+ try {
+ Object result = shell.evaluate(() -> "locks audit validate");
+ assertAll("Validation handles no table loaded",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertEquals("No Hudi table loaded. Please connect to a table
first.", result.toString()));
+ } finally {
+ HoodieCLI.basePath = originalBasePath;
+ }
+ }
+
+ /**
+ * Test validation with malformed audit files.
+ */
+ @Test
+ public void testValidateAuditLocksMalformedFiles() throws IOException {
+ // Create audit directory
+ String auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+ StoragePath auditDir = new StoragePath(auditFolderPath);
+ HoodieCLI.storage.createDirectory(auditDir);
+
+ // Create a malformed audit file
+ StoragePath filePath = new StoragePath(auditDir, "malformed_file.jsonl");
+ try (OutputStream outputStream = HoodieCLI.storage.create(filePath, true))
{
+ outputStream.write("invalid json content".getBytes());
+ }
+
+ Object result = shell.evaluate(() -> "locks audit validate");
+
+ assertAll("Validation handles malformed files",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertTrue(result.toString().contains("Validation Result:
FAILED")),
+ () -> assertTrue(result.toString().contains("Transactions Validated:
0")),
+ () -> assertTrue(result.toString().contains("Failed to parse any audit
files")));
+ }
+
+ // ==================== Cleanup Tests ====================
+
+ /**
+ * Test cleanup when no audit folder exists.
+ */
+ @Test
+ public void testCleanupAuditLocksNoAuditFolder() {
+ Object result = shell.evaluate(() -> "locks audit cleanup");
+
+ assertAll("Cleanup handles missing audit folder",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertEquals("No audit folder found - nothing to cleanup.",
result.toString()));
+ }
+
+ /**
+ * Test cleanup when audit folder exists but no audit files.
+ */
+ @Test
+ public void testCleanupAuditLocksNoAuditFiles() throws IOException {
+ // Create audit folder but no files
+ String auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+ StoragePath auditDir = new StoragePath(auditFolderPath);
+ HoodieCLI.storage.createDirectory(auditDir);
+
+ Object result = shell.evaluate(() -> "locks audit cleanup");
+
+ assertAll("Cleanup handles no audit files",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertTrue(result.toString().contains("No audit files older than
7 days found")));
+ }
+
+ /**
+ * Test cleanup with dry run - test basic functionality.
+ */
+ @Test
+ public void testCleanupAuditLocksDryRun() throws IOException {
+ // Create some audit files (they will have current modification time)
+ long oldTime = System.currentTimeMillis() - (10 * 24 * 60 * 60 * 1000L);
// 10 days ago
+ List<TransactionScenario> scenarios = new ArrayList<>();
+
+ List<LockAuditingCommand.AuditRecord> records = new ArrayList<>();
+ records.add(createAuditRecord("owner1", oldTime, oldTime + 100, "START",
oldTime + 60000));
+ records.add(createAuditRecord("owner1", oldTime, oldTime + 200, "END",
oldTime + 60000));
+ scenarios.add(new TransactionScenario(oldTime + "_owner1.jsonl", records));
+
+ createAuditFiles(scenarios);
+
+ // Test default cleanup with dry run
+ Object result = shell.evaluate(() -> "locks audit cleanup --dryRun true");
+
+ assertAll("Dry run executes successfully",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ // Since files are created with current time, they should be too
recent to delete
+ () -> assertTrue(result.toString().contains("No audit files older
than")
+ || result.toString().contains("Dry run: Would
delete")));
+
+ // Verify files still exist
+ String auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+ StoragePath auditFolder = new StoragePath(auditFolderPath);
+ List<StoragePathInfo> filesAfter =
HoodieCLI.storage.listDirectEntries(auditFolder);
+ long jsonlFiles = filesAfter.stream()
+ .filter(pathInfo -> pathInfo.isFile() &&
pathInfo.getPath().getName().endsWith(".jsonl"))
+ .count();
+ assertEquals(1, jsonlFiles, "Files should still exist after dry run");
+ }
+
+ /**
+ * Test cleanup with custom age threshold.
+ */
+ @Test
+ public void testCleanupAuditLocksCustomAge() throws IOException {
+ // Create some audit files
+ List<TransactionScenario> scenarios = new ArrayList<>();
+
+ List<LockAuditingCommand.AuditRecord> records = new ArrayList<>();
+ records.add(createAuditRecord("owner1", 1000L, 1100L, "START", 61000L));
+ records.add(createAuditRecord("owner1", 1000L, 1200L, "END", 61000L));
+ scenarios.add(new TransactionScenario("1000_owner1.jsonl", records));
+
+ createAuditFiles(scenarios);
+
+ // Test with custom age threshold - files will be recent so won't be
deleted
+ Object result = shell.evaluate(() -> "locks audit cleanup --ageDays 30
--dryRun true");
+
+ assertAll("Custom age threshold works",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertTrue(result.toString().contains("No audit files older than
30 days found")));
+ }
+
+ /**
+ * Test actual cleanup (not dry run).
+ */
+ @Test
+ public void testCleanupAuditLocksActualDelete() throws IOException {
+ // Create some audit files
+ List<TransactionScenario> scenarios = new ArrayList<>();
+
+ List<LockAuditingCommand.AuditRecord> records = new ArrayList<>();
+ records.add(createAuditRecord("owner1", 1000L, 1100L, "START", 61000L));
+ records.add(createAuditRecord("owner1", 1000L, 1200L, "END", 61000L));
+ scenarios.add(new TransactionScenario("1000_owner1.jsonl", records));
+
+ createAuditFiles(scenarios);
+
+ // Test actual cleanup - files will be recent so shouldn't be deleted
+ Object result = shell.evaluate(() -> "locks audit cleanup --dryRun false");
+
+ assertAll("Actual cleanup executes successfully",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ // Recent files shouldn't be deleted
+ () -> assertTrue(result.toString().contains("No audit files older than
7 days found")));
+ }
+
+ /**
+ * Test cleanup with recent files (should not delete).
+ */
+ @Test
+ public void testCleanupAuditLocksRecentFiles() throws IOException {
+ // Create recent audit files
+ long recentTime = System.currentTimeMillis() - (2 * 24 * 60 * 60 * 1000L);
// 2 days ago
+ List<TransactionScenario> scenarios = new ArrayList<>();
+
+ List<LockAuditingCommand.AuditRecord> records = new ArrayList<>();
+ records.add(createAuditRecord("owner1", recentTime, recentTime + 100,
"START", recentTime + 60000));
+ records.add(createAuditRecord("owner1", recentTime, recentTime + 200,
"END", recentTime + 60000));
+ scenarios.add(new TransactionScenario(recentTime + "_owner1.jsonl",
records));
+
+ createAuditFiles(scenarios);
+
+ Object result = shell.evaluate(() -> "locks audit cleanup");
+
+ assertAll("Recent files are not deleted",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertTrue(result.toString().contains("No audit files older than
7 days found")));
+ }
+
+ /**
+ * Test cleanup parameter validation - invalid age days.
+ */
+ @Test
+ public void testCleanupAuditLocksInvalidAgeDays() {
+ // Test with negative ageDays (Spring Shell treats this as invalid integer
format)
+ Object resultNegative = shell.evaluate(() -> "locks audit cleanup
--ageDays -1");
+
+ assertAll("Negative ageDays should be rejected",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(resultNegative)),
+ () -> assertNotNull(resultNegative.toString()),
+ () -> assertEquals("Error: ageDays must be a value greater than 0.",
resultNegative.toString()));
+
+ // Test with invalid string to verify our parsing validation
+ Object resultInvalid = shell.evaluate(() -> "locks audit cleanup --ageDays
abc");
+
+ assertAll("Invalid string ageDays should be rejected",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(resultInvalid)),
+ () -> assertNotNull(resultInvalid.toString()),
+ () -> assertEquals("Error: ageDays must be a value greater than 0.",
resultInvalid.toString()));
+ }
+
+ /**
+ * Test cleanup when no table is loaded.
+ */
+ @Test
+ public void testCleanupAuditLocksNoTable() {
+ // Clear the base path to simulate no table loaded
+ String originalBasePath = HoodieCLI.basePath;
+ HoodieCLI.basePath = null;
+
+ Object result = shell.evaluate(() -> "locks audit cleanup");
+ assertAll("Cleanup handles no table loaded",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertEquals("No Hudi table loaded. Please connect to a table
first.", result.toString()));
+ HoodieCLI.basePath = originalBasePath;
+ }
+
+ /**
+ * Test cleanup with multiple files.
+ */
+ @Test
+ public void testCleanupAuditLocksMixedFiles() throws IOException {
+ // Create multiple audit files
+ List<TransactionScenario> scenarios = new ArrayList<>();
+
+ // File 1
+ List<LockAuditingCommand.AuditRecord> records1 = new ArrayList<>();
+ records1.add(createAuditRecord("owner1", 1000L, 1100L, "START", 61000L));
+ records1.add(createAuditRecord("owner1", 1000L, 1200L, "END", 61000L));
+ scenarios.add(new TransactionScenario("1000_owner1.jsonl", records1));
+
+ // File 2
+ List<LockAuditingCommand.AuditRecord> records2 = new ArrayList<>();
+ records2.add(createAuditRecord("owner2", 2000L, 2100L, "START", 62000L));
+ records2.add(createAuditRecord("owner2", 2000L, 2200L, "END", 62000L));
+ scenarios.add(new TransactionScenario("2000_owner2.jsonl", records2));
+
+ createAuditFiles(scenarios);
+
+ Object result = shell.evaluate(() -> "locks audit cleanup --dryRun true");
+ System.out.println("DEBUG - Mixed files result: " + result.toString());
+
+ assertAll("Multiple files handled correctly",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertTrue(result.toString().contains("No audit files older
than")
+ || result.toString().contains("cleanup")
+ || result.toString().contains("found")));
+ }
+
+ /**
+ * Test that disable method can call performAuditCleanup with ageDays=0 to
delete all files.
+ * This validates that the internal validation allows ageDays >= 0 (not just
> 0).
+ */
+ @Test
+ public void testDisableLockAuditWithAgeDaysZero() throws Exception {
+ // First enable audit
+ shell.evaluate(() -> "locks audit enable");
+
+ // Create some audit files to be cleaned up
+ List<TransactionScenario> scenarios = new ArrayList<>();
+ List<LockAuditingCommand.AuditRecord> records = new ArrayList<>();
+ records.add(createAuditRecord("owner1", 1000L, 1100L, "START", 61000L));
+ records.add(createAuditRecord("owner1", 1000L, 1200L, "END", 61000L));
+ scenarios.add(new TransactionScenario("1000_owner1.jsonl", records));
+ createAuditFiles(scenarios);
+
+ // Verify files exist before disable
+ String auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+ StoragePath auditFolder = new StoragePath(auditFolderPath);
+ List<StoragePathInfo> filesBefore =
HoodieCLI.storage.listDirectEntries(auditFolder);
+ long jsonlFilesBefore = filesBefore.stream()
+ .filter(pathInfo -> pathInfo.isFile() &&
pathInfo.getPath().getName().endsWith(".jsonl"))
+ .count();
+ assertTrue(jsonlFilesBefore > 0, "Should have audit files before disable");
+
+ // Disable with keepAuditFiles=false, which internally calls
performAuditCleanup(false, 0)
+ Object result = shell.evaluate(() -> "locks audit disable --keepAuditFiles
false");
+
+ assertAll("Disable with ageDays=0 deletes all files",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertTrue(result.toString().contains("Lock audit disabled
successfully")),
+ () -> assertTrue(result.toString().contains("cleaned up") ||
result.toString().contains("No audit files to clean up")));
+
+ // Verify files were cleaned up (ageDays=0 should delete all files)
+ List<StoragePathInfo> filesAfter =
HoodieCLI.storage.listDirectEntries(auditFolder);
+ long jsonlFilesAfter = filesAfter.stream()
+ .filter(pathInfo -> pathInfo.isFile() &&
pathInfo.getPath().getName().endsWith(".jsonl"))
+ .count();
+ assertEquals(0, jsonlFilesAfter, "All audit files should be deleted when
ageDays=0");
+ }
}
\ No newline at end of file
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
index 56317c92075a..ad6577aed172 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
@@ -187,7 +187,7 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
this.lockValiditySecs = config.getValiditySeconds();
this.basePath = config.getHudiTableBasePath();
String lockFolderPath = StorageLockClient.getLockFolderPath(basePath);
- this.lockFilePath = String.format("%s%s%s", lockFolderPath,
StoragePath.SEPARATOR, DEFAULT_TABLE_LOCK_FILE_NAME);
+ this.lockFilePath = new StoragePath(lockFolderPath,
DEFAULT_TABLE_LOCK_FILE_NAME).toString();
this.heartbeatManager = heartbeatManagerLoader.apply(ownerId,
TimeUnit.SECONDS.toMillis(heartbeatPollSeconds), this::renewLock);
this.storageLockClient = storageLockClientLoader.apply(ownerId,
lockFilePath, properties);
this.ownerId = ownerId;
@@ -479,6 +479,7 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
// Upload metadata that will unlock this lock.
StorageLockData expiredLockData = new StorageLockData(true,
this.getLock().getValidUntilMs(), ownerId);
Pair<LockUpsertResult, Option<StorageLockFile>> result;
+ long lockExpirationTimeMs = System.currentTimeMillis();
result = this.storageLockClient.tryUpsertLockFile(expiredLockData,
Option.of(this.getLock()));
switch (result.getLeft()) {
case UNKNOWN_ERROR:
@@ -488,7 +489,7 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
return false;
case SUCCESS:
logInfoLockState(RELEASED);
- recordAuditOperation(AuditOperationState.END,
System.currentTimeMillis());
+ recordAuditOperation(AuditOperationState.END, lockExpirationTimeMs);
setLock(null);
return true;
case ACQUIRED_BY_OTHERS:
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
index fa5c55171983..0ae66a58dbb8 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
@@ -89,7 +89,7 @@ public interface StorageLockClient extends AutoCloseable {
* @return The lock folder path (e.g., "s3://bucket/table/.hoodie/.locks")
*/
static String getLockFolderPath(String basePath) {
- return String.format("%s%s%s", basePath, StoragePath.SEPARATOR,
LOCKS_FOLDER_NAME);
+ return new StoragePath(basePath, LOCKS_FOLDER_NAME).toString();
}
/**
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/StorageLockProviderAuditService.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/StorageLockProviderAuditService.java
index ba353fd8eacb..cc93f9f7388a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/StorageLockProviderAuditService.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/StorageLockProviderAuditService.java
@@ -52,7 +52,7 @@ public class StorageLockProviderAuditService implements
AuditService {
*/
public static String getAuditConfigPath(String basePath) {
String lockFolderPath = StorageLockClient.getLockFolderPath(basePath);
- return String.format("%s%s%s", lockFolderPath, StoragePath.SEPARATOR,
AUDIT_CONFIG_FILE_NAME);
+ return new StoragePath(lockFolderPath, AUDIT_CONFIG_FILE_NAME).toString();
}
/**
@@ -63,7 +63,7 @@ public class StorageLockProviderAuditService implements
AuditService {
*/
public static String getAuditFolderPath(String basePath) {
String lockFolderPath = StorageLockClient.getLockFolderPath(basePath);
- return String.format("%s%s%s", lockFolderPath, StoragePath.SEPARATOR,
AUDIT_FOLDER_NAME);
+ return new StoragePath(lockFolderPath, AUDIT_FOLDER_NAME).toString();
}
private final String ownerId;
@@ -100,10 +100,7 @@ public class StorageLockProviderAuditService implements
AuditService {
// Generate audit file path: <txn-start>_<full-owner-id>.jsonl
String filename = String.format("%d_%s.jsonl", transactionStartTime,
ownerId);
- this.auditFilePath = String.format("%s%s%s",
- getAuditFolderPath(basePath),
- StoragePath.SEPARATOR,
- filename);
+ this.auditFilePath = new StoragePath(getAuditFolderPath(basePath),
filename).toString();
LOG.debug("Initialized audit service for transaction starting at {} with
file: {}",
transactionStartTime, auditFilePath);
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockClientFileTest.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockClientFileTest.java
index 33d3c9bf44ff..9a415850289a 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockClientFileTest.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockClientFileTest.java
@@ -18,6 +18,8 @@
package org.apache.hudi.client.transaction.lock.models;
+import org.apache.hudi.client.transaction.lock.StorageLockClient;
+import
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService;
import org.apache.hudi.exception.HoodieIOException;
import org.junit.jupiter.api.BeforeEach;
@@ -162,4 +164,60 @@ public class StorageLockClientFileTest {
StorageLockFile file = new StorageLockFile(data, VERSION_ID);
assertEquals(VERSION_ID, file.getVersionId());
}
+
+ @Test
+ void testGetLockFolderPathWithoutTrailingSlash() {
+ String basePath = "s3://bucket/table";
+ String expected = "s3://bucket/table/.hoodie/.locks";
+ String actual = StorageLockClient.getLockFolderPath(basePath);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ void testGetLockFolderPathWithTrailingSlash() {
+ String basePath = "s3://bucket/table/";
+ String expected = "s3://bucket/table/.hoodie/.locks";
+ String actual = StorageLockClient.getLockFolderPath(basePath);
+ assertEquals(expected, actual, "Path with trailing slash should be
normalized correctly");
+ }
+
+ @Test
+ void testGetLockFolderPathWithMultipleTrailingSlashes() {
+ String basePath = "s3://bucket/table///";
+ String expected = "s3://bucket/table/.hoodie/.locks";
+ String actual = StorageLockClient.getLockFolderPath(basePath);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ void testGetLockFolderPathLocalFileSystem() {
+ String basePath = "/tmp/hudi/table";
+ String expected = "/tmp/hudi/table/.hoodie/.locks";
+ String actual = StorageLockClient.getLockFolderPath(basePath);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ void testGetLockFolderPathLocalFileSystemWithTrailingSlash() {
+ String basePath = "/tmp/hudi/table/";
+ String expected = "/tmp/hudi/table/.hoodie/.locks";
+ String actual = StorageLockClient.getLockFolderPath(basePath);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ void testGetAuditConfigPathWithTrailingSlash() {
+ String basePath = "s3://bucket/table/";
+ String expected = "s3://bucket/table/.hoodie/.locks/audit_enabled.json";
+ String actual =
StorageLockProviderAuditService.getAuditConfigPath(basePath);
+ assertEquals(expected, actual, "Audit config path with trailing slash
should be normalized correctly");
+ }
+
+ @Test
+ void testGetAuditFolderPathWithTrailingSlash() {
+ String basePath = "s3://bucket/table/";
+ String expected = "s3://bucket/table/.hoodie/.locks/audit";
+ String actual =
StorageLockProviderAuditService.getAuditFolderPath(basePath);
+ assertEquals(expected, actual, "Audit folder path with trailing slash
should be normalized correctly");
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CleanupAuditLockProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CleanupAuditLockProcedure.scala
new file mode 100644
index 000000000000..64c8aaacf505
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CleanupAuditLockProcedure.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService
+import org.apache.hudi.storage.StoragePath
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+
+/**
+ * Spark SQL procedure for cleaning up old audit lock files for Hudi tables.
+ *
+ * This procedure removes audit lock files that are older than a specified
number
+ * of days. It supports dry-run mode to preview which files would be deleted
+ * without actually removing them.
+ *
+ * Usage:
+ * {{{
+ * CALL cleanup_audit_lock(table => 'my_table')
+ * CALL cleanup_audit_lock(path => '/path/to/table', dry_run => true, age_days
=> 30)
+ * }}}
+ *
+ * The procedure operates on audit files in:
+ * `{table_path}/.hoodie/.locks/audit/`
+ *
+ * @author Apache Hudi
+ * @since 1.0.0
+ */
+class CleanupAuditLockProcedure extends BaseProcedure with ProcedureBuilder {
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.optional(0, "table", DataTypes.StringType),
+ ProcedureParameter.optional(1, "path", DataTypes.StringType),
+ ProcedureParameter.optional(2, "dry_run", DataTypes.BooleanType, false),
+ ProcedureParameter.optional(3, "age_days", DataTypes.IntegerType, 7)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("table", DataTypes.StringType, nullable = false,
Metadata.empty),
+ StructField("files_cleaned", DataTypes.IntegerType, nullable = false,
Metadata.empty),
+ StructField("dry_run", DataTypes.BooleanType, nullable = false,
Metadata.empty),
+ StructField("age_days", DataTypes.IntegerType, nullable = false,
Metadata.empty),
+ StructField("message", DataTypes.StringType, nullable = false,
Metadata.empty)
+ ))
+
+ /**
+ * Returns the procedure parameters definition.
+ *
+ * @return Array of parameters: table (optional String), path (optional
String),
+ * dry_run (optional Boolean, default false), and age_days (optional
Integer, default 7)
+ */
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ /**
+ * Returns the output schema for the procedure result.
+ *
+ * @return StructType containing table, files_cleaned, dry_run, age_days,
and message columns
+ */
+ def outputType: StructType = OUTPUT_TYPE
+
+ /**
+ * Executes the audit lock cleanup procedure.
+ *
+ * @param args Procedure arguments containing table name or path, dry_run
flag, and age threshold
+ * @return Sequence containing a single Row with cleanup results
+ * @throws IllegalArgumentException if neither table nor path is provided,
or both are provided
+ */
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+ val dryRun = getArgValueOrDefault(args,
PARAMETERS(2)).getOrElse(false).asInstanceOf[Boolean]
+ val ageDays = getArgValueOrDefault(args,
PARAMETERS(3)).getOrElse(7).asInstanceOf[Int]
+
+ // Validate age_days is positive
+ if (ageDays <= 0) {
+ throw new IllegalArgumentException("age_days must be a positive integer")
+ }
+
+ // Get the base path using BaseProcedure helper (handles table/path
validation)
+ val basePath: String = getBasePath(tableName, tablePath)
+ val metaClient = createMetaClient(jsc, basePath)
+
+ // Use table name if provided, otherwise extract from path
+ val displayName =
tableName.map(_.asInstanceOf[String]).getOrElse(tablePath.get.asInstanceOf[String])
+
+ try {
+ val auditFolderPath = new
StoragePath(StorageLockProviderAuditService.getAuditFolderPath(basePath))
+ val storage = metaClient.getStorage
+
+ // Check if audit folder exists
+ if (!storage.exists(auditFolderPath)) {
+ val message = "No audit folder found - nothing to cleanup"
+ Seq(Row(displayName, 0, dryRun, ageDays, message))
+ } else {
+
+ // Calculate cutoff timestamp (ageDays ago)
+ val cutoffTime = System.currentTimeMillis() - (ageDays * 24 * 60 * 60
* 1000L)
+
+ // List all files in audit folder and filter by modification time
+ val allFiles = storage.listDirectEntries(auditFolderPath).asScala
+ val auditFiles = allFiles.filter(pathInfo => pathInfo.isFile &&
pathInfo.getPath.getName.endsWith(".jsonl"))
+ val oldFiles = auditFiles.filter(_.getModificationTime < cutoffTime)
+
+ if (oldFiles.isEmpty) {
+ val message = s"No audit files older than $ageDays days found"
+ Seq(Row(displayName, 0, dryRun, ageDays, message))
+ } else {
+
+ val fileCount = oldFiles.size
+
+ if (dryRun) {
+ val message = s"Dry run: Would delete $fileCount audit files older
than $ageDays days"
+ Seq(Row(displayName, fileCount, dryRun, ageDays, message))
+ } else {
+ // Actually delete the files
+ var deletedCount = 0
+ var failedCount = 0
+
+ oldFiles.foreach { pathInfo =>
+ try {
+ storage.deleteFile(pathInfo.getPath)
+ deletedCount += 1
+ } catch {
+ case e: Exception =>
+ failedCount += 1
+ // Log the error but continue with other files
+ }
+ }
+
+ val message = if (failedCount == 0) {
+ s"Successfully deleted $deletedCount audit files older than
$ageDays days"
+ } else {
+ s"Deleted $deletedCount audit files, failed to delete
$failedCount files"
+ }
+
+ Seq(Row(displayName, deletedCount, dryRun, ageDays, message))
+ }
+ }
+ }
+ } catch {
+ case e: Exception =>
+ val errorMessage = s"Cleanup failed: ${e.getMessage}"
+ Seq(Row(displayName, 0, dryRun, ageDays, errorMessage))
+ }
+ }
+
+ /**
+ * Builds a new instance of the CleanupAuditLockProcedure.
+ *
+ * @return New CleanupAuditLockProcedure instance
+ */
+ override def build: Procedure = new CleanupAuditLockProcedure()
+}
+
+
+/**
+ * Companion object for CleanupAuditLockProcedure containing constants and
factory methods.
+ */
+object CleanupAuditLockProcedure {
+ /** The name used to register and invoke this procedure */
+ val NAME = "cleanup_audit_lock"
+
+ /**
+ * Factory method to create procedure builder instances.
+ *
+ * @return Supplier that creates new CleanupAuditLockProcedure instances
+ */
+ def builder: Supplier[ProcedureBuilder] = () => new
CleanupAuditLockProcedure()
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index ffde5337397d..20c0e8c31340 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -102,6 +102,8 @@ object HoodieProcedures {
,(ShowCleansPlanProcedure.NAME, ShowCleansPlanProcedure.builder)
,(SetAuditLockProcedure.NAME, SetAuditLockProcedure.builder)
,(ShowAuditLockStatusProcedure.NAME,
ShowAuditLockStatusProcedure.builder)
+ ,(ValidateAuditLockProcedure.NAME, ValidateAuditLockProcedure.builder)
+ ,(CleanupAuditLockProcedure.NAME, CleanupAuditLockProcedure.builder)
)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateAuditLockProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateAuditLockProcedure.scala
new file mode 100644
index 000000000000..22f9b777a5eb
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateAuditLockProcedure.scala
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService
+import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
+
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+
+import java.io.FileNotFoundException
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Spark SQL procedure for validating audit lock files for Hudi tables.
+ *
+ * This procedure validates the integrity and consistency of audit lock files
+ * generated by the storage lock audit service. It checks for various issues
+ * including file format, lock state transitions, and orphaned locks.
+ *
+ * Usage:
+ * {{{
+ * CALL validate_audit_lock(table => 'my_table')
+ * CALL validate_audit_lock(path => '/path/to/table')
+ * }}}
+ *
+ * The procedure reads audit files from:
+ * `{table_path}/.hoodie/.locks/audit/`
+ *
+ * @author Apache Hudi
+ * @since 1.0.0
+ */
+class ValidateAuditLockProcedure extends BaseProcedure with ProcedureBuilder {
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.optional(0, "table", DataTypes.StringType),
+ ProcedureParameter.optional(1, "path", DataTypes.StringType)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("table", DataTypes.StringType, nullable = false,
Metadata.empty),
+ StructField("validation_result", DataTypes.StringType, nullable = false,
Metadata.empty),
+ StructField("transactions_validated", DataTypes.IntegerType, nullable =
false, Metadata.empty),
+ StructField("issues_found", DataTypes.IntegerType, nullable = false,
Metadata.empty),
+ StructField("details", DataTypes.StringType, nullable = false,
Metadata.empty)
+ ))
+
+ private val OBJECT_MAPPER = new ObjectMapper()
+
+ /**
+ * Represents a transaction window with start time, end time, and metadata.
+ */
+ case class TransactionWindow(
+ ownerId: String,
+ transactionCreationTime: Long,
+ lockAcquisitionTime: Long,
+ lockReleaseTime: Option[Long],
+ lockExpirationTime: Option[Long],
+ filename: String
+ ) {
+ def effectiveEndTime: Long =
lockReleaseTime.orElse(lockExpirationTime).getOrElse(lockAcquisitionTime)
+ }
+
+ /**
+ * Returns the procedure parameters definition.
+ *
+ * @return Array of parameters: table (optional String) and path (optional
String)
+ */
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ /**
+ * Returns the output schema for the procedure result.
+ *
+ * @return StructType containing table, validation_result, issues_found, and
details columns
+ */
+ def outputType: StructType = OUTPUT_TYPE
+
+ /**
+ * Executes the audit lock validation procedure.
+ *
+ * @param args Procedure arguments containing table name or path
+ * @return Sequence containing a single Row with validation results
+ * @throws IllegalArgumentException if neither table nor path is provided,
or both are provided
+ */
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+
+ // Get the base path using BaseProcedure helper (handles table/path
validation)
+ val basePath: String = getBasePath(tableName, tablePath)
+ val metaClient = createMetaClient(jsc, basePath)
+
+ // Use table name if provided, otherwise extract from path
+ val displayName =
tableName.map(_.asInstanceOf[String]).getOrElse(tablePath.get.asInstanceOf[String])
+
+ try {
+ val auditFolderPath = new
StoragePath(StorageLockProviderAuditService.getAuditFolderPath(basePath))
+ val storage = metaClient.getStorage
+
+ // Get all audit files
+ val allFilesResult = try {
+ Some(storage.listDirectEntries(auditFolderPath).asScala)
+ } catch {
+ case _: FileNotFoundException =>
+ None
+ }
+
+ allFilesResult match {
+ case None =>
+ Seq(Row(displayName, "PASSED", 0, 0, "No audit folder found -
nothing to validate"))
+ case Some(allFiles) =>
+ val auditFiles = allFiles.filter(pathInfo => pathInfo.isFile &&
pathInfo.getPath.getName.endsWith(".jsonl"))
+
+ if (auditFiles.isEmpty) {
+ Seq(Row(displayName, "PASSED", 0, 0, "No audit files found -
nothing to validate"))
+ } else {
+
+ // Parse all audit files into transaction windows
+ val parseResults = auditFiles.map(pathInfo => (pathInfo,
parseAuditFile(pathInfo, storage)))
+ val windows = parseResults.flatMap(_._2).toSeq
+ val parseErrors = parseResults.filter(_._2.isEmpty).map(p =>
+ s"[ERROR] Failed to parse audit file: ${p._1.getPath.getName}")
+
+ if (windows.isEmpty) {
+ Seq(Row(displayName, "FAILED", 0, auditFiles.size, "Failed to
parse any audit files"))
+ } else {
+
+ // Validate transactions
+ val validationResults = validateTransactionWindows(windows)
+
+ // Add parse errors to validation results
+ val allErrors = validationResults.errors ++ parseErrors
+ val resultsWithParseErrors = validationResults.copy(errors =
allErrors.toList)
+
+ // Generate result
+ val (result, issuesFound, details) =
formatValidationResults(resultsWithParseErrors, auditFiles.size, windows.size,
parseErrors.size)
+
+ Seq(Row(displayName, result, windows.size, issuesFound, details))
+ }
+ }
+ }
+ } catch {
+ case e: Exception =>
+ val errorMessage = s"Validation failed: ${e.getMessage}"
+ Seq(Row(displayName, "ERROR", 0, -1, errorMessage))
+ }
+ }
+
+ /**
+ * Parses an audit file and extracts transaction window information.
+ */
+ private def parseAuditFile(pathInfo: StoragePathInfo, storage:
org.apache.hudi.storage.HoodieStorage): Option[TransactionWindow] = {
+ val filename = pathInfo.getPath.getName
+
+ Try {
+ // Read and parse JSONL content
+ val inputStream = storage.open(pathInfo.getPath)
+ val jsonNodes = try {
+ val content = scala.io.Source.fromInputStream(inputStream).mkString
+ val lines = content.split('\n').filter(_.trim.nonEmpty)
+ lines.map(line => OBJECT_MAPPER.readTree(line)).toSeq
+ } finally {
+ inputStream.close()
+ }
+
+ if (jsonNodes.isEmpty) {
+ None
+ } else {
+
+ // Extract transaction metadata from first entry
+ val firstNode = jsonNodes.head
+ val ownerId = firstNode.get("ownerId").asText()
+ val transactionCreationTime =
firstNode.get("transactionStartTime").asLong()
+
+ // Find first START timestamp
+ val startNodes = jsonNodes.filter(node => node.get("state").asText()
== "START")
+ val lockAcquisitionTime = if (startNodes.nonEmpty) {
+ startNodes.head.get("timestamp").asLong()
+ } else {
+ transactionCreationTime // fallback to transaction creation time
+ }
+
+ // Find last END timestamp
+ val endNodes = jsonNodes.filter(node => node.get("state").asText() ==
"END")
+ val lockReleaseTime = if (endNodes.nonEmpty) {
+ Some(endNodes.last.get("timestamp").asLong())
+ } else {
+ None
+ }
+
+ // Find last expiration time as fallback
+ val lockExpirationTime = if (jsonNodes.nonEmpty) {
+ Some(jsonNodes.last.get("lockExpiration").asLong())
+ } else {
+ None
+ }
+
+ Some(TransactionWindow(
+ ownerId = ownerId,
+ transactionCreationTime = transactionCreationTime,
+ lockAcquisitionTime = lockAcquisitionTime,
+ lockReleaseTime = lockReleaseTime,
+ lockExpirationTime = lockExpirationTime,
+ filename = filename
+ ))
+ }
+ } match {
+ case Success(window) => window
+ case Failure(_) => None // Skip corrupted files
+ }
+ }
+
+ /**
+ * Validates transaction windows for overlaps and proper closure.
+ */
+ private def validateTransactionWindows(windows: Seq[TransactionWindow]):
ValidationResults = {
+ val errors = mutable.ListBuffer[String]()
+ val warnings = mutable.ListBuffer[String]()
+
+ // Check for transactions without proper END
+ windows.foreach { window =>
+ if (window.lockReleaseTime.isEmpty) {
+ warnings += s"[WARNING] ${window.filename} => transaction did not end
gracefully. This could be due to driver OOM or non-graceful shutdown."
+ }
+ }
+
+ // Sort windows by start time for overlap detection
+ val sortedWindows = windows.sortBy(_.lockAcquisitionTime)
+
+ // Check for overlaps
+ for (i <- sortedWindows.indices) {
+ val currentWindow = sortedWindows(i)
+ val currentEnd = currentWindow.effectiveEndTime
+
+ // Check all subsequent windows for overlaps
+ for (j <- (i + 1) until sortedWindows.length) {
+ val otherWindow = sortedWindows(j)
+ val otherStart = otherWindow.lockAcquisitionTime
+
+ // Check if windows overlap
+ if (otherStart < currentEnd) {
+ errors += s"[ERROR] ${currentWindow.filename} => overlaps with
${otherWindow.filename}"
+ }
+ }
+ }
+
+ ValidationResults(errors.toList, warnings.toList)
+ }
+
+ /**
+ * Formats validation results into the output format.
+ */
+ private def formatValidationResults(results: ValidationResults, totalFiles:
Int, parsedFiles: Int, failedFiles: Int): (String, Int, String) = {
+ val totalIssues = results.errors.size + results.warnings.size
+
+ if (totalIssues == 0) {
+ ("PASSED", 0, s"All audit lock transactions validated successfully.
Audit Files: $totalFiles total, $parsedFiles parsed successfully, $failedFiles
failed to parse")
+ } else {
+ val resultType = if (results.errors.nonEmpty) "FAILED" else "WARNING"
+ val fileInfo = s"Audit Files: $totalFiles total, $parsedFiles parsed
successfully, $failedFiles failed to parse. "
+ val details = fileInfo + (results.errors ++
results.warnings).mkString(", ")
+ (resultType, totalIssues, details)
+ }
+ }
+
+ /**
+ * Case class to hold validation results.
+ */
+ private case class ValidationResults(errors: List[String], warnings:
List[String])
+
+ /**
+ * Builds a new instance of the ValidateAuditLockProcedure.
+ *
+ * @return New ValidateAuditLockProcedure instance
+ */
+ override def build: Procedure = new ValidateAuditLockProcedure()
+}
+
+
+/**
+ * Companion object for ValidateAuditLockProcedure containing constants and
factory methods.
+ */
+object ValidateAuditLockProcedure {
+ /** The name used to register and invoke this procedure */
+ val NAME = "validate_audit_lock"
+
+ /**
+ * Factory method to create procedure builder instances.
+ *
+ * @return Supplier that creates new ValidateAuditLockProcedure instances
+ */
+ def builder: Supplier[ProcedureBuilder] = () => new
ValidateAuditLockProcedure()
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanupAuditLockProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanupAuditLockProcedure.scala
new file mode 100644
index 000000000000..b0be9dd4208e
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanupAuditLockProcedure.scala
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService
+import org.apache.hudi.storage.StoragePath
+import org.apache.hudi.testutils.HoodieClientTestUtils
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+/**
+ * Test suite for the CleanupAuditLockProcedure Spark SQL procedure.
+ *
+ * This class contains comprehensive tests to verify the functionality of
+ * the cleanup_audit_lock procedure, including dry run mode, age-based cleanup,
+ * parameter validation, and error handling scenarios.
+ *
+ * @author Apache Hudi
+ * @since 1.1.0
+ */
+class TestCleanupAuditLockProcedure extends HoodieSparkProcedureTestBase {
+
+ override def generateTableName: String = {
+ super.generateTableName.split("\\.").last
+ }
+
+ /**
+ * Helper method to create a test table and return its path.
+ */
+ private def createTestTable(tmp: File, tableName: String): String = {
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey = 'id',
+ | orderingFields = 'ts'
+ | )
+ """.stripMargin)
+ // Insert data to initialize the Hudi metadata structure
+ spark.sql(s"insert into $tableName select 1, 'test', 10.0, 1000")
+ s"${tmp.getCanonicalPath}/$tableName"
+ }
+
+ /**
+ * Helper method to create test audit files with specified ages.
+ *
+ * @param tablePath The base path of the table
+ * @param filenames List of filenames to create
+ * @param ageDaysAgo How many days ago to set the modification time
+ */
+ private def createTestAuditFiles(tablePath: String, filenames: List[String],
ageDaysAgo: Int): Unit = {
+ val auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(tablePath)
+ val auditDir = Paths.get(auditFolderPath)
+
+ // Create audit directory if it doesn't exist
+ if (!Files.exists(auditDir)) {
+ Files.createDirectories(auditDir)
+ }
+
+ // Create test audit files with specified modification time
+ val targetTime = System.currentTimeMillis() - (ageDaysAgo * 24L * 60L *
60L * 1000L)
+
+ filenames.foreach { filename =>
+ val filePath = auditDir.resolve(filename)
+ val content =
s"""{"ownerId":"test","transactionStartTime":${System.currentTimeMillis()},"timestamp":${System.currentTimeMillis()},"state":"START","lockExpiration":${System.currentTimeMillis()
+ 60000},"lockHeld":true}"""
+ Files.write(filePath, content.getBytes())
+ // Set the modification time to simulate old files
+ Files.setLastModifiedTime(filePath,
java.nio.file.attribute.FileTime.fromMillis(targetTime))
+ }
+ }
+
+ /**
+ * Helper method to count audit files in the audit folder.
+ */
+ private def countAuditFiles(tablePath: String): Int = {
+ val auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(tablePath)
+ val auditDir = Paths.get(auditFolderPath)
+
+ if (Files.exists(auditDir)) {
+ Files.list(auditDir)
+ .filter(_.toString.endsWith(".jsonl"))
+ .count()
+ .toInt
+ } else {
+ 0
+ }
+ }
+
+ /**
+ * Test cleanup with table name parameter - dry run mode.
+ */
+ test("Test Call cleanup_audit_lock Procedure - Dry Run with Table Name") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = createTestTable(tmp, tableName)
+
+ // Create some old audit files (10 days old)
+ createTestAuditFiles(tablePath, List("old1.jsonl", "old2.jsonl"),
ageDaysAgo = 10)
+ // Create some recent audit files (2 days old)
+ createTestAuditFiles(tablePath, List("recent1.jsonl"), ageDaysAgo = 2)
+
+ val result = spark.sql(s"""call cleanup_audit_lock(table =>
'$tableName', dry_run => true, age_days => 7)""").collect()
+
+ assertResult(1)(result.length)
+ assertResult(tableName)(result.head.get(0))
+ assertResult(2)(result.head.get(1)) // Should find 2 old files
+ assertResult(true)(result.head.get(2)) // dry_run = true
+ assertResult(7)(result.head.get(3)) // age_days = 7
+ assert(result.head.get(4).toString.contains("Dry run"))
+
+ // Verify files still exist (dry run shouldn't delete)
+ assertResult(3)(countAuditFiles(tablePath))
+ }
+ }
+
+ /**
+ * Test cleanup with path parameter - actual deletion.
+ */
+ test("Test Call cleanup_audit_lock Procedure - Actual Cleanup with Path") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = createTestTable(tmp, tableName)
+
+ // Create some old audit files (10 days old)
+ createTestAuditFiles(tablePath, List("old1.jsonl", "old2.jsonl"),
ageDaysAgo = 10)
+ // Create some recent audit files (2 days old)
+ createTestAuditFiles(tablePath, List("recent1.jsonl"), ageDaysAgo = 2)
+
+ val result = spark.sql(s"""call cleanup_audit_lock(path => '$tablePath',
dry_run => false, age_days => 7)""").collect()
+
+ assertResult(1)(result.length)
+ assertResult(tablePath)(result.head.get(0))
+ assertResult(2)(result.head.get(1)) // Should delete 2 old files
+ assertResult(false)(result.head.get(2)) // dry_run = false
+ assertResult(7)(result.head.get(3)) // age_days = 7
+ assert(result.head.get(4).toString.contains("Successfully deleted"))
+
+ // Verify only recent file remains
+ assertResult(1)(countAuditFiles(tablePath))
+ }
+ }
+
+ /**
+ * Test cleanup with default parameters.
+ */
+ test("Test Call cleanup_audit_lock Procedure - Default Parameters") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = createTestTable(tmp, tableName)
+
+ // Create some old audit files (10 days old) - should be deleted with
default 7 days
+ createTestAuditFiles(tablePath, List("old1.jsonl"), ageDaysAgo = 10)
+ // Create some recent audit files (3 days old) - should be kept
+ createTestAuditFiles(tablePath, List("recent1.jsonl"), ageDaysAgo = 3)
+
+ val result = spark.sql(s"""call cleanup_audit_lock(table =>
'$tableName')""").collect()
+
+ assertResult(1)(result.length)
+ assertResult(tableName)(result.head.get(0))
+ assertResult(1)(result.head.get(1)) // Should delete 1 old file
+ assertResult(false)(result.head.get(2)) // dry_run defaults to false
+ assertResult(7)(result.head.get(3)) // age_days defaults to 7
+
+ // Verify only recent file remains
+ assertResult(1)(countAuditFiles(tablePath))
+ }
+ }
+
+ /**
+ * Test cleanup when no audit folder exists.
+ */
+ test("Test Call cleanup_audit_lock Procedure - No Audit Folder") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = createTestTable(tmp, tableName)
+ // Don't create any audit files
+
+ val result = spark.sql(s"""call cleanup_audit_lock(table =>
'$tableName')""").collect()
+
+ assertResult(1)(result.length)
+ assertResult(tableName)(result.head.get(0))
+ assertResult(0)(result.head.get(1)) // No files to delete
+ assert(result.head.get(4).toString.contains("No audit folder found"))
+ }
+ }
+
+ /**
+ * Test cleanup when no old files exist.
+ */
+ test("Test Call cleanup_audit_lock Procedure - No Old Files") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = createTestTable(tmp, tableName)
+
+ // Create only recent audit files (2 days old)
+ createTestAuditFiles(tablePath, List("recent1.jsonl", "recent2.jsonl"),
ageDaysAgo = 2)
+
+ val result = spark.sql(s"""call cleanup_audit_lock(table =>
'$tableName', age_days => 7)""").collect()
+
+ assertResult(1)(result.length)
+ assertResult(tableName)(result.head.get(0))
+ assertResult(0)(result.head.get(1)) // No old files to delete
+ assert(result.head.get(4).toString.contains("No audit files older than 7
days found"))
+
+ // Verify all files remain
+ assertResult(2)(countAuditFiles(tablePath))
+ }
+ }
+
+ /**
+ * Test parameter validation - negative age_days.
+ */
+ test("Test Call cleanup_audit_lock Procedure - Invalid Age Days") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ createTestTable(tmp, tableName)
+
+ checkExceptionContain(s"""call cleanup_audit_lock(table => '$tableName',
age_days => -1)""")(
+ "age_days must be a positive integer")
+
+ checkExceptionContain(s"""call cleanup_audit_lock(table => '$tableName',
age_days => 0)""")(
+ "age_days must be a positive integer")
+ }
+ }
+
+ /**
+ * Test parameter validation - both table and path provided.
+ * Since both are provided, the procedure will use the table parameter
(consistent with other procedures).
+ */
+ test("Test Call cleanup_audit_lock Procedure - Both Table and Path
Parameters") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = createTestTable(tmp, tableName)
+
+ // Should work fine - will use table parameter when both are provided
+ val result = spark.sql(s"""call cleanup_audit_lock(table =>
'$tableName', path => '$tablePath')""").collect()
+ assertResult(1)(result.length)
+ assertResult(tableName)(result.head.getString(0))
+ }
+ }
+
+ /**
+ * Test parameter validation - missing required arguments.
+ */
+ test("Test Call cleanup_audit_lock Procedure - Missing Required Arguments") {
+ checkExceptionContain(s"""call cleanup_audit_lock(dry_run => true)""")(
+ "Table name or table path must be given one")
+ }
+
+ /**
+ * Test cleanup with non-existent table.
+ */
+ test("Test Call cleanup_audit_lock Procedure - Non-existent Table") {
+ val nonExistentTable = "non_existent_table"
+
+ intercept[Exception] {
+ spark.sql(s"""call cleanup_audit_lock(table => '$nonExistentTable')""")
+ }
+ }
+
+ /**
+ * Test cleanup with invalid path.
+ */
+ test("Test Call cleanup_audit_lock Procedure - Invalid Path") {
+ val invalidPath = "/non/existent/path"
+
+ intercept[Exception] {
+ spark.sql(s"""call cleanup_audit_lock(path => '$invalidPath')""")
+ }
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestValidateAuditLockProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestValidateAuditLockProcedure.scala
new file mode 100644
index 000000000000..48fdd3ee66d7
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestValidateAuditLockProcedure.scala
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService
+
+import org.apache.spark.sql.Row
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+/**
+ * Test suite for the ValidateAuditLockProcedure Spark SQL procedure.
+ *
+ * This class contains comprehensive tests to verify the functionality of
+ * the validate_audit_lock procedure with parameterized test scenarios.
+ *
+ * @author Apache Hudi
+ * @since 1.1.0
+ */
+class TestValidateAuditLockProcedure extends HoodieSparkProcedureTestBase {
+
+ override def generateTableName: String = {
+ super.generateTableName.split("\\.").last
+ }
+
+ /**
+ * Helper method to create a test table and return its path.
+ */
+ private def createTestTable(tmp: File, tableName: String): String = {
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey = 'id',
+ | orderingFields = 'ts'
+ | )
+ """.stripMargin)
+ // Insert data to initialize the Hudi metadata structure
+ spark.sql(s"insert into $tableName select 1, 'test', 10.0, 1000")
+ s"${tmp.getCanonicalPath}/$tableName"
+ }
+
+ /**
+ * Represents a single audit record (JSON line in a .jsonl file)
+ */
+ case class AuditRecord(
+ ownerId: String,
+ transactionStartTime: Long,
+ timestamp: Long,
+ state: String, // START, RENEW, or END
+ lockExpiration: Long,
+ lockHeld: Boolean = true
+ )
+
+ /**
+ * Represents a transaction scenario with its audit records
+ */
+ case class TransactionScenario(
+ filename: String, // e.g., "1234567890_owner1.jsonl"
+ records: List[AuditRecord]
+ )
+
+ /**
+ * Represents expected validation results
+ */
+ case class ExpectedResult(
+ validationResult: String, // PASSED, WARNING, FAILED, ERROR
+ transactionsValidated: Int,
+ issuesFound: Int,
+ detailsContains: List[String] = List() // Strings that should be in details
+ )
+
+ /**
+ * Test scenario definition with input and expected output
+ */
+ case class ValidationTestScenario(
+ name: String,
+ scenarioGenerator: () => List[TransactionScenario],
+ expectedResultGenerator: () => ExpectedResult
+ )
+
+ /**
+ * Helper method to create audit files from scenarios
+ */
+ private def createAuditFiles(tablePath: String, scenarios:
List[TransactionScenario]): Unit = {
+ val auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(tablePath)
+ val auditDir = Paths.get(auditFolderPath)
+
+ // Create audit directory if it doesn't exist
+ if (!Files.exists(auditDir)) {
+ Files.createDirectories(auditDir)
+ }
+
+ scenarios.foreach { scenario =>
+ val filePath = auditDir.resolve(scenario.filename)
+ val jsonLines = scenario.records.map { record =>
+
s"""{"ownerId":"${record.ownerId}","transactionStartTime":${record.transactionStartTime},"timestamp":${record.timestamp},"state":"${record.state}","lockExpiration":${record.lockExpiration},"lockHeld":${record.lockHeld}}"""
+ }.mkString("\n")
+
+ Files.write(filePath, jsonLines.getBytes())
+ }
+ }
+
+ /**
+ * Helper method to run a parameterized validation test scenario
+ */
+ private def runValidationScenario(scenario: ValidationTestScenario): Unit = {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = createTestTable(tmp, tableName)
+
+ // Generate test data
+ val transactionScenarios = scenario.scenarioGenerator()
+ val expectedResult = scenario.expectedResultGenerator()
+
+ // Create audit files
+ createAuditFiles(tablePath, transactionScenarios)
+
+ // Run validation
+ val result = spark.sql(s"""call validate_audit_lock(table =>
'$tableName')""").collect()
+
+ // Verify results
+ assertResult(1)(result.length)
+
+ val row = result.head
+ assertResult(tableName)(row.getString(0))
+ assertResult(expectedResult.validationResult)(row.getString(1))
+ assertResult(expectedResult.transactionsValidated)(row.getInt(2))
+ assertResult(expectedResult.issuesFound)(row.getInt(3))
+
+ val details = row.getString(4)
+ expectedResult.detailsContains.foreach { expectedSubstring =>
+ assert(details.contains(expectedSubstring),
+ s"Details '$details' should contain '$expectedSubstring'")
+ }
+ }
+ }
+
+ // ==================== Scenario-Based Validation Tests ====================
+
+ /**
+ * Test scenario: No overlapping transactions, all transactions closed
properly
+ */
+ test("Test Validation - No Issues (PASSED)") {
+ val scenario = ValidationTestScenario(
+ name = "No Issues - All Transactions Completed Properly",
+ scenarioGenerator = () => {
+ val baseTime = System.currentTimeMillis()
+ List(
+ TransactionScenario(
+ filename = s"${baseTime}_owner1.jsonl",
+ records = List(
+ AuditRecord("owner1", baseTime, baseTime + 100, "START",
baseTime + 60000),
+ AuditRecord("owner1", baseTime, baseTime + 200, "RENEW",
baseTime + 60000),
+ AuditRecord("owner1", baseTime, baseTime + 300, "END", baseTime
+ 60000)
+ )
+ ),
+ TransactionScenario(
+ filename = s"${baseTime + 500}_owner2.jsonl",
+ records = List(
+ AuditRecord("owner2", baseTime + 500, baseTime + 600, "START",
baseTime + 60000),
+ AuditRecord("owner2", baseTime + 500, baseTime + 700, "END",
baseTime + 60000)
+ )
+ )
+ )
+ },
+ expectedResultGenerator = () => ExpectedResult(
+ validationResult = "PASSED",
+ transactionsValidated = 2,
+ issuesFound = 0,
+ detailsContains = List("successfully")
+ )
+ )
+
+ runValidationScenario(scenario)
+ }
+
+ /**
+ * Test scenario: Single unclosed transaction (should be WARNING only)
+ */
+ test("Test Validation - Single Unclosed Transaction (WARNING)") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = createTestTable(tmp, tableName)
+
+ val baseTime = 1000000L
+
+ // Create single audit file without END record
+ val scenarios = List(
+ TransactionScenario(
+ filename = s"${baseTime}_owner1.jsonl",
+ records = List(
+ AuditRecord("owner1", baseTime, baseTime + 100, "START", baseTime
+ 200)
+ )
+ )
+ )
+
+ createAuditFiles(tablePath, scenarios)
+
+ val result = spark.sql(s"""call validate_audit_lock(table =>
'$tableName')""").collect()
+ val row = result.head
+
+ // This should be WARNING only since there's no overlap possible with
just one transaction
+ assertResult("WARNING")(row.getString(1))
+ assertResult(1)(row.getInt(2)) // transactions_validated
+ assertResult(1)(row.getInt(3)) // issues_found
+ }
+ }
+
+ /**
+ * Test scenario: Transactions without proper END, with proper separation
(WARNING)
+ */
+ test("Test Validation - Unclosed Transactions (WARNING)") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = createTestTable(tmp, tableName)
+
+ val baseTime = 1000000L
+
+ // Create two transactions: one unclosed, one complete, no overlap
+ val scenarios = List(
+ TransactionScenario(
+ filename = s"${baseTime}_owner1.jsonl",
+ records = List(
+ AuditRecord("owner1", baseTime, baseTime + 100, "START", baseTime
+ 200)
+ // No END record - effective end at expiration (baseTime + 200)
+ )
+ ),
+ TransactionScenario(
+ filename = s"${baseTime + 300}_owner2.jsonl", // Starts after
owner1's expiration
+ records = List(
+ AuditRecord("owner2", baseTime + 300, baseTime + 400, "START",
baseTime + 60000),
+ AuditRecord("owner2", baseTime + 300, baseTime + 500, "END",
baseTime + 60000)
+ )
+ )
+ )
+
+ createAuditFiles(tablePath, scenarios)
+
+ val result = spark.sql(s"""call validate_audit_lock(table =>
'$tableName')""").collect()
+ val row = result.head
+
+ println(s"DEBUG - Result: ${row.getString(1)}, Issues: ${row.getInt(3)},
Details: ${row.getString(4)}")
+
+ assertResult("WARNING")(row.getString(1))
+ assertResult(2)(row.getInt(2)) // transactions_validated
+ assertResult(1)(row.getInt(3)) // issues_found - only the unclosed
transaction
+ assert(row.getString(4).contains("[WARNING]"))
+ assert(row.getString(4).contains("owner1.jsonl"))
+ assert(row.getString(4).contains("did not end gracefully"))
+ }
+ }
+
+ /**
+ * Test scenario: Overlapping transactions with improper closure (FAILED)
+ */
+ test("Test Validation - Overlapping Transactions (FAILED)") {
+ val scenario = ValidationTestScenario(
+ name = "Overlapping Transactions",
+ scenarioGenerator = () => {
+ val baseTime = System.currentTimeMillis()
+ List(
+ TransactionScenario(
+ filename = s"${baseTime}_owner1.jsonl",
+ records = List(
+ AuditRecord("owner1", baseTime, baseTime + 100, "START",
baseTime + 60000),
+ AuditRecord("owner1", baseTime, baseTime + 500, "END", baseTime
+ 60000) // Ends after owner2 starts
+ )
+ ),
+ TransactionScenario(
+ filename = s"${baseTime + 200}_owner2.jsonl", // Starts before
owner1 ends
+ records = List(
+ AuditRecord("owner2", baseTime + 200, baseTime + 300, "START",
baseTime + 60000),
+ AuditRecord("owner2", baseTime + 200, baseTime + 400, "END",
baseTime + 60000)
+ )
+ )
+ )
+ },
+ expectedResultGenerator = () => ExpectedResult(
+ validationResult = "FAILED",
+ transactionsValidated = 2,
+ issuesFound = 1,
+ detailsContains = List("[ERROR]", "owner1.jsonl", "overlaps with",
"owner2.jsonl")
+ )
+ )
+
+ runValidationScenario(scenario)
+ }
+
+ /**
+ * Test scenario: Mixed issues - overlaps and unclosed transactions
+ */
+ test("Test Validation - Mixed Issues (FAILED)") {
+ val scenario = ValidationTestScenario(
+ name = "Mixed Issues",
+ scenarioGenerator = () => {
+ val baseTime = System.currentTimeMillis()
+ List(
+ TransactionScenario(
+ filename = s"${baseTime}_owner1.jsonl",
+ records = List(
+ AuditRecord("owner1", baseTime, baseTime + 100, "START",
baseTime + 60000)
+ // No END - unclosed
+ )
+ ),
+ TransactionScenario(
+ filename = s"${baseTime + 50}_owner2.jsonl", // Overlaps with
owner1
+ records = List(
+ AuditRecord("owner2", baseTime + 50, baseTime + 150, "START",
baseTime + 60000),
+ AuditRecord("owner2", baseTime + 50, baseTime + 250, "END",
baseTime + 60000)
+ )
+ )
+ )
+ },
+ expectedResultGenerator = () => ExpectedResult(
+ validationResult = "FAILED",
+ transactionsValidated = 2,
+ issuesFound = 2,
+ detailsContains = List("[ERROR]", "[WARNING]", "overlaps with", "did
not end gracefully")
+ )
+ )
+
+ runValidationScenario(scenario)
+ }
+
+ /**
+ * Test scenario: Out-of-order filenames but valid non-overlapping
transactions (PASSED)
+ */
+ test("Test Validation - Out of Order Filenames but Valid Transactions
(PASSED)") {
+ val scenario = ValidationTestScenario(
+ name = "Out of Order Filenames - Valid Transactions",
+ scenarioGenerator = () => {
+ val baseTime = System.currentTimeMillis()
+ List(
+ // File with later timestamp in name but contains earlier transaction
+ TransactionScenario(
+ filename = s"${baseTime + 2000}_owner2.jsonl", // Filename
suggests later time
+ records = List(
+ AuditRecord("owner2", baseTime + 100, baseTime + 200, "START",
baseTime + 60000), // Actually starts first
+ AuditRecord("owner2", baseTime + 100, baseTime + 300, "END",
baseTime + 60000)
+ )
+ ),
+ // File with earlier timestamp in name but contains later transaction
+ TransactionScenario(
+ filename = s"${baseTime}_owner1.jsonl", // Filename suggests
earlier time
+ records = List(
+ AuditRecord("owner1", baseTime + 500, baseTime + 600, "START",
baseTime + 60000), // Actually starts second
+ AuditRecord("owner1", baseTime + 500, baseTime + 700, "END",
baseTime + 60000)
+ )
+ )
+ )
+ },
+ expectedResultGenerator = () => ExpectedResult(
+ validationResult = "PASSED",
+ transactionsValidated = 2,
+ issuesFound = 0,
+ detailsContains = List("successfully")
+ )
+ )
+
+ runValidationScenario(scenario)
+ }
+
+ // ==================== Parameter Validation Tests ====================
+
+ /**
+ * Test parameter validation by providing both table and path parameters.
+ * Since both are provided, the procedure will use the table parameter
(consistent with other procedures).
+ */
+ test("Test Parameter Validation - Both Table and Path Parameters") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = createTestTable(tmp, tableName)
+
+ // Should work fine - will use table parameter when both are provided
+ val result = spark.sql(s"""call validate_audit_lock(table =>
'$tableName', path => '$tablePath')""").collect()
+ assertResult(1)(result.length)
+ assertResult(tableName)(result.head.getString(0))
+ }
+ }
+
+ /**
+ * Test parameter validation by omitting both required arguments.
+ */
+ test("Test Parameter Validation - Missing Required Arguments") {
+ checkExceptionContain(s"""call validate_audit_lock()""")(
+ "Table name or table path must be given one")
+ }
+
+ /**
+ * Test validation with non-existent table.
+ */
+ test("Test Parameter Validation - Non-existent Table") {
+ val nonExistentTable = "non_existent_table"
+
+ intercept[Exception] {
+ spark.sql(s"""call validate_audit_lock(table => '$nonExistentTable')""")
+ }
+ }
+
+ /**
+ * Test validation with invalid path.
+ */
+ test("Test Parameter Validation - Invalid Path") {
+ val invalidPath = "/non/existent/path"
+
+ intercept[Exception] {
+ spark.sql(s"""call validate_audit_lock(path => '$invalidPath')""")
+ }
+ }
+
+ /**
+ * Test validation with no audit folder.
+ */
+ test("Test Validation - No Audit Folder (PASSED)") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = createTestTable(tmp, tableName)
+ // Don't create any audit files
+
+ val result = spark.sql(s"""call validate_audit_lock(table =>
'$tableName')""").collect()
+
+ assertResult(1)(result.length)
+ val row = result.head
+ assertResult(tableName)(row.getString(0))
+ assertResult("PASSED")(row.getString(1))
+ assertResult(0)(row.getInt(2)) // transactions_validated
+ assertResult(0)(row.getInt(3)) // issues_found
+ assert(row.getString(4).contains("No audit folder found"))
+ }
+ }
+}