This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 77ef958515e4 [HUDI-9782] Add audit logging scaffolding to storage
based LP (#13836)
77ef958515e4 is described below
commit 77ef958515e4a29be07183c35607a7f2e5a541be
Author: Alex R <[email protected]>
AuthorDate: Mon Sep 8 20:11:26 2025 -0700
[HUDI-9782] Add audit logging scaffolding to storage based LP (#13836)
---
.../aws/transaction/lock/S3StorageLockClient.java | 124 +++++++---
.../transaction/lock/TestS3StorageLockClient.java | 2 +-
.../TestS3StorageLockClientAuditOperations.java | 272 ++++++++++++++++++++
.../transaction/lock/StorageBasedLockProvider.java | 123 +++++++---
.../client/transaction/lock/StorageLockClient.java | 75 +++++-
.../lock/audit/AuditOperationState.java | 39 +++
.../transaction/lock/audit/AuditService.java | 41 ++++
.../lock/audit/AuditServiceFactory.java | 113 +++++++++
.../lock/TestStorageBasedLockProvider.java | 151 ++++++++++--
.../lock/audit/TestAuditServiceFactory.java | 163 ++++++++++++
.../gcp/transaction/lock/GCSStorageLockClient.java | 111 ++++++---
.../TestGCSStorageLockClientAuditOperations.java | 273 +++++++++++++++++++++
12 files changed, 1364 insertions(+), 123 deletions(-)
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java
index 3fda9ebfe47c..67fef4e75785 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/S3StorageLockClient.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieLockException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +45,7 @@ import
software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
import software.amazon.awssdk.services.s3.model.GetBucketLocationResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
@@ -54,8 +54,6 @@ import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Properties;
@@ -94,34 +92,22 @@ public class S3StorageLockClient implements
StorageLockClient {
@VisibleForTesting
S3StorageLockClient(String ownerId, String lockFileUri, Properties props,
Functions.Function2<String, Properties, S3Client> s3ClientSupplier, Logger
logger) {
- try {
- // This logic can likely be extended to other lock client
implementations.
- // Consider creating base class with utilities, incl error handling.
- URI uri = new URI(lockFileUri);
- this.bucketName = uri.getHost();
- this.lockFilePath = uri.getPath().replaceFirst("/", "");
- this.s3Client = s3ClientSupplier.apply(bucketName, props);
-
- if (StringUtils.isNullOrEmpty(this.bucketName)) {
- throw new IllegalArgumentException("LockFileUri does not contain a
valid bucket name.");
- }
- if (StringUtils.isNullOrEmpty(this.lockFilePath)) {
- throw new IllegalArgumentException("LockFileUri does not contain a
valid lock file path.");
- }
- this.ownerId = ownerId;
- this.logger = logger;
- } catch (URISyntaxException e) {
- throw new HoodieLockException(e);
- }
+ Pair<String, String> bucketAndPath =
StorageLockClient.parseBucketAndPath(lockFileUri);
+ this.bucketName = bucketAndPath.getLeft();
+ this.lockFilePath = bucketAndPath.getRight();
+
+ this.s3Client = s3ClientSupplier.apply(bucketName, props);
+ this.ownerId = ownerId;
+ this.logger = logger;
}
@Override
public Pair<LockGetResult, Option<StorageLockFile>> readCurrentLockFile() {
try (ResponseInputStream<GetObjectResponse> in = s3Client.getObject(
- GetObjectRequest.builder()
- .bucket(bucketName)
- .key(lockFilePath)
- .build())) {
+ GetObjectRequest.builder()
+ .bucket(bucketName)
+ .key(lockFilePath)
+ .build())) {
String eTag = in.response().eTag();
return Pair.of(LockGetResult.SUCCESS,
Option.of(StorageLockFile.createFromStream(in, eTag)));
} catch (S3Exception e) {
@@ -241,16 +227,16 @@ public class S3StorageLockClient implements
StorageLockClient {
S3Client s3Client = createS3Client(region, s3CallTimeoutSecs, props);
if (requiredFallbackRegion) {
GetBucketLocationResponse bucketLocationResponse =
s3Client.getBucketLocation(
- GetBucketLocationRequest.builder().bucket(bucketName).build());
+ GetBucketLocationRequest.builder().bucket(bucketName).build());
// This is null when the region is US_EAST_1, so we do not need to
worry about duplicate logic.
String regionString =
bucketLocationResponse.locationConstraintAsString();
if (!StringUtils.isNullOrEmpty(regionString)) {
// Close existing client and create another.
s3Client.close();
return createS3Client(
- Region.of(regionString),
- s3CallTimeoutSecs,
- props);
+ Region.of(regionString),
+ s3CallTimeoutSecs,
+ props);
}
}
@@ -261,10 +247,80 @@ public class S3StorageLockClient implements
StorageLockClient {
private static S3Client createS3Client(Region region, long timeoutSecs,
Properties props) {
// Set the timeout, credentials, and region
return S3Client.builder()
- .overrideConfiguration(
- b -> b.apiCallTimeout(Duration.ofSeconds(timeoutSecs)))
-
.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(props))
- .region(region).build();
+ .overrideConfiguration(
+ b -> b.apiCallTimeout(Duration.ofSeconds(timeoutSecs)))
+
.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(props))
+ .region(region).build();
+ }
+
+ @Override
+ public Option<String> readObject(String filePath, boolean checkExistsFirst) {
+ try {
+ // Parse the file path to get bucket and key
+ Pair<String, String> bucketAndKey =
StorageLockClient.parseBucketAndPath(filePath);
+ String bucket = bucketAndKey.getLeft();
+ String key = bucketAndKey.getRight();
+
+ if (checkExistsFirst) {
+ // First check if the file exists (lightweight HEAD request)
+ try {
+ s3Client.headObject(HeadObjectRequest.builder()
+ .bucket(bucket)
+ .key(key)
+ .build());
+ } catch (S3Exception e) {
+ if (e.statusCode() == NOT_FOUND_ERROR_CODE) {
+ // File doesn't exist - this is the common case for optional
configs
+ logger.debug("JSON config file not found: {}", filePath);
+ return Option.empty();
+ }
+ throw e; // Re-throw other errors
+ }
+ }
+
+ // Read the file (either after existence check or directly)
+ String content = s3Client.getObjectAsBytes(
+ GetObjectRequest.builder()
+ .bucket(bucket)
+ .key(key)
+ .build()).asUtf8String();
+
+ return Option.of(content);
+ } catch (S3Exception e) {
+ if (e.statusCode() == NOT_FOUND_ERROR_CODE) {
+ logger.debug("JSON config file not found: {}", filePath);
+ return Option.empty();
+ }
+ logger.warn("Error reading JSON config file: {}", filePath, e);
+ return Option.empty();
+ } catch (Exception e) {
+ logger.warn("Error reading JSON config file: {}", filePath, e);
+ return Option.empty();
+ }
+ }
+
+ @Override
+ public boolean writeObject(String filePath, String content) {
+ try {
+ // Parse the file path to get bucket and key
+ Pair<String, String> bucketAndPath =
StorageLockClient.parseBucketAndPath(filePath);
+ String bucket = bucketAndPath.getLeft();
+ String key = bucketAndPath.getRight();
+
+ // Write the content to S3
+ s3Client.putObject(
+ PutObjectRequest.builder()
+ .bucket(bucket)
+ .key(key)
+ .build(),
+ RequestBody.fromString(content));
+
+ logger.debug("Successfully wrote object to: {}", filePath);
+ return true;
+ } catch (Exception e) {
+ logger.warn("Error writing object to: {}", filePath, e);
+ return false;
+ }
}
@Override
diff --git
a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClient.java
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClient.java
index b8b3f077ac66..ce73a2556f61 100644
---
a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClient.java
+++
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClient.java
@@ -142,7 +142,7 @@ class TestS3StorageLockClient {
(a,b) -> mockS3Client,
mockLogger
));
- assertTrue(ex.getMessage().contains("lock file path"));
+ assertTrue(ex.getMessage().contains("path"));
}
@Test
diff --git
a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClientAuditOperations.java
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClientAuditOperations.java
new file mode 100644
index 000000000000..b92433806644
--- /dev/null
+++
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/TestS3StorageLockClientAuditOperations.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.aws.transaction.lock;
+
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.contains;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for S3StorageLockClient audit operations (readObject and writeObject
methods)
+ */
+public class TestS3StorageLockClientAuditOperations {
+
+ private S3Client mockS3Client;
+ private Logger mockLogger;
+ private S3StorageLockClient lockClient;
+
+ @BeforeEach
+ void setUp() {
+ mockS3Client = mock(S3Client.class);
+ mockLogger = mock(Logger.class);
+ String ownerId = "test-owner";
+ String lockFileUri =
"s3://test-bucket/table/.hoodie/.locks/table_lock.json";
+ lockClient = new S3StorageLockClient(
+ ownerId,
+ lockFileUri,
+ new Properties(),
+ (bucket, props) -> mockS3Client,
+ mockLogger);
+ }
+
+ @Test
+ void testReadConfigWithCheckExistsFirstFileNotFound() {
+ String configPath =
"s3://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+
+ // HEAD request returns 404
+ S3Exception notFoundException = (S3Exception) S3Exception.builder()
+ .statusCode(404)
+ .message("Not Found")
+ .build();
+ when(mockS3Client.headObject(any(HeadObjectRequest.class)))
+ .thenThrow(notFoundException);
+
+ Option<String> result = lockClient.readObject(configPath, true);
+
+ assertTrue(result.isEmpty());
+ // Should only call HEAD, not GET
+ verify(mockS3Client, times(1)).headObject(any(HeadObjectRequest.class));
+ verify(mockS3Client,
never()).getObjectAsBytes(any(GetObjectRequest.class));
+ }
+
+ @Test
+ void testReadConfigWithCheckExistsFirstFileExists() {
+ String configPath =
"s3://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+ String expectedContent = "{\"STORAGE_LP_AUDIT_SERVICE_ENABLED\": true}";
+
+ // HEAD request succeeds
+ HeadObjectResponse headResponse = HeadObjectResponse.builder().build();
+ when(mockS3Client.headObject(any(HeadObjectRequest.class)))
+ .thenReturn(headResponse);
+
+ // GET request returns content
+ ResponseBytes<GetObjectResponse> responseBytes =
ResponseBytes.fromByteArray(
+ GetObjectResponse.builder().build(),
+ expectedContent.getBytes(StandardCharsets.UTF_8));
+ when(mockS3Client.getObjectAsBytes(any(GetObjectRequest.class)))
+ .thenReturn(responseBytes);
+
+ Option<String> result = lockClient.readObject(configPath, true);
+
+ assertTrue(result.isPresent());
+ assertEquals(expectedContent, result.get());
+ // Should call both HEAD and GET
+ verify(mockS3Client, times(1)).headObject(any(HeadObjectRequest.class));
+ verify(mockS3Client,
times(1)).getObjectAsBytes(any(GetObjectRequest.class));
+ }
+
+ @Test
+ void testReadConfigWithoutCheckExistsFirstFileNotFound() {
+ String configPath =
"s3://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+
+ // Direct GET request returns 404
+ S3Exception notFoundException = (S3Exception) S3Exception.builder()
+ .statusCode(404)
+ .message("Not Found")
+ .build();
+ when(mockS3Client.getObjectAsBytes(any(GetObjectRequest.class)))
+ .thenThrow(notFoundException);
+
+ Option<String> result = lockClient.readObject(configPath, false);
+
+ assertTrue(result.isEmpty());
+ // Should not call HEAD, only GET
+ verify(mockS3Client, never()).headObject(any(HeadObjectRequest.class));
+ verify(mockS3Client,
times(1)).getObjectAsBytes(any(GetObjectRequest.class));
+ }
+
+ @Test
+ void testReadConfigWithoutCheckExistsFirstFileExists() {
+ String configPath =
"s3://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+ String expectedContent = "{\"STORAGE_LP_AUDIT_SERVICE_ENABLED\": false}";
+
+ // Direct GET request returns content
+ ResponseBytes<GetObjectResponse> responseBytes =
ResponseBytes.fromByteArray(
+ GetObjectResponse.builder().build(),
+ expectedContent.getBytes(StandardCharsets.UTF_8));
+ when(mockS3Client.getObjectAsBytes(any(GetObjectRequest.class)))
+ .thenReturn(responseBytes);
+
+ Option<String> result = lockClient.readObject(configPath, false);
+
+ assertTrue(result.isPresent());
+ assertEquals(expectedContent, result.get());
+ // Should not call HEAD, only GET
+ verify(mockS3Client, never()).headObject(any(HeadObjectRequest.class));
+ verify(mockS3Client,
times(1)).getObjectAsBytes(any(GetObjectRequest.class));
+ }
+
+ @Test
+ void testReadConfigWithCheckExistsFirstOtherS3Error() {
+ String configPath =
"s3://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+
+ // HEAD request returns non-404 error
+ S3Exception serverError = (S3Exception) S3Exception.builder()
+ .statusCode(500)
+ .message("Internal Server Error")
+ .build();
+ when(mockS3Client.headObject(any(HeadObjectRequest.class)))
+ .thenThrow(serverError);
+
+ Option<String> result = lockClient.readObject(configPath, true);
+
+ assertTrue(result.isEmpty());
+ verify(mockS3Client, times(1)).headObject(any(HeadObjectRequest.class));
+ verify(mockS3Client,
never()).getObjectAsBytes(any(GetObjectRequest.class));
+ }
+
+ @Test
+ void testReadConfigWithInvalidUri() {
+ String invalidPath = "not-a-valid-uri";
+
+ Option<String> result = lockClient.readObject(invalidPath, false);
+
+ assertTrue(result.isEmpty());
+ // Should not make any S3 calls due to URI parsing error
+ verify(mockS3Client, never()).headObject(any(HeadObjectRequest.class));
+ }
+
+ @Test
+ void testReadConfigWithRateLimitError() {
+ String configPath =
"s3://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+
+ // GET request returns rate limit error
+ S3Exception rateLimitException = (S3Exception) S3Exception.builder()
+ .statusCode(429)
+ .message("Too Many Requests")
+ .build();
+ when(mockS3Client.getObjectAsBytes(any(GetObjectRequest.class)))
+ .thenThrow(rateLimitException);
+
+ Option<String> result = lockClient.readObject(configPath, false);
+
+ assertTrue(result.isEmpty());
+ verify(mockS3Client,
times(1)).getObjectAsBytes(any(GetObjectRequest.class));
+ }
+
+ // ================================
+ // writeObject() tests
+ // ================================
+
+ @Test
+ void testWriteObject_success() {
+ String filePath = "s3://test-bucket/audit/test-audit.jsonl";
+ String content = "{\"test\": \"data\"}\n";
+ PutObjectResponse putResp =
PutObjectResponse.builder().eTag("write-etag-123").build();
+ when(mockS3Client.putObject(any(PutObjectRequest.class),
any(RequestBody.class))).thenReturn(putResp);
+
+ boolean result = lockClient.writeObject(filePath, content);
+
+ assertTrue(result);
+ verify(mockS3Client, times(1)).putObject(
+
eq(PutObjectRequest.builder().bucket("test-bucket").key("audit/test-audit.jsonl").build()),
+ any(RequestBody.class)
+ );
+ verify(mockLogger).debug("Successfully wrote object to: {}", filePath);
+ }
+
+ @Test
+ void testWriteObject_s3Exception() {
+ String filePath = "s3://test-bucket/audit/test-audit.jsonl";
+ String content = "{\"test\": \"data\"}\n";
+ AwsServiceException s3Exception =
S3Exception.builder().statusCode(500).build();
+ when(mockS3Client.putObject(any(PutObjectRequest.class),
any(RequestBody.class))).thenThrow(s3Exception);
+
+ boolean result = lockClient.writeObject(filePath, content);
+
+ assertFalse(result);
+ verify(mockLogger).warn(contains("Error writing object to"), eq(filePath),
eq(s3Exception));
+ }
+
+ @Test
+ void testWriteObject_invalidPath() {
+ String invalidPath = "invalid-path";
+ String content = "{\"test\": \"data\"}\n";
+
+ boolean result = lockClient.writeObject(invalidPath, content);
+
+ assertFalse(result);
+ verify(mockLogger).warn(contains("Error writing object to"),
eq(invalidPath), any(Exception.class));
+ }
+
+ @Test
+ void testWriteObject_emptyContent() {
+ String filePath = "s3://test-bucket/audit/empty-content.jsonl";
+ String content = "";
+ PutObjectResponse putResp =
PutObjectResponse.builder().eTag("empty-etag-456").build();
+ when(mockS3Client.putObject(any(PutObjectRequest.class),
any(RequestBody.class))).thenReturn(putResp);
+
+ boolean result = lockClient.writeObject(filePath, content);
+
+ assertTrue(result);
+ verify(mockS3Client, times(1)).putObject(
+
eq(PutObjectRequest.builder().bucket("test-bucket").key("audit/empty-content.jsonl").build()),
+ any(RequestBody.class)
+ );
+ verify(mockLogger).debug("Successfully wrote object to: {}", filePath);
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
index 8c8d0f7d3547..31757906e595 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java
@@ -18,6 +18,9 @@
package org.apache.hudi.client.transaction.lock;
+import org.apache.hudi.client.transaction.lock.audit.AuditOperationState;
+import org.apache.hudi.client.transaction.lock.audit.AuditService;
+import org.apache.hudi.client.transaction.lock.audit.AuditServiceFactory;
import org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics;
import org.apache.hudi.client.transaction.lock.models.HeartbeatManager;
import org.apache.hudi.client.transaction.lock.models.LockGetResult;
@@ -62,7 +65,6 @@ import static
org.apache.hudi.common.lock.LockState.FAILED_TO_ACQUIRE;
import static org.apache.hudi.common.lock.LockState.FAILED_TO_RELEASE;
import static org.apache.hudi.common.lock.LockState.RELEASED;
import static org.apache.hudi.common.lock.LockState.RELEASING;
-import static
org.apache.hudi.common.table.HoodieTableMetaClient.LOCKS_FOLDER_NAME;
/**
* A distributed filesystem storage based lock provider. This {@link
LockProvider} implementation
@@ -97,6 +99,8 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
private final HeartbeatManager heartbeatManager;
private final transient Thread shutdownThread;
private final Option<HoodieLockMetrics> hoodieLockMetrics;
+ private Option<AuditService> auditService;
+ private final String basePath;
@GuardedBy("this")
private StorageLockFile currentLockObj = null;
@@ -114,25 +118,25 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
/**
* Default constructor for StorageBasedLockProvider, required by LockManager
* to instantiate it using reflection.
- *
+ *
* @param lockConfiguration The lock configuration, should be transformable
into
* StorageBasedLockConfig
* @param conf Storage config, ignored.
*/
public StorageBasedLockProvider(final LockConfiguration lockConfiguration,
final StorageConfiguration<?> conf) {
this(
- UUID.randomUUID().toString(),
- lockConfiguration.getConfig(),
- LockProviderHeartbeatManager::new,
- getStorageLockClientClassName(),
- LOGGER,
+ UUID.randomUUID().toString(),
+ lockConfiguration.getConfig(),
+ LockProviderHeartbeatManager::new,
+ getStorageLockClientClassName(),
+ LOGGER,
null);
}
/**
* Constructor for StorageBasedLockProvider with HoodieLockMetrics support.
* This constructor allows lock providers to access metrics for fine-grained
metrics collection.
- *
+ *
* @param lockConfiguration The lock configuration, should be transformable
into
* StorageBasedLockConfig
* @param conf Storage config, ignored.
@@ -140,21 +144,21 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
*/
public StorageBasedLockProvider(final LockConfiguration lockConfiguration,
final StorageConfiguration<?> conf, final HoodieLockMetrics metrics) {
this(
- UUID.randomUUID().toString(),
- lockConfiguration.getConfig(),
- LockProviderHeartbeatManager::new,
- getStorageLockClientClassName(),
- LOGGER,
- metrics);
+ UUID.randomUUID().toString(),
+ lockConfiguration.getConfig(),
+ LockProviderHeartbeatManager::new,
+ getStorageLockClientClassName(),
+ LOGGER,
+ metrics);
}
private static Functions.Function3<String, String, TypedProperties,
StorageLockClient> getStorageLockClientClassName() {
return (ownerId, lockFilePath, lockConfig) -> {
try {
return (StorageLockClient) ReflectionUtils.loadClass(
- getLockServiceClassName(new URI(lockFilePath).getScheme()),
- new Class<?>[]{String.class, String.class, Properties.class},
- new Object[]{ownerId, lockFilePath, lockConfig});
+ getLockServiceClassName(new URI(lockFilePath).getScheme()),
+ new Class<?>[] {String.class, String.class, Properties.class},
+ new Object[] {ownerId, lockFilePath, lockConfig});
} catch (Throwable e) {
throw new HoodieLockException("Failed to load and initialize
StorageLock", e);
}
@@ -181,18 +185,15 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
StorageBasedLockConfig config = new
StorageBasedLockConfig.Builder().fromProperties(properties).build();
long heartbeatPollSeconds = config.getHeartbeatPollSeconds();
this.lockValiditySecs = config.getValiditySeconds();
- this.lockFilePath = String.format(
- "%s%s%s%s%s",
- config.getHudiTableBasePath(),
- StoragePath.SEPARATOR,
- LOCKS_FOLDER_NAME,
- StoragePath.SEPARATOR,
- DEFAULT_TABLE_LOCK_FILE_NAME);
+ this.basePath = config.getHudiTableBasePath();
+ String lockFolderPath = StorageLockClient.getLockFolderPath(basePath);
+ this.lockFilePath = String.format("%s%s%s", lockFolderPath,
StoragePath.SEPARATOR, DEFAULT_TABLE_LOCK_FILE_NAME);
this.heartbeatManager = heartbeatManagerLoader.apply(ownerId,
TimeUnit.SECONDS.toMillis(heartbeatPollSeconds), this::renewLock);
this.storageLockClient = storageLockClientLoader.apply(ownerId,
lockFilePath, properties);
this.ownerId = ownerId;
this.logger = logger;
this.hoodieLockMetrics = Option.ofNullable(hoodieLockMetrics);
+ this.auditService = Option.empty(); // Will be created lazily on first
lock acquisition
shutdownThread = new Thread(() -> shutdown(true));
Runtime.getRuntime().addShutdownHook(shutdownThread);
logger.info("Instantiated new storage-based lock provider, owner: {},
lockfilePath: {}", ownerId, lockFilePath);
@@ -267,6 +268,18 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
} catch (Exception e) {
logger.error("Owner {}: Heartbeat manager failed to close.", ownerId, e);
}
+ try {
+ this.auditService.ifPresent(auditService -> {
+ try {
+ auditService.close();
+ } catch (Exception e) {
+ logger.error("Owner {}: Audit service failed to close.", ownerId, e);
+ }
+ });
+ this.auditService = Option.empty();
+ } catch (Exception e) {
+ logger.error("Owner {}: Failed to close audit service.", ownerId, e);
+ }
this.isClosed = true;
}
@@ -290,7 +303,7 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
/**
* Attempts a single pass to acquire the lock (non-blocking).
- *
+ *
* @return true if lock acquired, false otherwise
*/
@Override
@@ -327,7 +340,9 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
}
// Try to acquire the lock
- StorageLockData newLockData = new StorageLockData(false,
getCurrentEpochMs() + TimeUnit.SECONDS.toMillis(lockValiditySecs), ownerId);
+ long acquisitionTimestamp = getCurrentEpochMs();
+ long lockExpirationMs = calculateLockExpiration(acquisitionTimestamp);
+ StorageLockData newLockData = new StorageLockData(false, lockExpirationMs,
ownerId);
Pair<LockUpsertResult, Option<StorageLockFile>> lockUpdateStatus =
this.storageLockClient.tryUpsertLockFile(
newLockData,
latestLock.getRight());
@@ -359,6 +374,15 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
}
logInfoLockState(ACQUIRED);
+
+ // Create audit service lazily on first successful lock acquisition if
auditing is enabled
+ if (auditService.isEmpty()) {
+ auditService = AuditServiceFactory.createLockProviderAuditService(
+ ownerId, basePath, storageLockClient, acquisitionTimestamp,
+ this::calculateLockExpiration, this::actuallyHoldsLock);
+ }
+
+ recordAuditOperation(AuditOperationState.START, acquisitionTimestamp);
return true;
}
@@ -371,7 +395,7 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
* valid only if it exists and has not expired according to its timestamp.
*
* @return {@code true} if this provider holds a valid lock, {@code false}
- * otherwise
+ * otherwise
*/
private boolean actuallyHoldsLock() {
return believesLockMightBeHeld() && isLockStillValid(getLock());
@@ -390,7 +414,7 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
* StorageBasedLockProvider#actuallyHoldsLock should be used.
*
* @return {@code true} if this provider has a non-null lock object,
- * {@code false} otherwise
+ * {@code false} otherwise
* @see StorageBasedLockProvider#actuallyHoldsLock()
*/
private boolean believesLockMightBeHeld() {
@@ -439,6 +463,7 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
/**
* Tries to expire the currently held lock.
+ *
* @param fromShutdownHook Whether we are attempting best effort quick
unlock from shutdown hook.
* @return True if we were successfully able to upload an expired lock.
*/
@@ -463,6 +488,7 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
return false;
case SUCCESS:
logInfoLockState(RELEASED);
+ recordAuditOperation(AuditOperationState.END,
System.currentTimeMillis());
setLock(null);
return true;
case ACQUIRED_BY_OTHERS:
@@ -481,6 +507,7 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
* Renews (heartbeats) the current lock if we are the holder, it forcefully
set
* the expiration flag
* to false and the lock expiration time to a later time in the future.
+ *
* @return True if we successfully renewed the lock, false if not.
*/
@VisibleForTesting
@@ -505,8 +532,10 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
// Action taken for corner case 2 is just a best effort mitigation. At
least it
// prevents further data corruption by
// letting someone else acquire the lock.
+ long acquisitionTimestamp = getCurrentEpochMs();
+ long lockExpirationMs = calculateLockExpiration(acquisitionTimestamp);
Pair<LockUpsertResult, Option<StorageLockFile>> currentLock =
this.storageLockClient.tryUpsertLockFile(
- new StorageLockData(false, getCurrentEpochMs() +
TimeUnit.SECONDS.toMillis(lockValiditySecs), ownerId),
+ new StorageLockData(false, lockExpirationMs, ownerId),
Option.of(getLock()));
switch (currentLock.getLeft()) {
case ACQUIRED_BY_OTHERS:
@@ -529,6 +558,7 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
(int) (oldExpirationMs - getCurrentEpochMs())));
logger.info("Owner {}: Lock renewal successful. The renewal
completes {} ms before expiration for lock {}.",
ownerId, oldExpirationMs - getCurrentEpochMs(), lockFilePath);
+ recordAuditOperation(AuditOperationState.RENEW,
acquisitionTimestamp);
// Let heartbeat continue to renew lock lease again later.
return true;
default:
@@ -556,11 +586,11 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
private String generateLockStateMessage(LockState state) {
String threadName = Thread.currentThread().getName();
return String.format(
- "Owner %s: Lock file path %s, Thread %s, Storage based lock state
%s",
- ownerId,
- lockFilePath,
- threadName,
- state.toString());
+ "Owner %s: Lock file path %s, Thread %s, Storage based lock state %s",
+ ownerId,
+ lockFilePath,
+ threadName,
+ state.toString());
}
private static final String LOCK_STATE_LOGGER_MSG = "Owner {}: Lock file
path {}, Thread {}, Storage based lock state {}";
@@ -590,4 +620,29 @@ public class StorageBasedLockProvider implements
LockProvider<StorageLockFile> {
long getCurrentEpochMs() {
return System.currentTimeMillis();
}
+
+ /**
+ * Calculates the lock expiration time based on given timestamp and validity
period.
+ * This is the shared function used by both the lock provider and audit
service.
+ *
+ * @param timestamp The base timestamp to calculate expiration from
+ * @return Lock expiration time in milliseconds
+ */
+ private long calculateLockExpiration(long timestamp) {
+ return timestamp + TimeUnit.SECONDS.toMillis(lockValiditySecs);
+ }
+
+ /**
+ * Helper method to record audit operations.
+ */
+ private void recordAuditOperation(AuditOperationState state, long timestamp)
{
+ auditService.ifPresent(service -> {
+ try {
+ service.recordOperation(state, timestamp);
+ } catch (Exception e) {
+ // Log but don't fail the lock operation due to recording failures
+ logger.warn("Owner {}: Failed to record audit operation {}: {}",
ownerId, state, e.getMessage());
+ }
+ });
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
index ac8ff2ed9d0d..865013f3acaa 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
@@ -23,7 +23,15 @@ import
org.apache.hudi.client.transaction.lock.models.StorageLockData;
import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
import org.apache.hudi.client.transaction.lock.models.LockGetResult;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.hudi.storage.StoragePath;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import static
org.apache.hudi.common.table.HoodieTableMetaClient.LOCKS_FOLDER_NAME;
/**
* Defines a contract for a service which should be able to perform
conditional writes to object storage.
@@ -34,7 +42,8 @@ public interface StorageLockClient extends AutoCloseable {
/**
* Tries to create or update a lock file.
* All non pre-condition failure related errors should be returned as
UNKNOWN_ERROR.
- * @param newLockData The new data to write to the lock file
+ *
+ * @param newLockData The new data to write to the lock file
* @param previousLockFile The previous lock file
* @return A pair containing the result state and the new lock file (if
successful)
*/
@@ -44,7 +53,69 @@ public interface StorageLockClient extends AutoCloseable {
/**
* Reads the current lock file.
+ *
* @return The lock retrieve result and the current lock file if
successfully retrieved.
- * */
+ */
Pair<LockGetResult, Option<StorageLockFile>> readCurrentLockFile();
+
+ /**
+ * Reads an object from the specified path.
+ * This method is intended for reading small files (e.g., audit config,
feature flags, JSON objects)
+ * and loads the entire file into memory. Do not use for large files.
+ *
+ * @param filePath The path to the object file to read
+ * @param checkExistsFirst If true, performs a lightweight existence check
before attempting to read.
+ * Recommended when the file is unlikely to exist
(e.g., optional configs).
+ * @return An Option containing the content as a string if successful,
Option.empty() otherwise
+ */
+ Option<String> readObject(String filePath, boolean checkExistsFirst);
+
+ /**
+ * Writes an object to the specified path.
+ * This method is intended for writing small files (e.g., audit logs,
configuration files)
+ * and should not be used for large files.
+ *
+ * @param filePath The path where the object should be written
+ * @param content The content to write as a string
+ * @return true if the write was successful, false otherwise
+ */
+ boolean writeObject(String filePath, String content);
+
+ /**
+ * Gets the lock folder path for the given base path.
+ * This is a static utility method that can be used without creating an
instance.
+ *
+ * @param basePath The base path of the Hudi table
+ * @return The lock folder path (e.g., "s3://bucket/table/.hoodie/locks")
+ */
+ static String getLockFolderPath(String basePath) {
+ return String.format("%s%s%s", basePath, StoragePath.SEPARATOR,
LOCKS_FOLDER_NAME);
+ }
+
+ /**
+ * Parses a URI and returns bucket name and path as a Pair.
+ * This is a shared utility method for all storage lock client
implementations.
+ *
+ * @param uriString The URI string to parse
+ * @return A Pair containing bucket name (left) and path (right)
+ * @throws HoodieLockException if URI parsing fails or components are invalid
+ */
+ static Pair<String, String> parseBucketAndPath(String uriString) {
+ try {
+ URI uri = new URI(uriString);
+ String bucketName = uri.getAuthority();
+ String path = uri.getPath().replaceFirst("/", "");
+
+ if (StringUtils.isNullOrEmpty(bucketName)) {
+ throw new IllegalArgumentException("URI does not contain a valid
bucket name.");
+ }
+ if (StringUtils.isNullOrEmpty(path)) {
+ throw new IllegalArgumentException("URI does not contain a valid
path.");
+ }
+
+ return Pair.of(bucketName, path);
+ } catch (URISyntaxException e) {
+ throw new HoodieLockException("Failed to parse URI: " + uriString, e);
+ }
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditOperationState.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditOperationState.java
new file mode 100644
index 000000000000..700a8dd79be7
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditOperationState.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.audit;
+
+/**
+ * Enumeration of audit operation states for different operations.
+ */
+public enum AuditOperationState {
+ /**
+ * Operation started.
+ */
+ START,
+
+ /**
+ * Operation renewal/heartbeat.
+ */
+ RENEW,
+
+ /**
+ * Operation end.
+ */
+ END
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditService.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditService.java
new file mode 100644
index 000000000000..bdc062f519af
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditService.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.audit;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+
+/**
+ * Generic audit service interface for tracking operation lifecycles.
+ * Provides a single method for recording all types of audit operations.
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface AuditService extends AutoCloseable {
+
+ /**
+ * Records an audit operation with the given state and timestamp.
+ *
+ * @param state The type of operation (START, RENEW, END)
+ * @param timestamp When the operation occurred
+ * @throws Exception if the operation cannot be recorded
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ void recordOperation(AuditOperationState state, long timestamp) throws
Exception;
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditServiceFactory.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditServiceFactory.java
new file mode 100644
index 000000000000..eb9accaf4fdc
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditServiceFactory.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.audit;
+
+import org.apache.hudi.client.transaction.lock.StorageLockClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.storage.StoragePath;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Generic factory for creating audit services.
+ * This factory determines whether auditing is enabled by checking
configuration files.
+ */
+public class AuditServiceFactory {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AuditServiceFactory.class);
+ private static final String AUDIT_CONFIG_FILE_NAME = "audit_enabled.json";
+ private static final String STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD =
"STORAGE_LOCK_AUDIT_SERVICE_ENABLED";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ /**
+ * Creates a lock provider audit service instance by checking the audit
configuration file.
+ *
+ * @param ownerId The owner ID for the lock provider
+ * @param basePath The base path of the Hudi table
+ * @param storageLockClient The storage lock client to use for reading
configuration
+ * @param transactionStartTime The timestamp when the transaction started
(lock acquired)
+ * @param lockExpirationFunction Function that takes a timestamp and returns
the lock expiration time
+ * @param lockHeldSupplier Supplier that provides whether the lock is
currently held
+ * @return An Option containing the audit service if enabled, Option.empty()
otherwise
+ */
+ public static Option<AuditService> createLockProviderAuditService(
+ String ownerId,
+ String basePath,
+ StorageLockClient storageLockClient,
+ long transactionStartTime,
+ Function<Long, Long> lockExpirationFunction,
+ Supplier<Boolean> lockHeldSupplier) {
+
+ if (!isAuditEnabled(basePath, storageLockClient)) {
+ return Option.empty();
+ }
+
+ // No-op, will add in a follow up
+ return Option.empty();
+ }
+
+ /**
+ * Checks if audit is enabled by reading the audit configuration file.
+ *
+ * @param basePath The base path of the Hudi table
+ * @param storageLockClient The storage lock client to use for reading
configuration
+ * @return true if audit is enabled, false otherwise
+ */
+ private static boolean isAuditEnabled(String basePath, StorageLockClient
storageLockClient) {
+ try {
+ // Construct the audit config path using the same lock folder as the
lock file
+ String lockFolderPath = StorageLockClient.getLockFolderPath(basePath);
+ String auditConfigPath = String.format("%s%s%s", lockFolderPath,
StoragePath.SEPARATOR, AUDIT_CONFIG_FILE_NAME);
+
+ LOG.debug("Checking for audit configuration at: {}", auditConfigPath);
+
+ // Use the readObject method to read the config file
+ // Pass true for checkExistsFirst since audit config is rarely present
(99% miss rate)
+ Option<String> jsonContent =
storageLockClient.readObject(auditConfigPath, true);
+
+ if (jsonContent.isPresent()) {
+ LOG.debug("Audit configuration file found, parsing content");
+ JsonNode rootNode = OBJECT_MAPPER.readTree(jsonContent.get());
+ JsonNode enabledNode =
rootNode.get(STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD);
+
+ boolean isEnabled = enabledNode != null &&
enabledNode.asBoolean(false);
+
+ if (isEnabled) {
+ LOG.info("Audit logging is ENABLED for lock operations at base path:
{}", basePath);
+ } else {
+ LOG.info("Audit configuration present but audit logging is
DISABLED");
+ }
+
+ return isEnabled;
+ }
+
+ LOG.debug("No audit configuration file found at: {}", auditConfigPath);
+ return false;
+ } catch (Exception e) {
+ LOG.error("Error reading audit configuration from base path: {}. Audit
will be disabled.", basePath, e);
+ return false;
+ }
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
index 5d51a2177939..4ae637cbdc18 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
@@ -56,7 +56,10 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.refEq;
import static org.mockito.Mockito.atLeastOnce;
@@ -66,6 +69,7 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -86,6 +90,8 @@ class TestStorageBasedLockProvider {
mockHeartbeatManager = mock(HeartbeatManager.class);
mockLogger = mock(Logger.class);
when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true);
+ // Mock the readObject method to return Option.empty() to prevent NPE in
audit service creation
+ when(mockLockService.readObject(anyString(),
anyBoolean())).thenReturn(Option.empty());
TypedProperties props = new TypedProperties();
props.put(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "10");
props.put(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "1");
@@ -94,8 +100,8 @@ class TestStorageBasedLockProvider {
lockProvider = spy(new StorageBasedLockProvider(
ownerId,
props,
- (a,b,c) -> mockHeartbeatManager,
- (a,b,c) -> mockLockService,
+ (a, b, c) -> mockHeartbeatManager,
+ (a, b, c) -> mockLockService,
mockLogger,
null));
}
@@ -119,8 +125,8 @@ class TestStorageBasedLockProvider {
}
@ParameterizedTest
- @ValueSource(strings = { "gs://bucket/lake/db/tbl-default",
"s3://bucket/lake/db/tbl-default",
- "s3a://bucket/lake/db/tbl-default" })
+ @ValueSource(strings = {"gs://bucket/lake/db/tbl-default",
"s3://bucket/lake/db/tbl-default",
+ "s3a://bucket/lake/db/tbl-default"})
void testNonExistentWriteServiceWithDefaults(String tableBasePathString) {
TypedProperties props = new TypedProperties();
props.put(BASE_PATH.key(), tableBasePathString);
@@ -533,7 +539,7 @@ class TestStorageBasedLockProvider {
StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
StorageLockFile realLockFile = new StorageLockFile(data, "v1");
when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty())))
- .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
Option.of(realLockFile)));
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
Option.of(realLockFile)));
when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
boolean acquired = lockProvider.tryLock();
@@ -542,7 +548,7 @@ class TestStorageBasedLockProvider {
verify(mockLockService, atLeastOnce()).tryUpsertLockFile(any(), any());
when(mockLockService.tryUpsertLockFile(any(StorageLockData.class),
eq(Option.of(realLockFile))))
- .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
Option.of(realLockFile)));
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS,
Option.of(realLockFile)));
// Mock shutdown
Method shutdownMethod =
lockProvider.getClass().getDeclaredMethod("shutdown", boolean.class);
@@ -667,10 +673,10 @@ class TestStorageBasedLockProvider {
props.put(BASE_PATH.key(), "gs://bucket/lake/db/tbl-default");
LockConfiguration lockConfiguration = new LockConfiguration(props);
StorageConfiguration<?> storageConf =
HoodieTestUtils.getDefaultStorageConf();
-
+
// Create a mock HoodieLockMetrics object
HoodieLockMetrics mockMetrics = mock(HoodieLockMetrics.class);
-
+
// Test that constructor with metrics works by using the internal
constructor to avoid scheme issues
StorageBasedLockProvider lockProviderWithMetrics = null;
try {
@@ -679,22 +685,22 @@ class TestStorageBasedLockProvider {
assertThrows(Exception.class, () -> {
new StorageBasedLockProvider(lockConfiguration, storageConf,
mockMetrics);
}, "Constructor should exist but fail during lock client instantiation");
-
+
// Now create a working instance using the internal constructor for
proper validation
lockProviderWithMetrics = new StorageBasedLockProvider(
UUID.randomUUID().toString(),
props,
- (a,b,c) -> mock(HeartbeatManager.class),
- (a,b,c) -> new StubStorageLockClient(a, b, new Properties()),
+ (a, b, c) -> mock(HeartbeatManager.class),
+ (a, b, c) -> new StubStorageLockClient(a, b, new Properties()),
mock(Logger.class),
mockMetrics);
-
+
// Verify the lock provider was created successfully
assertNotNull(lockProviderWithMetrics, "StorageBasedLockProvider should
be created successfully");
-
+
// Verify that it can perform basic operations
assertNull(lockProviderWithMetrics.getLock(), "Initially should have no
lock");
-
+
} catch (Exception e) {
fail("StorageBasedLockProvider creation should not throw unexpected
exception: " + e.getMessage());
} finally {
@@ -704,7 +710,7 @@ class TestStorageBasedLockProvider {
}
}
- @Test
+ @Test
public void testStorageBasedLockProviderStandardConstructor() {
// Create test configuration
TypedProperties props = new TypedProperties();
@@ -713,7 +719,7 @@ class TestStorageBasedLockProvider {
props.put(BASE_PATH.key(), "gs://bucket/lake/db/tbl-default");
LockConfiguration lockConfiguration = new LockConfiguration(props);
StorageConfiguration<?> storageConf =
HoodieTestUtils.getDefaultStorageConf();
-
+
// Test that standard constructor works by using the internal constructor
to avoid scheme issues
StorageBasedLockProvider lockProviderStandard = null;
try {
@@ -722,22 +728,22 @@ class TestStorageBasedLockProvider {
assertThrows(Exception.class, () -> {
new StorageBasedLockProvider(lockConfiguration, storageConf);
}, "Standard constructor should exist but fail during lock client
instantiation");
-
+
// Now create a working instance using the internal constructor for
proper validation
lockProviderStandard = new StorageBasedLockProvider(
UUID.randomUUID().toString(),
props,
- (a,b,c) -> mock(HeartbeatManager.class),
- (a,b,c) -> new StubStorageLockClient(a, b, new Properties()),
+ (a, b, c) -> mock(HeartbeatManager.class),
+ (a, b, c) -> new StubStorageLockClient(a, b, new Properties()),
mock(Logger.class),
null); // No metrics for standard constructor test
-
+
// Verify the lock provider was created successfully
assertNotNull(lockProviderStandard, "StorageBasedLockProvider should be
created successfully");
-
+
// Verify that it can perform basic operations
assertNull(lockProviderStandard.getLock(), "Initially should have no
lock");
-
+
} catch (Exception e) {
fail("StorageBasedLockProvider creation should not throw unexpected
exception: " + e.getMessage());
} finally {
@@ -747,6 +753,95 @@ class TestStorageBasedLockProvider {
}
}
+ @Test
+ void testAuditServiceIntegrationWhenConfigNotPresent() {
+ // Test that lock provider works correctly when audit config is not present
+ TypedProperties props = new TypedProperties();
+ props.put(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "10");
+ props.put(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "1");
+ props.put(BASE_PATH.key(), "gs://bucket/lake/db/tbl-audit-test");
+
+ // Mock client that returns empty for audit config
+ StorageLockClient auditMockClient = mock(StorageLockClient.class);
+ when(auditMockClient.readObject(anyString(), eq(true)))
+ .thenReturn(Option.empty());
+ when(auditMockClient.readCurrentLockFile())
+ .thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
+
+ StorageBasedLockProvider auditLockProvider = new StorageBasedLockProvider(
+ ownerId,
+ props,
+ (a, b, c) -> mockHeartbeatManager,
+ (a, b, c) -> auditMockClient,
+ mockLogger,
+ null);
+
+ // Lock provider should work normally even without audit
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile lockFile = new StorageLockFile(data, "v1");
+ when(auditMockClient.tryUpsertLockFile(any(), eq(Option.empty())))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(lockFile)));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+ // tryLock should trigger audit service creation (lazily)
+ assertTrue(auditLockProvider.tryLock());
+
+ // Verify audit config was checked during tryLock
+ verify(auditMockClient, times(1)).readObject(
+ contains(".locks/audit_enabled.json"), eq(true));
+
+ // No audit writes should happen since audit is not present
+ verify(auditMockClient, never()).writeObject(
+ contains(".locks/audit"), anyString());
+
+ auditLockProvider.close();
+ }
+
+ @Test
+ void testAuditServiceIntegrationWhenConfigDisabled() {
+ // Test that lock provider works correctly when audit is explicitly
disabled
+ TypedProperties props = new TypedProperties();
+ props.put(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "10");
+ props.put(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "1");
+ props.put(BASE_PATH.key(), "gs://bucket/lake/db/tbl-audit-disabled");
+
+ // Mock client that returns disabled config
+ StorageLockClient auditMockClient = mock(StorageLockClient.class);
+ String disabledConfig = "{\"STORAGE_LOCK_AUDIT_SERVICE_ENABLED\": false}";
+ when(auditMockClient.readObject(anyString(), eq(true)))
+ .thenReturn(Option.of(disabledConfig));
+ when(auditMockClient.readCurrentLockFile())
+ .thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
+
+ StorageBasedLockProvider auditLockProvider = new StorageBasedLockProvider(
+ ownerId,
+ props,
+ (a, b, c) -> mockHeartbeatManager,
+ (a, b, c) -> auditMockClient,
+ mockLogger,
+ null);
+
+ // Set up lock acquisition
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile lockFile = new StorageLockFile(data, "v1");
+ when(auditMockClient.tryUpsertLockFile(any(), eq(Option.empty())))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(lockFile)));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+ // tryLock should trigger audit service creation check
+ assertTrue(auditLockProvider.tryLock());
+
+ // Verify audit config was checked during tryLock
+ verify(auditMockClient, times(1)).readObject(
+ contains(".locks/audit_enabled.json"), eq(true));
+
+ // No audit writes should happen since audit is disabled
+ verify(auditMockClient, never()).writeObject(
+ contains(".locks/audit"), anyString());
+
+ auditLockProvider.close();
+ }
+
public static class StubStorageLockClient implements StorageLockClient {
public StubStorageLockClient(String ownerId, String lockFileUri,
Properties props) {
assertTrue(lockFileUri.endsWith("table_lock.json"));
@@ -764,6 +859,18 @@ class TestStorageBasedLockProvider {
return null;
}
+ @Override
+ public Option<String> readObject(String filePath, boolean
checkExistsFirst) {
+ // Stub implementation for testing
+ return Option.empty();
+ }
+
+ @Override
+ public boolean writeObject(String filePath, String content) {
+ // Stub implementation for testing
+ return true;
+ }
+
@Override
public void close() throws Exception {
// stub, no-op
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestAuditServiceFactory.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestAuditServiceFactory.java
new file mode 100644
index 000000000000..e944fdc89615
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestAuditServiceFactory.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.audit;
+
+import org.apache.hudi.client.transaction.lock.StorageLockClient;
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test class for AuditServiceFactory
+ */
+public class TestAuditServiceFactory {
+
+ private StorageLockClient mockStorageLockClient;
+ private final String ownerId = "test-owner";
+ private final String basePath = "s3://bucket/path/to/table";
+
+ @BeforeEach
+ void setUp() {
+ mockStorageLockClient = mock(StorageLockClient.class);
+ }
+
+ @Test
+ void testCreateAuditServiceWhenConfigNotFound() {
+ // Config file doesn't exist
+ String expectedPath = basePath + "/.hoodie/.locks/audit_enabled.json";
+ when(mockStorageLockClient.readObject(eq(expectedPath), eq(true)))
+ .thenReturn(Option.empty());
+
+ Option<AuditService> result =
AuditServiceFactory.createLockProviderAuditService(
+ ownerId, basePath, mockStorageLockClient,
+ System.currentTimeMillis(),
+ timestamp -> timestamp + 10000,
+ () -> true);
+
+ // Should return empty for now (no concrete implementation yet)
+ // When implementation is added, this should return a present Option
+ assertTrue(result.isEmpty());
+ verify(mockStorageLockClient).readObject(expectedPath, true);
+ }
+
+ @Test
+ void testCreateAuditServiceWhenDisabledInConfig() {
+ // Config file exists but audit is disabled
+ String expectedPath = basePath + "/.hoodie/.locks/audit_enabled.json";
+ String configJson = "{\"STORAGE_LOCK_AUDIT_SERVICE_ENABLED\": false}";
+ when(mockStorageLockClient.readObject(eq(expectedPath), eq(true)))
+ .thenReturn(Option.of(configJson));
+
+ Option<AuditService> result =
AuditServiceFactory.createLockProviderAuditService(
+ ownerId, basePath, mockStorageLockClient,
+ System.currentTimeMillis(),
+ timestamp -> timestamp + 10000,
+ () -> true);
+
+ // Should return empty when audit is disabled
+ assertTrue(result.isEmpty());
+ verify(mockStorageLockClient).readObject(expectedPath, true);
+ }
+
+ @Test
+ void testCreateAuditServiceWhenEnabledInConfig() {
+ // Config file exists and audit is enabled
+ String expectedPath = basePath + "/.hoodie/.locks/audit_enabled.json";
+ String configJson = "{\"STORAGE_LOCK_AUDIT_SERVICE_ENABLED\": true}";
+ when(mockStorageLockClient.readObject(eq(expectedPath), eq(true)))
+ .thenReturn(Option.of(configJson));
+ // Mock writeObject method to return true for audit file writes
+ when(mockStorageLockClient.writeObject(anyString(), anyString()))
+ .thenReturn(true);
+
+ Option<AuditService> result =
AuditServiceFactory.createLockProviderAuditService(
+ ownerId, basePath, mockStorageLockClient,
+ System.currentTimeMillis(),
+ timestamp -> timestamp + 10000, // lockExpirationFunction
+ () -> true); // lockHeldSupplier
+
+ // Should return empty audit service as the service is not added yet.
+ assertFalse(result.isPresent());
+ verify(mockStorageLockClient).readObject(expectedPath, true);
+ }
+
+ @Test
+ void testCreateAuditServiceWithMalformedJson() {
+ // Config file exists but contains invalid JSON
+ String expectedPath = basePath + "/.hoodie/.locks/audit_enabled.json";
+ String malformedJson = "{invalid json}";
+ when(mockStorageLockClient.readObject(eq(expectedPath), eq(true)))
+ .thenReturn(Option.of(malformedJson));
+
+ Option<AuditService> result =
AuditServiceFactory.createLockProviderAuditService(
+ ownerId, basePath, mockStorageLockClient,
+ System.currentTimeMillis(),
+ timestamp -> timestamp + 10000,
+ () -> true);
+
+ // Should return empty when JSON is malformed
+ assertTrue(result.isEmpty());
+ verify(mockStorageLockClient).readObject(expectedPath, true);
+ }
+
+ @Test
+ void testCreateAuditServiceWithEmptyConfig() {
+ // Config file exists but doesn't contain the expected field
+ String expectedPath = basePath + "/.hoodie/.locks/audit_enabled.json";
+ String configJson = "{\"some_other_field\": true}";
+ when(mockStorageLockClient.readObject(eq(expectedPath), eq(true)))
+ .thenReturn(Option.of(configJson));
+
+ Option<AuditService> result =
AuditServiceFactory.createLockProviderAuditService(
+ ownerId, basePath, mockStorageLockClient,
+ System.currentTimeMillis(),
+ timestamp -> timestamp + 10000,
+ () -> true);
+
+ // Should return empty when field is missing (defaults to false)
+ assertTrue(result.isEmpty());
+ verify(mockStorageLockClient).readObject(expectedPath, true);
+ }
+
+ @Test
+ void testCreateAuditServiceUsesCheckExistsFirst() {
+ // Verify that the factory passes true for checkExistsFirst
+ String expectedPath = basePath + "/.hoodie/.locks/audit_enabled.json";
+ when(mockStorageLockClient.readObject(eq(expectedPath), eq(true)))
+ .thenReturn(Option.empty());
+
+ AuditServiceFactory.createLockProviderAuditService(
+ ownerId, basePath, mockStorageLockClient,
+ System.currentTimeMillis(),
+ timestamp -> timestamp + 10000,
+ () -> true);
+
+ // Should pass true for checkExistsFirst since audit config is rarely
present
+ verify(mockStorageLockClient).readObject(expectedPath, true);
+ }
+}
diff --git
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
index c5d1fc011028..1e782cc109dd 100644
---
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
+++
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/transaction/lock/GCSStorageLockClient.java
@@ -25,11 +25,9 @@ import
org.apache.hudi.client.transaction.lock.models.StorageLockData;
import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieLockException;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
@@ -46,11 +44,11 @@ import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.nio.channels.Channels;
import java.util.Properties;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
/**
* A GCS-based implementation of a distributed lock provider using conditional
writes
* with generationMatch, plus local concurrency safety, heartbeat/renew, and
pruning old locks.
@@ -68,10 +66,12 @@ public class GCSStorageLockClient implements
StorageLockClient {
private final String lockFilePath;
private final String ownerId;
- /** Constructor that is used by reflection to instantiate a GCS-based
locking service.
- * @param ownerId The owner id.
+ /**
+ * Constructor that is used by reflection to instantiate a GCS-based locking
service.
+ *
+ * @param ownerId The owner id.
* @param lockFileUri The path within the bucket where to write lock files.
- * @param props The properties for the lock config, can be used to customize
client.
+ * @param props The properties for the lock config, can be used to
customize client.
*/
public GCSStorageLockClient(
String ownerId,
@@ -87,25 +87,12 @@ public class GCSStorageLockClient implements
StorageLockClient {
Properties properties,
Functions.Function1<Properties, Storage> gcsClientSupplier,
Logger logger) {
- try {
- // This logic can likely be extended to other lock client
implementations.
- // Consider creating base class with utilities, incl error handling.
- URI uri = new URI(lockFileUri);
- this.bucketName = uri.getAuthority();
- this.lockFilePath = uri.getPath().replaceFirst("/", "");
- this.gcsClient = gcsClientSupplier.apply(properties);
-
- if (StringUtils.isNullOrEmpty(this.bucketName)) {
- throw new IllegalArgumentException("LockFileUri does not contain a
valid bucket name.");
- }
- if (StringUtils.isNullOrEmpty(this.lockFilePath)) {
- throw new IllegalArgumentException("LockFileUri does not contain a
valid lock file path.");
- }
- this.ownerId = ownerId;
- this.logger = logger;
- } catch (URISyntaxException e) {
- throw new HoodieLockException(e);
- }
+ Pair<String, String> bucketAndPath =
StorageLockClient.parseBucketAndPath(lockFileUri);
+ this.bucketName = bucketAndPath.getLeft();
+ this.lockFilePath = bucketAndPath.getRight();
+ this.gcsClient = gcsClientSupplier.apply(properties);
+ this.ownerId = ownerId;
+ this.logger = logger;
}
private static Functions.Function1<Properties, Storage>
createDefaultGcsClient() {
@@ -119,7 +106,7 @@ public class GCSStorageLockClient implements
StorageLockClient {
/**
* Attempts to create or update the lock file using the given lock data and
generation number.
*
- * @param lockData the new lock data to use.
+ * @param lockData the new lock data to use.
* @param generationNumber the expected generation number (0 for creation).
* @return the updated StorageLockFile instance.
* @throws StorageException if the update fails.
@@ -149,13 +136,13 @@ public class GCSStorageLockClient implements
StorageLockClient {
return Pair.of(LockUpsertResult.SUCCESS, Option.of(updatedFile));
} catch (StorageException e) {
if (e.getCode() == PRECONDITION_FAILURE_ERROR_CODE) {
- logger.info("OwnerId: {}, Unable to write new lock file. Another
process has modified this lockfile {} already.",
+ logger.info("OwnerId: {}, Unable to write new lock file. Another
process has modified this lockfile {} already.",
ownerId, lockFilePath);
return Pair.of(LockUpsertResult.ACQUIRED_BY_OTHERS, Option.empty());
} else if (e.getCode() == RATE_LIMIT_ERROR_CODE) {
logger.warn("OwnerId: {}, Rate limit exceeded for lock file: {}",
ownerId, lockFilePath);
} else if (e.getCode() >= INTERNAL_SERVER_ERROR_CODE_MIN) {
- logger.warn("OwnerId: {}, GCS returned internal server error code for
lock file: {}",
+ logger.warn("OwnerId: {}, GCS returned internal server error code for
lock file: {}",
ownerId, lockFilePath, e);
} else {
throw e;
@@ -166,7 +153,8 @@ public class GCSStorageLockClient implements
StorageLockClient {
/**
* Handling storage exception for GET request
- * @param e The error to handle.
+ *
+ * @param e The error to handle.
* @param ignore404 Whether to ignore 404 as a valid exception.
* When we read from stream we might see this, and
* it should not be counted as NOT_EXISTS.
@@ -225,6 +213,69 @@ public class GCSStorageLockClient implements
StorageLockClient {
}
}
+ @Override
+ public Option<String> readObject(String filePath, boolean checkExistsFirst) {
+ try {
+ // Parse the file path to get bucket and object path
+ Pair<String, String> bucketAndPath =
StorageLockClient.parseBucketAndPath(filePath);
+ String bucket = bucketAndPath.getLeft();
+ String objectPath = bucketAndPath.getRight();
+
+ BlobId blobId = BlobId.of(bucket, objectPath);
+
+ if (checkExistsFirst) {
+ // First check if the file exists (lightweight metadata check)
+ Blob blob = gcsClient.get(blobId);
+
+ if (blob == null || !blob.exists()) {
+ // File doesn't exist - this is the common case for optional configs
+ logger.debug("JSON config file not found: {}", filePath);
+ return Option.empty();
+ }
+
+ // File exists, read its content
+ byte[] content = blob.getContent();
+ return Option.of(new String(content, UTF_8));
+ } else {
+ // Direct read without existence check
+ byte[] content = gcsClient.readAllBytes(blobId);
+ return Option.of(new String(content, UTF_8));
+ }
+ } catch (StorageException e) {
+ if (e.getCode() == NOT_FOUND_ERROR_CODE) {
+ logger.debug("JSON config file not found: {}", filePath);
+ } else {
+ logger.warn("Error reading JSON config file: {}", filePath, e);
+ }
+ return Option.empty();
+ } catch (Exception e) {
+ logger.warn("Error reading JSON config file: {}", filePath, e);
+ return Option.empty();
+ }
+ }
+
+ @Override
+ public boolean writeObject(String filePath, String content) {
+ try {
+ // Parse the file path to get bucket and object path
+ Pair<String, String> bucketAndPath =
StorageLockClient.parseBucketAndPath(filePath);
+ String bucket = bucketAndPath.getLeft();
+ String objectPath = bucketAndPath.getRight();
+
+ BlobId blobId = BlobId.of(bucket, objectPath);
+ BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();
+
+ // Write the content to GCS
+ gcsClient.create(blobInfo, content.getBytes(UTF_8));
+
+ logger.debug("Successfully wrote object to: {}", filePath);
+ return true;
+ } catch (Exception e) {
+ logger.warn("Error writing object to: {}", filePath, e);
+ return false;
+ }
+ }
+
@Override
public void close() throws Exception {
this.gcsClient.close();
diff --git
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClientAuditOperations.java
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClientAuditOperations.java
new file mode 100644
index 000000000000..02ea0719d764
--- /dev/null
+++
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/transaction/lock/TestGCSStorageLockClientAuditOperations.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.gcp.transaction.lock;
+
+import org.apache.hudi.common.util.Option;
+
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.contains;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for GCSStorageLockClient audit operations (readObject and writeObject
methods)
+ */
+public class TestGCSStorageLockClientAuditOperations {
+
+ private Storage mockGcsClient;
+ private Blob mockBlob;
+ private Logger mockLogger;
+ private GCSStorageLockClient lockClient;
+
+ @BeforeEach
+ void setUp() {
+ mockGcsClient = mock(Storage.class);
+ mockLogger = mock(Logger.class);
+ mockBlob = mock(Blob.class);
+ String lockFileUri =
"gs://test-bucket/table/.hoodie/.locks/table_lock.json";
+ String ownerId = "test-owner";
+ lockClient = new GCSStorageLockClient(
+ ownerId,
+ lockFileUri,
+ new Properties(),
+ props -> mockGcsClient,
+ mockLogger);
+ }
+
+ @Test
+ void testReadConfigWithCheckExistsFirstFileNotFound() {
+ String configPath =
"gs://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+
+ // get() returns null for non-existent blob
+ when(mockGcsClient.get(any(BlobId.class)))
+ .thenReturn(null);
+
+ Option<String> result = lockClient.readObject(configPath, true);
+
+ assertTrue(result.isEmpty());
+ // Should only call get() for existence check, not readAllBytes
+ verify(mockGcsClient, times(1)).get(any(BlobId.class));
+ verify(mockGcsClient, never()).readAllBytes(any(BlobId.class));
+ }
+
+ @Test
+ void testReadConfigWithCheckExistsFirstBlobExistsFalse() {
+ String configPath =
"gs://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+
+ // get() returns blob but exists() is false
+ when(mockGcsClient.get(any(BlobId.class)))
+ .thenReturn(mockBlob);
+ when(mockBlob.exists()).thenReturn(false);
+
+ Option<String> result = lockClient.readObject(configPath, true);
+
+ assertTrue(result.isEmpty());
+ // Should only check existence, not read content
+ verify(mockGcsClient, times(1)).get(any(BlobId.class));
+ verify(mockBlob, times(1)).exists();
+ verify(mockBlob, never()).getContent();
+ }
+
+ @Test
+ void testReadConfigWithCheckExistsFirstFileExists() {
+ String configPath =
"gs://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+ String expectedContent = "{\"STORAGE_LP_AUDIT_SERVICE_ENABLED\": true}";
+
+ // get() returns existing blob
+ when(mockGcsClient.get(any(BlobId.class)))
+ .thenReturn(mockBlob);
+ when(mockBlob.exists()).thenReturn(true);
+ when(mockBlob.getContent())
+ .thenReturn(expectedContent.getBytes(StandardCharsets.UTF_8));
+
+ Option<String> result = lockClient.readObject(configPath, true);
+
+ assertTrue(result.isPresent());
+ assertEquals(expectedContent, result.get());
+ // Should call get() for existence check and getContent() for reading
+ verify(mockGcsClient, times(1)).get(any(BlobId.class));
+ verify(mockBlob, times(1)).exists();
+ verify(mockBlob, times(1)).getContent();
+ }
+
+ @Test
+ void testReadConfigWithoutCheckExistsFirstFileNotFound() {
+ String configPath =
"gs://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+
+ // Direct readAllBytes throws 404 exception
+ StorageException notFoundException = new StorageException(404, "Not
Found");
+ when(mockGcsClient.readAllBytes(any(BlobId.class)))
+ .thenThrow(notFoundException);
+
+ Option<String> result = lockClient.readObject(configPath, false);
+
+ assertTrue(result.isEmpty());
+ // Should not call get(), only readAllBytes
+ verify(mockGcsClient, never()).get(any(BlobId.class));
+ verify(mockGcsClient, times(1)).readAllBytes(any(BlobId.class));
+ }
+
+ @Test
+ void testReadConfigWithoutCheckExistsFirstFileExists() {
+ String configPath =
"gs://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+ String expectedContent = "{\"STORAGE_LP_AUDIT_SERVICE_ENABLED\": false}";
+
+ // Direct readAllBytes returns content
+ when(mockGcsClient.readAllBytes(any(BlobId.class)))
+ .thenReturn(expectedContent.getBytes(StandardCharsets.UTF_8));
+
+ Option<String> result = lockClient.readObject(configPath, false);
+
+ assertTrue(result.isPresent());
+ assertEquals(expectedContent, result.get());
+ // Should not call get(), only readAllBytes
+ verify(mockGcsClient, never()).get(any(BlobId.class));
+ verify(mockGcsClient, times(1)).readAllBytes(any(BlobId.class));
+ }
+
+ @Test
+ void testReadConfigWithCheckExistsFirstOtherGcsError() {
+ String configPath =
"gs://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+
+ // get() throws non-404 error
+ StorageException serverError = new StorageException(500, "Internal Server
Error");
+ when(mockGcsClient.get(any(BlobId.class)))
+ .thenThrow(serverError);
+
+ Option<String> result = lockClient.readObject(configPath, true);
+
+ assertTrue(result.isEmpty());
+ verify(mockGcsClient, times(1)).get(any(BlobId.class));
+ verify(mockGcsClient, never()).readAllBytes(any(BlobId.class));
+ }
+
+ @Test
+ void testReadConfigWithInvalidUri() {
+ String invalidPath = "not-a-valid-uri";
+
+ Option<String> result = lockClient.readObject(invalidPath, false);
+
+ assertTrue(result.isEmpty());
+ // Should not make any GCS calls due to URI parsing error
+ verify(mockGcsClient, never()).get(any(BlobId.class));
+ verify(mockGcsClient, never()).readAllBytes(any(BlobId.class));
+ }
+
+ @Test
+ void testReadConfigWithRateLimitError() {
+ String configPath =
"gs://test-bucket/table/.hoodie/.locks/audit_enabled.json";
+
+ // readAllBytes returns rate limit error
+ StorageException rateLimitException = new StorageException(429, "Too Many
Requests");
+ when(mockGcsClient.readAllBytes(any(BlobId.class)))
+ .thenThrow(rateLimitException);
+
+ Option<String> result = lockClient.readObject(configPath, false);
+
+ assertTrue(result.isEmpty());
+ verify(mockGcsClient, times(1)).readAllBytes(any(BlobId.class));
+ }
+
+ // ================================
+ // writeObject() tests
+ // ================================
+
+ @Test
+ void testWriteObject_success() {
+ String filePath = "gs://test-bucket/audit/test-audit.jsonl";
+ String content = "{\"test\": \"data\"}\n";
+ when(mockGcsClient.create(any(BlobInfo.class),
any(byte[].class))).thenReturn(mockBlob);
+
+ boolean result = lockClient.writeObject(filePath, content);
+
+ assertTrue(result);
+ verify(mockGcsClient).create(any(BlobInfo.class),
eq(content.getBytes(UTF_8)));
+ verify(mockLogger).debug("Successfully wrote object to: {}", filePath);
+ }
+
+ @Test
+ void testWriteObject_storageException() {
+ String filePath = "gs://test-bucket/audit/test-audit.jsonl";
+ String content = "{\"test\": \"data\"}\n";
+ StorageException storageException = new StorageException(500, "Internal
Server Error");
+ when(mockGcsClient.create(any(BlobInfo.class),
any(byte[].class))).thenThrow(storageException);
+
+ boolean result = lockClient.writeObject(filePath, content);
+
+ assertFalse(result);
+ verify(mockLogger).warn(contains("Error writing object to"), eq(filePath),
eq(storageException));
+ }
+
+ @Test
+ void testWriteObject_invalidPath() {
+ String invalidPath = "invalid-path";
+ String content = "{\"test\": \"data\"}\n";
+
+ boolean result = lockClient.writeObject(invalidPath, content);
+
+ assertFalse(result);
+ verify(mockLogger).warn(contains("Error writing object to"),
eq(invalidPath), any(Exception.class));
+ }
+
+ @Test
+ void testWriteObject_emptyContent() {
+ String filePath = "gs://test-bucket/audit/empty-content.jsonl";
+ String content = "";
+ when(mockGcsClient.create(any(BlobInfo.class),
any(byte[].class))).thenReturn(mockBlob);
+
+ boolean result = lockClient.writeObject(filePath, content);
+
+ assertTrue(result);
+ verify(mockGcsClient).create(any(BlobInfo.class),
eq(content.getBytes(UTF_8)));
+ verify(mockLogger).debug("Successfully wrote object to: {}", filePath);
+ }
+
+ @Test
+ void testWriteObject_rateLimitExceeded() {
+ String filePath = "gs://test-bucket/audit/test-audit.jsonl";
+ String content = "{\"test\": \"data\"}\n";
+ StorageException rateLimitException = new StorageException(429, "Rate
Limit Exceeded");
+ when(mockGcsClient.create(any(BlobInfo.class),
any(byte[].class))).thenThrow(rateLimitException);
+
+ boolean result = lockClient.writeObject(filePath, content);
+
+ assertFalse(result);
+ verify(mockLogger).warn(contains("Error writing object to"), eq(filePath),
eq(rateLimitException));
+ }
+}