rmahindra123 commented on code in PR #13886:
URL: https://github.com/apache/hudi/pull/13886#discussion_r2353210797
##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java:
##########
@@ -123,8 +169,13 @@ public String disableLockAudit(
if (keepAuditFiles) {
message += String.format("\nExisting audit files preserved at: %s",
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
} else {
- // Todo: write then call the api method to prune the old files
- message += String.format("\nAudit files cleaned up at: %s",
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
+ // Call the cleanup method to prune 7 days of old files
+ String cleanupResult = performAuditCleanup(false, 7);
Review Comment:
could we make the 7 days configurable?
##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java:
##########
@@ -185,4 +236,323 @@ public String showLockAuditStatus() {
return String.format("Failed to check lock audit status: %s",
e.getMessage());
}
}
+
+ /**
+ * Validates the audit lock files for consistency and integrity.
+ * This command checks for issues such as corrupted files, invalid format,
+ * incorrect state transitions, and orphaned locks.
+ *
+ * @return Validation results including any issues found
+ */
+ @ShellMethod(key = "locks audit validate", value = "Validate audit lock
files for consistency and integrity")
+ public String validateAuditLocks() {
Review Comment:
same, lets use a PoJo instead of String..
##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java:
##########
@@ -185,4 +236,323 @@ public String showLockAuditStatus() {
return String.format("Failed to check lock audit status: %s",
e.getMessage());
}
}
+
+ /**
+ * Validates the audit lock files for consistency and integrity.
+ * This command checks for issues such as corrupted files, invalid format,
+ * incorrect state transitions, and orphaned locks.
+ *
+ * @return Validation results including any issues found
+ */
+ @ShellMethod(key = "locks audit validate", value = "Validate audit lock
files for consistency and integrity")
+ public String validateAuditLocks() {
+
+ if (HoodieCLI.basePath == null) {
+ return "No Hudi table loaded. Please connect to a table first.";
+ }
+
+ try {
+ String auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+ StoragePath auditFolder = new StoragePath(auditFolderPath);
+
+ // Check if audit folder exists
+ if (!HoodieCLI.storage.exists(auditFolder)) {
+ return "Validation Result: PASSED\n"
+ + "Transactions Validated: 0\n"
+ + "Issues Found: 0\n"
+ + "Details: No audit folder found - nothing to validate";
Review Comment:
may be PoJo and enums might be better instead of String..
##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java:
##########
@@ -123,8 +169,13 @@ public String disableLockAudit(
if (keepAuditFiles) {
message += String.format("\nExisting audit files preserved at: %s",
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
} else {
- // Todo: write then call the api method to prune the old files
- message += String.format("\nAudit files cleaned up at: %s",
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
+ // Call the cleanup method to prune 7 days of old files
+ String cleanupResult = performAuditCleanup(false, 7);
+ if (cleanupResult.contains("Successfully deleted") ||
cleanupResult.contains("No audit files")) {
Review Comment:
Can we make cleanupResult a PoJo instead of a String?
##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java:
##########
@@ -185,4 +236,323 @@ public String showLockAuditStatus() {
return String.format("Failed to check lock audit status: %s",
e.getMessage());
}
}
+
+ /**
+ * Validates the audit lock files for consistency and integrity.
+ * This command checks for issues such as corrupted files, invalid format,
+ * incorrect state transitions, and orphaned locks.
+ *
+ * @return Validation results including any issues found
+ */
+ @ShellMethod(key = "locks audit validate", value = "Validate audit lock
files for consistency and integrity")
+ public String validateAuditLocks() {
+
+ if (HoodieCLI.basePath == null) {
+ return "No Hudi table loaded. Please connect to a table first.";
+ }
+
+ try {
+ String auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+ StoragePath auditFolder = new StoragePath(auditFolderPath);
+
+ // Check if audit folder exists
+ if (!HoodieCLI.storage.exists(auditFolder)) {
+ return "Validation Result: PASSED\n"
+ + "Transactions Validated: 0\n"
+ + "Issues Found: 0\n"
+ + "Details: No audit folder found - nothing to validate";
+ }
+
+ // Get all audit files
+ List<StoragePathInfo> allFiles =
HoodieCLI.storage.listDirectEntries(auditFolder);
+ List<StoragePathInfo> auditFiles = new ArrayList<>();
+ for (StoragePathInfo pathInfo : allFiles) {
+ if (pathInfo.isFile() &&
pathInfo.getPath().getName().endsWith(".jsonl")) {
+ auditFiles.add(pathInfo);
+ }
+ }
+
+ if (auditFiles.isEmpty()) {
+ return "Validation Result: PASSED\n"
+ + "Transactions Validated: 0\n"
+ + "Issues Found: 0\n"
+ + "Details: No audit files found - nothing to validate";
+ }
+
+ // Parse all audit files into transaction windows
+ List<TransactionWindow> windows = new ArrayList<>();
+ for (StoragePathInfo pathInfo : auditFiles) {
+ Option<TransactionWindow> window = parseAuditFile(pathInfo);
+ if (window.isPresent()) {
+ windows.add(window.get());
+ }
+ }
+
+ if (windows.isEmpty()) {
+ return String.format("Validation Result: FAILED\n"
+ + "Transactions Validated: 0\n"
+ + "Issues Found: %d\n"
+ + "Details: Failed to parse any audit files", auditFiles.size());
+ }
+
+ // Validate transactions
+ ValidationResults validationResults =
validateTransactionWindows(windows);
+
+ // Generate result
+ int totalIssues = validationResults.errors.size() +
validationResults.warnings.size();
+ String result;
+ String details;
+
+ if (totalIssues == 0) {
+ result = "PASSED";
+ details = "All audit lock transactions validated successfully";
+ } else {
+ result = validationResults.errors.isEmpty() ? "WARNING" : "FAILED";
+ List<String> allIssues = new ArrayList<>();
+ allIssues.addAll(validationResults.errors);
+ allIssues.addAll(validationResults.warnings);
+ details = String.join(", ", allIssues);
+ }
+
+ return String.format("Validation Result: %s\n"
+ + "Transactions Validated: %d\n"
+ + "Issues Found: %d\n"
+ + "Details: %s", result, windows.size(), totalIssues, details);
+
+ } catch (Exception e) {
+ LOG.error("Error validating audit locks", e);
+ return String.format("Validation Result: ERROR\n"
+ + "Transactions Validated: 0\n"
+ + "Issues Found: -1\n"
+ + "Details: Validation failed: %s", e.getMessage());
+ }
+ }
+
+ /**
+ * Cleans up old audit lock files based on age threshold.
+ * This command removes audit files that are older than the specified number
of days.
+ *
+ * @param dryRun Whether to perform a dry run (preview changes without
deletion)
+ * @param ageDays Number of days to keep audit files (default 7)
+ * @return Status message indicating files cleaned or to be cleaned
+ */
+ @ShellMethod(key = "locks audit cleanup", value = "Clean up old audit lock
files")
+ public String cleanupAuditLocks(
+ @ShellOption(value = {"--dryRun"}, defaultValue = "false",
+ help = "Preview changes without actually deleting files") final
boolean dryRun,
+ @ShellOption(value = {"--ageDays"}, defaultValue = "7",
+ help = "Delete audit files older than this many days") final int
ageDays) {
+
+ if (HoodieCLI.basePath == null) {
+ return "No Hudi table loaded. Please connect to a table first.";
+ }
+
+ if (ageDays <= 0) {
+ return "Error: ageDays must be a positive integer.";
+ }
+
+ return performAuditCleanup(dryRun, ageDays);
+ }
+
+ /**
+ * Internal method to perform audit cleanup. Used by both the CLI command
and disable method.
+ *
+ * @param dryRun Whether to perform a dry run (preview changes without
deletion)
+ * @param ageDays Number of days to keep audit files (0 means delete all)
+ * @return Status message indicating files cleaned or to be cleaned
+ */
+ private String performAuditCleanup(boolean dryRun, int ageDays) {
+ try {
+ String auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+ StoragePath auditFolder = new StoragePath(auditFolderPath);
+
+ // Check if audit folder exists
+ if (!HoodieCLI.storage.exists(auditFolder)) {
+ return "No audit folder found - nothing to cleanup.";
+ }
+
+ // Calculate cutoff timestamp (ageDays ago)
+ long cutoffTime = System.currentTimeMillis() - (ageDays * 24L * 60L *
60L * 1000L);
+
+ // List all files in audit folder and filter by modification time
+ List<StoragePathInfo> allFiles =
HoodieCLI.storage.listDirectEntries(auditFolder);
+ List<StoragePathInfo> auditFiles = new ArrayList<>();
+ List<StoragePathInfo> oldFiles = new ArrayList<>();
+
+ // Filter to get only .jsonl files
+ for (org.apache.hudi.storage.StoragePathInfo pathInfo : allFiles) {
+ if (pathInfo.isFile() &&
pathInfo.getPath().getName().endsWith(".jsonl")) {
+ auditFiles.add(pathInfo);
+ if (pathInfo.getModificationTime() < cutoffTime) {
+ oldFiles.add(pathInfo);
+ }
+ }
+ }
+
+ if (oldFiles.isEmpty()) {
+ return String.format("No audit files older than %d days found.",
ageDays);
+ }
+
+ int fileCount = oldFiles.size();
+
+ if (dryRun) {
+ return String.format("Dry run: Would delete %d audit files older than
%d days.", fileCount, ageDays);
+ } else {
+ // Actually delete the files
+ int deletedCount = 0;
+ int failedCount = 0;
+
+ for (org.apache.hudi.storage.StoragePathInfo pathInfo : oldFiles) {
+ try {
+ HoodieCLI.storage.deleteFile(pathInfo.getPath());
+ deletedCount++;
+ } catch (Exception e) {
+ failedCount++;
+ LOG.warn("Failed to delete audit file: " + pathInfo.getPath(), e);
+ }
+ }
+
+ if (failedCount == 0) {
+ return String.format("Successfully deleted %d audit files older than
%d days.", deletedCount, ageDays);
+ } else {
+ return String.format("Deleted %d audit files, failed to delete %d
files.", deletedCount, failedCount);
+ }
+ }
+
+ } catch (Exception e) {
+ LOG.error("Error cleaning up audit locks", e);
+ return String.format("Failed to cleanup audit locks: %s",
e.getMessage());
+ }
+ }
+
+ /**
+ * Parses an audit file and extracts transaction window information.
+ */
+ private Option<TransactionWindow> parseAuditFile(StoragePathInfo pathInfo) {
+ String filename = pathInfo.getPath().getName();
+
+ try {
+ // Read file content using Hudi storage API
+ String content;
+ try (InputStream inputStream =
HoodieCLI.storage.open(pathInfo.getPath());
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(inputStream))) {
+ StringBuilder sb = new StringBuilder();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ sb.append(line).append("\n");
+ }
+ content = sb.toString();
+ }
+
+ // Parse JSONL content
+ String[] lines = content.split("\n");
+ List<JsonNode> jsonObjects = new ArrayList<>();
+ for (String line : lines) {
+ if (line.trim().isEmpty()) {
+ continue;
+ }
+ try {
+ jsonObjects.add(OBJECT_MAPPER.readTree(line));
+ } catch (Exception e) {
+ LOG.warn("Failed to parse JSON line in file " + filename + ": " +
line, e);
+ }
+ }
+
+ if (jsonObjects.isEmpty()) {
+ return Option.empty();
+ }
+
+ // Extract transaction metadata
+ JsonNode firstObject = jsonObjects.get(0);
+ String ownerId = firstObject.has("ownerId") ?
firstObject.get("ownerId").asText() : "unknown";
+ long transactionStartTime = firstObject.has("transactionStartTime")
+ ? firstObject.get("transactionStartTime").asLong() : 0L;
+
+ // Find first START timestamp
+ long startTimestamp = transactionStartTime; // default to transaction
start time
+ for (JsonNode obj : jsonObjects) {
+ if (obj.has("state") && "START".equals(obj.get("state").asText())) {
+ startTimestamp = obj.has("timestamp") ?
obj.get("timestamp").asLong() : transactionStartTime;
+ break;
+ }
+ }
+
+ // Find last END timestamp
+ Option<Long> endTimestamp = Option.empty();
+ for (int i = jsonObjects.size() - 1; i >= 0; i--) {
+ JsonNode obj = jsonObjects.get(i);
+ if (obj.has("state") && "END".equals(obj.get("state").asText())) {
+ endTimestamp = Option.of(obj.has("timestamp") ?
obj.get("timestamp").asLong() : 0L);
+ break;
+ }
+ }
+
+ // Find last expiration time as fallback
+ Option<Long> lastExpirationTime = Option.empty();
+ if (!jsonObjects.isEmpty()) {
+ JsonNode lastObject = jsonObjects.get(jsonObjects.size() - 1);
+ if (lastObject.has("lockExpiration")) {
+ lastExpirationTime =
Option.of(lastObject.get("lockExpiration").asLong());
+ }
+ }
+
+ return Option.of(new TransactionWindow(
+ ownerId,
+ transactionStartTime,
+ startTimestamp,
+ endTimestamp,
+ lastExpirationTime,
+ filename
+ ));
+ } catch (Exception e) {
+ LOG.warn("Failed to parse audit file: " + filename, e);
+ return Option.empty();
+ }
+ }
+
+ /**
+ * Validates transaction windows for overlaps and proper closure.
+ */
+ private ValidationResults validateTransactionWindows(List<TransactionWindow>
windows) {
+ List<String> errors = new ArrayList<>();
+ List<String> warnings = new ArrayList<>();
+
+ // Check for transactions without proper END
+ for (TransactionWindow window : windows) {
+ if (!window.endTimestamp.isPresent()) {
+ warnings.add(String.format("[WARNING] %s => transaction did not end
gracefully. This could be due to driver OOM or non-graceful shutdown.",
+ window.filename));
+ }
+ }
+
+ // Sort windows by start time for overlap detection
+ List<TransactionWindow> sortedWindows = new ArrayList<>(windows);
+ sortedWindows.sort(Comparator.comparingLong(w -> w.startTimestamp));
+
+ // Check for overlaps
+ for (int i = 0; i < sortedWindows.size(); i++) {
+ TransactionWindow currentWindow = sortedWindows.get(i);
+ long currentEnd = currentWindow.getEffectiveEndTime();
+
+ // Check all subsequent windows for overlaps
+ for (int j = i + 1; j < sortedWindows.size(); j++) {
+ TransactionWindow otherWindow = sortedWindows.get(j);
+ long otherStart = otherWindow.startTimestamp;
+
+ // Check if windows overlap
+ if (otherStart < currentEnd) {
+ // There's an overlap - check if current transaction ended properly
before the other started
+ if (currentWindow.endTimestamp.isPresent() &&
currentWindow.endTimestamp.get() <= otherStart) {
+ // Transaction ended properly before the other started - no issue
Review Comment:
do we need this if statement, just have the else part?
##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java:
##########
@@ -123,8 +169,13 @@ public String disableLockAudit(
if (keepAuditFiles) {
message += String.format("\nExisting audit files preserved at: %s",
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
} else {
- // Todo: write then call the api method to prune the old files
- message += String.format("\nAudit files cleaned up at: %s",
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
+ // Call the cleanup method to prune 7 days of old files
+ String cleanupResult = performAuditCleanup(false, 7);
Review Comment:
ok, this is the disable Audit lock flow. Should there simply be 2 options:
keep autdit files or delete all audit files?
##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java:
##########
@@ -185,4 +236,323 @@ public String showLockAuditStatus() {
return String.format("Failed to check lock audit status: %s",
e.getMessage());
}
}
+
+ /**
+ * Validates the audit lock files for consistency and integrity.
+ * This command checks for issues such as corrupted files, invalid format,
+ * incorrect state transitions, and orphaned locks.
+ *
+ * @return Validation results including any issues found
+ */
+ @ShellMethod(key = "locks audit validate", value = "Validate audit lock
files for consistency and integrity")
+ public String validateAuditLocks() {
+
+ if (HoodieCLI.basePath == null) {
+ return "No Hudi table loaded. Please connect to a table first.";
+ }
+
+ try {
+ String auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+ StoragePath auditFolder = new StoragePath(auditFolderPath);
+
+ // Check if audit folder exists
+ if (!HoodieCLI.storage.exists(auditFolder)) {
+ return "Validation Result: PASSED\n"
+ + "Transactions Validated: 0\n"
+ + "Issues Found: 0\n"
+ + "Details: No audit folder found - nothing to validate";
+ }
+
+ // Get all audit files
+ List<StoragePathInfo> allFiles =
HoodieCLI.storage.listDirectEntries(auditFolder);
+ List<StoragePathInfo> auditFiles = new ArrayList<>();
+ for (StoragePathInfo pathInfo : allFiles) {
+ if (pathInfo.isFile() &&
pathInfo.getPath().getName().endsWith(".jsonl")) {
+ auditFiles.add(pathInfo);
+ }
+ }
+
+ if (auditFiles.isEmpty()) {
+ return "Validation Result: PASSED\n"
+ + "Transactions Validated: 0\n"
+ + "Issues Found: 0\n"
+ + "Details: No audit files found - nothing to validate";
+ }
+
+ // Parse all audit files into transaction windows
+ List<TransactionWindow> windows = new ArrayList<>();
+ for (StoragePathInfo pathInfo : auditFiles) {
+ Option<TransactionWindow> window = parseAuditFile(pathInfo);
+ if (window.isPresent()) {
+ windows.add(window.get());
+ }
+ }
+
+ if (windows.isEmpty()) {
+ return String.format("Validation Result: FAILED\n"
+ + "Transactions Validated: 0\n"
+ + "Issues Found: %d\n"
+ + "Details: Failed to parse any audit files", auditFiles.size());
+ }
+
+ // Validate transactions
+ ValidationResults validationResults =
validateTransactionWindows(windows);
+
+ // Generate result
+ int totalIssues = validationResults.errors.size() +
validationResults.warnings.size();
+ String result;
+ String details;
+
+ if (totalIssues == 0) {
+ result = "PASSED";
+ details = "All audit lock transactions validated successfully";
+ } else {
+ result = validationResults.errors.isEmpty() ? "WARNING" : "FAILED";
+ List<String> allIssues = new ArrayList<>();
+ allIssues.addAll(validationResults.errors);
+ allIssues.addAll(validationResults.warnings);
+ details = String.join(", ", allIssues);
+ }
+
+ return String.format("Validation Result: %s\n"
+ + "Transactions Validated: %d\n"
+ + "Issues Found: %d\n"
+ + "Details: %s", result, windows.size(), totalIssues, details);
+
+ } catch (Exception e) {
+ LOG.error("Error validating audit locks", e);
+ return String.format("Validation Result: ERROR\n"
+ + "Transactions Validated: 0\n"
+ + "Issues Found: -1\n"
+ + "Details: Validation failed: %s", e.getMessage());
+ }
+ }
+
+ /**
+ * Cleans up old audit lock files based on age threshold.
+ * This command removes audit files that are older than the specified number
of days.
+ *
+ * @param dryRun Whether to perform a dry run (preview changes without
deletion)
+ * @param ageDays Number of days to keep audit files (default 7)
+ * @return Status message indicating files cleaned or to be cleaned
+ */
+ @ShellMethod(key = "locks audit cleanup", value = "Clean up old audit lock
files")
+ public String cleanupAuditLocks(
+ @ShellOption(value = {"--dryRun"}, defaultValue = "false",
+ help = "Preview changes without actually deleting files") final
boolean dryRun,
+ @ShellOption(value = {"--ageDays"}, defaultValue = "7",
+ help = "Delete audit files older than this many days") final int
ageDays) {
+
+ if (HoodieCLI.basePath == null) {
+ return "No Hudi table loaded. Please connect to a table first.";
+ }
+
+ if (ageDays <= 0) {
+ return "Error: ageDays must be a positive integer.";
+ }
+
+ return performAuditCleanup(dryRun, ageDays);
+ }
+
+ /**
+ * Internal method to perform audit cleanup. Used by both the CLI command
and disable method.
+ *
+ * @param dryRun Whether to perform a dry run (preview changes without
deletion)
+ * @param ageDays Number of days to keep audit files (0 means delete all)
+ * @return Status message indicating files cleaned or to be cleaned
+ */
+ private String performAuditCleanup(boolean dryRun, int ageDays) {
+ try {
+ String auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+ StoragePath auditFolder = new StoragePath(auditFolderPath);
+
+ // Check if audit folder exists
+ if (!HoodieCLI.storage.exists(auditFolder)) {
+ return "No audit folder found - nothing to cleanup.";
+ }
+
+ // Calculate cutoff timestamp (ageDays ago)
+ long cutoffTime = System.currentTimeMillis() - (ageDays * 24L * 60L *
60L * 1000L);
+
+ // List all files in audit folder and filter by modification time
+ List<StoragePathInfo> allFiles =
HoodieCLI.storage.listDirectEntries(auditFolder);
+ List<StoragePathInfo> auditFiles = new ArrayList<>();
+ List<StoragePathInfo> oldFiles = new ArrayList<>();
+
+ // Filter to get only .jsonl files
+ for (org.apache.hudi.storage.StoragePathInfo pathInfo : allFiles) {
+ if (pathInfo.isFile() &&
pathInfo.getPath().getName().endsWith(".jsonl")) {
+ auditFiles.add(pathInfo);
+ if (pathInfo.getModificationTime() < cutoffTime) {
+ oldFiles.add(pathInfo);
+ }
+ }
+ }
+
+ if (oldFiles.isEmpty()) {
+ return String.format("No audit files older than %d days found.",
ageDays);
+ }
+
+ int fileCount = oldFiles.size();
+
+ if (dryRun) {
+ return String.format("Dry run: Would delete %d audit files older than
%d days.", fileCount, ageDays);
+ } else {
+ // Actually delete the files
+ int deletedCount = 0;
+ int failedCount = 0;
+
+ for (org.apache.hudi.storage.StoragePathInfo pathInfo : oldFiles) {
+ try {
+ HoodieCLI.storage.deleteFile(pathInfo.getPath());
+ deletedCount++;
+ } catch (Exception e) {
+ failedCount++;
+ LOG.warn("Failed to delete audit file: " + pathInfo.getPath(), e);
+ }
+ }
+
+ if (failedCount == 0) {
+ return String.format("Successfully deleted %d audit files older than
%d days.", deletedCount, ageDays);
+ } else {
+ return String.format("Deleted %d audit files, failed to delete %d
files.", deletedCount, failedCount);
+ }
+ }
+
+ } catch (Exception e) {
+ LOG.error("Error cleaning up audit locks", e);
+ return String.format("Failed to cleanup audit locks: %s",
e.getMessage());
+ }
+ }
+
+ /**
+ * Parses an audit file and extracts transaction window information.
+ */
+ private Option<TransactionWindow> parseAuditFile(StoragePathInfo pathInfo) {
+ String filename = pathInfo.getPath().getName();
+
+ try {
+ // Read file content using Hudi storage API
+ String content;
+ try (InputStream inputStream =
HoodieCLI.storage.open(pathInfo.getPath());
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(inputStream))) {
+ StringBuilder sb = new StringBuilder();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ sb.append(line).append("\n");
+ }
+ content = sb.toString();
+ }
+
+ // Parse JSONL content
+ String[] lines = content.split("\n");
+ List<JsonNode> jsonObjects = new ArrayList<>();
+ for (String line : lines) {
+ if (line.trim().isEmpty()) {
+ continue;
+ }
+ try {
+ jsonObjects.add(OBJECT_MAPPER.readTree(line));
+ } catch (Exception e) {
+ LOG.warn("Failed to parse JSON line in file " + filename + ": " +
line, e);
+ }
+ }
+
+ if (jsonObjects.isEmpty()) {
+ return Option.empty();
+ }
+
+ // Extract transaction metadata
+ JsonNode firstObject = jsonObjects.get(0);
+ String ownerId = firstObject.has("ownerId") ?
firstObject.get("ownerId").asText() : "unknown";
+ long transactionStartTime = firstObject.has("transactionStartTime")
+ ? firstObject.get("transactionStartTime").asLong() : 0L;
+
+ // Find first START timestamp
+ long startTimestamp = transactionStartTime; // default to transaction
start time
+ for (JsonNode obj : jsonObjects) {
+ if (obj.has("state") && "START".equals(obj.get("state").asText())) {
+ startTimestamp = obj.has("timestamp") ?
obj.get("timestamp").asLong() : transactionStartTime;
+ break;
+ }
+ }
+
+ // Find last END timestamp
+ Option<Long> endTimestamp = Option.empty();
+ for (int i = jsonObjects.size() - 1; i >= 0; i--) {
+ JsonNode obj = jsonObjects.get(i);
+ if (obj.has("state") && "END".equals(obj.get("state").asText())) {
+ endTimestamp = Option.of(obj.has("timestamp") ?
obj.get("timestamp").asLong() : 0L);
+ break;
+ }
+ }
+
+ // Find last expiration time as fallback
+ Option<Long> lastExpirationTime = Option.empty();
+ if (!jsonObjects.isEmpty()) {
+ JsonNode lastObject = jsonObjects.get(jsonObjects.size() - 1);
+ if (lastObject.has("lockExpiration")) {
+ lastExpirationTime =
Option.of(lastObject.get("lockExpiration").asLong());
+ }
+ }
+
+ return Option.of(new TransactionWindow(
+ ownerId,
+ transactionStartTime,
+ startTimestamp,
+ endTimestamp,
+ lastExpirationTime,
+ filename
+ ));
+ } catch (Exception e) {
+ LOG.warn("Failed to parse audit file: " + filename, e);
+ return Option.empty();
+ }
+ }
+
+ /**
+ * Validates transaction windows for overlaps and proper closure.
+ */
+ private ValidationResults validateTransactionWindows(List<TransactionWindow>
windows) {
+ List<String> errors = new ArrayList<>();
+ List<String> warnings = new ArrayList<>();
+
+ // Check for transactions without proper END
+ for (TransactionWindow window : windows) {
+ if (!window.endTimestamp.isPresent()) {
+ warnings.add(String.format("[WARNING] %s => transaction did not end
gracefully. This could be due to driver OOM or non-graceful shutdown.",
+ window.filename));
+ }
+ }
+
+ // Sort windows by start time for overlap detection
+ List<TransactionWindow> sortedWindows = new ArrayList<>(windows);
+ sortedWindows.sort(Comparator.comparingLong(w -> w.startTimestamp));
+
+ // Check for overlaps
+ for (int i = 0; i < sortedWindows.size(); i++) {
+ TransactionWindow currentWindow = sortedWindows.get(i);
+ long currentEnd = currentWindow.getEffectiveEndTime();
+
+ // Check all subsequent windows for overlaps
+ for (int j = i + 1; j < sortedWindows.size(); j++) {
+ TransactionWindow otherWindow = sortedWindows.get(j);
+ long otherStart = otherWindow.startTimestamp;
+
+ // Check if windows overlap
+ if (otherStart < currentEnd) {
+ // There's an overlap - check if current transaction ended properly
before the other started
+ if (currentWindow.endTimestamp.isPresent() &&
currentWindow.endTimestamp.get() <= otherStart) {
+ // Transaction ended properly before the other started - no issue
+ } else {
+ // Either no END timestamp or END timestamp is after other
transaction started
+ errors.add(String.format("[ERROR] %s => overlaps with %s",
+ currentWindow.filename, otherWindow.filename));
Review Comment:
same: these shld be PoJos so they are extensible in future. Not sure if we
added Lombok in hudi..
##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java:
##########
@@ -185,4 +236,323 @@ public String showLockAuditStatus() {
return String.format("Failed to check lock audit status: %s",
e.getMessage());
}
}
+
+ /**
+ * Validates the audit lock files for consistency and integrity.
+ * This command checks for issues such as corrupted files, invalid format,
+ * incorrect state transitions, and orphaned locks.
+ *
+ * @return Validation results including any issues found
+ */
+ @ShellMethod(key = "locks audit validate", value = "Validate audit lock
files for consistency and integrity")
+ public String validateAuditLocks() {
+
+ if (HoodieCLI.basePath == null) {
+ return "No Hudi table loaded. Please connect to a table first.";
+ }
+
+ try {
+ String auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+ StoragePath auditFolder = new StoragePath(auditFolderPath);
+
+ // Check if audit folder exists
+ if (!HoodieCLI.storage.exists(auditFolder)) {
+ return "Validation Result: PASSED\n"
+ + "Transactions Validated: 0\n"
+ + "Issues Found: 0\n"
+ + "Details: No audit folder found - nothing to validate";
+ }
+
+ // Get all audit files
+ List<StoragePathInfo> allFiles =
HoodieCLI.storage.listDirectEntries(auditFolder);
+ List<StoragePathInfo> auditFiles = new ArrayList<>();
+ for (StoragePathInfo pathInfo : allFiles) {
+ if (pathInfo.isFile() &&
pathInfo.getPath().getName().endsWith(".jsonl")) {
+ auditFiles.add(pathInfo);
+ }
+ }
+
+ if (auditFiles.isEmpty()) {
+ return "Validation Result: PASSED\n"
+ + "Transactions Validated: 0\n"
+ + "Issues Found: 0\n"
+ + "Details: No audit files found - nothing to validate";
+ }
+
+ // Parse all audit files into transaction windows
+ List<TransactionWindow> windows = new ArrayList<>();
+ for (StoragePathInfo pathInfo : auditFiles) {
+ Option<TransactionWindow> window = parseAuditFile(pathInfo);
+ if (window.isPresent()) {
+ windows.add(window.get());
+ }
+ }
+
+ if (windows.isEmpty()) {
+ return String.format("Validation Result: FAILED\n"
+ + "Transactions Validated: 0\n"
+ + "Issues Found: %d\n"
+ + "Details: Failed to parse any audit files", auditFiles.size());
+ }
+
+ // Validate transactions
+ ValidationResults validationResults =
validateTransactionWindows(windows);
+
+ // Generate result
+ int totalIssues = validationResults.errors.size() +
validationResults.warnings.size();
+ String result;
+ String details;
+
+ if (totalIssues == 0) {
+ result = "PASSED";
+ details = "All audit lock transactions validated successfully";
+ } else {
+ result = validationResults.errors.isEmpty() ? "WARNING" : "FAILED";
+ List<String> allIssues = new ArrayList<>();
+ allIssues.addAll(validationResults.errors);
+ allIssues.addAll(validationResults.warnings);
+ details = String.join(", ", allIssues);
+ }
+
+ return String.format("Validation Result: %s\n"
+ + "Transactions Validated: %d\n"
+ + "Issues Found: %d\n"
+ + "Details: %s", result, windows.size(), totalIssues, details);
+
+ } catch (Exception e) {
+ LOG.error("Error validating audit locks", e);
+ return String.format("Validation Result: ERROR\n"
+ + "Transactions Validated: 0\n"
+ + "Issues Found: -1\n"
+ + "Details: Validation failed: %s", e.getMessage());
+ }
+ }
+
+ /**
+ * Cleans up old audit lock files based on age threshold.
+ * This command removes audit files that are older than the specified number
of days.
+ *
+ * @param dryRun Whether to perform a dry run (preview changes without
deletion)
+ * @param ageDays Number of days to keep audit files (default 7)
+ * @return Status message indicating files cleaned or to be cleaned
+ */
+ @ShellMethod(key = "locks audit cleanup", value = "Clean up old audit lock
files")
+ public String cleanupAuditLocks(
+ @ShellOption(value = {"--dryRun"}, defaultValue = "false",
+ help = "Preview changes without actually deleting files") final
boolean dryRun,
+ @ShellOption(value = {"--ageDays"}, defaultValue = "7",
+ help = "Delete audit files older than this many days") final int
ageDays) {
+
+ if (HoodieCLI.basePath == null) {
+ return "No Hudi table loaded. Please connect to a table first.";
+ }
+
+ if (ageDays <= 0) {
+ return "Error: ageDays must be a positive integer.";
+ }
+
+ return performAuditCleanup(dryRun, ageDays);
+ }
+
+ /**
+ * Internal method to perform audit cleanup. Used by both the CLI command
and disable method.
+ *
+ * @param dryRun Whether to perform a dry run (preview changes without
deletion)
+ * @param ageDays Number of days to keep audit files (0 means delete all)
+ * @return Status message indicating files cleaned or to be cleaned
+ */
+ private String performAuditCleanup(boolean dryRun, int ageDays) {
+ try {
+ String auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+ StoragePath auditFolder = new StoragePath(auditFolderPath);
+
+ // Check if audit folder exists
+ if (!HoodieCLI.storage.exists(auditFolder)) {
+ return "No audit folder found - nothing to cleanup.";
+ }
+
+ // Calculate cutoff timestamp (ageDays ago)
+ long cutoffTime = System.currentTimeMillis() - (ageDays * 24L * 60L *
60L * 1000L);
+
+ // List all files in audit folder and filter by modification time
+ List<StoragePathInfo> allFiles =
HoodieCLI.storage.listDirectEntries(auditFolder);
+ List<StoragePathInfo> auditFiles = new ArrayList<>();
+ List<StoragePathInfo> oldFiles = new ArrayList<>();
+
+ // Filter to get only .jsonl files
+ for (org.apache.hudi.storage.StoragePathInfo pathInfo : allFiles) {
+ if (pathInfo.isFile() &&
pathInfo.getPath().getName().endsWith(".jsonl")) {
+ auditFiles.add(pathInfo);
+ if (pathInfo.getModificationTime() < cutoffTime) {
+ oldFiles.add(pathInfo);
+ }
+ }
+ }
+
+ if (oldFiles.isEmpty()) {
+ return String.format("No audit files older than %d days found.",
ageDays);
+ }
+
+ int fileCount = oldFiles.size();
+
+ if (dryRun) {
+ return String.format("Dry run: Would delete %d audit files older than
%d days.", fileCount, ageDays);
+ } else {
+ // Actually delete the files
+ int deletedCount = 0;
+ int failedCount = 0;
+
+ for (org.apache.hudi.storage.StoragePathInfo pathInfo : oldFiles) {
+ try {
+ HoodieCLI.storage.deleteFile(pathInfo.getPath());
+ deletedCount++;
+ } catch (Exception e) {
+ failedCount++;
+ LOG.warn("Failed to delete audit file: " + pathInfo.getPath(), e);
+ }
+ }
+
+ if (failedCount == 0) {
+ return String.format("Successfully deleted %d audit files older than
%d days.", deletedCount, ageDays);
+ } else {
+ return String.format("Deleted %d audit files, failed to delete %d
files.", deletedCount, failedCount);
+ }
+ }
+
+ } catch (Exception e) {
+ LOG.error("Error cleaning up audit locks", e);
+ return String.format("Failed to cleanup audit locks: %s",
e.getMessage());
+ }
+ }
+
+ /**
+ * Parses an audit file and extracts transaction window information.
+ */
+ private Option<TransactionWindow> parseAuditFile(StoragePathInfo pathInfo) {
+ String filename = pathInfo.getPath().getName();
+
+ try {
+ // Read file content using Hudi storage API
+ String content;
+ try (InputStream inputStream =
HoodieCLI.storage.open(pathInfo.getPath());
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(inputStream))) {
+ StringBuilder sb = new StringBuilder();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ sb.append(line).append("\n");
+ }
+ content = sb.toString();
+ }
+
+ // Parse JSONL content
+ String[] lines = content.split("\n");
+ List<JsonNode> jsonObjects = new ArrayList<>();
+ for (String line : lines) {
+ if (line.trim().isEmpty()) {
+ continue;
+ }
+ try {
+ jsonObjects.add(OBJECT_MAPPER.readTree(line));
+ } catch (Exception e) {
+ LOG.warn("Failed to parse JSON line in file " + filename + ": " +
line, e);
+ }
+ }
+
+ if (jsonObjects.isEmpty()) {
+ return Option.empty();
+ }
+
+ // Extract transaction metadata
+ JsonNode firstObject = jsonObjects.get(0);
+ String ownerId = firstObject.has("ownerId") ?
firstObject.get("ownerId").asText() : "unknown";
+ long transactionStartTime = firstObject.has("transactionStartTime")
+ ? firstObject.get("transactionStartTime").asLong() : 0L;
+
+ // Find first START timestamp
+ long startTimestamp = transactionStartTime; // default to transaction
start time
+ for (JsonNode obj : jsonObjects) {
+ if (obj.has("state") && "START".equals(obj.get("state").asText())) {
+ startTimestamp = obj.has("timestamp") ?
obj.get("timestamp").asLong() : transactionStartTime;
+ break;
+ }
+ }
+
+ // Find last END timestamp
+ Option<Long> endTimestamp = Option.empty();
+ for (int i = jsonObjects.size() - 1; i >= 0; i--) {
+ JsonNode obj = jsonObjects.get(i);
+ if (obj.has("state") && "END".equals(obj.get("state").asText())) {
+ endTimestamp = Option.of(obj.has("timestamp") ?
obj.get("timestamp").asLong() : 0L);
+ break;
+ }
+ }
+
+ // Find last expiration time as fallback
+ Option<Long> lastExpirationTime = Option.empty();
+ if (!jsonObjects.isEmpty()) {
+ JsonNode lastObject = jsonObjects.get(jsonObjects.size() - 1);
+ if (lastObject.has("lockExpiration")) {
+ lastExpirationTime =
Option.of(lastObject.get("lockExpiration").asLong());
+ }
+ }
+
+ return Option.of(new TransactionWindow(
+ ownerId,
+ transactionStartTime,
+ startTimestamp,
+ endTimestamp,
+ lastExpirationTime,
+ filename
+ ));
+ } catch (Exception e) {
+ LOG.warn("Failed to parse audit file: " + filename, e);
+ return Option.empty();
+ }
+ }
+
+ /**
+ * Validates transaction windows for overlaps and proper closure.
+ */
+ private ValidationResults validateTransactionWindows(List<TransactionWindow>
windows) {
+ List<String> errors = new ArrayList<>();
+ List<String> warnings = new ArrayList<>();
+
+ // Check for transactions without proper END
+ for (TransactionWindow window : windows) {
+ if (!window.endTimestamp.isPresent()) {
+ warnings.add(String.format("[WARNING] %s => transaction did not end
gracefully. This could be due to driver OOM or non-graceful shutdown.",
+ window.filename));
+ }
+ }
+
+ // Sort windows by start time for overlap detection
+ List<TransactionWindow> sortedWindows = new ArrayList<>(windows);
+ sortedWindows.sort(Comparator.comparingLong(w -> w.startTimestamp));
+
+ // Check for overlaps
+ for (int i = 0; i < sortedWindows.size(); i++) {
+ TransactionWindow currentWindow = sortedWindows.get(i);
+ long currentEnd = currentWindow.getEffectiveEndTime();
+
+ // Check all subsequent windows for overlaps
+ for (int j = i + 1; j < sortedWindows.size(); j++) {
+ TransactionWindow otherWindow = sortedWindows.get(j);
+ long otherStart = otherWindow.startTimestamp;
+
+ // Check if windows overlap
+ if (otherStart < currentEnd) {
+ // There's an overlap - check if current transaction ended properly
before the other started
+ if (currentWindow.endTimestamp.isPresent() &&
currentWindow.endTimestamp.get() <= otherStart) {
Review Comment:
Not sure why we need to check this again. If there is an overlap, we should
simply mark it as an issue?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]