This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fa5878d9c46 [ENG-17094] Change implicit lock provider lock key scheme
(#12220)
fa5878d9c46 is described below
commit fa5878d9c46f5c824ae56a9ad56ef90b0bc37a19
Author: Davis-Zhang-Onehouse
<[email protected]>
AuthorDate: Tue Nov 19 10:10:41 2024 -0800
[ENG-17094] Change implicit lock provider lock key scheme (#12220)
---
...amoDBBasedImplicitPartitionKeyLockProvider.java | 29 ++++++++++++---------
.../lock/DynamoDBBasedLockProviderBase.java | 4 +--
.../hudi/config/DynamoDbBasedLockConfig.java | 3 ---
.../integ/ITTestDynamoDBBasedLockProvider.java | 30 ++++++++++++----------
.../lock/BaseZookeeperBasedLockProvider.java | 2 --
...ZookeeperBasedImplicitBasePathLockProvider.java | 30 ++++++++++++++--------
6 files changed, 55 insertions(+), 43 deletions(-)
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedImplicitPartitionKeyLockProvider.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedImplicitPartitionKeyLockProvider.java
index 8018c5e2761..d9a131cd096 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedImplicitPartitionKeyLockProvider.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedImplicitPartitionKeyLockProvider.java
@@ -20,19 +20,18 @@ package org.apache.hudi.aws.transaction.lock;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.LockConfiguration;
-import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.hash.HashID;
import org.apache.hudi.storage.StorageConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import org.apache.hudi.common.util.StringUtils;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import javax.annotation.concurrent.NotThreadSafe;
import static org.apache.hudi.aws.utils.S3Utils.s3aToS3;
-import static org.apache.hudi.common.util.StringUtils.concatenateWithThreshold;
-import static
org.apache.hudi.config.DynamoDbBasedLockConfig.MAX_PARTITION_KEY_SIZE_BYTE;
/**
* A DynamoDB based lock.
@@ -43,6 +42,8 @@ import static
org.apache.hudi.config.DynamoDbBasedLockConfig.MAX_PARTITION_KEY_S
public class DynamoDBBasedImplicitPartitionKeyLockProvider extends
DynamoDBBasedLockProviderBase {
protected static final Logger LOG =
LoggerFactory.getLogger(DynamoDBBasedImplicitPartitionKeyLockProvider.class);
+ private final String hudiTableBasePath;
+
public DynamoDBBasedImplicitPartitionKeyLockProvider(final LockConfiguration
lockConfiguration, final StorageConfiguration<?> conf) {
this(lockConfiguration, conf, null);
}
@@ -50,20 +51,24 @@ public class DynamoDBBasedImplicitPartitionKeyLockProvider
extends DynamoDBBased
public DynamoDBBasedImplicitPartitionKeyLockProvider(
final LockConfiguration lockConfiguration, final StorageConfiguration<?>
conf, DynamoDbClient dynamoDB) {
super(lockConfiguration, conf, dynamoDB);
+ hudiTableBasePath =
s3aToS3(lockConfiguration.getConfig().getString(HoodieCommonConfig.BASE_PATH.key()));
}
- public static String generatePartitionKey(String basePath, String tableName)
{
- String hashPart = '-' + HashID.generateXXHashAsString(basePath,
HashID.Size.BITS_64);
- String partitionKey = concatenateWithThreshold(tableName, hashPart,
MAX_PARTITION_KEY_SIZE_BYTE);
- LOG.info(String.format("The DynamoDB partition key of the lock provider
for the base path %s is %s", basePath, partitionKey));
+ @Override
+ public String getDynamoDBPartitionKey(LockConfiguration lockConfiguration) {
+ // Ensure consistent format for S3 URI.
+ String hudiTableBasePathNormalized =
s3aToS3(lockConfiguration.getConfig().getString(
+ HoodieCommonConfig.BASE_PATH.key()));
+ String partitionKey =
HashID.generateXXHashAsString(hudiTableBasePathNormalized, HashID.Size.BITS_64);
+ LOG.info(String.format("The DynamoDB partition key of the lock provider
for the base path %s is %s",
+ hudiTableBasePathNormalized, partitionKey));
return partitionKey;
}
@Override
- public String getDynamoDBPartitionKey(LockConfiguration lockConfiguration) {
- String hudiTableBasePath =
lockConfiguration.getConfig().getString(HoodieCommonConfig.BASE_PATH.key());
- String hudiTableName =
lockConfiguration.getConfig().getString(HoodieTableConfig.HOODIE_TABLE_NAME_KEY);
- // Ensure consistent format for S3 URI.
- return generatePartitionKey(s3aToS3(hudiTableBasePath), hudiTableName);
+ protected String generateLogSuffixString() {
+ return StringUtils.join("DynamoDb table = ", tableName,
+ ", partition key = ", dynamoDBPartitionKey,
+ ", hudi table base path = ", hudiTableBasePath);
}
}
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java
index d9e705f19a9..12a3cd89647 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java
@@ -74,7 +74,7 @@ public abstract class DynamoDBBasedLockProviderBase
implements LockProvider<Lock
protected final DynamoDbBasedLockConfig dynamoDbBasedLockConfig;
protected final AmazonDynamoDBLockClient client;
protected final String tableName;
- private final String dynamoDBPartitionKey;
+ protected final String dynamoDBPartitionKey;
protected volatile LockItem lock;
protected DynamoDBBasedLockProviderBase(final LockConfiguration
lockConfiguration, final StorageConfiguration<?> conf, DynamoDbClient dynamoDB)
{
@@ -215,7 +215,7 @@ public abstract class DynamoDBBasedLockProviderBase
implements LockProvider<Lock
LOG.info("Created dynamoDB table " + tableName);
}
- private String generateLogSuffixString() {
+ protected String generateLogSuffixString() {
return StringUtils.join("DynamoDb table = ", tableName, ", partition key =
", dynamoDBPartitionKey);
}
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
b/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
index c268717c4c4..bb89ade4cb9 100644
--- a/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
+++ b/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
@@ -47,9 +47,6 @@ public class DynamoDbBasedLockConfig extends HoodieConfig {
return new DynamoDbBasedLockConfig.Builder();
}
- // The max length of DDB partition key allowed.
- public static final int MAX_PARTITION_KEY_SIZE_BYTE = 2048;
-
// configs for DynamoDb based locks
public static final String DYNAMODB_BASED_LOCK_PROPERTY_PREFIX =
LockConfiguration.LOCK_PREFIX + "dynamodb.";
diff --git
a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java
index b1bd552e396..bda1b3c656f 100644
---
a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java
+++
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java
@@ -114,17 +114,6 @@ public class ITTestDynamoDBBasedLockProvider {
dynamoDb = getDynamoClientWithLocalEndpoint();
}
- public static Stream<Object> testDimensions() {
- return Stream.of(
- // Without parititon key, only table name is used.
- Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG,
DynamoDBBasedLockProvider.class),
- Arguments.of(LOCK_CONFIGURATION, DynamoDBBasedLockProvider.class),
- // Even if we have partition key set, nothing would break.
- Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG_WITH_PART_KEY,
DynamoDBBasedImplicitPartitionKeyLockProvider.class),
- Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG,
DynamoDBBasedImplicitPartitionKeyLockProvider.class)
- );
- }
-
public static Stream<Object> badTestDimensions() {
return Stream.of(
Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG_NO_TBL_NAME,
DynamoDBBasedLockProvider.class),
@@ -165,6 +154,18 @@ public class ITTestDynamoDBBasedLockProvider {
Assertions.assertEquals(IllegalArgumentException.class,
e.getCause().getCause().getClass());
}
+ public static Stream<Arguments> testDimensions() {
+ return Stream.of(
+ // Without partition key, only table name is used.
+ Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG,
DynamoDBBasedLockProvider.class),
+ // Even if we have partition key set, nothing would break.
+ Arguments.of(LOCK_CONFIGURATION, DynamoDBBasedLockProvider.class),
+ // Even if we have partition key set, nothing would break.
+ Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG_WITH_PART_KEY,
DynamoDBBasedImplicitPartitionKeyLockProvider.class),
+ Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG,
DynamoDBBasedImplicitPartitionKeyLockProvider.class)
+ );
+ }
+
@ParameterizedTest
@MethodSource("testDimensions")
void testAcquireLock(LockConfiguration lockConfig, Class<?>
lockProviderClass) {
@@ -187,11 +188,14 @@ public class ITTestDynamoDBBasedLockProvider {
String tableName = (String)
lockConfig.getConfig().get(HoodieTableConfig.HOODIE_TABLE_NAME_KEY);
String basePath = (String)
lockConfig.getConfig().get(HoodieCommonConfig.BASE_PATH.key());
// Base path is constructed with prefix s3a, verify that for partition
key calculation, s3a is replaced with s3
+ Assertions.assertTrue(basePath.startsWith(SCHEME_S3A));
+ // Verify base path only scheme partition key
Assertions.assertEquals(
- tableName + '-' + HashID.generateXXHashAsString(SCHEME_S3 +
URI_NO_CLOUD_PROVIDER_PREFIX, HashID.Size.BITS_64),
+ HashID.generateXXHashAsString(SCHEME_S3 +
URI_NO_CLOUD_PROVIDER_PREFIX, HashID.Size.BITS_64),
dynamoDbBasedLockProvider.getPartitionKey());
- Assertions.assertTrue(basePath.startsWith(SCHEME_S3A));
}
+
+ // Test lock acquisition and release
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfig.getConfig().getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
TimeUnit.MILLISECONDS));
dynamoDbBasedLockProvider.unlock();
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
index f0159f2dab8..496d754f025 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
@@ -63,8 +63,6 @@ public abstract class BaseZookeeperBasedLockProvider
implements LockProvider<Int
protected final String zkBasePath;
protected final String lockKey;
- public static final int MAX_ZK_BASE_PATH_NUM_BYTES = 4096;
-
public BaseZookeeperBasedLockProvider(final LockConfiguration
lockConfiguration, final StorageConfiguration<?> conf) {
checkRequiredProps(lockConfiguration);
this.lockConfiguration = lockConfiguration;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedImplicitBasePathLockProvider.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedImplicitBasePathLockProvider.java
index fc834c32eff..199a9b6309d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedImplicitBasePathLockProvider.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedImplicitBasePathLockProvider.java
@@ -21,16 +21,17 @@ package org.apache.hudi.client.transaction.lock;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.lock.LockProvider;
-import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.hash.HashID;
import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.common.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
import static org.apache.hudi.aws.utils.S3Utils.s3aToS3;
-import static org.apache.hudi.common.util.StringUtils.concatenateWithThreshold;
/**
* A zookeeper based lock. This {@link LockProvider} implementation allows to
lock table operations
@@ -41,31 +42,38 @@ import static
org.apache.hudi.common.util.StringUtils.concatenateWithThreshold;
*/
@NotThreadSafe
public class ZookeeperBasedImplicitBasePathLockProvider extends
BaseZookeeperBasedLockProvider {
+ private static final Logger LOG =
LoggerFactory.getLogger(ZookeeperBasedImplicitBasePathLockProvider.class);
public static final String LOCK_KEY = "lock_key";
+ private final String hudiTableBasePath;
- public static String getLockBasePath(String hudiTableBasePath, String
hudiTableName) {
+ public static String getLockBasePath(String hudiTableBasePath) {
// Ensure consistent format for S3 URI.
- String hashPart = '-' +
HashID.generateXXHashAsString(s3aToS3(hudiTableBasePath), HashID.Size.BITS_64);
- String folderName = concatenateWithThreshold(hudiTableName, hashPart,
MAX_ZK_BASE_PATH_NUM_BYTES);
- return "/tmp/" + folderName;
+ String lockBasePath = "/tmp/" +
HashID.generateXXHashAsString(s3aToS3(hudiTableBasePath), HashID.Size.BITS_64);
+ LOG.info(String.format("The Zookeeper lock key for the base path %s is
%s", hudiTableBasePath, lockBasePath));
+ return lockBasePath;
}
public ZookeeperBasedImplicitBasePathLockProvider(final LockConfiguration
lockConfiguration, final StorageConfiguration<?> conf) {
super(lockConfiguration, conf);
+ hudiTableBasePath =
s3aToS3(lockConfiguration.getConfig().getString(HoodieCommonConfig.BASE_PATH.key()));
}
@Override
protected String getZkBasePath(LockConfiguration lockConfiguration) {
- String hudiTableBasePath =
ConfigUtils.getStringWithAltKeys(lockConfiguration.getConfig(),
HoodieCommonConfig.BASE_PATH);
- String hudiTableName =
ConfigUtils.getStringWithAltKeys(lockConfiguration.getConfig(),
HoodieTableConfig.NAME);
+ String hudiTableBasePath =
lockConfiguration.getConfig().getString(HoodieCommonConfig.BASE_PATH.key());
ValidationUtils.checkArgument(hudiTableBasePath != null);
- ValidationUtils.checkArgument(hudiTableName != null);
- return getLockBasePath(hudiTableBasePath, hudiTableName);
+ return getLockBasePath(hudiTableBasePath);
}
@Override
protected String getLockKey(LockConfiguration lockConfiguration) {
return LOCK_KEY;
}
+
+ @Override
+ protected String generateLogSuffixString() {
+ return StringUtils.join("ZkBasePath = ", zkBasePath,
+ ", lock key = ", lockKey, ", hudi table base path = ",
hudiTableBasePath);
+ }
}