This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new 34dbfefe316d [HUDI-8490] Adding DynamoDB Implicit Lock Provider 
support (#17843)
34dbfefe316d is described below

commit 34dbfefe316dedd20e25617d2989f72d5d52780f
Author: Lokesh Jain <[email protected]>
AuthorDate: Thu Feb 12 08:29:35 2026 +0530

    [HUDI-8490] Adding DynamoDB Implicit Lock Provider support (#17843)
    
    
    
    ---------
    
    Co-authored-by: Davis-Zhang-Onehouse 
<[email protected]>
    Co-authored-by: Nicolas Paris <[email protected]>
    Co-authored-by: Vova Kolmakov <[email protected]>
    Co-authored-by: Vova Kolmakov <[email protected]>
    Co-authored-by: Tim Brown <[email protected]>
    Co-authored-by: voonhous <[email protected]>
    Co-authored-by: Lokesh Jain <[email protected]>
---
 ...amoDBBasedImplicitPartitionKeyLockProvider.java |  74 +++++++
 .../lock/DynamoDBBasedLockProvider.java            | 185 ++--------------
 ...der.java => DynamoDBBasedLockProviderBase.java} |  98 +++++----
 .../hudi/config/DynamoDbBasedLockConfig.java       |  40 +---
 .../integ/ITTestDynamoDBBasedLockProvider.java     | 237 +++++++++++++++------
 .../lock/DynamoDBBasedLockProviderBaseTest.java    |  83 ++++++++
 ...er.java => BaseZookeeperBasedLockProvider.java} |  80 +++----
 ...ZookeeperBasedImplicitBasePathLockProvider.java |  79 +++++++
 .../lock/ZookeeperBasedLockProvider.java           | 180 ++--------------
 .../TestZookeeperBasedLockProvider.java            | 106 +++++++--
 .../org/apache/hudi/common/util/hash/HashID.java   |  22 +-
 .../apache/hudi/common/util/hash/TestHashID.java   |  40 +++-
 .../src/test/resources/hash/magic_input.txt        |  10 +
 .../hash/xxhash_BITS_128_for_magic_input.txt       |  10 +
 .../hash/xxhash_BITS_32_for_magic_input.txt        |  10 +
 .../hash/xxhash_BITS_64_for_magic_input.txt        |  10 +
 .../org/apache/hudi/common/util/StringUtils.java   |  79 +++++++
 .../apache/hudi/common/util/TestStringUtils.java   |  92 +++++++-
 .../lock/HiveMetastoreBasedLockProvider.java       |   9 +-
 scripts/release/validate_source_copyright.sh       |   4 +-
 20 files changed, 885 insertions(+), 563 deletions(-)

diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedImplicitPartitionKeyLockProvider.java
 
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedImplicitPartitionKeyLockProvider.java
new file mode 100644
index 000000000000..0a5ff98fcabd
--- /dev/null
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedImplicitPartitionKeyLockProvider.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.aws.transaction.lock;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.util.hash.HashID;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hudi.common.util.StringUtils;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static org.apache.hudi.common.fs.FSUtils.s3aToS3;
+
+/**
+ * A DynamoDB based lock.
+ * It implicitly derives the partition key from the hudi table name and hudi 
table base path
+ * available in the lock configuration.
+ */
+@NotThreadSafe
+public class DynamoDBBasedImplicitPartitionKeyLockProvider extends 
DynamoDBBasedLockProviderBase {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(DynamoDBBasedImplicitPartitionKeyLockProvider.class);
+
+  private final String hudiTableBasePath;
+
+  public DynamoDBBasedImplicitPartitionKeyLockProvider(final LockConfiguration 
lockConfiguration, final StorageConfiguration<?> conf) {
+    this(lockConfiguration, conf, null);
+  }
+
+  public DynamoDBBasedImplicitPartitionKeyLockProvider(
+      final LockConfiguration lockConfiguration, final StorageConfiguration<?> 
conf, DynamoDbClient dynamoDB) {
+    super(lockConfiguration, conf, dynamoDB);
+    hudiTableBasePath = 
s3aToS3(lockConfiguration.getConfig().getString(HoodieCommonConfig.BASE_PATH.key()));
+  }
+
+  @Override
+  public String getDynamoDBPartitionKey(LockConfiguration lockConfiguration) {
+    // Ensure consistent format for S3 URI.
+    String hudiTableBasePathNormalized = 
s3aToS3(lockConfiguration.getConfig().getString(
+        HoodieCommonConfig.BASE_PATH.key()));
+    String partitionKey = 
HashID.generateXXHashAsString(hudiTableBasePathNormalized, HashID.Size.BITS_64);
+    LOG.info(String.format("The DynamoDB partition key of the lock provider 
for the base path %s is %s",
+        hudiTableBasePathNormalized, partitionKey));
+    return partitionKey;
+  }
+
+  @Override
+  protected String generateLogSuffixString() {
+    return StringUtils.join("DynamoDb table = ", tableName,
+        ", partition key = ", dynamoDBPartitionKey,
+        ", hudi table base path = ", hudiTableBasePath);
+  }
+}
diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
 
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
index 2b67a483f383..d7a72ffc54e6 100644
--- 
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
@@ -18,199 +18,38 @@
 
 package org.apache.hudi.aws.transaction.lock;
 
-import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory;
-import org.apache.hudi.aws.utils.DynamoTableUtils;
 import org.apache.hudi.common.config.LockConfiguration;
-import org.apache.hudi.common.lock.LockProvider;
-import org.apache.hudi.common.lock.LockState;
-import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.DynamoDbBasedLockConfig;
-import org.apache.hudi.exception.HoodieLockException;
 import org.apache.hudi.storage.StorageConfiguration;
 
-import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
-import com.amazonaws.services.dynamodbv2.LockItem;
-import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
-import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
-import software.amazon.awssdk.services.dynamodb.model.BillingMode;
-import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
-import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
-import software.amazon.awssdk.services.dynamodb.model.KeyType;
-import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
-import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import static 
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY;
 
 /**
- * A DynamoDB based lock. This {@link LockProvider} implementation allows to 
lock table operations
- * using DynamoDB. Users need to have access to AWS DynamoDB to be able to use 
this lock.
+ * A DynamoDB based lock.
+ * It expects partition key is explicitly available in DynamoDbBasedLockConfig.
  */
 @NotThreadSafe
-public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDBBasedLockProvider.class);
-
-  private static final String DYNAMODB_ATTRIBUTE_NAME = "key";
-
-  private final AmazonDynamoDBLockClient client;
-  private final String tableName;
-  private final String dynamoDBPartitionKey;
-  protected final DynamoDbBasedLockConfig dynamoDBLockConfiguration;
-  private volatile LockItem lock;
+public class DynamoDBBasedLockProvider extends DynamoDBBasedLockProviderBase {
 
   public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, 
final StorageConfiguration<?> conf) {
     this(lockConfiguration, conf, null);
   }
 
   public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, 
final StorageConfiguration<?> conf, DynamoDbClient dynamoDB) {
-    this.dynamoDBLockConfiguration = DynamoDbBasedLockConfig.newBuilder()
-        .fromProperties(lockConfiguration.getConfig())
-        .build();
-    this.tableName = 
dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME);
-    this.dynamoDBPartitionKey = 
dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY);
-    long leaseDuration = 
dynamoDBLockConfiguration.getInt(DynamoDbBasedLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY);
-    if (dynamoDB == null) {
-      dynamoDB = getDynamoDBClient();
-    }
-    // build the dynamoDb lock client
-    this.client = new AmazonDynamoDBLockClient(
-        AmazonDynamoDBLockClientOptions.builder(dynamoDB, tableName)
-            .withTimeUnit(TimeUnit.MILLISECONDS)
-            .withLeaseDuration(leaseDuration)
-            .withHeartbeatPeriod(leaseDuration / 3)
-            .withCreateHeartbeatBackgroundThread(true)
-            .build());
-
-    if (!this.client.lockTableExists()) {
-      createLockTableInDynamoDB(dynamoDB, tableName);
-    }
-  }
-
-  @Override
-  public boolean tryLock(long time, TimeUnit unit) {
-    LOG.info(generateLogStatement(LockState.ACQUIRING, 
generateLogSuffixString()));
-    try {
-      lock = 
client.acquireLock(AcquireLockOptions.builder(dynamoDBPartitionKey)
-              .withAdditionalTimeToWaitForLock(time)
-              .withTimeUnit(TimeUnit.MILLISECONDS)
-              .build());
-      LOG.info(generateLogStatement(LockState.ACQUIRED, 
generateLogSuffixString()));
-    } catch (InterruptedException e) {
-      throw new 
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE, 
generateLogSuffixString()), e);
-    } catch (LockNotGrantedException e) {
-      return false;
-    }
-    return lock != null && !lock.isExpired();
+    super(lockConfiguration, conf, dynamoDB);
   }
 
   @Override
-  public void unlock() {
-    try {
-      LOG.info(generateLogStatement(LockState.RELEASING, 
generateLogSuffixString()));
-      if (lock == null) {
-        return;
-      }
-      if (!client.releaseLock(lock)) {
-        LOG.warn("The lock has already been stolen");
-      }
-      lock = null;
-      LOG.info(generateLogStatement(LockState.RELEASED, 
generateLogSuffixString()));
-    } catch (Exception e) {
-      throw new 
HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE, 
generateLogSuffixString()), e);
-    }
-  }
-
-  @Override
-  public void close() {
-    try {
-      if (lock != null) {
-        if (!client.releaseLock(lock)) {
-          LOG.warn("The lock has already been stolen");
-        }
-        lock = null;
-      }
-      this.client.close();
-    } catch (Exception e) {
-      LOG.error(generateLogStatement(LockState.FAILED_TO_RELEASE, 
generateLogSuffixString()));
-    }
-  }
-
-  @Override
-  public LockItem getLock() {
-    return lock;
-  }
-
-  private DynamoDbClient getDynamoDBClient() {
-    String region = 
this.dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION);
-    String endpointURL = 
this.dynamoDBLockConfiguration.contains(DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL.key())
-            ? 
this.dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL)
-            : 
DynamoDbClient.serviceMetadata().endpointFor(Region.of(region)).toString();
-
-    if (!endpointURL.startsWith("https://";) && 
!endpointURL.startsWith("http://";)) {
-      endpointURL = "https://"; + endpointURL;
-    }
-
-    return DynamoDbClient.builder()
-            .endpointOverride(URI.create(endpointURL))
-            
.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(dynamoDBLockConfiguration.getProps()))
-            .build();
-  }
-
-  private void createLockTableInDynamoDB(DynamoDbClient dynamoDB, String 
tableName) {
-    String billingMode = 
dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE);
-    KeySchemaElement partitionKeyElement = KeySchemaElement
-            .builder()
-            .attributeName(DYNAMODB_ATTRIBUTE_NAME)
-            .keyType(KeyType.HASH)
-            .build();
-
-    List<KeySchemaElement> keySchema = new ArrayList<>();
-    keySchema.add(partitionKeyElement);
-
-    Collection<AttributeDefinition> attributeDefinitions = new ArrayList<>();
-    
attributeDefinitions.add(AttributeDefinition.builder().attributeName(DYNAMODB_ATTRIBUTE_NAME).attributeType(ScalarAttributeType.S).build());
-    CreateTableRequest.Builder createTableRequestBuilder = 
CreateTableRequest.builder();
-    if (billingMode.equals(BillingMode.PROVISIONED.name())) {
-      
createTableRequestBuilder.provisionedThroughput(ProvisionedThroughput.builder()
-              
.readCapacityUnits(dynamoDBLockConfiguration.getLong(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY))
-              
.writeCapacityUnits(dynamoDBLockConfiguration.getLong(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY))
-              .build());
-    }
-    createTableRequestBuilder.tableName(tableName)
-            .keySchema(keySchema)
-            .attributeDefinitions(attributeDefinitions)
-            .billingMode(billingMode);
-    dynamoDB.createTable(createTableRequestBuilder.build());
-
-    LOG.info("Creating dynamoDB table " + tableName + ", waiting for table to 
be active");
-    try {
-
-      DynamoTableUtils.waitUntilActive(dynamoDB, tableName, 
dynamoDBLockConfiguration.getInt(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT),
 20 * 1000);
-    } catch (DynamoTableUtils.TableNeverTransitionedToStateException e) {
-      throw new HoodieLockException("Created dynamoDB table never transits to 
active", e);
-    } catch (InterruptedException e) {
-      throw new HoodieLockException("Thread interrupted while waiting for 
dynamoDB table to turn active", e);
-    }
-    LOG.info("Created dynamoDB table " + tableName);
-  }
-
-  private String generateLogSuffixString() {
-    return StringUtils.join("DynamoDb table = ", tableName, ", partition key = 
", dynamoDBPartitionKey);
-  }
-
-  protected String generateLogStatement(LockState state, String suffix) {
-    return StringUtils.join(state.name(), " lock at ", suffix);
+  public String getDynamoDBPartitionKey(LockConfiguration lockConfiguration) {
+    DynamoDbBasedLockConfig config = 
DynamoDbBasedLockConfig.from(lockConfiguration.getConfig());
+    ValidationUtils.checkArgument(
+        config.contains(DYNAMODB_LOCK_PARTITION_KEY),
+        "Config key is not found: " + DYNAMODB_LOCK_PARTITION_KEY.key());
+    return config.getString(DYNAMODB_LOCK_PARTITION_KEY);
   }
 }
diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
 
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java
similarity index 66%
copy from 
hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
copy to 
hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java
index 2b67a483f383..3a1677d51749 100644
--- 
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java
@@ -47,42 +47,44 @@ import 
software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
+import java.io.Serializable;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL;
+import static 
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE;
+import static 
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY;
+import static 
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION;
+import static 
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT;
+import static 
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY;
+
 /**
  * A DynamoDB based lock. This {@link LockProvider} implementation allows to 
lock table operations
  * using DynamoDB. Users need to have access to AWS DynamoDB to be able to use 
this lock.
  */
 @NotThreadSafe
-public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDBBasedLockProvider.class);
+public abstract class DynamoDBBasedLockProviderBase implements 
LockProvider<LockItem>, Serializable {
 
-  private static final String DYNAMODB_ATTRIBUTE_NAME = "key";
+  protected static final Logger LOG = 
LoggerFactory.getLogger(DynamoDBBasedLockProviderBase.class);
 
-  private final AmazonDynamoDBLockClient client;
-  private final String tableName;
-  private final String dynamoDBPartitionKey;
-  protected final DynamoDbBasedLockConfig dynamoDBLockConfiguration;
-  private volatile LockItem lock;
+  protected static final String DYNAMODB_ATTRIBUTE_NAME = "key";
 
-  public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, 
final StorageConfiguration<?> conf) {
-    this(lockConfiguration, conf, null);
-  }
+  protected final DynamoDbBasedLockConfig dynamoDbBasedLockConfig;
+  protected final transient AmazonDynamoDBLockClient client;
+  protected final String tableName;
+  protected final String dynamoDBPartitionKey;
+  protected volatile LockItem lock;
 
-  public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, 
final StorageConfiguration<?> conf, DynamoDbClient dynamoDB) {
-    this.dynamoDBLockConfiguration = DynamoDbBasedLockConfig.newBuilder()
-        .fromProperties(lockConfiguration.getConfig())
-        .build();
-    this.tableName = 
dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME);
-    this.dynamoDBPartitionKey = 
dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY);
-    long leaseDuration = 
dynamoDBLockConfiguration.getInt(DynamoDbBasedLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY);
+  protected DynamoDBBasedLockProviderBase(final LockConfiguration 
lockConfiguration, final StorageConfiguration<?> conf, DynamoDbClient dynamoDB) 
{
+    this.dynamoDbBasedLockConfig = 
DynamoDbBasedLockConfig.from(lockConfiguration.getConfig());
+    this.tableName = 
dynamoDbBasedLockConfig.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME);
+    long leaseDuration = 
dynamoDbBasedLockConfig.getInt(DynamoDbBasedLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY);
+    dynamoDBPartitionKey = getDynamoDBPartitionKey(lockConfiguration);
     if (dynamoDB == null) {
-      dynamoDB = getDynamoDBClient();
+      dynamoDB = getDynamoDBClient(dynamoDbBasedLockConfig);
     }
     // build the dynamoDb lock client
     this.client = new AmazonDynamoDBLockClient(
@@ -98,14 +100,20 @@ public class DynamoDBBasedLockProvider implements 
LockProvider<LockItem> {
     }
   }
 
+  public abstract String getDynamoDBPartitionKey(LockConfiguration 
lockConfiguration);
+
+  public String getPartitionKey() {
+    return dynamoDBPartitionKey;
+  }
+
   @Override
   public boolean tryLock(long time, TimeUnit unit) {
     LOG.info(generateLogStatement(LockState.ACQUIRING, 
generateLogSuffixString()));
     try {
       lock = 
client.acquireLock(AcquireLockOptions.builder(dynamoDBPartitionKey)
-              .withAdditionalTimeToWaitForLock(time)
-              .withTimeUnit(TimeUnit.MILLISECONDS)
-              .build());
+          .withAdditionalTimeToWaitForLock(time)
+          .withTimeUnit(TimeUnit.MILLISECONDS)
+          .build());
       LOG.info(generateLogStatement(LockState.ACQUIRED, 
generateLogSuffixString()));
     } catch (InterruptedException e) {
       throw new 
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE, 
generateLogSuffixString()), e);
@@ -152,29 +160,29 @@ public class DynamoDBBasedLockProvider implements 
LockProvider<LockItem> {
     return lock;
   }
 
-  private DynamoDbClient getDynamoDBClient() {
-    String region = 
this.dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION);
-    String endpointURL = 
this.dynamoDBLockConfiguration.contains(DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL.key())
-            ? 
this.dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL)
-            : 
DynamoDbClient.serviceMetadata().endpointFor(Region.of(region)).toString();
+  private static DynamoDbClient getDynamoDBClient(DynamoDbBasedLockConfig 
dynamoDbBasedLockConfig) {
+    String region = dynamoDbBasedLockConfig.getString(DYNAMODB_LOCK_REGION);
+    String endpointURL = 
dynamoDbBasedLockConfig.contains(DYNAMODB_ENDPOINT_URL.key())
+        ? dynamoDbBasedLockConfig.getString(DYNAMODB_ENDPOINT_URL)
+        : 
DynamoDbClient.serviceMetadata().endpointFor(Region.of(region)).toString();
 
-    if (!endpointURL.startsWith("https://";) && 
!endpointURL.startsWith("http://";)) {
+    if (!endpointURL.startsWith("https://";) || 
!endpointURL.startsWith("http://";)) {
       endpointURL = "https://"; + endpointURL;
     }
 
     return DynamoDbClient.builder()
-            .endpointOverride(URI.create(endpointURL))
-            
.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(dynamoDBLockConfiguration.getProps()))
-            .build();
+        .endpointOverride(URI.create(endpointURL))
+        
.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(dynamoDbBasedLockConfig.getProps()))
+        .build();
   }
 
   private void createLockTableInDynamoDB(DynamoDbClient dynamoDB, String 
tableName) {
-    String billingMode = 
dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE);
+    String billingMode = 
dynamoDbBasedLockConfig.getString(DYNAMODB_LOCK_BILLING_MODE);
     KeySchemaElement partitionKeyElement = KeySchemaElement
-            .builder()
-            .attributeName(DYNAMODB_ATTRIBUTE_NAME)
-            .keyType(KeyType.HASH)
-            .build();
+        .builder()
+        .attributeName(DYNAMODB_ATTRIBUTE_NAME)
+        .keyType(KeyType.HASH)
+        .build();
 
     List<KeySchemaElement> keySchema = new ArrayList<>();
     keySchema.add(partitionKeyElement);
@@ -184,20 +192,20 @@ public class DynamoDBBasedLockProvider implements 
LockProvider<LockItem> {
     CreateTableRequest.Builder createTableRequestBuilder = 
CreateTableRequest.builder();
     if (billingMode.equals(BillingMode.PROVISIONED.name())) {
       
createTableRequestBuilder.provisionedThroughput(ProvisionedThroughput.builder()
-              
.readCapacityUnits(dynamoDBLockConfiguration.getLong(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY))
-              
.writeCapacityUnits(dynamoDBLockConfiguration.getLong(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY))
-              .build());
+          
.readCapacityUnits(dynamoDbBasedLockConfig.getLong(DYNAMODB_LOCK_READ_CAPACITY))
+          
.writeCapacityUnits(dynamoDbBasedLockConfig.getLong(DYNAMODB_LOCK_WRITE_CAPACITY))
+          .build());
     }
     createTableRequestBuilder.tableName(tableName)
-            .keySchema(keySchema)
-            .attributeDefinitions(attributeDefinitions)
-            .billingMode(billingMode);
+        .keySchema(keySchema)
+        .attributeDefinitions(attributeDefinitions)
+        .billingMode(billingMode);
     dynamoDB.createTable(createTableRequestBuilder.build());
 
     LOG.info("Creating dynamoDB table " + tableName + ", waiting for table to 
be active");
     try {
 
-      DynamoTableUtils.waitUntilActive(dynamoDB, tableName, 
dynamoDBLockConfiguration.getInt(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT),
 20 * 1000);
+      DynamoTableUtils.waitUntilActive(dynamoDB, tableName, 
dynamoDbBasedLockConfig.getInt(DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT), 20 * 
1000);
     } catch (DynamoTableUtils.TableNeverTransitionedToStateException e) {
       throw new HoodieLockException("Created dynamoDB table never transits to 
active", e);
     } catch (InterruptedException e) {
@@ -206,7 +214,7 @@ public class DynamoDBBasedLockProvider implements 
LockProvider<LockItem> {
     LOG.info("Created dynamoDB table " + tableName);
   }
 
-  private String generateLogSuffixString() {
+  protected String generateLogSuffixString() {
     return StringUtils.join("DynamoDb table = ", tableName, ", partition key = 
", dynamoDBPartitionKey);
   }
 
diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java 
b/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
index 5639db025825..7e90c36f2cdb 100644
--- a/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
+++ b/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
@@ -43,10 +43,6 @@ import 
software.amazon.awssdk.services.dynamodb.model.BillingMode;
         + " are auto managed internally.")
 public class DynamoDbBasedLockConfig extends HoodieConfig {
 
-  public static DynamoDbBasedLockConfig.Builder newBuilder() {
-    return new DynamoDbBasedLockConfig.Builder();
-  }
-
   // configs for DynamoDb based locks
   public static final String DYNAMODB_BASED_LOCK_PROPERTY_PREFIX = 
LockConfiguration.LOCK_PREFIX + "dynamodb.";
 
@@ -132,34 +128,12 @@ public class DynamoDbBasedLockConfig extends HoodieConfig 
{
       .sinceVersion("0.10.0")
       .withDocumentation("Lock Acquire Wait Timeout in milliseconds");
 
-  /**
-   * Builder for {@link DynamoDbBasedLockConfig}.
-   */
-  public static class Builder {
-    private final DynamoDbBasedLockConfig lockConfig = new 
DynamoDbBasedLockConfig();
-
-    public DynamoDbBasedLockConfig build() {
-      lockConfig.setDefaults(DynamoDbBasedLockConfig.class.getName());
-      checkRequiredProps(lockConfig);
-      return lockConfig;
-    }
-
-    public Builder fromProperties(TypedProperties props) {
-      lockConfig.getProps().putAll(props);
-      return this;
-    }
-
-    private void checkRequiredProps(final DynamoDbBasedLockConfig config) {
-      String errorMsg = "Config key is not found: ";
-      ValidationUtils.checkArgument(
-          config.contains(DYNAMODB_LOCK_TABLE_NAME.key()),
-          errorMsg + DYNAMODB_LOCK_TABLE_NAME.key());
-      ValidationUtils.checkArgument(
-          config.contains(DYNAMODB_LOCK_REGION.key()),
-          errorMsg + DYNAMODB_LOCK_REGION.key());
-      ValidationUtils.checkArgument(
-          config.contains(DYNAMODB_LOCK_PARTITION_KEY.key()),
-          errorMsg + DYNAMODB_LOCK_PARTITION_KEY.key());
-    }
+  public static DynamoDbBasedLockConfig from(TypedProperties properties) {
+    DynamoDbBasedLockConfig config = new DynamoDbBasedLockConfig();
+    config.getProps().putAll(properties);
+    config.setDefaults(DynamoDbBasedLockConfig.class.getName());
+    
ValidationUtils.checkArgument(config.contains(DYNAMODB_LOCK_TABLE_NAME.key()), 
"Config key is not found: " + DYNAMODB_LOCK_TABLE_NAME.key());
+    ValidationUtils.checkArgument(config.contains(DYNAMODB_LOCK_REGION.key()), 
"Config key is not found: " + DYNAMODB_LOCK_REGION.key());
+    return config;
   }
 }
diff --git 
a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java
 
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java
index b874f4f3c3cc..6a7d9ef78aab 100644
--- 
a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java
+++ 
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java
@@ -18,7 +18,24 @@
 
 package org.apache.hudi.aws.transaction.integ;
 
+import 
org.apache.hudi.aws.transaction.lock.DynamoDBBasedImplicitPartitionKeyLockProvider;
+import org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider;
+import org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProviderBase;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.hash.HashID;
+import org.apache.hudi.config.DynamoDbBasedLockConfig;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
@@ -26,18 +43,11 @@ import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
 import software.amazon.awssdk.services.dynamodb.model.BillingMode;
 
-import org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider;
-import org.apache.hudi.common.config.LockConfiguration;
-import org.apache.hudi.config.DynamoDbBasedLockConfig;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-
 import java.net.URI;
 import java.util.UUID;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
 
@@ -48,62 +58,70 @@ import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_
 @Disabled("HUDI-7475 The tests do not work. Disabling them to unblock Azure 
CI")
 public class ITTestDynamoDBBasedLockProvider {
 
-  private static LockConfiguration lockConfiguration;
-  private static DynamoDbClient dynamoDb;
-
+  private static final LockConfiguration LOCK_CONFIGURATION;
+  private static final LockConfiguration 
IMPLICIT_PART_KEY_LOCK_CONFIG_NO_BASE_PATH;
+  private static final LockConfiguration 
IMPLICIT_PART_KEY_LOCK_CONFIG_WITH_PART_KEY;
+  private static final LockConfiguration 
IMPLICIT_PART_KEY_LOCK_CONFIG_NO_TBL_NAME;
+  private static final LockConfiguration IMPLICIT_PART_KEY_LOCK_CONFIG;
   private static final String TABLE_NAME_PREFIX = "testDDBTable-";
   private static final String REGION = "us-east-2";
+  private static DynamoDbClient dynamoDb;
+  private static String SCHEME_S3 = "s3";
+  private static String SCHEME_S3A = "s3a";
+  private static String URI_NO_CLOUD_PROVIDER_PREFIX = 
"://my-bucket-8b2a4b30/1718662238400/be715573/my_lake/my_table";
+
+  static {
+    Properties dynamoDblpProps = new Properties();
+    Properties implicitPartKeyLpProps;
+    // Common properties shared by both lock providers.
+    
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key(),
 BillingMode.PAY_PER_REQUEST.name());
+    
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key(),
 Integer.toString(20 * 1000 * 5));
+    
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key(), 
REGION);
+    dynamoDblpProps.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
+    
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key(),
 "0");
+    
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key(),
 "0");
+
+    implicitPartKeyLpProps = new TypedProperties(dynamoDblpProps);
+    dynamoDblpProps = new TypedProperties(dynamoDblpProps);
+
+    // For the day-1 DynamoDbBasedLockProvider, it requires an optional 
partition key
+    
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key(),
 "testKey");
+    LOCK_CONFIGURATION = new LockConfiguration(dynamoDblpProps);
+
+    // For the newly added implicit partition key DDB lock provider, it can 
derive the partition key from
+    // hudi table base path and hudi table name. These properties are 
available in lockConfig as of today.
+    implicitPartKeyLpProps.setProperty(
+        HoodieCommonConfig.BASE_PATH.key(),
+        "gs://my-bucket-8b2a4b30/1718662238400/be715573/my_lake/my_table");
+    implicitPartKeyLpProps.setProperty(
+        HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "ma_po_tofu_is_awesome");
+    IMPLICIT_PART_KEY_LOCK_CONFIG = new 
LockConfiguration(implicitPartKeyLpProps);
+    // With partition key nothing should break, the field should be simply 
ignored.
+    TypedProperties withPartKey = new TypedProperties(implicitPartKeyLpProps);
+    
withPartKey.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key(),
 "testKey");
+    IMPLICIT_PART_KEY_LOCK_CONFIG_WITH_PART_KEY = new 
LockConfiguration(withPartKey);
+
+    // Missing either base path or hoodie table name is a bad config for 
implicit partition key lock provider.
+    TypedProperties missingBasePath = new 
TypedProperties(implicitPartKeyLpProps);
+    missingBasePath.remove(HoodieCommonConfig.BASE_PATH.key());
+    IMPLICIT_PART_KEY_LOCK_CONFIG_NO_BASE_PATH = new 
LockConfiguration(missingBasePath);
+
+    TypedProperties missingTableName = new 
TypedProperties(implicitPartKeyLpProps);
+    missingTableName.remove(HoodieTableConfig.HOODIE_TABLE_NAME_KEY);
+    IMPLICIT_PART_KEY_LOCK_CONFIG_NO_TBL_NAME = new 
LockConfiguration(missingTableName);
+  }
 
   @BeforeAll
   public static void setup() throws InterruptedException {
-    Properties properties = new Properties();
-    
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key(),
 BillingMode.PAY_PER_REQUEST.name());
-    // properties.setProperty(AWSLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), 
TABLE_NAME_PREFIX);
-    
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key(),
 "testKey");
-    properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key(), 
REGION);
-    properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
-    
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key(),
 "0");
-    
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key(),
 "0");
-    lockConfiguration = new LockConfiguration(properties);
     dynamoDb = getDynamoClientWithLocalEndpoint();
   }
 
-  @Test
-  public void testAcquireLock() {
-    
lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
 TABLE_NAME_PREFIX + UUID.randomUUID());
-    DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new 
DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
-    
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
-            .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
-    dynamoDbBasedLockProvider.unlock();
-  }
-
-  @Test
-  public void testUnlock() {
-    
lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
 TABLE_NAME_PREFIX + UUID.randomUUID());
-    DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new 
DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
-    
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
-            .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
-    dynamoDbBasedLockProvider.unlock();
-    
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
-            .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
-  }
-
-  @Test
-  public void testReentrantLock() {
-    
lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
 TABLE_NAME_PREFIX + UUID.randomUUID());
-    DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new 
DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
-    
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
-            .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
-    
Assertions.assertFalse(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
-            .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
-    dynamoDbBasedLockProvider.unlock();
-  }
-
-  @Test
-  public void testUnlockWithoutLock() {
-    
lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
 TABLE_NAME_PREFIX + UUID.randomUUID());
-    DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new 
DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
-    dynamoDbBasedLockProvider.unlock();
+  public static Stream<Object> badTestDimensions() {
+    return Stream.of(
+        Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG_NO_TBL_NAME, 
DynamoDBBasedLockProvider.class),
+        Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG_NO_TBL_NAME, 
DynamoDBBasedImplicitPartitionKeyLockProvider.class),
+        Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG_NO_BASE_PATH, 
DynamoDBBasedImplicitPartitionKeyLockProvider.class)
+    );
   }
 
   private static DynamoDbClient getDynamoClientWithLocalEndpoint() {
@@ -112,13 +130,112 @@ public class ITTestDynamoDBBasedLockProvider {
       throw new IllegalStateException("dynamodb-local.endpoint system property 
not set");
     }
     return DynamoDbClient.builder()
-            .region(Region.of(REGION))
-            .endpointOverride(URI.create(endpoint))
-            .credentialsProvider(getCredentials())
-            .build();
+        .region(Region.of(REGION))
+        .endpointOverride(URI.create(endpoint))
+        .credentialsProvider(getCredentials())
+        .build();
   }
 
   private static AwsCredentialsProvider getCredentials() {
     return 
StaticCredentialsProvider.create(AwsBasicCredentials.create("random-access-key",
 "random-secret-key"));
   }
+
+  @ParameterizedTest
+  @MethodSource("badTestDimensions")
+  void testBadConfig(LockConfiguration lockConfig, Class<?> lockProviderClass) 
{
+    
lockConfig.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
 TABLE_NAME_PREFIX + UUID.randomUUID());
+    Exception e = new Exception();
+    try {
+      ReflectionUtils.loadClass(
+          lockProviderClass.getName(),
+          new Class<?>[] {LockConfiguration.class, StorageConfiguration.class, 
DynamoDbClient.class},
+          lockConfig, null, dynamoDb);
+    } catch (Exception ex) {
+      e = ex;
+    }
+    Assertions.assertEquals(IllegalArgumentException.class, 
e.getCause().getCause().getClass());
+  }
+
+  public static Stream<Arguments> testDimensions() {
+    return Stream.of(
+        // Without partition key, only table name is used.
+        Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG, 
DynamoDBBasedLockProvider.class),
+        // Even if we have partition key set, nothing would break.
+        Arguments.of(LOCK_CONFIGURATION, DynamoDBBasedLockProvider.class),
+        // Even if we have partition key set, nothing would break.
+        Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG_WITH_PART_KEY, 
DynamoDBBasedImplicitPartitionKeyLockProvider.class),
+        Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG, 
DynamoDBBasedImplicitPartitionKeyLockProvider.class)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("testDimensions")
+  void testAcquireLock(LockConfiguration lockConfig, Class<?> 
lockProviderClass) {
+    
lockConfig.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
 TABLE_NAME_PREFIX + UUID.randomUUID());
+    DynamoDBBasedLockProviderBase dynamoDbBasedLockProvider = 
(DynamoDBBasedLockProviderBase) ReflectionUtils.loadClass(
+        lockProviderClass.getName(),
+        new Class<?>[] {LockConfiguration.class, StorageConfiguration.class, 
DynamoDbClient.class},
+        new Object[] {lockConfig, null, dynamoDb});
+
+    // Also validate the partition key is properly constructed.
+    if (lockProviderClass.equals(DynamoDBBasedLockProvider.class)) {
+      String tableName = (String) 
lockConfig.getConfig().get(HoodieTableConfig.HOODIE_TABLE_NAME_KEY);
+      String partitionKey = (String) 
lockConfig.getConfig().get(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key());
+      if (partitionKey != null) {
+        Assertions.assertEquals(partitionKey, 
dynamoDbBasedLockProvider.getPartitionKey());
+      } else {
+        Assertions.assertEquals(tableName, 
dynamoDbBasedLockProvider.getPartitionKey());
+      }
+    } else if 
(lockProviderClass.equals(DynamoDBBasedImplicitPartitionKeyLockProvider.class)) 
{
+      String tableName = (String) 
lockConfig.getConfig().get(HoodieTableConfig.HOODIE_TABLE_NAME_KEY);
+      String basePath = (String) 
lockConfig.getConfig().get(HoodieCommonConfig.BASE_PATH.key());
+      // Base path is constructed with prefix s3a, verify that for partition 
key calculation, s3a is replaced with s3
+      Assertions.assertTrue(basePath.startsWith(SCHEME_S3A));
+      // Verify base path only scheme partition key
+      Assertions.assertEquals(
+          HashID.generateXXHashAsString(SCHEME_S3 + 
URI_NO_CLOUD_PROVIDER_PREFIX, HashID.Size.BITS_64),
+          dynamoDbBasedLockProvider.getPartitionKey());
+    }
+
+    // Test lock acquisition and release
+    
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfig.getConfig().getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
 TimeUnit.MILLISECONDS));
+    dynamoDbBasedLockProvider.unlock();
+  }
+
+  @ParameterizedTest
+  @MethodSource("testDimensions")
+  void testUnlock(LockConfiguration lockConfig, Class<?> lockProviderClass) {
+    
lockConfig.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
 TABLE_NAME_PREFIX + UUID.randomUUID());
+    DynamoDBBasedLockProviderBase dynamoDbBasedLockProvider = 
(DynamoDBBasedLockProviderBase) ReflectionUtils.loadClass(
+        lockProviderClass.getName(),
+        new Class<?>[] {LockConfiguration.class, StorageConfiguration.class, 
DynamoDbClient.class},
+        new Object[] {lockConfig, null, dynamoDb});
+    
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfig.getConfig().getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
 TimeUnit.MILLISECONDS));
+    dynamoDbBasedLockProvider.unlock();
+    
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfig.getConfig().getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
 TimeUnit.MILLISECONDS));
+  }
+
+  @ParameterizedTest
+  @MethodSource("testDimensions")
+  void testReentrantLock(LockConfiguration lockConfig, Class<?> 
lockProviderClass) {
+    
lockConfig.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
 TABLE_NAME_PREFIX + UUID.randomUUID());
+    DynamoDBBasedLockProviderBase dynamoDbBasedLockProvider = 
(DynamoDBBasedLockProviderBase) ReflectionUtils.loadClass(
+        lockProviderClass.getName(),
+        new Class<?>[] {LockConfiguration.class, StorageConfiguration.class, 
DynamoDbClient.class},
+        new Object[] {lockConfig, null, dynamoDb});
+    
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfig.getConfig().getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
 TimeUnit.MILLISECONDS));
+    
Assertions.assertFalse(dynamoDbBasedLockProvider.tryLock(lockConfig.getConfig().getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
 TimeUnit.MILLISECONDS));
+    dynamoDbBasedLockProvider.unlock();
+  }
+
+  @ParameterizedTest
+  @MethodSource("testDimensions")
+  void testUnlockWithoutLock(LockConfiguration lockConfig, Class<?> 
lockProviderClass) {
+    
lockConfig.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
 TABLE_NAME_PREFIX + UUID.randomUUID());
+    DynamoDBBasedLockProviderBase dynamoDbBasedLockProvider = 
(DynamoDBBasedLockProviderBase) ReflectionUtils.loadClass(
+        lockProviderClass.getName(),
+        new Class<?>[] {LockConfiguration.class, StorageConfiguration.class, 
DynamoDbClient.class},
+        new Object[] {lockConfig, null, dynamoDb});
+    dynamoDbBasedLockProvider.unlock();
+  }
 }
diff --git 
a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBaseTest.java
 
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBaseTest.java
new file mode 100644
index 000000000000..9e7256c3c84f
--- /dev/null
+++ 
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBaseTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.aws.transaction.lock;
+
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.config.DynamoDbBasedLockConfig;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+
+import java.util.Properties;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+
+class DynamoDBBasedLockProviderBaseTest {
+  private static final LockConfiguration LOCK_CONFIGURATION;
+  @Mock
+  private static DynamoDbClient mockClient = new DynamoDbClient() {
+    @Override
+    public String serviceName() {
+      return "";
+    }
+
+    @Override
+    public void close() {
+
+    }
+  };
+
+  static {
+    Properties dynamoDblpProps = new TypedProperties();
+    
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key(),
 BillingMode.PAY_PER_REQUEST.name());
+    
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key(),
 Integer.toString(20 * 1000 * 5));
+    
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key(), 
"us-east-2");
+    
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
 "my-table");
+    dynamoDblpProps.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
+    
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key(),
 "0");
+    
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key(),
 "0");
+    
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key(),
 "testKey");
+    LOCK_CONFIGURATION = new LockConfiguration(dynamoDblpProps);
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testLockProviderBaseInitialization(boolean isNull) {
+    Exception e = null;
+    try {
+      new DynamoDBBasedLockProvider(LOCK_CONFIGURATION, new 
HadoopStorageConfiguration(true), isNull ? null : mockClient);
+    } catch (Exception ex) {
+      e = ex;
+    }
+    Assertions.assertNotNull(e);
+    if (isNull) {
+      // Initialization should fail on AWS API call due to invalid setup.
+      
Assertions.assertEquals(software.amazon.awssdk.core.exception.SdkClientException.class,
 e.getClass());
+    } else {
+      // Otherwise it should be anything but NPE.
+      Assertions.assertNotEquals(java.lang.NullPointerException.class, 
e.getClass());
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
similarity index 64%
copy from 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
copy to 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
index 02f137b509a6..807226460a4a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client.transaction.lock;
 import org.apache.hudi.common.config.LockConfiguration;
 import org.apache.hudi.common.lock.LockProvider;
 import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieLockException;
@@ -28,7 +29,6 @@ import org.apache.hudi.storage.StorageConfiguration;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.apache.curator.retry.BoundedExponentialBackoffRetry;
 import org.apache.zookeeper.KeeperException;
@@ -40,47 +40,57 @@ import javax.annotation.concurrent.NotThreadSafe;
 import java.io.Serializable;
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
-import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
-import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
-import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY;
-import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
-import static 
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP_KEY;
-import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP_KEY;
-import static 
org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP_KEY;
-import static 
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP_KEY;
+import static org.apache.hudi.config.HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES;
+import static 
org.apache.hudi.config.HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS;
+import static 
org.apache.hudi.config.HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS;
+import static org.apache.hudi.config.HoodieLockConfig.ZK_CONNECTION_TIMEOUT_MS;
+import static org.apache.hudi.config.HoodieLockConfig.ZK_CONNECT_URL;
+import static org.apache.hudi.config.HoodieLockConfig.ZK_SESSION_TIMEOUT_MS;
 
 /**
  * A zookeeper based lock. This {@link LockProvider} implementation allows to 
lock table operations
  * using zookeeper. Users need to have a Zookeeper cluster deployed to be able 
to use this lock.
  */
 @NotThreadSafe
-public class ZookeeperBasedLockProvider implements 
LockProvider<InterProcessMutex>, Serializable {
+public abstract class BaseZookeeperBasedLockProvider implements 
LockProvider<InterProcessMutex>, Serializable {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperBasedLockProvider.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseZookeeperBasedLockProvider.class);
 
   private final transient CuratorFramework curatorFrameworkClient;
   private volatile InterProcessMutex lock = null;
-  protected LockConfiguration lockConfiguration;
+  protected final LockConfiguration lockConfiguration;
+  protected final String zkBasePath;
+  protected final String lockKey;
 
-  public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration, 
final StorageConfiguration<?> conf) {
+  public BaseZookeeperBasedLockProvider(final LockConfiguration 
lockConfiguration, final StorageConfiguration<?> conf) {
     checkRequiredProps(lockConfiguration);
     this.lockConfiguration = lockConfiguration;
+    zkBasePath = getZkBasePath(lockConfiguration);
+    lockKey = getLockKey(lockConfiguration);
     this.curatorFrameworkClient = CuratorFrameworkFactory.builder()
-        
.connectString(lockConfiguration.getConfig().getString(ZK_CONNECT_URL_PROP_KEY))
-        .retryPolicy(new 
BoundedExponentialBackoffRetry(lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY),
-            
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY),
 lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY)))
-        
.sessionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_SESSION_TIMEOUT_MS_PROP_KEY,
 DEFAULT_ZK_SESSION_TIMEOUT_MS))
-        
.connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY,
 DEFAULT_ZK_CONNECTION_TIMEOUT_MS))
+        
.connectString(ConfigUtils.getStringWithAltKeys(lockConfiguration.getConfig(), 
ZK_CONNECT_URL))
+        .retryPolicy(new BoundedExponentialBackoffRetry(
+            ConfigUtils.getIntWithAltKeys(lockConfiguration.getConfig(), 
LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS),
+            ConfigUtils.getIntWithAltKeys(lockConfiguration.getConfig(), 
LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS),
+            ConfigUtils.getIntWithAltKeys(lockConfiguration.getConfig(), 
LOCK_ACQUIRE_NUM_RETRIES)))
+        
.sessionTimeoutMs(ConfigUtils.getIntWithAltKeys(lockConfiguration.getConfig(), 
ZK_SESSION_TIMEOUT_MS))
+        
.connectionTimeoutMs(ConfigUtils.getIntWithAltKeys(lockConfiguration.getConfig(),
 ZK_CONNECTION_TIMEOUT_MS))
         .build();
     this.curatorFrameworkClient.start();
     createPathIfNotExists();
   }
 
-  private String getLockPath() {
-    return lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY) + "/"
-        + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY);
+  protected abstract String getZkBasePath(LockConfiguration lockConfiguration);
+
+  protected abstract String getLockKey(LockConfiguration lockConfiguration);
+
+  protected String generateLogSuffixString() {
+    return StringUtils.join("ZkBasePath = ", zkBasePath, ", lock key = ", 
lockKey);
+  }
+
+  protected String getLockPath() {
+    return zkBasePath + '/' + lockKey;
   }
 
   private void createPathIfNotExists() {
@@ -116,21 +126,6 @@ public class ZookeeperBasedLockProvider implements 
LockProvider<InterProcessMute
     }
   }
 
-
-  // Only used for testing
-  public ZookeeperBasedLockProvider(
-      final LockConfiguration lockConfiguration, final CuratorFramework 
curatorFrameworkClient) {
-    checkRequiredProps(lockConfiguration);
-    this.lockConfiguration = lockConfiguration;
-    this.curatorFrameworkClient = curatorFrameworkClient;
-    synchronized (this.curatorFrameworkClient) {
-      if (this.curatorFrameworkClient.getState() != 
CuratorFrameworkState.STARTED) {
-        this.curatorFrameworkClient.start();
-        createPathIfNotExists();
-      }
-    }
-  }
-
   @Override
   public boolean tryLock(long time, TimeUnit unit) {
     LOG.info(generateLogStatement(LockState.ACQUIRING, 
generateLogSuffixString()));
@@ -181,8 +176,7 @@ public class ZookeeperBasedLockProvider implements 
LockProvider<InterProcessMute
   private void acquireLock(long time, TimeUnit unit) throws Exception {
     ValidationUtils.checkArgument(this.lock == null, 
generateLogStatement(LockState.ALREADY_ACQUIRED, generateLogSuffixString()));
     InterProcessMutex newLock = new InterProcessMutex(
-        this.curatorFrameworkClient, 
lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY) + "/"
-        + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY));
+        this.curatorFrameworkClient, getLockPath());
     boolean acquired = newLock.acquire(time, unit);
     if (!acquired) {
       throw new 
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE, 
generateLogSuffixString()));
@@ -196,14 +190,6 @@ public class ZookeeperBasedLockProvider implements 
LockProvider<InterProcessMute
 
   private void checkRequiredProps(final LockConfiguration config) {
     
ValidationUtils.checkArgument(config.getConfig().getString(ZK_CONNECT_URL_PROP_KEY)
 != null);
-    
ValidationUtils.checkArgument(config.getConfig().getString(ZK_BASE_PATH_PROP_KEY)
 != null);
-    
ValidationUtils.checkArgument(config.getConfig().getString(ZK_LOCK_KEY_PROP_KEY)
 != null);
-  }
-
-  private String generateLogSuffixString() {
-    String zkBasePath = 
this.lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY);
-    String lockKey = 
this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY);
-    return StringUtils.join("ZkBasePath = ", zkBasePath, ", lock key = ", 
lockKey);
   }
 
   protected String generateLogStatement(LockState state, String suffix) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedImplicitBasePathLockProvider.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedImplicitBasePathLockProvider.java
new file mode 100644
index 000000000000..9c4a57bc271f
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedImplicitBasePathLockProvider.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.hash.HashID;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.common.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static org.apache.hudi.common.fs.FSUtils.s3aToS3;
+
+/**
+ * A zookeeper based lock. This {@link LockProvider} implementation allows to 
lock table operations
+ * using zookeeper. Users need to have a Zookeeper cluster deployed to be able 
to use this lock.
+ *
+ * This class derives the zookeeper base path from the hudi table base path 
(hoodie.base.path) and
+ * table name (hoodie.table.name), with lock key set to a hard-coded value.
+ */
+@NotThreadSafe
+public class ZookeeperBasedImplicitBasePathLockProvider extends 
BaseZookeeperBasedLockProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperBasedImplicitBasePathLockProvider.class);
+
+  public static final String LOCK_KEY = "lock_key";
+  private final String hudiTableBasePath;
+
+  public static String getLockBasePath(String hudiTableBasePath) {
+    // Ensure consistent format for S3 URI.
+    String lockBasePath = "/tmp/" + 
HashID.generateXXHashAsString(s3aToS3(hudiTableBasePath), HashID.Size.BITS_64);
+    LOG.info(String.format("The Zookeeper lock key for the base path %s is 
%s", hudiTableBasePath, lockBasePath));
+    return lockBasePath;
+  }
+
+  public ZookeeperBasedImplicitBasePathLockProvider(final LockConfiguration 
lockConfiguration, final StorageConfiguration<?> conf) {
+    super(lockConfiguration, conf);
+    hudiTableBasePath = 
s3aToS3(lockConfiguration.getConfig().getString(HoodieCommonConfig.BASE_PATH.key()));
+  }
+
+  @Override
+  protected String getZkBasePath(LockConfiguration lockConfiguration) {
+    String hudiTableBasePath = 
lockConfiguration.getConfig().getString(HoodieCommonConfig.BASE_PATH.key());
+    ValidationUtils.checkArgument(hudiTableBasePath != null);
+    return getLockBasePath(hudiTableBasePath);
+  }
+
+  @Override
+  protected String getLockKey(LockConfiguration lockConfiguration) {
+    return LOCK_KEY;
+  }
+
+  @Override
+  protected String generateLogSuffixString() {
+    return StringUtils.join("ZkBasePath = ", zkBasePath,
+        ", lock key = ", lockKey, ", hudi table base path = ", 
hudiTableBasePath);
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
index 02f137b509a6..e1eacd7a870e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
@@ -20,193 +20,39 @@ package org.apache.hudi.client.transaction.lock;
 
 import org.apache.hudi.common.config.LockConfiguration;
 import org.apache.hudi.common.lock.LockProvider;
-import org.apache.hudi.common.lock.LockState;
-import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.exception.HoodieLockException;
 import org.apache.hudi.storage.StorageConfiguration;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.retry.BoundedExponentialBackoffRetry;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import javax.annotation.concurrent.NotThreadSafe;
 
-import java.io.Serializable;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
-import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
-import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
-import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY;
-import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP_KEY;
-import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP_KEY;
-import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP_KEY;
-import static 
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP_KEY;
+import static org.apache.hudi.config.HoodieLockConfig.ZK_BASE_PATH;
+import static org.apache.hudi.config.HoodieLockConfig.ZK_LOCK_KEY;
 
 /**
  * A zookeeper based lock. This {@link LockProvider} implementation allows to 
lock table operations
  * using zookeeper. Users need to have a Zookeeper cluster deployed to be able 
to use this lock.
+ * The lock provider requires mandatory config 
"hoodie.write.lock.zookeeper.base_path" and
+ * "hoodie.write.lock.zookeeper.lock_key" to be set.
  */
 @NotThreadSafe
-public class ZookeeperBasedLockProvider implements 
LockProvider<InterProcessMutex>, Serializable {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperBasedLockProvider.class);
-
-  private final transient CuratorFramework curatorFrameworkClient;
-  private volatile InterProcessMutex lock = null;
-  protected LockConfiguration lockConfiguration;
+public class ZookeeperBasedLockProvider extends BaseZookeeperBasedLockProvider 
{
 
   public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration, 
final StorageConfiguration<?> conf) {
-    checkRequiredProps(lockConfiguration);
-    this.lockConfiguration = lockConfiguration;
-    this.curatorFrameworkClient = CuratorFrameworkFactory.builder()
-        
.connectString(lockConfiguration.getConfig().getString(ZK_CONNECT_URL_PROP_KEY))
-        .retryPolicy(new 
BoundedExponentialBackoffRetry(lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY),
-            
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY),
 lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY)))
-        
.sessionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_SESSION_TIMEOUT_MS_PROP_KEY,
 DEFAULT_ZK_SESSION_TIMEOUT_MS))
-        
.connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY,
 DEFAULT_ZK_CONNECTION_TIMEOUT_MS))
-        .build();
-    this.curatorFrameworkClient.start();
-    createPathIfNotExists();
-  }
-
-  private String getLockPath() {
-    return lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY) + "/"
-        + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY);
-  }
-
-  private void createPathIfNotExists() {
-    try {
-      String lockPath = getLockPath();
-      LOG.info(String.format("Creating zookeeper path %s if not exists", 
lockPath));
-      String[] parts = lockPath.split("/");
-      StringBuilder currentPath = new StringBuilder();
-      for (String part : parts) {
-        if (!part.isEmpty()) {
-          currentPath.append("/").append(part);
-          createNodeIfNotExists(currentPath.toString());
-        }
-      }
-    } catch (Exception e) {
-      LOG.error("Failed to create ZooKeeper path: " + e.getMessage());
-      throw new HoodieLockException("Failed to initialize ZooKeeper path", e);
-    }
-  }
-
-  private void createNodeIfNotExists(String path) throws Exception {
-    if (this.curatorFrameworkClient.checkExists().forPath(path) == null) {
-      try {
-        this.curatorFrameworkClient.create().forPath(path);
-        // to avoid failure due to synchronous calls.
-      } catch (KeeperException e) {
-        if (e.code() == KeeperException.Code.NODEEXISTS) {
-          LOG.debug(String.format("Node already exist for path = %s", path));
-        } else {
-          throw new HoodieLockException("Failed to create zookeeper node", e);
-        }
-      }
-    }
-  }
-
-
-  // Only used for testing
-  public ZookeeperBasedLockProvider(
-      final LockConfiguration lockConfiguration, final CuratorFramework 
curatorFrameworkClient) {
-    checkRequiredProps(lockConfiguration);
-    this.lockConfiguration = lockConfiguration;
-    this.curatorFrameworkClient = curatorFrameworkClient;
-    synchronized (this.curatorFrameworkClient) {
-      if (this.curatorFrameworkClient.getState() != 
CuratorFrameworkState.STARTED) {
-        this.curatorFrameworkClient.start();
-        createPathIfNotExists();
-      }
-    }
+    super(lockConfiguration, conf);
   }
 
   @Override
-  public boolean tryLock(long time, TimeUnit unit) {
-    LOG.info(generateLogStatement(LockState.ACQUIRING, 
generateLogSuffixString()));
-    try {
-      acquireLock(time, unit);
-      LOG.info(generateLogStatement(LockState.ACQUIRED, 
generateLogSuffixString()));
-    } catch (HoodieLockException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new 
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE, 
generateLogSuffixString()), e);
-    }
-    return lock != null && lock.isAcquiredInThisProcess();
+  protected String getZkBasePath(LockConfiguration lockConfiguration) {
+    
ValidationUtils.checkArgument(ConfigUtils.getStringWithAltKeys(lockConfiguration.getConfig(),
 ZK_BASE_PATH) != null);
+    return lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY);
   }
 
   @Override
-  public void unlock() {
-    try {
-      LOG.info(generateLogStatement(LockState.RELEASING, 
generateLogSuffixString()));
-      if (lock == null || !lock.isAcquiredInThisProcess()) {
-        return;
-      }
-      lock.release();
-      lock = null;
-      LOG.info(generateLogStatement(LockState.RELEASED, 
generateLogSuffixString()));
-    } catch (Exception e) {
-      throw new 
HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE, 
generateLogSuffixString()), e);
-    }
-  }
-
-  @Override
-  public void close() {
-    try {
-      if (lock != null) {
-        lock.release();
-        lock = null;
-      }
-      this.curatorFrameworkClient.close();
-    } catch (Exception e) {
-      LOG.error(generateLogStatement(LockState.FAILED_TO_RELEASE, 
generateLogSuffixString()));
-    }
-  }
-
-  @Override
-  public InterProcessMutex getLock() {
-    return this.lock;
-  }
-
-  private void acquireLock(long time, TimeUnit unit) throws Exception {
-    ValidationUtils.checkArgument(this.lock == null, 
generateLogStatement(LockState.ALREADY_ACQUIRED, generateLogSuffixString()));
-    InterProcessMutex newLock = new InterProcessMutex(
-        this.curatorFrameworkClient, 
lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY) + "/"
-        + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY));
-    boolean acquired = newLock.acquire(time, unit);
-    if (!acquired) {
-      throw new 
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE, 
generateLogSuffixString()));
-    }
-    if (newLock.isAcquiredInThisProcess()) {
-      lock = newLock;
-    } else {
-      throw new 
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE, 
generateLogSuffixString()));
-    }
-  }
-
-  private void checkRequiredProps(final LockConfiguration config) {
-    
ValidationUtils.checkArgument(config.getConfig().getString(ZK_CONNECT_URL_PROP_KEY)
 != null);
-    
ValidationUtils.checkArgument(config.getConfig().getString(ZK_BASE_PATH_PROP_KEY)
 != null);
-    
ValidationUtils.checkArgument(config.getConfig().getString(ZK_LOCK_KEY_PROP_KEY)
 != null);
-  }
-
-  private String generateLogSuffixString() {
-    String zkBasePath = 
this.lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY);
-    String lockKey = 
this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY);
-    return StringUtils.join("ZkBasePath = ", zkBasePath, ", lock key = ", 
lockKey);
-  }
-
-  protected String generateLogStatement(LockState state, String suffix) {
-    return StringUtils.join(state.name(), " lock at", suffix);
+  protected String getLockKey(LockConfiguration lockConfiguration) {
+    
ValidationUtils.checkArgument(ConfigUtils.getStringWithAltKeys(lockConfiguration.getConfig(),
 ZK_LOCK_KEY) != null);
+    return this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY);
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
index 616ba0d228f3..d5a2c9ae28c5 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
@@ -18,9 +18,15 @@
 
 package org.apache.hudi.client.transaction;
 
+import 
org.apache.hudi.client.transaction.lock.ZookeeperBasedImplicitBasePathLockProvider;
 import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
+import org.apache.hudi.client.transaction.lock.BaseZookeeperBasedLockProvider;
+import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.exception.HoodieLockException;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -30,13 +36,21 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP_KEY;
@@ -52,7 +66,10 @@ public class TestZookeeperBasedLockProvider {
   private static CuratorFramework client;
   private static String basePath = "/hudi/test/lock";
   private static String key = "table1";
-  private static LockConfiguration lockConfiguration;
+  private static LockConfiguration zkConfWithZkBasePathAndLockKeyLock;
+  private static LockConfiguration zkConfNoTableBasePathTableName;
+  private static LockConfiguration zkConfWithTableBasePathTableName;
+  private static LockConfiguration zkConfWithZkBasePathLockKeyTableInfo;
 
   @BeforeAll
   public static void setup() {
@@ -67,15 +84,33 @@ public class TestZookeeperBasedLockProvider {
       }
     }
     Properties properties = new Properties();
-    properties.setProperty(ZK_BASE_PATH_PROP_KEY, basePath);
-    properties.setProperty(ZK_LOCK_KEY_PROP_KEY, key);
+
     properties.setProperty(ZK_CONNECT_URL_PROP_KEY, server.getConnectString());
-    properties.setProperty(ZK_BASE_PATH_PROP_KEY, 
server.getTempDirectory().getAbsolutePath());
+    properties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, 
"1000");
+    
properties.setProperty(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY, 
"3000");
+    properties.setProperty(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
+    properties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3");
     properties.setProperty(ZK_SESSION_TIMEOUT_MS_PROP_KEY, "10000");
     properties.setProperty(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY, "10000");
-    properties.setProperty(ZK_LOCK_KEY_PROP_KEY, "key");
     properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
-    lockConfiguration = new LockConfiguration(properties);
+    zkConfNoTableBasePathTableName = new LockConfiguration(properties);
+
+    Properties propsWithTableInfo = (Properties) properties.clone();
+    propsWithTableInfo.setProperty(
+        HoodieCommonConfig.BASE_PATH.key(), 
"s3://my-bucket-8b2a4b30/1718662238400/be715573/my_lake/my_table");
+    propsWithTableInfo.setProperty(
+        HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "ma_po_tofu_is_awesome");
+    zkConfWithTableBasePathTableName = new 
LockConfiguration(propsWithTableInfo);
+
+    properties.setProperty(ZK_BASE_PATH_PROP_KEY, basePath);
+    properties.setProperty(ZK_LOCK_KEY_PROP_KEY, key);
+    zkConfWithZkBasePathAndLockKeyLock = new LockConfiguration(properties);
+
+    properties.setProperty(
+        HoodieCommonConfig.BASE_PATH.key(), 
"s3://my-bucket-8b2a4b30/1718662238400/be715573/my_lake/my_table");
+    properties.setProperty(
+        HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "ma_po_tofu_is_awesome");
+    zkConfWithZkBasePathLockKeyTableInfo = new LockConfiguration(properties);
   }
 
   @AfterAll
@@ -88,31 +123,66 @@ public class TestZookeeperBasedLockProvider {
     }
   }
 
-  @Test
-  public void testAcquireLock() {
-    ZookeeperBasedLockProvider zookeeperBasedLockProvider = new 
ZookeeperBasedLockProvider(lockConfiguration, client);
-    
Assertions.assertTrue(zookeeperBasedLockProvider.tryLock(lockConfiguration.getConfig()
+  public static Stream<Object> testDimensions() {
+    return Stream.of(
+        Arguments.of(zkConfWithTableBasePathTableName, 
ZookeeperBasedImplicitBasePathLockProvider.class),
+        Arguments.of(zkConfWithZkBasePathAndLockKeyLock, 
ZookeeperBasedLockProvider.class),
+        // Even if we have base path set, nothing would break.
+        Arguments.of(zkConfWithZkBasePathLockKeyTableInfo, 
ZookeeperBasedImplicitBasePathLockProvider.class)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("testDimensions")
+  void testAcquireLock(LockConfiguration lockConfig, Class<?> 
lockProviderClass) {
+    BaseZookeeperBasedLockProvider zookeeperLP = 
(BaseZookeeperBasedLockProvider) ReflectionUtils.loadClass(
+        lockProviderClass.getName(),
+        new Class<?>[] {LockConfiguration.class, StorageConfiguration.class},
+        new Object[] {lockConfig, null});
+    
Assertions.assertTrue(zookeeperLP.tryLock(zkConfWithZkBasePathAndLockKeyLock.getConfig()
         .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
-    zookeeperBasedLockProvider.unlock();
+    zookeeperLP.unlock();
+  }
+
+  public static Stream<Object> testBadDimensions() {
+    return Stream.of(
+        Arguments.of(zkConfNoTableBasePathTableName, 
ZookeeperBasedImplicitBasePathLockProvider.class),
+        Arguments.of(zkConfWithTableBasePathTableName, 
ZookeeperBasedLockProvider.class)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("testBadDimensions")
+  void testBadLockConfig(LockConfiguration lockConfig, Class<?> 
lockProviderClass) {
+    Exception ex = null;
+    try {
+      ReflectionUtils.loadClass(
+          lockProviderClass.getName(),
+          new Class<?>[] {LockConfiguration.class, StorageConfiguration.class},
+          new Object[] {lockConfig, null});
+    } catch (Exception e) {
+      ex = e;
+    }
+    Assertions.assertEquals(IllegalArgumentException.class, 
ex.getCause().getCause().getClass());
   }
 
   @Test
   public void testUnLock() {
-    ZookeeperBasedLockProvider zookeeperBasedLockProvider = new 
ZookeeperBasedLockProvider(lockConfiguration, client);
-    
Assertions.assertTrue(zookeeperBasedLockProvider.tryLock(lockConfiguration.getConfig()
+    ZookeeperBasedLockProvider zookeeperBasedLockProvider = new 
ZookeeperBasedLockProvider(zkConfWithZkBasePathAndLockKeyLock, null);
+    
Assertions.assertTrue(zookeeperBasedLockProvider.tryLock(zkConfWithZkBasePathAndLockKeyLock.getConfig()
         .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
     zookeeperBasedLockProvider.unlock();
-    zookeeperBasedLockProvider.tryLock(lockConfiguration.getConfig()
+    
zookeeperBasedLockProvider.tryLock(zkConfWithZkBasePathAndLockKeyLock.getConfig()
         .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS);
   }
 
   @Test
   public void testReentrantLock() {
-    ZookeeperBasedLockProvider zookeeperBasedLockProvider = new 
ZookeeperBasedLockProvider(lockConfiguration, client);
-    
Assertions.assertTrue(zookeeperBasedLockProvider.tryLock(lockConfiguration.getConfig()
+    ZookeeperBasedLockProvider zookeeperBasedLockProvider = new 
ZookeeperBasedLockProvider(zkConfWithZkBasePathAndLockKeyLock, null);
+    
Assertions.assertTrue(zookeeperBasedLockProvider.tryLock(zkConfWithZkBasePathAndLockKeyLock.getConfig()
         .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
     try {
-      zookeeperBasedLockProvider.tryLock(lockConfiguration.getConfig()
+      
zookeeperBasedLockProvider.tryLock(zkConfWithZkBasePathAndLockKeyLock.getConfig()
           .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS);
       Assertions.fail();
     } catch (HoodieLockException e) {
@@ -123,7 +193,7 @@ public class TestZookeeperBasedLockProvider {
 
   @Test
   public void testUnlockWithoutLock() {
-    ZookeeperBasedLockProvider zookeeperBasedLockProvider = new 
ZookeeperBasedLockProvider(lockConfiguration, client);
+    ZookeeperBasedLockProvider zookeeperBasedLockProvider = new 
ZookeeperBasedLockProvider(zkConfWithZkBasePathAndLockKeyLock, null);
     zookeeperBasedLockProvider.unlock();
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/hash/HashID.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/hash/HashID.java
index 4df8c3852892..e76c5a8d0739 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/hash/HashID.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/hash/HashID.java
@@ -79,10 +79,10 @@ public class HashID implements Serializable {
   }
 
   /**
-   * Get the hash value for a string message and for the desired @{@link Size}.
+   * Get the hash value for a string message and for the desired @{@link 
HashID.Size}.
    *
    * @param message - String message to get the hash value for
-   * @param bits    - @{@link Size} of the hash value
+   * @param bits    - @{@link HashID.Size} of the hash value
    * @return Hash value for the message as byte array
    */
   public static byte[] hash(final String message, final Size bits) {
@@ -108,6 +108,24 @@ public class HashID implements Serializable {
     }
   }
 
+  /**
+   * Get the hash value as string for a given string and for the desired 
@{@link Size}.
+   *
+   * @param input - String message to get the hash value for.
+   * @param size  - @{@link Size} of the hash value
+   * @return Hash value for the message as string.
+   */
+  public static String generateXXHashAsString(String input, Size size) {
+    // Compute the hash
+    byte[] hashBytes = hash(input, size);
+    // Convert the hash value to a hexadecimal string
+    StringBuilder hexString = new StringBuilder();
+    for (byte hashByte : hashBytes) {
+      hexString.append(String.format("%02X", hashByte));
+    }
+    return hexString.toString();
+  }
+
   public static int getXXHash32(final String message, int hashSeed) {
     return getXXHash32(getUTF8Bytes(message), hashSeed);
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/hash/TestHashID.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/hash/TestHashID.java
index 1ab9d82b2b92..319e216ff4f5 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/hash/TestHashID.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/hash/TestHashID.java
@@ -21,24 +21,40 @@ package org.apache.hudi.common.util.hash;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import javax.xml.bind.DatatypeConverter;
 
-import java.util.Arrays;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 
+import static java.util.Arrays.asList;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Tests {@link HashID}.
  */
 public class TestHashID {
 
+  public static List<Arguments> hashIdSizes() {
+    return asList(
+        Arguments.of(HashID.Size.BITS_32),
+        Arguments.of(HashID.Size.BITS_64),
+        Arguments.of(HashID.Size.BITS_128)
+    );
+  }
+
   /**
    * Test HashID of all sizes for ByteArray type input message.
    */
@@ -124,8 +140,26 @@ public class TestHashID {
       for (Map.Entry<String, String> sizeEntry : 
allSizeEntries.getValue().entrySet()) {
         final byte[] actualHashBytes = HashID.hash(sizeEntry.getKey(), 
allSizeEntries.getKey());
         final byte[] expectedHashBytes = 
DatatypeConverter.parseHexBinary(sizeEntry.getValue());
-        assertTrue(Arrays.equals(expectedHashBytes, actualHashBytes));
+        assertArrayEquals(expectedHashBytes, actualHashBytes);
       }
     }
   }
+
+  @ParameterizedTest
+  @MethodSource("hashIdSizes")
+  void testGenerateXXHashMagicNumber(HashID.Size size) throws IOException, 
URISyntaxException {
+    // We need to make sure we always generate the same hash value for the 
same input. This test
+    // guards against unexpected change of hash value due to accidents like 
library version upgrade.
+    // Load inputs and expected hash values from files.
+    List<String> inputs = Files.readAllLines(Paths.get(Objects.requireNonNull(
+        
getClass().getClassLoader().getResource("hash/magic_input.txt")).toURI()));
+    List<String> expectedHash = 
Files.readAllLines(Paths.get(Objects.requireNonNull(
+        
getClass().getClassLoader().getResource(String.format("hash/xxhash_%s_for_magic_input.txt",
 size.name()))).toURI()));
+
+    for (int i = 0; i < expectedHash.size(); ++i) {
+      String hash = HashID.generateXXHashAsString(inputs.get(i), size);
+      // Magic number test to guard against accidental upgrade that changes 
the hash value
+      assertEquals(expectedHash.get(i), hash);
+    }
+  }
 }
diff --git a/hudi-common/src/test/resources/hash/magic_input.txt 
b/hudi-common/src/test/resources/hash/magic_input.txt
new file mode 100644
index 000000000000..ef6468b087ab
--- /dev/null
+++ b/hudi-common/src/test/resources/hash/magic_input.txt
@@ -0,0 +1,10 @@
+vB1xYjlEfL8tN9Ap6NMCwraFQlpQ6QU2z9XCPCv9lfIxzRfs6GfxKir2FuU:Xbl9uMZqjNRhKLqZY2MIH5vPCKZbV1zjoHVaVBnyRfzJLZ0jwWZQjJqjpdg/PsrIq3YdyAsDHcb__oXfbZUpMFX00g6ALt/uRZ2LOFFdQA9Ofuu_BLhhPl4gIfGNJIt1ATuCGIFXFG4zg1qn96MsLy8Nr2v/6pxzBXM5Rhzn6o_HhiFo1RzooJcm1tJlObZroxBNKSDbDcdYkm4KnAVUAVcyVfO2DRx_KQfphLYbG1rNRTV03RNN7jQakMf8Vq38Q1Joz:STilSqalcvrDmdPTY1RETh8eVP0ELqsKevQ22OKKl5ypy4I2Y6nqRfTfoepo_l0zc_QEZNawdif5l:SpvaJUaWU9rpUpqxiz2T_40ekLVXDM_B9oGgS4e9AMT50OASHp/cVue2qFpgrYSptp4EZZ9yfEqPXX3Ij7hSUGL29zar
 [...]
+EXDDWHY_7:5oYXsoCWM1LVD2mxXAilEXwsA8J_btuAAPUm6ODVlM/njpnAyULYvBKrAiwjY4xMWkJLCGS09qqmvu8ea5:ib/WYg1nHAELdy8fiREkQ/QXIByXL1xJ40ZfPYiMzUlNoUMXJNVqK59bipG7NSOhEZ0KSAW9uxv77S4G:8B1ybZNgWbYx_f1/DqYgPD7PcHfr3c_NMYvHVOCEOB3oYZwU2p_/BRWfaY/aJfydpG_VivzSqYM7qCghR8LXEL8955iXjPVMpXHu03BdAqE/uXsrlvTm8sdzum/PNsrwHYH:2YCHRjJ7B9yawP2mYZmmCTA9FSAqhizLrs7JvqUQNfF0DW8Imy_ld__eTq0hM55H3QEIzDs:u45zd6Y/1hEeBMFEGmgXHJDOGfSt1boX_lO6q/M33U5n0LEv1o_4_tQeqnsqODq509y7rbgfOVg7whAtZ81QlXNQ3VITG0kadegE3EFcD6OwC1I3BE
 [...]
+qbl3zptxyhBdGZzXYSuz/dc_W79I3nnZIg41:_ujQnA8NnzrsENtSvxUX5Hj5nHyjmlzS9vH2zawvXaoeTtwXPqEVp0Vc2p7Ho93iRUq:Zj/bjAxIuxBXEJMPYTE5PBVPSR2yPP4ci0ChrVtIRCXTD7v9gluqs4ouz_G8sNuXaT8uCP7ypOMRUG:VQuQ69yY/KIS1_AffBHL8n7jgT7nGfzra5dZZUBe4oqm9BI3nyHpCa5ookke:703QP:8sDMg3UdCg8uw6ccqAcco7xBbOgdvS6qpz1LZdku0:As2LOmX42B0aEReh52BdtZtBFnk5t_cm0m5ZAuEH_Bii8:reXocImMCSsnL2hVpbs8gn8jI4QAGW2rDOvj1N/byJbjrJxhlsZ6Y9dDZc7Z8B8aCx8kRoRQVkskAO4lULO1pNaIINreWaij:tJhHeF_kkhSevooTRkGXoWouI/1a4ADi5ijTESgsrBleqpU2e4yGmBKw
 [...]
+kha:j60RU2k4jsMcyoK/gJ7F4IEkoWzrtVN3kUEPvP/9M4NfaoPtruIe0reUxlMhoTKJbLTLFJ0pIVfqGrx/VAcW1ZWdjret19cb7TAP08wpOP_t5ljP/pUYR5KsEn5zUNtKWiNHJxPsT7u9FYNupiIRIe_VGG_J:1wPQJ2xBtWCbiCOGk15zGoOHhAjTE8V5JwyyJHrxSfZQpHXMqOYICII9/rXoKpTsV/uUASF58e/OuQ9pg0IA4KENs3_XnptctzloUYr0AI4zTLIRL5N:CvFRujqaOmKgiMGYmC1V9DdYkUTAiv_3NYvUnJuV54mXMEQlAoyrILPimonjdpjLx7fjep9vPjImfsyIXK3EoStTNJoWcAWsOzSn:wb84sBDg4mfDYvqz3pL13SFbIxtlsAl66/11xBVIUI0TRK6KWX5yX806JnbFSG:oi6X4_OdwucUDSx:ac_RqzZiA9Ig:wFg2dg6JYrVt3iLj/KDaS0
 [...]
+XViqCa5XfqAB0kski4A4N82FE8i0U/WjPvcgLbi:X9LhXRV0qfusVp19Xr_5pCQUDFRr34z_foGnpbWC5:pOn:jQHwVJIer:hfh5uErwLimX6OC1nLPXdPVrhKJ4/XN2YBfF5luB_7:dEGJ1E22iCTgto6sQW63f/c:wUtZdRioJ3JiA14KMVRKI8KD79zrCF1p61XSjQUvzrBh4VOjxGervgu2kZarv4HvvzvTuzThk4yrE1YvQse5iKeZWHbjCYBS7Tr0eTAjZLIqZRZ4/t/zuI9ZgnIerWJ8z3kfYv:maAiYP:Fjc4k32CXdXVhfiP9OLMqu4XWA2U1i7htnAotjF3G1ECwGLo9faRUxp5XwK85DhoQqrld/9e4BX4FxI61omgU_sa9SlSI:oU4/GrmsejIJn8adPcxwUyYf5uetKOVbcuBesp9l7pVmc4O97sojQ6:ohnhcvGssks0UpzdUideM8Hfo0yIWF1XPSJjJV
 [...]
+uYeJoV7jMpUbSfo0PZfVqxapg8VlFDxp7t6x3M:r/hMj3EdSrXNmtnAJOg/DubMz6uGvkZM3equxs8LflyyjZYFypahe90QXYM6mngjaMpXGl/ngy6_wQ512E7tqMnaVaz52xbKAdCCgastCJ5xM2jikxmIOK/28J77VXwkL6T43JGKXrFzoqm/eelXf:G5oihZt/jBKTMg/wXgC0:BvTVM9A1e4nmXw_ANjnBxCWe7QD7O/A/k4hrVt0MKpH2glsJKAnz_ZAY_qZp33vN5v975ZFwhAJtkYwtsXl4UFT_06Uy9BHyWnMcvHaPgEgslv/_P2FLzlJWfsSI8UjWUdMVhYAff4NE7UQF2Oye6ciXsW1ga_TS0VN8tugmqE:xs:5/SlyG3puDh:mzA60V7EpM38_rsKO/v9l1ZG9ICucglBv0hzHuVz/_vUR_5B_scT4nEIFmLOB67RsoGXMDVI7oJzETHzA3DLk:nX1yF4g_VT
 [...]
+1/dFjdWmwjcDBD6QBMjRqWtTJEdCZ2:2X2:NiPutlDhlxkp7k2MBqiAMDffvA5bbe0Yxel0IF94is_h0cMtgXKwYgCPKgFQgH0O2hnFrWp2fmETy4DCHnUV54mNUF4_xMESiZwJaUQFCPUxK8j28qlPppbt2M:zsJ64LUVIoCZpJsc1GRVY9lImikNRoEiOXKOQIMLOHlDfVEYyJubQy14zWQsqSoHK6Du_S76_MdZi3DEHJmInbDHvykJRoKke2QQL7wae/7D4FVg/zKbfnOSrgDTWbdDewy2lJKMti5MnrMnevTCQSSFhueoekPYsLiyoDIDWDoqAzdiWP2vkzisR1CbZ74DQeOyyk4rvF85TRTfjPWjdv6PZX40W0K2VOPWUiys5224nSnfT886LVBkuKwjImo8NqZskfp1LYJtMRYlad9/fRLVwo65h:l4FFLEuPH1OzlFaz6wsMqqh4GGtQeScwfzTWA78A9iosPNyn
 [...]
+twVaFB_AiBxWsfHRYHdhnwMtrtp/PzyU6lJivvSGGRfGmLUu0EcaW5dzO0vx/Hy/E33JY:2oeb1p:Hl35zPFj:98DQRRyodl5JJrBiNeyavBKlQPoYglYIpWK3uy47lsTHXe2SYrEEscKtwH/:BJiQn4XPQejfVBPNPI35Othp64j0tf2GBiqrAETQT0xnSrN6vnQnNWAj:RDR9IbUSZISaJ/GrQoT651W7iwQQvBo5L/XcdCrvWGbmw/3Yp5S2HeoA/:qJ:6gmrOQ9eoFGCe8X9yERKKbPzcmSB7SXKuP3UUbT1L0_vvFOP2mIsr46k:77G31gPJv6zzdDZLR1Ewxaz38vR6MeSjTtKR53yw44cWgWPD2wS/GHJDI4PhHxnsgUamNi8_PVmdFceTXouehmnrdrM_byV_iQOF:3Jq7czDhcAwMF2eYqZkdlPdG:kr:WQoIBu2UTgZB/uKI8kf7l4qosy:aTz_QLmouh5ilZ1
 [...]
+wM51f5wQY1p4N_J4hC/oN9BryO/ZUwUisXuYaGHIJGDZmTE52hPf7vJdPTR:NiJiPCNMG:cnWGoJwNRiKIQY3u/Yppx6Ig6wdvScLqPhXI5Fxq/W4XUgBkCwEVkhfd3FCv0/LmaNvg6RJelbz:/ympzvRLOpAgsYmxtnqh17rEqCkxKp4nC4o_j7VLJgyHqXYP9RnKw2sTpMP7seq1zHe00drKb_k:_iS59NZ2NnAJBjjKCo2kbNmT4LvAVa9COquUyCF4Q:Qp39Rxs30mNtTW30x_0LuL8TH4pg65Z0OjBoQtwo4:GBF_2rE2WbkRDeNeGGB:DUU4p0MqkkYTT7jraK10pmKi2C:wH6HXN8SX6UwCJoY2m24sIaU3f:k2vBzqYu4Mq5E9slvsx:vNjlA4KtSiy5XLt01s0bRuXhXXrjQuSsFXvGUdlppWtQcR8Xbg6W0Bk5Qdx267BBbsQ5bk6JOg8xHfGqcy2Kde_u_ZFz
 [...]
+AXipEHIS1WnAkrLS_TZT8AEBe2KPR:0tgOLRhUIEiN33Ml4_pOtN6TuDNvfSa4MSPexRluckSpCnl6le__MucXEilapxfqXY/MW3ebuICROpXQr9rn4fvJxxR25eeyDDOJfVkXu9gTwPJD:CAqb:byah1PXndGFR3L4NNgzofPKVvyh/liGhvcU54qF3:U8B4Ed4K5VVZKlDC_LIjyPpO:fzwhCY:a7plQHp3G8rwqZlVWll8gZt1BRTJS3nvxCLNe2IPjq5aj5QD/P3O4QkLPDW/bdXTFxfILaQ3xrBO:kqPMyqowiLcm6gAwY5fdkPgvv6xh7yC5t8d98io1UxEOoVdWdGUXgztIKs2gxjzVUqj6qsbLaIg2ft/j4txWJMQ3z3anRSmCZ1Th6V7LFEKaX2PBlK//eC0Sm1f45XPf5MWpHuRMxBzxFqY60IXZVJ1tx/Daa86UC5A0jyyJyPxmdUlt1FTqDqZ_TQK4qTWGrz
 [...]
diff --git 
a/hudi-common/src/test/resources/hash/xxhash_BITS_128_for_magic_input.txt 
b/hudi-common/src/test/resources/hash/xxhash_BITS_128_for_magic_input.txt
new file mode 100644
index 000000000000..ee2a4f5fbea0
--- /dev/null
+++ b/hudi-common/src/test/resources/hash/xxhash_BITS_128_for_magic_input.txt
@@ -0,0 +1,10 @@
+AF26454CFF3D72BBDB7A6CD6BAF23770
+080D42690A45825782F4CD4DB31AA449
+E14D364EF059C966A7F50708E9CF2E7B
+68F4BCB6F281DD101D5B7375652E574E
+1F2A77EB57C4091E9F89887DA88169B2
+1A57459BE2F7ECD655C090BADAA3ED1A
+E79D2F90D440296F94419C7093518DE4
+86C4CFD114B59C0CB29AF95425767B63
+EE70F0B8607E339D92E6823DA2538113
+0E6F75EC00B024201B91FBCEBDB9F971
\ No newline at end of file
diff --git 
a/hudi-common/src/test/resources/hash/xxhash_BITS_32_for_magic_input.txt 
b/hudi-common/src/test/resources/hash/xxhash_BITS_32_for_magic_input.txt
new file mode 100644
index 000000000000..4cba48b9f4e8
--- /dev/null
+++ b/hudi-common/src/test/resources/hash/xxhash_BITS_32_for_magic_input.txt
@@ -0,0 +1,10 @@
+0304CA31
+71EC1ABE
+4F003C35
+79A69D14
+62CAF229
+0136BCE2
+1D68C842
+0B416E0E
+CF7373EB
+F5933F12
\ No newline at end of file
diff --git 
a/hudi-common/src/test/resources/hash/xxhash_BITS_64_for_magic_input.txt 
b/hudi-common/src/test/resources/hash/xxhash_BITS_64_for_magic_input.txt
new file mode 100644
index 000000000000..bc06cc5614e4
--- /dev/null
+++ b/hudi-common/src/test/resources/hash/xxhash_BITS_64_for_magic_input.txt
@@ -0,0 +1,10 @@
+F0E17DF624BF7B6A
+07EC6A7186973385
+24A8A6F1FEA11BA2
+BFE00C8A367BAF03
+C323770B790400C8
+9F96033C96E80D39
+5B99015437BBEA94
+C6C54FCA5141979D
+3580238C2761459E
+D9DBB1D9BF7174BC
\ No newline at end of file
diff --git a/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java 
b/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
index eb8f19987484..6cb3544db491 100644
--- a/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
+++ b/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
@@ -302,4 +302,83 @@ public class StringUtils {
     buf.append(text.substring(start));
     return buf.toString();
   }
+
+  /**
+   * Strips given characters from end of the string and returns the result
+   * @param str - string to be stripped
+   * @param stripChars - characters to strip
+   */
+  public static String stripEnd(final String str, final String stripChars) {
+    int end;
+    if (str == null || (end = str.length()) == 0) {
+      return str;
+    }
+
+    if (stripChars == null) {
+      while (end != 0 && Character.isWhitespace(str.charAt(end - 1))) {
+        end--;
+      }
+    } else if (stripChars.isEmpty()) {
+      return str;
+    } else {
+      while (end != 0 && stripChars.indexOf(str.charAt(end - 1)) != 
INDEX_NOT_FOUND) {
+        end--;
+      }
+    }
+    return str.substring(0, end);
+  }
+
+  /**
+   * Concatenates two strings such that the total byte length does not exceed 
the threshold.
+   * If the total byte length exceeds the threshold, the function will find 
the maximum length of the first string
+   * that fits within the threshold and concatenate that with the second 
string.
+   *
+   * @param a         The first string
+   * @param b         The second string
+   * @param byteLengthThreshold The maximum byte length
+   */
+  public static String concatenateWithThreshold(String a, String b, int 
byteLengthThreshold) {
+    // Convert both strings to byte arrays in UTF-8 encoding
+    byte[] bytesA = getUTF8Bytes(a);
+    byte[] bytesB = getUTF8Bytes(b);
+    if (bytesB.length > byteLengthThreshold) {
+      throw new IllegalArgumentException(String.format(
+          "Length of the Second string to concatenate exceeds the threshold 
(%d > %d)",
+          bytesB.length, byteLengthThreshold));
+    }
+
+    // Calculate total bytes
+    int totalBytes = bytesA.length + bytesB.length;
+
+    // If total bytes is within the threshold, return concatenated string
+    if (totalBytes <= byteLengthThreshold) {
+      return a + b;
+    }
+
+    // Calculate the maximum bytes 'a' can take
+    int bestLength = getBestLength(a, byteLengthThreshold - bytesB.length);
+
+    // Concatenate the valid substring of 'a' with 'b'
+    return a.substring(0, bestLength) + b;
+  }
+
+  private static int getBestLength(String a, int threshold) {
+    // Binary search to find the maximum length of substring of 'a' that fits 
within maxBytesForA
+    int low = 0;
+    int high = Math.min(a.length(), threshold);
+    int bestLength = 0;
+
+    while (low <= high) {
+      int mid = (low + high) / 2;
+      byte[] subABytes = getUTF8Bytes(a.substring(0, mid));
+
+      if (subABytes.length <= threshold) {
+        bestLength = mid;
+        low = mid + 1;
+      } else {
+        high = mid - 1;
+      }
+    }
+    return bestLength;
+  }
 }
diff --git 
a/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java 
b/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
index a4bee6bc6be7..bcb0a9f65b97 100644
--- a/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
+++ b/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
@@ -22,14 +22,19 @@ package org.apache.hudi.common.util;
 import org.junit.jupiter.api.Test;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Random;
 
+import static org.apache.hudi.common.util.StringUtils.concatenateWithThreshold;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -39,6 +44,29 @@ public class TestStringUtils {
 
   private static final String[] STRINGS = {"This", "is", "a", "test"};
 
+  private static final String CHARACTERS_FOR_RANDOM_GEN = 
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_/:";
+  private static final Random RANDOM = new SecureRandom();
+
+  private static String toHexString(byte[] bytes) {
+    StringBuilder sb = new StringBuilder(bytes.length * 2);
+    for (byte b : bytes) {
+      sb.append(String.format("%02x", b));
+    }
+    return sb.toString();
+  }
+
+  private static String generateRandomString(int length) {
+    if (length < 1) {
+      throw new IllegalArgumentException("Length must be greater than 0");
+    }
+    StringBuilder builder = new StringBuilder(length);
+    for (int i = 0; i < length; i++) {
+      int randomIndex = RANDOM.nextInt(CHARACTERS_FOR_RANDOM_GEN.length());
+      builder.append(CHARACTERS_FOR_RANDOM_GEN.charAt(randomIndex));
+    }
+    return new String(getUTF8Bytes(builder.toString()), 
StandardCharsets.UTF_8);
+  }
+
   @Test
   public void testStringJoinWithDelim() {
     String joinedString = StringUtils.joinUsingDelim("-", STRINGS);
@@ -108,14 +136,6 @@ public class TestStringUtils {
     assertEquals(StringUtils.toHexString(getUTF8Bytes(str)), 
toHexString(getUTF8Bytes(str)));
   }
 
-  private static String toHexString(byte[] bytes) {
-    StringBuilder sb = new StringBuilder(bytes.length * 2);
-    for (byte b : bytes) {
-      sb.append(String.format("%02x", b));
-    }
-    return sb.toString();
-  }
-
   @Test
   public void testTruncate() {
     assertNull(StringUtils.truncate(null, 10, 10));
@@ -129,6 +149,60 @@ public class TestStringUtils {
     assertTrue(StringUtils.compareVersions("1.9", "1.10") < 0);
     assertTrue(StringUtils.compareVersions("1.100.1", "1.10") > 0);
     assertTrue(StringUtils.compareVersions("1.10.1", "1.10") > 0);
-    assertTrue(StringUtils.compareVersions("1.10", "1.10") == 0);
+    assertEquals(0, StringUtils.compareVersions("1.10", "1.10"));
+  }
+
+  @Test
+  void testConcatenateWithinThreshold() {
+    String a = generateRandomString(1000); // 1000 bytes in UTF-8
+    String b = generateRandomString(1048); // 1048 bytes in UTF-8
+    int threshold = 2048;
+
+    // The total length of bytes of `a` + `b` exceeds the threshold
+    String result = StringUtils.concatenateWithThreshold(a, b, threshold);
+
+    // The resulting string should be exactly `threshold` bytes long
+    assertEquals(threshold, getUTF8Bytes(result).length);
+    assertEquals(a + b, result);
+
+    // Test case when a + b is within the threshold
+    String a2 = generateRandomString(900);
+    String b2 = generateRandomString(1000);
+    String result2 = concatenateWithThreshold(a2, b2, threshold);
+
+    // The resulting string should be `a2 + b2`
+    assertEquals(a2 + b2, result2);
+  }
+
+  @Test
+  void testConcatenateInvalidInput() {
+    // Test case when b alone exceeds the threshold
+    String a = generateRandomString(900);
+    String b = generateRandomString(3000); // 3000 bytes in UTF-8
+    Exception exception = assertThrows(IllegalArgumentException.class, () -> {
+      concatenateWithThreshold(a, b, 2048);
+    });
+
+    String expectedMessage = "Length of the Second string to concatenate 
exceeds the threshold (3000 > 2048)";
+    String actualMessage = exception.getMessage();
+
+    assertTrue(actualMessage.contains(expectedMessage));
+  }
+
+  @Test
+  void testConcatenateTruncateCase() {
+    // 'é' is 2 bytes
+    assertEquals("ad", concatenateWithThreshold("aé", "d", 3));
+    // Chinese chars are 3 bytes
+    assertEquals("世d", concatenateWithThreshold("世界", "d", 4));
+    assertEquals("ad", concatenateWithThreshold("ab", "d", 2));
+  }
+
+  @Test
+  void testGenerateInvalidRandomString() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> generateRandomString(-1)
+    );
   }
 }
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java
index 6e484a142164..1d001b331e77 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java
@@ -46,6 +46,7 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -77,18 +78,18 @@ import static 
org.apache.hudi.common.lock.LockState.RELEASING;
  * using hive metastore APIs. Users need to have a HiveMetastore & Zookeeper 
cluster deployed to be able to use this lock.
  *
  */
-public class HiveMetastoreBasedLockProvider implements 
LockProvider<LockResponse> {
+public class HiveMetastoreBasedLockProvider implements 
LockProvider<LockResponse>, Serializable {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HiveMetastoreBasedLockProvider.class);
 
   private final String databaseName;
   private final String tableName;
   private final String hiveMetastoreUris;
-  private IMetaStoreClient hiveClient;
+  private transient IMetaStoreClient hiveClient;
   private volatile LockResponse lock = null;
   protected LockConfiguration lockConfiguration;
-  private ScheduledFuture<?> future = null;
-  private final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(2);
+  private transient ScheduledFuture<?> future = null;
+  private final transient ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(2);
 
   public HiveMetastoreBasedLockProvider(final LockConfiguration 
lockConfiguration, final StorageConfiguration<?> conf) {
     this(lockConfiguration);
diff --git a/scripts/release/validate_source_copyright.sh 
b/scripts/release/validate_source_copyright.sh
index d44864135be8..52aa15bb3d70 100755
--- a/scripts/release/validate_source_copyright.sh
+++ b/scripts/release/validate_source_copyright.sh
@@ -46,10 +46,10 @@ echo -e "\t\tNotice file exists ? [OK]\n"
 
 ### Licensing Check
 echo "Performing custom Licensing Check "
-numfilesWithNoLicense=`find . -iname '*' -type f | grep -v NOTICE | grep -v 
LICENSE | grep -v '.jpg' | grep -v '.json' | grep -v '.hfile' | grep -v '.data' 
| grep -v '.commit' | grep -v emptyFile | grep -v DISCLAIMER | grep -v KEYS | 
grep -v '.mailmap' | grep -v '.sqltemplate' | grep -v 'banner.txt' | grep -v 
"fixtures" | xargs grep -L "Licensed to the Apache Software Foundation (ASF)" | 
wc -l`
+numfilesWithNoLicense=`find . -iname '*' -type f | grep -v NOTICE | grep -v 
LICENSE | grep -v '.jpg' | grep -v '.json' | grep -v '.hfile' | grep -v '.data' 
| grep -v '.commit' | grep -v emptyFile | grep -v DISCLAIMER | grep -v KEYS | 
grep -v '.mailmap' | grep -v '.sqltemplate' | grep -v 'banner.txt' | grep -v 
'.txt' | grep -v "fixtures" | xargs grep -L "Licensed to the Apache Software 
Foundation (ASF)" | wc -l`
 if [ "$numfilesWithNoLicense" -gt  "0" ]; then
   echo "There were some source files that did not have Apache License [ERROR]"
-  find . -iname '*' -type f | grep -v NOTICE | grep -v LICENSE | grep -v 
'.jpg' | grep -v '.json' | grep -v '.hfile' | grep -v '.data' | grep -v 
'.commit' | grep -v emptyFile | grep -v DISCLAIMER | grep -v '.sqltemplate' | 
grep -v KEYS | grep -v '.mailmap' | grep -v 'banner.txt' | grep -v "fixtures" | 
xargs grep -L "Licensed to the Apache Software Foundation (ASF)"
+  find . -iname '*' -type f | grep -v NOTICE | grep -v LICENSE | grep -v 
'.jpg' | grep -v '.json' | grep -v '.zip' | grep -v '.hfile' | grep -v '.data' 
| grep -v '.commit' | grep -v emptyFile | grep -v DISCLAIMER | grep -v 
'.sqltemplate' | grep -v KEYS | grep -v '.mailmap' | grep -v 'banner.txt' | 
grep -v '.txt' | grep -v "fixtures" | xargs grep -L "Licensed to the Apache 
Software Foundation (ASF)"
   exit 1
 fi
 echo -e "\t\tLicensing Check Passed [OK]\n"

Reply via email to