This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new 00bf6a36c8c4 [HUDI-9155] Add storage based lock provider scaffolding
to 0.x (#13709)
00bf6a36c8c4 is described below
commit 00bf6a36c8c45f80480e024e0e239d4a89a8e85c
Author: Alex R <[email protected]>
AuthorDate: Mon Aug 11 21:07:19 2025 -0700
[HUDI-9155] Add storage based lock provider scaffolding to 0.x (#13709)
---
.../client/transaction/lock/StorageLockClient.java | 50 +++++++
.../transaction/lock/models/LockGetResult.java | 38 +++++
.../transaction/lock/models/LockUpsertResult.java | 38 +++++
.../transaction/lock/models/StorageLockData.java | 71 +++++++++
.../transaction/lock/models/StorageLockFile.java | 133 +++++++++++++++++
.../lock/models/StorageLockClientFileTest.java | 165 +++++++++++++++++++++
6 files changed, 495 insertions(+)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
new file mode 100644
index 000000000000..ac8ff2ed9d0d
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hudi.client.transaction.lock.models.LockUpsertResult;
+import org.apache.hudi.client.transaction.lock.models.StorageLockData;
+import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
+import org.apache.hudi.client.transaction.lock.models.LockGetResult;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+
+/**
+ * Defines a contract for a service which should be able to perform
conditional writes to object storage.
+ * It expects to be interacting with a single lock file per context (table),
and will be competing with other instances
+ * to perform writes, so it should handle these cases accordingly (using
conditional writes).
+ */
+public interface StorageLockClient extends AutoCloseable {
+ /**
+ * Tries to create or update a lock file.
+ * All non pre-condition failure related errors should be returned as
UNKNOWN_ERROR.
+ * @param newLockData The new data to write to the lock file
+ * @param previousLockFile The previous lock file
+ * @return A pair containing the result state and the new lock file (if
successful)
+ */
+ Pair<LockUpsertResult, Option<StorageLockFile>> tryUpsertLockFile(
+ StorageLockData newLockData,
+ Option<StorageLockFile> previousLockFile);
+
+ /**
+ * Reads the current lock file.
+ * @return The lock retrieve result and the current lock file if
successfully retrieved.
+ * */
+ Pair<LockGetResult, Option<StorageLockFile>> readCurrentLockFile();
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockGetResult.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockGetResult.java
new file mode 100644
index 000000000000..68809d244c3d
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockGetResult.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+public enum LockGetResult {
+ // Lock file does not exist with code 0
+ NOT_EXISTS(0),
+ // Successfully retrieved the lock file with code 1
+ SUCCESS(1),
+ // Unable to determine lock state due to transient errors with code 2
+ UNKNOWN_ERROR(2);
+
+ private final int code;
+
+ LockGetResult(int code) {
+ this.code = code;
+ }
+
+ public int getCode() {
+ return code;
+ }
+}
\ No newline at end of file
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java
new file mode 100644
index 000000000000..eec707d4e56d
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+public enum LockUpsertResult {
+ // Lock was successfully created/updated with code 0
+ SUCCESS(0),
+ // Another process has modified the lock file (precondition failure) with
code 1
+ ACQUIRED_BY_OTHERS(1),
+ // Unable to determine lock state due to transient errors with code 2
+ UNKNOWN_ERROR(2);
+
+ private final int code;
+
+ LockUpsertResult(int code) {
+ this.code = code;
+ }
+
+ public int getCode() {
+ return code;
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockData.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockData.java
new file mode 100644
index 000000000000..01c7d141ebda
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockData.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Pojo for conditional writes-based lock provider.
+ */
+public class StorageLockData {
+ private final boolean expired;
+ private final long validUntil;
+ private final String owner;
+
+ /**
+ * Initializes an object describing a conditionally written lock.
+ * @param expired Whether the lock is expired.
+ * @param validUntil The epoch in ms when the lock is expired.
+ * @param owner The uuid owner of the owner of this lock.
+ */
+ @JsonCreator
+ public StorageLockData(
+ @JsonProperty(value = "expired", required = true) boolean expired,
+ @JsonProperty(value = "validUntil", required = true) long validUntil,
+ @JsonProperty(value = "owner", required = true) String owner) {
+ this.expired = expired;
+ this.validUntil = validUntil;
+ this.owner = owner;
+ }
+
+ /**
+ * Gets the expiration.
+ * @return The long representing the expiration in ms.
+ */
+ public long getValidUntil() {
+ return this.validUntil;
+ }
+
+ /**
+ * Whether the lock is expired.
+ * @return True boolean representing whether the lock is expired.
+ */
+ public boolean isExpired() {
+ return this.expired;
+ }
+
+ /**
+ * The owner.
+ * @return A string representing the uuid of the owner of this lock.
+ */
+ public String getOwner() {
+ return this.owner;
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java
new file mode 100644
index 000000000000..4929d207d9a4
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class StorageLockFile {
+
+ private final StorageLockData data;
+ private final String versionId;
+
+ // Get a custom object mapper. See StorageLockData for required properties.
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ // This allows us to add new properties down the line.
+ .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
+ // Should not let validUntil or expired be null.
+ .enable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES);
+
+ /**
+ * Initializes a StorageLockFile using the data and unique versionId.
+ *
+ * @param data The data in the lock file.
+ * @param versionId The version of this lock file. Used to ensure
consistency through conditional writes.
+ */
+ public StorageLockFile(StorageLockData data, String versionId) {
+ ValidationUtils.checkArgument(data != null, "Data must not be null.");
+ ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(versionId),
"VersionId must not be null or empty.");
+ this.data = data;
+ this.versionId = versionId;
+ }
+
+ /**
+ * Factory method to load an input stream into a StorageLockFile.
+ *
+ * @param inputStream The input stream of the JSON content.
+ * @param versionId The unique version identifier for the lock file.
+ * @return A new instance of StorageLockFile.
+ * @throws HoodieIOException If deserialization fails.
+ */
+ public static StorageLockFile createFromStream(InputStream inputStream,
String versionId) {
+ try {
+ StorageLockData data = OBJECT_MAPPER.readValue(inputStream,
StorageLockData.class);
+ return new StorageLockFile(data, versionId);
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to deserialize JSON content into
StorageLockData", e);
+ }
+ }
+
+ /**
+ * Writes the serialized JSON representation of this object to an output
stream.
+ *
+ * @param outputStream The output stream to write the JSON to.
+ * @throws HoodieIOException If serialization fails.
+ */
+ public void writeToStream(OutputStream outputStream) {
+ try {
+ OBJECT_MAPPER.writeValue(outputStream, this.data);
+ } catch (IOException e) {
+ throw new HoodieIOException("Error writing object to JSON output
stream", e);
+ }
+ }
+
+ /**
+ * Converts the data to a bytearray. Since we know the payloads will be
small this is fine.
+ * @return A byte array.
+ * @throws HoodieIOException If serialization fails.
+ */
+ public static byte[] toByteArray(StorageLockData data) {
+ try {
+ return OBJECT_MAPPER.writeValueAsBytes(data);
+ } catch (JsonProcessingException e) {
+ throw new HoodieIOException("Error writing object to byte array", e);
+ }
+ }
+
+ /**
+ * Gets the version id.
+ * @return A string for the version id.
+ */
+ public String getVersionId() {
+ return this.versionId;
+ }
+
+ /**
+ * Gets the expiration time in ms.
+ * @return A long representing the expiration.
+ */
+ public long getValidUntilMs() {
+ return this.data.getValidUntil();
+ }
+
+ /**
+ * Gets whether the lock is expired.
+ * @return A boolean representing expired.
+ */
+ public boolean isExpired() {
+ return this.data.isExpired();
+ }
+
+ /**
+ * Gets the owner of the lock.
+ * @return A string for the owner of the lock.
+ */
+ public String getOwner() {
+ return this.data.getOwner();
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockClientFileTest.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockClientFileTest.java
new file mode 100644
index 000000000000..33d3c9bf44ff
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockClientFileTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+public class StorageLockClientFileTest {
+
+ private static final String JSON_DATA =
"{\"expired\":false,\"validUntil\":1700000000000,\"owner\":\"testOwner\"}";
+ private static final String JSON_DATA_EXTRA_FIELD =
"{\"expired\":true,\"validUntil\":1600000000000,\"owner\":\"otherOwner\",\"state\":\"active\"}";
+ private static final String INVALID_JSON = "{\"invalidField\":123}";
+ private static final String VERSION_ID = "testVersionId";
+
+ private InputStream validJsonStream;
+ private InputStream extraFieldValidJsonStream;
+ private InputStream invalidJsonStream;
+
+ @BeforeEach
+ void setup() {
+ validJsonStream = new ByteArrayInputStream(JSON_DATA.getBytes());
+ extraFieldValidJsonStream = new
ByteArrayInputStream(JSON_DATA_EXTRA_FIELD.getBytes());
+ invalidJsonStream = new ByteArrayInputStream(INVALID_JSON.getBytes());
+ }
+
+ @Test
+ void testCreateValidInputStream() {
+ StorageLockFile file = StorageLockFile.createFromStream(validJsonStream,
VERSION_ID);
+ assertEquals(1700000000000L, file.getValidUntilMs());
+ assertEquals("testOwner", file.getOwner());
+ assertEquals(VERSION_ID, file.getVersionId());
+ assertFalse(file.isExpired());
+ }
+
+ @Test
+ void testCreateValidInputStreamExtraField() {
+ StorageLockFile file =
StorageLockFile.createFromStream(extraFieldValidJsonStream, VERSION_ID);
+ assertEquals(1600000000000L, file.getValidUntilMs());
+ assertEquals("otherOwner", file.getOwner());
+ assertEquals(VERSION_ID, file.getVersionId());
+ assertTrue(file.isExpired());
+ }
+
+ @Test
+ void testCreateInvalidInputStreamFromMock() throws IOException {
+ InputStream mockInputStream = mock(InputStream.class);
+
+ doThrow(new IOException("Simulated IOException"))
+ .when(mockInputStream)
+ .read();
+ HoodieIOException exception = assertThrows(HoodieIOException.class, () ->
StorageLockFile.createFromStream(mockInputStream, "versionId"));
+ assertTrue(exception.getMessage().contains("Failed to deserialize"));
+ }
+
+ @Test
+ void testCreateInvalidInputStreamFromBadData() {
+ HoodieIOException exception = assertThrows(HoodieIOException.class, () ->
+ StorageLockFile.createFromStream(invalidJsonStream, VERSION_ID)
+ );
+ assertTrue(exception.getMessage().contains("Failed to deserialize"));
+ }
+
+ @Test
+ void testCreateNullData() {
+ IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () ->
+ new StorageLockFile(null, VERSION_ID)
+ );
+ assertTrue(exception.getMessage().contains("Data must not be null"));
+ }
+
+ @Test
+ void testCreateNullVersionId() {
+ StorageLockData data = new StorageLockData(true, 1700000000000L,
"testOwner");
+ IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () ->
+ new StorageLockFile(data, null)
+ );
+ assertTrue(exception.getMessage().contains("VersionId must not be null or
empty."));
+ exception = assertThrows(IllegalArgumentException.class, () ->
+ new StorageLockFile(data, "")
+ );
+ assertTrue(exception.getMessage().contains("VersionId must not be null or
empty."));
+ }
+
+ @Test
+ void testToJsonStreamValidData() {
+ StorageLockFile file = StorageLockFile.createFromStream(validJsonStream,
VERSION_ID);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ file.writeToStream(outputStream);
+ String outputJson = new String(outputStream.toByteArray());
+ assertTrue(outputJson.contains("\"expired\":false"));
+ assertTrue(outputJson.contains("\"validUntil\":1700000000000"));
+ assertTrue(outputJson.contains("\"owner\":\"testOwner\""));
+ }
+
+ @Test
+ void testToJsonStreamErrorHandling() throws IOException {
+ OutputStream mockOutputStream = mock(OutputStream.class);
+
+ doThrow(new IOException("Simulated IOException"))
+ .when(mockOutputStream)
+ .write(any(byte[].class), anyInt(), anyInt());
+ StorageLockFile file = new StorageLockFile(
+ new StorageLockData(true, System.currentTimeMillis() + 1000,
"testOwner"),
+ VERSION_ID);
+
+ HoodieIOException exception = assertThrows(HoodieIOException.class, () ->
file.writeToStream(mockOutputStream));
+ assertTrue(exception.getMessage().contains("Error writing object to
JSON"));
+ }
+
+ @Test
+ void testToByteArrayValidData() {
+ StorageLockData data = new StorageLockData(false, 1700000000000L,
"testOwner");
+ String outputJson = new String(StorageLockFile.toByteArray(data));
+ assertTrue(outputJson.contains("\"expired\":false"));
+ assertTrue(outputJson.contains("\"validUntil\":1700000000000"));
+ assertTrue(outputJson.contains("\"owner\":\"testOwner\""));
+ }
+
+ @Test
+ void testIsExpired() {
+ StorageLockData data = new StorageLockData(true,
System.currentTimeMillis() - 1000, "testOwner");
+ StorageLockFile file = new StorageLockFile(data, VERSION_ID);
+ assertTrue(file.isExpired());
+ }
+
+ @Test
+ void testGetVersionId() {
+ StorageLockData data = new StorageLockData(false, 1700000000000L,
"testOwner");
+ StorageLockFile file = new StorageLockFile(data, VERSION_ID);
+ assertEquals(VERSION_ID, file.getVersionId());
+ }
+}