alexr17 commented on code in PR #13868: URL: https://github.com/apache/hudi/pull/13868#discussion_r2334936991
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/StorageLockProviderAuditService.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction.lock.audit; + +import org.apache.hudi.client.transaction.lock.StorageLockClient; +import org.apache.hudi.storage.StoragePath; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * Storage-based audit service implementation for lock provider operations. + * Writes audit records to a single JSONL file per transaction to track lock lifecycle events. + */ +public class StorageLockProviderAuditService implements AuditService { + + private static final Logger LOG = LoggerFactory.getLogger(StorageLockProviderAuditService.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + // Audit configuration constants + public static final String AUDIT_FOLDER_NAME = "audit"; + public static final String AUDIT_CONFIG_FILE_NAME = "audit_enabled.json"; + public static final String STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD = "STORAGE_LOCK_AUDIT_SERVICE_ENABLED"; + + /** + * Constructs the full path to the audit configuration file for a given table. + * + * @param basePath The base path of the Hudi table + * @return The full path to the audit_enabled.json configuration file + */ + public static String getAuditConfigPath(String basePath) { + String lockFolderPath = StorageLockClient.getLockFolderPath(basePath); + return String.format("%s%s%s", lockFolderPath, StoragePath.SEPARATOR, AUDIT_CONFIG_FILE_NAME); + } + + /** + * Constructs the full path to the audit folder for a given table. + * + * @param basePath The base path of the Hudi table + * @return The full path to the audit folder where audit files are stored + */ + public static String getAuditFolderPath(String basePath) { + String lockFolderPath = StorageLockClient.getLockFolderPath(basePath); + return String.format("%s%s%s", lockFolderPath, StoragePath.SEPARATOR, AUDIT_FOLDER_NAME); + } + + private final String ownerId; + private final long transactionStartTime; + private final String auditFilePath; + private final StorageLockClient storageLockClient; + private final Function<Long, Long> lockExpirationFunction; + private final Supplier<Boolean> lockHeldSupplier; + private final StringBuilder auditBuffer; + + /** + * Constructor for StorageLockProviderAuditService. + * + * @param basePath The base path where audit files will be written + * @param ownerId The full owner ID + * @param transactionStartTime The timestamp when the transaction started (lock acquired) + * @param storageLockClient The storage client for writing audit files + * @param lockExpirationFunction Function that takes a timestamp and returns the lock expiration time + * @param lockHeldSupplier Supplier that provides whether the lock is currently held + */ + public StorageLockProviderAuditService( + String basePath, + String ownerId, + long transactionStartTime, + StorageLockClient storageLockClient, + Function<Long, Long> lockExpirationFunction, + Supplier<Boolean> lockHeldSupplier) { + this.ownerId = ownerId; + this.transactionStartTime = transactionStartTime; + this.storageLockClient = storageLockClient; + this.lockExpirationFunction = lockExpirationFunction; + this.lockHeldSupplier = lockHeldSupplier; + this.auditBuffer = new StringBuilder(); + + // Generate audit file path: <txn-start>_<full-owner-id>.jsonl + String filename = String.format("%d_%s.jsonl", transactionStartTime, ownerId); + this.auditFilePath = String.format("%s%s%s", + getAuditFolderPath(basePath), + StoragePath.SEPARATOR, + filename); + + LOG.debug("Initialized audit service for transaction starting at {} with file: {}", + transactionStartTime, auditFilePath); + } + + @Override + public synchronized void recordOperation(AuditOperationState state, long timestamp) throws Exception { + // Create audit record + Map<String, Object> auditRecord = new HashMap<>(); + auditRecord.put("ownerId", ownerId); + auditRecord.put("transactionStartTime", transactionStartTime); + auditRecord.put("timestamp", timestamp); + auditRecord.put("state", state.name()); + auditRecord.put("lockExpiration", lockExpirationFunction.apply(timestamp)); + auditRecord.put("lockHeld", lockHeldSupplier.get()); + + // Convert to JSON and append newline for JSONL format + String jsonLine = OBJECT_MAPPER.writeValueAsString(auditRecord) + "\n"; + + // Append to buffer + auditBuffer.append(jsonLine); + + // Write the accumulated audit records to file + writeAuditFile(); + + LOG.debug("Recorded audit operation: state={}, timestamp={}, file={}", + state, timestamp, auditFilePath); + } + Review Comment: kept simple for now IMO -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
