alexr17 commented on code in PR #13886:
URL: https://github.com/apache/hudi/pull/13886#discussion_r2353423268


##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java:
##########
@@ -185,4 +236,323 @@ public String showLockAuditStatus() {
       return String.format("Failed to check lock audit status: %s", 
e.getMessage());
     }
   }
+
+  /**
+   * Validates the audit lock files for consistency and integrity.
+   * This command checks for issues such as corrupted files, invalid format,
+   * incorrect state transitions, and orphaned locks.
+   * 
+   * @return Validation results including any issues found
+   */
+  @ShellMethod(key = "locks audit validate", value = "Validate audit lock 
files for consistency and integrity")
+  public String validateAuditLocks() {
+    
+    if (HoodieCLI.basePath == null) {
+      return "No Hudi table loaded. Please connect to a table first.";
+    }
+
+    try {
+      String auditFolderPath = 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+      StoragePath auditFolder = new StoragePath(auditFolderPath);
+      
+      // Check if audit folder exists
+      if (!HoodieCLI.storage.exists(auditFolder)) {
+        return "Validation Result: PASSED\n"
+            + "Transactions Validated: 0\n"
+            + "Issues Found: 0\n"
+            + "Details: No audit folder found - nothing to validate";
+      }
+      
+      // Get all audit files
+      List<StoragePathInfo> allFiles = 
HoodieCLI.storage.listDirectEntries(auditFolder);
+      List<StoragePathInfo> auditFiles = new ArrayList<>();
+      for (StoragePathInfo pathInfo : allFiles) {
+        if (pathInfo.isFile() && 
pathInfo.getPath().getName().endsWith(".jsonl")) {
+          auditFiles.add(pathInfo);
+        }
+      }
+      
+      if (auditFiles.isEmpty()) {
+        return "Validation Result: PASSED\n"
+            + "Transactions Validated: 0\n"
+            + "Issues Found: 0\n"
+            + "Details: No audit files found - nothing to validate";
+      }
+      
+      // Parse all audit files into transaction windows
+      List<TransactionWindow> windows = new ArrayList<>();
+      for (StoragePathInfo pathInfo : auditFiles) {
+        Option<TransactionWindow> window = parseAuditFile(pathInfo);
+        if (window.isPresent()) {
+          windows.add(window.get());
+        }
+      }
+      
+      if (windows.isEmpty()) {
+        return String.format("Validation Result: FAILED\n"
+            + "Transactions Validated: 0\n"
+            + "Issues Found: %d\n"
+            + "Details: Failed to parse any audit files", auditFiles.size());
+      }
+      
+      // Validate transactions
+      ValidationResults validationResults = 
validateTransactionWindows(windows);
+      
+      // Generate result
+      int totalIssues = validationResults.errors.size() + 
validationResults.warnings.size();
+      String result;
+      String details;
+      
+      if (totalIssues == 0) {
+        result = "PASSED";
+        details = "All audit lock transactions validated successfully";
+      } else {
+        result = validationResults.errors.isEmpty() ? "WARNING" : "FAILED";
+        List<String> allIssues = new ArrayList<>();
+        allIssues.addAll(validationResults.errors);
+        allIssues.addAll(validationResults.warnings);
+        details = String.join(", ", allIssues);
+      }
+      
+      return String.format("Validation Result: %s\n"
+          + "Transactions Validated: %d\n"
+          + "Issues Found: %d\n"
+          + "Details: %s", result, windows.size(), totalIssues, details);
+      
+    } catch (Exception e) {
+      LOG.error("Error validating audit locks", e);
+      return String.format("Validation Result: ERROR\n"
+          + "Transactions Validated: 0\n"
+          + "Issues Found: -1\n"
+          + "Details: Validation failed: %s", e.getMessage());
+    }
+  }
+
+  /**
+   * Cleans up old audit lock files based on age threshold.
+   * This command removes audit files that are older than the specified number 
of days.
+   * 
+   * @param dryRun Whether to perform a dry run (preview changes without 
deletion)
+   * @param ageDays Number of days to keep audit files (default 7)
+   * @return Status message indicating files cleaned or to be cleaned
+   */
+  @ShellMethod(key = "locks audit cleanup", value = "Clean up old audit lock 
files")
+  public String cleanupAuditLocks(
+      @ShellOption(value = {"--dryRun"}, defaultValue = "false",
+          help = "Preview changes without actually deleting files") final 
boolean dryRun,
+      @ShellOption(value = {"--ageDays"}, defaultValue = "7",
+          help = "Delete audit files older than this many days") final int 
ageDays) {
+    
+    if (HoodieCLI.basePath == null) {
+      return "No Hudi table loaded. Please connect to a table first.";
+    }
+
+    if (ageDays <= 0) {
+      return "Error: ageDays must be a positive integer.";
+    }
+
+    return performAuditCleanup(dryRun, ageDays);
+  }
+
+  /**
+   * Internal method to perform audit cleanup. Used by both the CLI command 
and disable method.
+   * 
+   * @param dryRun Whether to perform a dry run (preview changes without 
deletion)
+   * @param ageDays Number of days to keep audit files (0 means delete all)
+   * @return Status message indicating files cleaned or to be cleaned
+   */
+  private String performAuditCleanup(boolean dryRun, int ageDays) {
+    try {
+      String auditFolderPath = 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+      StoragePath auditFolder = new StoragePath(auditFolderPath);
+      
+      // Check if audit folder exists
+      if (!HoodieCLI.storage.exists(auditFolder)) {
+        return "No audit folder found - nothing to cleanup.";
+      }
+      
+      // Calculate cutoff timestamp (ageDays ago)
+      long cutoffTime = System.currentTimeMillis() - (ageDays * 24L * 60L * 
60L * 1000L);
+      
+      // List all files in audit folder and filter by modification time
+      List<StoragePathInfo> allFiles = 
HoodieCLI.storage.listDirectEntries(auditFolder);
+      List<StoragePathInfo> auditFiles = new ArrayList<>();
+      List<StoragePathInfo> oldFiles = new ArrayList<>();
+      
+      // Filter to get only .jsonl files
+      for (org.apache.hudi.storage.StoragePathInfo pathInfo : allFiles) {
+        if (pathInfo.isFile() && 
pathInfo.getPath().getName().endsWith(".jsonl")) {
+          auditFiles.add(pathInfo);
+          if (pathInfo.getModificationTime() < cutoffTime) {
+            oldFiles.add(pathInfo);
+          }
+        }
+      }
+      
+      if (oldFiles.isEmpty()) {
+        return String.format("No audit files older than %d days found.", 
ageDays);
+      }
+      
+      int fileCount = oldFiles.size();
+      
+      if (dryRun) {
+        return String.format("Dry run: Would delete %d audit files older than 
%d days.", fileCount, ageDays);
+      } else {
+        // Actually delete the files
+        int deletedCount = 0;
+        int failedCount = 0;
+        
+        for (org.apache.hudi.storage.StoragePathInfo pathInfo : oldFiles) {
+          try {
+            HoodieCLI.storage.deleteFile(pathInfo.getPath());
+            deletedCount++;
+          } catch (Exception e) {
+            failedCount++;
+            LOG.warn("Failed to delete audit file: " + pathInfo.getPath(), e);
+          }
+        }
+        
+        if (failedCount == 0) {
+          return String.format("Successfully deleted %d audit files older than 
%d days.", deletedCount, ageDays);
+        } else {
+          return String.format("Deleted %d audit files, failed to delete %d 
files.", deletedCount, failedCount);
+        }
+      }
+      
+    } catch (Exception e) {
+      LOG.error("Error cleaning up audit locks", e);
+      return String.format("Failed to cleanup audit locks: %s", 
e.getMessage());
+    }
+  }
+
+  /**
+   * Parses an audit file and extracts transaction window information.
+   */
+  private Option<TransactionWindow> parseAuditFile(StoragePathInfo pathInfo) {
+    String filename = pathInfo.getPath().getName();
+    
+    try {
+      // Read file content using Hudi storage API
+      String content;
+      try (InputStream inputStream = 
HoodieCLI.storage.open(pathInfo.getPath());
+           BufferedReader reader = new BufferedReader(new 
InputStreamReader(inputStream))) {
+        StringBuilder sb = new StringBuilder();
+        String line;
+        while ((line = reader.readLine()) != null) {
+          sb.append(line).append("\n");
+        }
+        content = sb.toString();
+      }
+      
+      // Parse JSONL content
+      String[] lines = content.split("\n");
+      List<JsonNode> jsonObjects = new ArrayList<>();
+      for (String line : lines) {
+        if (line.trim().isEmpty()) {
+          continue;
+        }
+        try {
+          jsonObjects.add(OBJECT_MAPPER.readTree(line));
+        } catch (Exception e) {
+          LOG.warn("Failed to parse JSON line in file " + filename + ": " + 
line, e);
+        }
+      }
+      
+      if (jsonObjects.isEmpty()) {
+        return Option.empty();
+      }
+      
+      // Extract transaction metadata
+      JsonNode firstObject = jsonObjects.get(0);
+      String ownerId = firstObject.has("ownerId") ? 
firstObject.get("ownerId").asText() : "unknown";
+      long transactionStartTime = firstObject.has("transactionStartTime") 
+          ? firstObject.get("transactionStartTime").asLong() : 0L;
+      
+      // Find first START timestamp
+      long startTimestamp = transactionStartTime; // default to transaction 
start time
+      for (JsonNode obj : jsonObjects) {
+        if (obj.has("state") && "START".equals(obj.get("state").asText())) {
+          startTimestamp = obj.has("timestamp") ? 
obj.get("timestamp").asLong() : transactionStartTime;
+          break;
+        }
+      }
+      
+      // Find last END timestamp
+      Option<Long> endTimestamp = Option.empty();
+      for (int i = jsonObjects.size() - 1; i >= 0; i--) {
+        JsonNode obj = jsonObjects.get(i);
+        if (obj.has("state") && "END".equals(obj.get("state").asText())) {
+          endTimestamp = Option.of(obj.has("timestamp") ? 
obj.get("timestamp").asLong() : 0L);
+          break;
+        }
+      }
+      
+      // Find last expiration time as fallback
+      Option<Long> lastExpirationTime = Option.empty();
+      if (!jsonObjects.isEmpty()) {
+        JsonNode lastObject = jsonObjects.get(jsonObjects.size() - 1);
+        if (lastObject.has("lockExpiration")) {
+          lastExpirationTime = 
Option.of(lastObject.get("lockExpiration").asLong());
+        }
+      }
+      
+      return Option.of(new TransactionWindow(
+          ownerId,
+          transactionStartTime,
+          startTimestamp,
+          endTimestamp,
+          lastExpirationTime,
+          filename
+      ));
+    } catch (Exception e) {
+      LOG.warn("Failed to parse audit file: " + filename, e);
+      return Option.empty();
+    }
+  }
+
+  /**
+   * Validates transaction windows for overlaps and proper closure.
+   */
+  private ValidationResults validateTransactionWindows(List<TransactionWindow> 
windows) {
+    List<String> errors = new ArrayList<>();
+    List<String> warnings = new ArrayList<>();
+    
+    // Check for transactions without proper END
+    for (TransactionWindow window : windows) {
+      if (!window.endTimestamp.isPresent()) {
+        warnings.add(String.format("[WARNING] %s => transaction did not end 
gracefully. This could be due to driver OOM or non-graceful shutdown.", 
+            window.filename));
+      }
+    }
+    
+    // Sort windows by start time for overlap detection
+    List<TransactionWindow> sortedWindows = new ArrayList<>(windows);
+    sortedWindows.sort(Comparator.comparingLong(w -> w.startTimestamp));
+    
+    // Check for overlaps
+    for (int i = 0; i < sortedWindows.size(); i++) {
+      TransactionWindow currentWindow = sortedWindows.get(i);
+      long currentEnd = currentWindow.getEffectiveEndTime();
+      
+      // Check all subsequent windows for overlaps
+      for (int j = i + 1; j < sortedWindows.size(); j++) {
+        TransactionWindow otherWindow = sortedWindows.get(j);
+        long otherStart = otherWindow.startTimestamp;
+        
+        // Check if windows overlap
+        if (otherStart < currentEnd) {
+          // There's an overlap - check if current transaction ended properly 
before the other started
+          if (currentWindow.endTimestamp.isPresent() && 
currentWindow.endTimestamp.get() <= otherStart) {
+            // Transaction ended properly before the other started - no issue
+          } else {
+            // Either no END timestamp or END timestamp is after other 
transaction started
+            errors.add(String.format("[ERROR] %s => overlaps with %s", 
+                currentWindow.filename, otherWindow.filename));

Review Comment:
   since this is a CLI command i feel that pojos are unnecessary



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to