This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new 00bf6a36c8c4 [HUDI-9155] Add storage based lock provider scaffolding 
to 0.x (#13709)
00bf6a36c8c4 is described below

commit 00bf6a36c8c45f80480e024e0e239d4a89a8e85c
Author: Alex R <[email protected]>
AuthorDate: Mon Aug 11 21:07:19 2025 -0700

    [HUDI-9155] Add storage based lock provider scaffolding to 0.x (#13709)
---
 .../client/transaction/lock/StorageLockClient.java |  50 +++++++
 .../transaction/lock/models/LockGetResult.java     |  38 +++++
 .../transaction/lock/models/LockUpsertResult.java  |  38 +++++
 .../transaction/lock/models/StorageLockData.java   |  71 +++++++++
 .../transaction/lock/models/StorageLockFile.java   | 133 +++++++++++++++++
 .../lock/models/StorageLockClientFileTest.java     | 165 +++++++++++++++++++++
 6 files changed, 495 insertions(+)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
new file mode 100644
index 000000000000..ac8ff2ed9d0d
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hudi.client.transaction.lock.models.LockUpsertResult;
+import org.apache.hudi.client.transaction.lock.models.StorageLockData;
+import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
+import org.apache.hudi.client.transaction.lock.models.LockGetResult;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+
+/**
+ * Defines a contract for a service which should be able to perform 
conditional writes to object storage.
+ * It expects to be interacting with a single lock file per context (table), 
and will be competing with other instances
+ * to perform writes, so it should handle these cases accordingly (using 
conditional writes).
+ */
+public interface StorageLockClient extends AutoCloseable {
+  /**
+   * Tries to create or update a lock file.
+   * All non pre-condition failure related errors should be returned as 
UNKNOWN_ERROR.
+   * @param newLockData The new data to write to the lock file
+   * @param previousLockFile The previous lock file
+   * @return A pair containing the result state and the new lock file (if 
successful)
+   */
+  Pair<LockUpsertResult, Option<StorageLockFile>> tryUpsertLockFile(
+      StorageLockData newLockData,
+      Option<StorageLockFile> previousLockFile);
+
+  /**
+   * Reads the current lock file.
+   * @return The lock retrieve result and the current lock file if 
successfully retrieved.
+   * */
+  Pair<LockGetResult, Option<StorageLockFile>> readCurrentLockFile();
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockGetResult.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockGetResult.java
new file mode 100644
index 000000000000..68809d244c3d
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockGetResult.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+public enum LockGetResult {
+  // Lock file does not exist with code 0
+  NOT_EXISTS(0),
+  // Successfully retrieved the lock file with code 1
+  SUCCESS(1),
+  // Unable to determine lock state due to transient errors with code 2
+  UNKNOWN_ERROR(2);
+
+  private final int code;
+
+  LockGetResult(int code) {
+    this.code = code;
+  }
+
+  public int getCode() {
+    return code;
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java
new file mode 100644
index 000000000000..eec707d4e56d
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+public enum LockUpsertResult {
+  // Lock was successfully created/updated with code 0
+  SUCCESS(0),
+  // Another process has modified the lock file (precondition failure) with 
code 1
+  ACQUIRED_BY_OTHERS(1),
+  // Unable to determine lock state due to transient errors with code 2
+  UNKNOWN_ERROR(2);
+
+  private final int code;
+
+  LockUpsertResult(int code) {
+    this.code = code;
+  }
+
+  public int getCode() {
+    return code;
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockData.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockData.java
new file mode 100644
index 000000000000..01c7d141ebda
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockData.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Pojo for conditional writes-based lock provider.
+ */
+public class StorageLockData {
+  private final boolean expired;
+  private final long validUntil;
+  private final String owner;
+
+  /**
+   * Initializes an object describing a conditionally written lock.
+   * @param expired Whether the lock is expired.
+   * @param validUntil The epoch in ms when the lock is expired.
+   * @param owner The uuid owner of the owner of this lock.
+   */
+  @JsonCreator
+  public StorageLockData(
+      @JsonProperty(value = "expired", required = true) boolean expired,
+      @JsonProperty(value = "validUntil", required = true) long validUntil,
+      @JsonProperty(value = "owner", required = true) String owner) {
+    this.expired = expired;
+    this.validUntil = validUntil;
+    this.owner = owner;
+  }
+
+  /**
+   * Gets the expiration.
+   * @return The long representing the expiration in ms.
+   */
+  public long getValidUntil() {
+    return this.validUntil;
+  }
+
+  /**
+   * Whether the lock is expired.
+   * @return True boolean representing whether the lock is expired.
+   */
+  public boolean isExpired() {
+    return this.expired;
+  }
+
+  /**
+   * The owner.
+   * @return A string representing the uuid of the owner of this lock.
+   */
+  public String getOwner() {
+    return this.owner;
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java
new file mode 100644
index 000000000000..4929d207d9a4
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class StorageLockFile {
+
+  private final StorageLockData data;
+  private final String versionId;
+
+  // Get a custom object mapper. See StorageLockData for required properties.
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+      // This allows us to add new properties down the line.
+      .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
+      // Should not let validUntil or expired be null.
+      .enable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES);
+
+  /**
+   * Initializes a StorageLockFile using the data and unique versionId.
+   *
+   * @param data      The data in the lock file.
+   * @param versionId The version of this lock file. Used to ensure 
consistency through conditional writes.
+   */
+  public StorageLockFile(StorageLockData data, String versionId) {
+    ValidationUtils.checkArgument(data != null, "Data must not be null.");
+    ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(versionId), 
"VersionId must not be null or empty.");
+    this.data = data;
+    this.versionId = versionId;
+  }
+
+  /**
+   * Factory method to load an input stream into a StorageLockFile.
+   *
+   * @param inputStream The input stream of the JSON content.
+   * @param versionId   The unique version identifier for the lock file.
+   * @return A new instance of StorageLockFile.
+   * @throws HoodieIOException If deserialization fails.
+   */
+  public static StorageLockFile createFromStream(InputStream inputStream, 
String versionId) {
+    try {
+      StorageLockData data = OBJECT_MAPPER.readValue(inputStream, 
StorageLockData.class);
+      return new StorageLockFile(data, versionId);
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to deserialize JSON content into 
StorageLockData", e);
+    }
+  }
+
+  /**
+   * Writes the serialized JSON representation of this object to an output 
stream.
+   *
+   * @param outputStream The output stream to write the JSON to.
+   * @throws HoodieIOException If serialization fails.
+   */
+  public void writeToStream(OutputStream outputStream) {
+    try {
+      OBJECT_MAPPER.writeValue(outputStream, this.data);
+    } catch (IOException e) {
+      throw new HoodieIOException("Error writing object to JSON output 
stream", e);
+    }
+  }
+
+  /**
+   * Converts the data to a bytearray. Since we know the payloads will be 
small this is fine.
+   * @return A byte array.
+   * @throws HoodieIOException If serialization fails.
+   */
+  public static byte[] toByteArray(StorageLockData data) {
+    try {
+      return OBJECT_MAPPER.writeValueAsBytes(data);
+    } catch (JsonProcessingException e) {
+      throw new HoodieIOException("Error writing object to byte array", e);
+    }
+  }
+
+  /**
+   * Gets the version id.
+   * @return A string for the version id.
+   */
+  public String getVersionId() {
+    return this.versionId;
+  }
+
+  /**
+   * Gets the expiration time in ms.
+   * @return A long representing the expiration.
+   */
+  public long getValidUntilMs() {
+    return this.data.getValidUntil();
+  }
+
+  /**
+   * Gets whether the lock is expired.
+   * @return A boolean representing expired.
+   */
+  public boolean isExpired() {
+    return this.data.isExpired();
+  }
+
+  /**
+   * Gets the owner of the lock.
+   * @return A string for the owner of the lock.
+   */
+  public String getOwner() {
+    return this.data.getOwner();
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockClientFileTest.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockClientFileTest.java
new file mode 100644
index 000000000000..33d3c9bf44ff
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockClientFileTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.models;
+
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+public class StorageLockClientFileTest {
+
+  private static final String JSON_DATA = 
"{\"expired\":false,\"validUntil\":1700000000000,\"owner\":\"testOwner\"}";
+  private static final String JSON_DATA_EXTRA_FIELD = 
"{\"expired\":true,\"validUntil\":1600000000000,\"owner\":\"otherOwner\",\"state\":\"active\"}";
+  private static final String INVALID_JSON = "{\"invalidField\":123}";
+  private static final String VERSION_ID = "testVersionId";
+
+  private InputStream validJsonStream;
+  private InputStream extraFieldValidJsonStream;
+  private InputStream invalidJsonStream;
+
+  @BeforeEach
+  void setup() {
+    validJsonStream = new ByteArrayInputStream(JSON_DATA.getBytes());
+    extraFieldValidJsonStream = new 
ByteArrayInputStream(JSON_DATA_EXTRA_FIELD.getBytes());
+    invalidJsonStream = new ByteArrayInputStream(INVALID_JSON.getBytes());
+  }
+
+  @Test
+  void testCreateValidInputStream() {
+    StorageLockFile file = StorageLockFile.createFromStream(validJsonStream, 
VERSION_ID);
+    assertEquals(1700000000000L, file.getValidUntilMs());
+    assertEquals("testOwner", file.getOwner());
+    assertEquals(VERSION_ID, file.getVersionId());
+    assertFalse(file.isExpired());
+  }
+
+  @Test
+  void testCreateValidInputStreamExtraField() {
+    StorageLockFile file = 
StorageLockFile.createFromStream(extraFieldValidJsonStream, VERSION_ID);
+    assertEquals(1600000000000L, file.getValidUntilMs());
+    assertEquals("otherOwner", file.getOwner());
+    assertEquals(VERSION_ID, file.getVersionId());
+    assertTrue(file.isExpired());
+  }
+
+  @Test
+  void testCreateInvalidInputStreamFromMock() throws IOException {
+    InputStream mockInputStream = mock(InputStream.class);
+
+    doThrow(new IOException("Simulated IOException"))
+        .when(mockInputStream)
+        .read();
+    HoodieIOException exception = assertThrows(HoodieIOException.class, () -> 
StorageLockFile.createFromStream(mockInputStream, "versionId"));
+    assertTrue(exception.getMessage().contains("Failed to deserialize"));
+  }
+
+  @Test
+  void testCreateInvalidInputStreamFromBadData() {
+    HoodieIOException exception = assertThrows(HoodieIOException.class, () ->
+        StorageLockFile.createFromStream(invalidJsonStream, VERSION_ID)
+    );
+    assertTrue(exception.getMessage().contains("Failed to deserialize"));
+  }
+
+  @Test
+  void testCreateNullData() {
+    IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () ->
+        new StorageLockFile(null, VERSION_ID)
+    );
+    assertTrue(exception.getMessage().contains("Data must not be null"));
+  }
+
+  @Test
+  void testCreateNullVersionId() {
+    StorageLockData data = new StorageLockData(true, 1700000000000L, 
"testOwner");
+    IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () ->
+        new StorageLockFile(data, null)
+    );
+    assertTrue(exception.getMessage().contains("VersionId must not be null or 
empty."));
+    exception = assertThrows(IllegalArgumentException.class, () ->
+        new StorageLockFile(data, "")
+    );
+    assertTrue(exception.getMessage().contains("VersionId must not be null or 
empty."));
+  }
+
+  @Test
+  void testToJsonStreamValidData() {
+    StorageLockFile file = StorageLockFile.createFromStream(validJsonStream, 
VERSION_ID);
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    file.writeToStream(outputStream);
+    String outputJson = new String(outputStream.toByteArray());
+    assertTrue(outputJson.contains("\"expired\":false"));
+    assertTrue(outputJson.contains("\"validUntil\":1700000000000"));
+    assertTrue(outputJson.contains("\"owner\":\"testOwner\""));
+  }
+
+  @Test
+  void testToJsonStreamErrorHandling() throws IOException {
+    OutputStream mockOutputStream = mock(OutputStream.class);
+
+    doThrow(new IOException("Simulated IOException"))
+        .when(mockOutputStream)
+        .write(any(byte[].class), anyInt(), anyInt());
+    StorageLockFile file = new StorageLockFile(
+        new StorageLockData(true, System.currentTimeMillis() + 1000, 
"testOwner"),
+        VERSION_ID);
+
+    HoodieIOException exception = assertThrows(HoodieIOException.class, () -> 
file.writeToStream(mockOutputStream));
+    assertTrue(exception.getMessage().contains("Error writing object to 
JSON"));
+  }
+
+  @Test
+  void testToByteArrayValidData() {
+    StorageLockData data = new StorageLockData(false, 1700000000000L, 
"testOwner");
+    String outputJson = new String(StorageLockFile.toByteArray(data));
+    assertTrue(outputJson.contains("\"expired\":false"));
+    assertTrue(outputJson.contains("\"validUntil\":1700000000000"));
+    assertTrue(outputJson.contains("\"owner\":\"testOwner\""));
+  }
+
+  @Test
+  void testIsExpired() {
+    StorageLockData data = new StorageLockData(true, 
System.currentTimeMillis() - 1000, "testOwner");
+    StorageLockFile file = new StorageLockFile(data, VERSION_ID);
+    assertTrue(file.isExpired());
+  }
+
+  @Test
+  void testGetVersionId() {
+    StorageLockData data = new StorageLockData(false, 1700000000000L, 
"testOwner");
+    StorageLockFile file = new StorageLockFile(data, VERSION_ID);
+    assertEquals(VERSION_ID, file.getVersionId());
+  }
+}

Reply via email to