yihua commented on code in PR #13886:
URL: https://github.com/apache/hudi/pull/13886#discussion_r2392936573


##########
hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestLockAuditingCommand.java:
##########
@@ -56,6 +61,76 @@ public class TestLockAuditingCommand extends 
CLIFunctionalTestHarness {
 
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
+  /**
+   * Represents a single audit record (JSON line in a .jsonl file)
+   */
+  static class AuditRecord {

Review Comment:
   Should this be used by `StorageLockProviderAuditService` and 
`LockAuditingCommand`?



##########
hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestLockAuditingCommand.java:
##########
@@ -282,4 +421,544 @@ public void testEnableLockAuditMultipleTimes() throws 
Exception {
     
     assertTrue(enabledNode.asBoolean(), "Audit should still be enabled");
   }
+
+  // ==================== Validation Tests ====================
+
+  /**
+   * Test validation when no audit folder exists.
+   */
+  @Test
+  public void testValidateAuditLocksNoAuditFolder() {
+    Object result = shell.evaluate(() -> "locks audit validate");
+    
+    assertAll("Validation handles missing audit folder",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("Validation Result: 
PASSED")),
+        () -> assertTrue(result.toString().contains("Transactions Validated: 
0")),
+        () -> assertTrue(result.toString().contains("Issues Found: 0")),
+        () -> assertTrue(result.toString().contains("No audit folder found")));
+  }
+
+  /**
+   * Test validation when audit folder exists but no audit files.
+   */
+  @Test
+  public void testValidateAuditLocksNoAuditFiles() throws IOException {
+    // Create audit folder but no files
+    String auditFolderPath = 
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath);
+    StoragePath auditDir = new StoragePath(auditFolderPath);
+    HoodieCLI.storage.createDirectory(auditDir);
+    
+    Object result = shell.evaluate(() -> "locks audit validate");
+    
+    assertAll("Validation handles no audit files",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("Validation Result: 
PASSED")),
+        () -> assertTrue(result.toString().contains("Transactions Validated: 
0")),
+        () -> assertTrue(result.toString().contains("Issues Found: 0")),
+        () -> assertTrue(result.toString().contains("No audit files found")));
+  }
+
+  /**
+   * Test validation - No Issues (PASSED)
+   */
+  @Test
+  public void testValidateAuditLocksNoIssues() throws IOException {
+    long baseTime = System.currentTimeMillis();
+    List<TransactionScenario> scenarios = new ArrayList<>();
+    
+    // Transaction 1: Complete transaction
+    List<AuditRecord> records1 = new ArrayList<>();
+    records1.add(new AuditRecord("owner1", baseTime, baseTime + 100, "START", 
baseTime + 60000));
+    records1.add(new AuditRecord("owner1", baseTime, baseTime + 200, "RENEW", 
baseTime + 60000));
+    records1.add(new AuditRecord("owner1", baseTime, baseTime + 300, "END", 
baseTime + 60000));
+    scenarios.add(new TransactionScenario(baseTime + "_owner1.jsonl", 
records1));
+    
+    // Transaction 2: Complete transaction starting after first one ends
+    List<AuditRecord> records2 = new ArrayList<>();
+    records2.add(new AuditRecord("owner2", baseTime + 500, baseTime + 600, 
"START", baseTime + 60000));
+    records2.add(new AuditRecord("owner2", baseTime + 500, baseTime + 700, 
"END", baseTime + 60000));
+    scenarios.add(new TransactionScenario((baseTime + 500) + "_owner2.jsonl", 
records2));
+    
+    createAuditFiles(scenarios);
+    
+    Object result = shell.evaluate(() -> "locks audit validate");
+    
+    assertAll("Validation passes with no issues",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("Validation Result: 
PASSED")),
+        () -> assertTrue(result.toString().contains("Transactions Validated: 
2")),
+        () -> assertTrue(result.toString().contains("Issues Found: 0")),
+        () -> assertTrue(result.toString().contains("successfully")));
+  }
+
+  /**
+   * Test validation - Single Unclosed Transaction (WARNING)
+   */
+  @Test
+  public void testValidateAuditLocksSingleUnclosedTransaction() throws 
IOException {
+    long baseTime = 1000000L;
+    List<TransactionScenario> scenarios = new ArrayList<>();
+    
+    // Single audit file without END record
+    List<AuditRecord> records1 = new ArrayList<>();
+    records1.add(new AuditRecord("owner1", baseTime, baseTime + 100, "START", 
baseTime + 200));
+    scenarios.add(new TransactionScenario(baseTime + "_owner1.jsonl", 
records1));
+    
+    createAuditFiles(scenarios);
+    
+    Object result = shell.evaluate(() -> "locks audit validate");
+    
+    assertAll("Single unclosed transaction shows warning",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("Validation Result: 
WARNING")),
+        () -> assertTrue(result.toString().contains("Transactions Validated: 
1")),
+        () -> assertTrue(result.toString().contains("Issues Found: 1")));
+  }
+
+  /**
+   * Test validation - Unclosed Transactions (WARNING)
+   */
+  @Test
+  public void testValidateAuditLocksUnclosedTransactions() throws IOException {
+    long baseTime = 1000000L;
+    List<TransactionScenario> scenarios = new ArrayList<>();
+    
+    // Transaction 1: Unclosed (effective end at expiration = baseTime + 200)
+    List<AuditRecord> records1 = new ArrayList<>();
+    records1.add(new AuditRecord("owner1", baseTime, baseTime + 100, "START", 
baseTime + 200));
+    scenarios.add(new TransactionScenario(baseTime + "_owner1.jsonl", 
records1));
+    
+    // Transaction 2: Complete, starts after owner1's expiration
+    List<AuditRecord> records2 = new ArrayList<>();
+    records2.add(new AuditRecord("owner2", baseTime + 300, baseTime + 400, 
"START", baseTime + 60000));
+    records2.add(new AuditRecord("owner2", baseTime + 300, baseTime + 500, 
"END", baseTime + 60000));
+    scenarios.add(new TransactionScenario((baseTime + 300) + "_owner2.jsonl", 
records2));
+    
+    createAuditFiles(scenarios);
+    
+    Object result = shell.evaluate(() -> "locks audit validate");
+    
+    assertAll("Unclosed transactions show warning",
+        () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+        () -> assertNotNull(result.toString()),
+        () -> assertTrue(result.toString().contains("Validation Result: 
WARNING")),
+        () -> assertTrue(result.toString().contains("Transactions Validated: 
2")),
+        () -> assertTrue(result.toString().contains("Issues Found: 1")),

Review Comment:
   Is unclosed transactions expected?  Would `WARNING` be noisy in this case?



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateAuditLockProcedure.scala:
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import 
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService
+import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
+
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Spark SQL procedure for validating audit lock files for Hudi tables.
+ *
+ * This procedure validates the integrity and consistency of audit lock files
+ * generated by the storage lock audit service. It checks for various issues
+ * including file format, lock state transitions, and orphaned locks.
+ *
+ * Usage:
+ * {{{
+ * CALL validate_audit_lock(table => 'my_table')
+ * CALL validate_audit_lock(path => '/path/to/table')
+ * }}}
+ *
+ * The procedure reads audit files from:
+ * `{table_path}/.hoodie/.locks/audit/`
+ *
+ * @author Apache Hudi
+ * @since 1.0.0
+ */
+class ValidateAuditLockProcedure extends BaseProcedure with ProcedureBuilder {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("table", DataTypes.StringType, nullable = false, 
Metadata.empty),
+    StructField("validation_result", DataTypes.StringType, nullable = false, 
Metadata.empty),
+    StructField("transactions_validated", DataTypes.IntegerType, nullable = 
false, Metadata.empty),
+    StructField("issues_found", DataTypes.IntegerType, nullable = false, 
Metadata.empty),
+    StructField("details", DataTypes.StringType, nullable = false, 
Metadata.empty)
+  ))
+
+  private val OBJECT_MAPPER = new ObjectMapper()
+
+  /**
+   * Represents a transaction window with start time, end time, and metadata.
+   */
+  case class TransactionWindow(
+    ownerId: String,
+    transactionStartTime: Long,
+    startTimestamp: Long,
+    endTimestamp: Option[Long],
+    lastExpirationTime: Option[Long],
+    filename: String
+  ) {
+    def effectiveEndTime: Long = 
endTimestamp.orElse(lastExpirationTime).getOrElse(startTimestamp)
+  }
+
+  /**
+   * Returns the procedure parameters definition.
+   *
+   * @return Array of parameters: table (optional String) and path (optional 
String)
+   */
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  /**
+   * Returns the output schema for the procedure result.
+   *
+   * @return StructType containing table, validation_result, issues_found, and 
details columns
+   */
+  def outputType: StructType = OUTPUT_TYPE
+
+  /**
+   * Executes the audit lock validation procedure.
+   *
+   * @param args Procedure arguments containing table name or path
+   * @return Sequence containing a single Row with validation results
+   * @throws IllegalArgumentException if neither table nor path is provided, 
or both are provided
+   */
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+
+    // Get the base path using BaseProcedure helper (handles table/path 
validation)
+    val basePath: String = getBasePath(tableName, tablePath)
+    val metaClient = createMetaClient(jsc, basePath)
+
+    // Use table name if provided, otherwise extract from path
+    val displayName = 
tableName.map(_.asInstanceOf[String]).getOrElse(tablePath.get.asInstanceOf[String])
+
+    try {
+      val auditFolderPath = new 
StoragePath(StorageLockProviderAuditService.getAuditFolderPath(basePath))
+      val storage = metaClient.getStorage
+
+      // Check if audit folder exists
+      if (!storage.exists(auditFolderPath)) {
+        Seq(Row(displayName, "PASSED", 0, 0, "No audit folder found - nothing 
to validate"))
+      } else {
+
+        // Get all audit files
+        val allFiles = storage.listDirectEntries(auditFolderPath).asScala
+        val auditFiles = allFiles.filter(pathInfo => pathInfo.isFile && 
pathInfo.getPath.getName.endsWith(".jsonl"))
+
+        if (auditFiles.isEmpty) {
+          Seq(Row(displayName, "PASSED", 0, 0, "No audit files found - nothing 
to validate"))
+        } else {
+
+          // Parse all audit files into transaction windows
+          val windows = auditFiles.flatMap(pathInfo => 
parseAuditFile(pathInfo, storage)).toSeq
+
+          if (windows.isEmpty) {
+            Seq(Row(displayName, "FAILED", 0, auditFiles.size, "Failed to 
parse any audit files"))
+          } else {
+
+            // Validate transactions
+            val validationResults = validateTransactionWindows(windows)
+
+            // Generate result
+            val (result, issuesFound, details) = 
formatValidationResults(validationResults)
+
+            Seq(Row(displayName, result, windows.size, issuesFound, details))
+          }
+        }
+      }
+    } catch {
+      case e: Exception =>
+        val errorMessage = s"Validation failed: ${e.getMessage}"
+        Seq(Row(displayName, "ERROR", 0, -1, errorMessage))
+    }
+  }
+
+  /**
+   * Parses an audit file and extracts transaction window information.
+   */
+  private def parseAuditFile(pathInfo: StoragePathInfo, storage: 
org.apache.hudi.storage.HoodieStorage): Option[TransactionWindow] = {

Review Comment:
   Similar here on reusing the parsing logic



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateAuditLockProcedure.scala:
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import 
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService
+import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
+
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Spark SQL procedure for validating audit lock files for Hudi tables.
+ *
+ * This procedure validates the integrity and consistency of audit lock files
+ * generated by the storage lock audit service. It checks for various issues
+ * including file format, lock state transitions, and orphaned locks.
+ *
+ * Usage:
+ * {{{
+ * CALL validate_audit_lock(table => 'my_table')
+ * CALL validate_audit_lock(path => '/path/to/table')
+ * }}}
+ *
+ * The procedure reads audit files from:
+ * `{table_path}/.hoodie/.locks/audit/`
+ *
+ * @author Apache Hudi
+ * @since 1.0.0
+ */
+class ValidateAuditLockProcedure extends BaseProcedure with ProcedureBuilder {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("table", DataTypes.StringType, nullable = false, 
Metadata.empty),
+    StructField("validation_result", DataTypes.StringType, nullable = false, 
Metadata.empty),
+    StructField("transactions_validated", DataTypes.IntegerType, nullable = 
false, Metadata.empty),
+    StructField("issues_found", DataTypes.IntegerType, nullable = false, 
Metadata.empty),
+    StructField("details", DataTypes.StringType, nullable = false, 
Metadata.empty)
+  ))
+
+  private val OBJECT_MAPPER = new ObjectMapper()
+
+  /**
+   * Represents a transaction window with start time, end time, and metadata.
+   */
+  case class TransactionWindow(
+    ownerId: String,
+    transactionStartTime: Long,
+    startTimestamp: Long,
+    endTimestamp: Option[Long],
+    lastExpirationTime: Option[Long],
+    filename: String
+  ) {
+    def effectiveEndTime: Long = 
endTimestamp.orElse(lastExpirationTime).getOrElse(startTimestamp)
+  }
+
+  /**
+   * Returns the procedure parameters definition.
+   *
+   * @return Array of parameters: table (optional String) and path (optional 
String)
+   */
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  /**
+   * Returns the output schema for the procedure result.
+   *
+   * @return StructType containing table, validation_result, issues_found, and 
details columns
+   */
+  def outputType: StructType = OUTPUT_TYPE
+
+  /**
+   * Executes the audit lock validation procedure.
+   *
+   * @param args Procedure arguments containing table name or path
+   * @return Sequence containing a single Row with validation results
+   * @throws IllegalArgumentException if neither table nor path is provided, 
or both are provided
+   */
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+
+    // Get the base path using BaseProcedure helper (handles table/path 
validation)
+    val basePath: String = getBasePath(tableName, tablePath)
+    val metaClient = createMetaClient(jsc, basePath)
+
+    // Use table name if provided, otherwise extract from path
+    val displayName = 
tableName.map(_.asInstanceOf[String]).getOrElse(tablePath.get.asInstanceOf[String])
+
+    try {
+      val auditFolderPath = new 
StoragePath(StorageLockProviderAuditService.getAuditFolderPath(basePath))
+      val storage = metaClient.getStorage
+
+      // Check if audit folder exists
+      if (!storage.exists(auditFolderPath)) {

Review Comment:
   Similar on avoiding `storage.exists` 



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateAuditLockProcedure.scala:
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import 
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService
+import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
+
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Spark SQL procedure for validating audit lock files for Hudi tables.
+ *
+ * This procedure validates the integrity and consistency of audit lock files
+ * generated by the storage lock audit service. It checks for various issues
+ * including file format, lock state transitions, and orphaned locks.
+ *
+ * Usage:
+ * {{{
+ * CALL validate_audit_lock(table => 'my_table')
+ * CALL validate_audit_lock(path => '/path/to/table')
+ * }}}
+ *
+ * The procedure reads audit files from:
+ * `{table_path}/.hoodie/.locks/audit/`
+ *
+ * @author Apache Hudi
+ * @since 1.0.0
+ */
+class ValidateAuditLockProcedure extends BaseProcedure with ProcedureBuilder {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("table", DataTypes.StringType, nullable = false, 
Metadata.empty),
+    StructField("validation_result", DataTypes.StringType, nullable = false, 
Metadata.empty),
+    StructField("transactions_validated", DataTypes.IntegerType, nullable = 
false, Metadata.empty),
+    StructField("issues_found", DataTypes.IntegerType, nullable = false, 
Metadata.empty),
+    StructField("details", DataTypes.StringType, nullable = false, 
Metadata.empty)
+  ))
+
+  private val OBJECT_MAPPER = new ObjectMapper()
+
+  /**
+   * Represents a transaction window with start time, end time, and metadata.
+   */
+  case class TransactionWindow(
+    ownerId: String,
+    transactionStartTime: Long,
+    startTimestamp: Long,
+    endTimestamp: Option[Long],
+    lastExpirationTime: Option[Long],
+    filename: String
+  ) {
+    def effectiveEndTime: Long = 
endTimestamp.orElse(lastExpirationTime).getOrElse(startTimestamp)
+  }
+
+  /**
+   * Returns the procedure parameters definition.
+   *
+   * @return Array of parameters: table (optional String) and path (optional 
String)
+   */
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  /**
+   * Returns the output schema for the procedure result.
+   *
+   * @return StructType containing table, validation_result, issues_found, and 
details columns
+   */
+  def outputType: StructType = OUTPUT_TYPE
+
+  /**
+   * Executes the audit lock validation procedure.
+   *
+   * @param args Procedure arguments containing table name or path
+   * @return Sequence containing a single Row with validation results
+   * @throws IllegalArgumentException if neither table nor path is provided, 
or both are provided
+   */
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+
+    // Get the base path using BaseProcedure helper (handles table/path 
validation)
+    val basePath: String = getBasePath(tableName, tablePath)
+    val metaClient = createMetaClient(jsc, basePath)
+
+    // Use table name if provided, otherwise extract from path
+    val displayName = 
tableName.map(_.asInstanceOf[String]).getOrElse(tablePath.get.asInstanceOf[String])
+
+    try {
+      val auditFolderPath = new 
StoragePath(StorageLockProviderAuditService.getAuditFolderPath(basePath))
+      val storage = metaClient.getStorage
+
+      // Check if audit folder exists
+      if (!storage.exists(auditFolderPath)) {
+        Seq(Row(displayName, "PASSED", 0, 0, "No audit folder found - nothing 
to validate"))
+      } else {
+
+        // Get all audit files
+        val allFiles = storage.listDirectEntries(auditFolderPath).asScala
+        val auditFiles = allFiles.filter(pathInfo => pathInfo.isFile && 
pathInfo.getPath.getName.endsWith(".jsonl"))
+
+        if (auditFiles.isEmpty) {
+          Seq(Row(displayName, "PASSED", 0, 0, "No audit files found - nothing 
to validate"))
+        } else {
+
+          // Parse all audit files into transaction windows
+          val windows = auditFiles.flatMap(pathInfo => 
parseAuditFile(pathInfo, storage)).toSeq
+
+          if (windows.isEmpty) {
+            Seq(Row(displayName, "FAILED", 0, auditFiles.size, "Failed to 
parse any audit files"))
+          } else {
+
+            // Validate transactions
+            val validationResults = validateTransactionWindows(windows)
+
+            // Generate result
+            val (result, issuesFound, details) = 
formatValidationResults(validationResults)
+
+            Seq(Row(displayName, result, windows.size, issuesFound, details))

Review Comment:
   Is it possible to reuse common logic as `LockAuditingCommand`?



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateAuditLockProcedure.scala:
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import 
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService
+import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
+
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Spark SQL procedure for validating audit lock files for Hudi tables.
+ *
+ * This procedure validates the integrity and consistency of audit lock files
+ * generated by the storage lock audit service. It checks for various issues
+ * including file format, lock state transitions, and orphaned locks.
+ *
+ * Usage:
+ * {{{
+ * CALL validate_audit_lock(table => 'my_table')
+ * CALL validate_audit_lock(path => '/path/to/table')
+ * }}}
+ *
+ * The procedure reads audit files from:
+ * `{table_path}/.hoodie/.locks/audit/`
+ *
+ * @author Apache Hudi
+ * @since 1.0.0
+ */
+class ValidateAuditLockProcedure extends BaseProcedure with ProcedureBuilder {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("table", DataTypes.StringType, nullable = false, 
Metadata.empty),
+    StructField("validation_result", DataTypes.StringType, nullable = false, 
Metadata.empty),
+    StructField("transactions_validated", DataTypes.IntegerType, nullable = 
false, Metadata.empty),
+    StructField("issues_found", DataTypes.IntegerType, nullable = false, 
Metadata.empty),
+    StructField("details", DataTypes.StringType, nullable = false, 
Metadata.empty)
+  ))
+
+  private val OBJECT_MAPPER = new ObjectMapper()
+
+  /**
+   * Represents a transaction window with start time, end time, and metadata.
+   */
+  case class TransactionWindow(
+    ownerId: String,
+    transactionStartTime: Long,
+    startTimestamp: Long,
+    endTimestamp: Option[Long],
+    lastExpirationTime: Option[Long],
+    filename: String
+  ) {
+    def effectiveEndTime: Long = 
endTimestamp.orElse(lastExpirationTime).getOrElse(startTimestamp)
+  }
+
+  /**
+   * Returns the procedure parameters definition.
+   *
+   * @return Array of parameters: table (optional String) and path (optional 
String)
+   */
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  /**
+   * Returns the output schema for the procedure result.
+   *
+   * @return StructType containing table, validation_result, issues_found, and 
details columns
+   */
+  def outputType: StructType = OUTPUT_TYPE
+
+  /**
+   * Executes the audit lock validation procedure.
+   *
+   * @param args Procedure arguments containing table name or path
+   * @return Sequence containing a single Row with validation results
+   * @throws IllegalArgumentException if neither table nor path is provided, 
or both are provided
+   */
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+
+    // Get the base path using BaseProcedure helper (handles table/path 
validation)
+    val basePath: String = getBasePath(tableName, tablePath)
+    val metaClient = createMetaClient(jsc, basePath)
+
+    // Use table name if provided, otherwise extract from path
+    val displayName = 
tableName.map(_.asInstanceOf[String]).getOrElse(tablePath.get.asInstanceOf[String])
+
+    try {
+      val auditFolderPath = new 
StoragePath(StorageLockProviderAuditService.getAuditFolderPath(basePath))
+      val storage = metaClient.getStorage
+
+      // Check if audit folder exists
+      if (!storage.exists(auditFolderPath)) {
+        Seq(Row(displayName, "PASSED", 0, 0, "No audit folder found - nothing 
to validate"))
+      } else {
+
+        // Get all audit files
+        val allFiles = storage.listDirectEntries(auditFolderPath).asScala
+        val auditFiles = allFiles.filter(pathInfo => pathInfo.isFile && 
pathInfo.getPath.getName.endsWith(".jsonl"))
+
+        if (auditFiles.isEmpty) {
+          Seq(Row(displayName, "PASSED", 0, 0, "No audit files found - nothing 
to validate"))
+        } else {
+
+          // Parse all audit files into transaction windows
+          val windows = auditFiles.flatMap(pathInfo => 
parseAuditFile(pathInfo, storage)).toSeq
+
+          if (windows.isEmpty) {
+            Seq(Row(displayName, "FAILED", 0, auditFiles.size, "Failed to 
parse any audit files"))
+          } else {
+
+            // Validate transactions
+            val validationResults = validateTransactionWindows(windows)
+
+            // Generate result
+            val (result, issuesFound, details) = 
formatValidationResults(validationResults)
+
+            Seq(Row(displayName, result, windows.size, issuesFound, details))
+          }
+        }
+      }
+    } catch {
+      case e: Exception =>
+        val errorMessage = s"Validation failed: ${e.getMessage}"
+        Seq(Row(displayName, "ERROR", 0, -1, errorMessage))
+    }
+  }
+
+  /**
+   * Parses an audit file and extracts transaction window information.
+   */
+  private def parseAuditFile(pathInfo: StoragePathInfo, storage: 
org.apache.hudi.storage.HoodieStorage): Option[TransactionWindow] = {
+    val filename = pathInfo.getPath.getName
+
+    Try {
+      // Read file content using Hudi storage API
+      val inputStream = storage.open(pathInfo.getPath)
+      val content = try {
+        scala.io.Source.fromInputStream(inputStream).mkString
+      } finally {
+        inputStream.close()
+      }
+
+      // Parse JSONL content
+      val lines = content.split('\n').filter(_.trim.nonEmpty)
+      val jsonObjects = lines.map(OBJECT_MAPPER.readTree)
+
+      if (jsonObjects.isEmpty) {
+        None
+      } else {
+
+        // Extract transaction metadata
+        val firstObject = jsonObjects.head
+        val ownerId = firstObject.get("ownerId").asText()
+        val transactionStartTime = 
firstObject.get("transactionStartTime").asLong()
+
+      // Find first START timestamp
+      val startRecords = jsonObjects.filter(_.get("state").asText() == "START")
+      val startTimestamp = if (startRecords.nonEmpty) {
+        startRecords.head.get("timestamp").asLong()
+        } else {
+          transactionStartTime // fallback to transaction start time
+        }
+
+        // Find last END timestamp
+        val endRecords = jsonObjects.filter(_.get("state").asText() == "END")
+        val endTimestamp = if (endRecords.nonEmpty) {
+          Some(endRecords.last.get("timestamp").asLong())
+        } else {
+          None
+        }
+
+        // Find last expiration time as fallback
+        val lastExpirationTime = if (jsonObjects.nonEmpty) {
+          val lastObject = jsonObjects.last
+          if (lastObject.has("lockExpiration")) {
+            Some(lastObject.get("lockExpiration").asLong())
+          } else {
+            None
+          }
+        } else {
+          None
+        }
+
+        Some(TransactionWindow(
+        ownerId = ownerId,
+        transactionStartTime = transactionStartTime,
+        startTimestamp = startTimestamp,
+        endTimestamp = endTimestamp,
+        lastExpirationTime = lastExpirationTime,
+          filename = filename
+        ))
+      }
+    } match {
+      case Success(window) => window
+      case Failure(_) => None // Skip corrupted files
+    }
+  }
+
+  /**
+   * Validates transaction windows for overlaps and proper closure.
+   */
+  private def validateTransactionWindows(windows: Seq[TransactionWindow]): 
ValidationResults = {

Review Comment:
   Similar here, reuse part of the logic between `LockAuditingCommand` and 
`ValidateAuditLockProcedure`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to