This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new 34dbfefe316d [HUDI-8490] Adding DynamoDB Implicit Lock Provider
support (#17843)
34dbfefe316d is described below
commit 34dbfefe316dedd20e25617d2989f72d5d52780f
Author: Lokesh Jain <[email protected]>
AuthorDate: Thu Feb 12 08:29:35 2026 +0530
[HUDI-8490] Adding DynamoDB Implicit Lock Provider support (#17843)
---------
Co-authored-by: Davis-Zhang-Onehouse
<[email protected]>
Co-authored-by: Nicolas Paris <[email protected]>
Co-authored-by: Vova Kolmakov <[email protected]>
Co-authored-by: Vova Kolmakov <[email protected]>
Co-authored-by: Tim Brown <[email protected]>
Co-authored-by: voonhous <[email protected]>
Co-authored-by: Lokesh Jain <[email protected]>
---
...amoDBBasedImplicitPartitionKeyLockProvider.java | 74 +++++++
.../lock/DynamoDBBasedLockProvider.java | 185 ++--------------
...der.java => DynamoDBBasedLockProviderBase.java} | 98 +++++----
.../hudi/config/DynamoDbBasedLockConfig.java | 40 +---
.../integ/ITTestDynamoDBBasedLockProvider.java | 237 +++++++++++++++------
.../lock/DynamoDBBasedLockProviderBaseTest.java | 83 ++++++++
...er.java => BaseZookeeperBasedLockProvider.java} | 80 +++----
...ZookeeperBasedImplicitBasePathLockProvider.java | 79 +++++++
.../lock/ZookeeperBasedLockProvider.java | 180 ++--------------
.../TestZookeeperBasedLockProvider.java | 106 +++++++--
.../org/apache/hudi/common/util/hash/HashID.java | 22 +-
.../apache/hudi/common/util/hash/TestHashID.java | 40 +++-
.../src/test/resources/hash/magic_input.txt | 10 +
.../hash/xxhash_BITS_128_for_magic_input.txt | 10 +
.../hash/xxhash_BITS_32_for_magic_input.txt | 10 +
.../hash/xxhash_BITS_64_for_magic_input.txt | 10 +
.../org/apache/hudi/common/util/StringUtils.java | 79 +++++++
.../apache/hudi/common/util/TestStringUtils.java | 92 +++++++-
.../lock/HiveMetastoreBasedLockProvider.java | 9 +-
scripts/release/validate_source_copyright.sh | 4 +-
20 files changed, 885 insertions(+), 563 deletions(-)
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedImplicitPartitionKeyLockProvider.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedImplicitPartitionKeyLockProvider.java
new file mode 100644
index 000000000000..0a5ff98fcabd
--- /dev/null
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedImplicitPartitionKeyLockProvider.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.aws.transaction.lock;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.util.hash.HashID;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hudi.common.util.StringUtils;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static org.apache.hudi.common.fs.FSUtils.s3aToS3;
+
+/**
+ * A DynamoDB based lock.
+ * It implicitly derives the partition key from the hudi table name and hudi
table base path
+ * available in the lock configuration.
+ */
+@NotThreadSafe
+public class DynamoDBBasedImplicitPartitionKeyLockProvider extends
DynamoDBBasedLockProviderBase {
+ protected static final Logger LOG =
LoggerFactory.getLogger(DynamoDBBasedImplicitPartitionKeyLockProvider.class);
+
+ private final String hudiTableBasePath;
+
+ public DynamoDBBasedImplicitPartitionKeyLockProvider(final LockConfiguration
lockConfiguration, final StorageConfiguration<?> conf) {
+ this(lockConfiguration, conf, null);
+ }
+
+ public DynamoDBBasedImplicitPartitionKeyLockProvider(
+ final LockConfiguration lockConfiguration, final StorageConfiguration<?>
conf, DynamoDbClient dynamoDB) {
+ super(lockConfiguration, conf, dynamoDB);
+ hudiTableBasePath =
s3aToS3(lockConfiguration.getConfig().getString(HoodieCommonConfig.BASE_PATH.key()));
+ }
+
+ @Override
+ public String getDynamoDBPartitionKey(LockConfiguration lockConfiguration) {
+ // Ensure consistent format for S3 URI.
+ String hudiTableBasePathNormalized =
s3aToS3(lockConfiguration.getConfig().getString(
+ HoodieCommonConfig.BASE_PATH.key()));
+ String partitionKey =
HashID.generateXXHashAsString(hudiTableBasePathNormalized, HashID.Size.BITS_64);
+ LOG.info(String.format("The DynamoDB partition key of the lock provider
for the base path %s is %s",
+ hudiTableBasePathNormalized, partitionKey));
+ return partitionKey;
+ }
+
+ @Override
+ protected String generateLogSuffixString() {
+ return StringUtils.join("DynamoDb table = ", tableName,
+ ", partition key = ", dynamoDBPartitionKey,
+ ", hudi table base path = ", hudiTableBasePath);
+ }
+}
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
index 2b67a483f383..d7a72ffc54e6 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
@@ -18,199 +18,38 @@
package org.apache.hudi.aws.transaction.lock;
-import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory;
-import org.apache.hudi.aws.utils.DynamoTableUtils;
import org.apache.hudi.common.config.LockConfiguration;
-import org.apache.hudi.common.lock.LockProvider;
-import org.apache.hudi.common.lock.LockState;
-import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.DynamoDbBasedLockConfig;
-import org.apache.hudi.exception.HoodieLockException;
import org.apache.hudi.storage.StorageConfiguration;
-import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
-import com.amazonaws.services.dynamodbv2.LockItem;
-import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
-import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
-import software.amazon.awssdk.services.dynamodb.model.BillingMode;
-import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
-import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
-import software.amazon.awssdk.services.dynamodb.model.KeyType;
-import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
-import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import javax.annotation.concurrent.NotThreadSafe;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import static
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY;
/**
- * A DynamoDB based lock. This {@link LockProvider} implementation allows to
lock table operations
- * using DynamoDB. Users need to have access to AWS DynamoDB to be able to use
this lock.
+ * A DynamoDB based lock.
+ * It expects partition key is explicitly available in DynamoDbBasedLockConfig.
*/
@NotThreadSafe
-public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(DynamoDBBasedLockProvider.class);
-
- private static final String DYNAMODB_ATTRIBUTE_NAME = "key";
-
- private final AmazonDynamoDBLockClient client;
- private final String tableName;
- private final String dynamoDBPartitionKey;
- protected final DynamoDbBasedLockConfig dynamoDBLockConfiguration;
- private volatile LockItem lock;
+public class DynamoDBBasedLockProvider extends DynamoDBBasedLockProviderBase {
public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final StorageConfiguration<?> conf) {
this(lockConfiguration, conf, null);
}
public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final StorageConfiguration<?> conf, DynamoDbClient dynamoDB) {
- this.dynamoDBLockConfiguration = DynamoDbBasedLockConfig.newBuilder()
- .fromProperties(lockConfiguration.getConfig())
- .build();
- this.tableName =
dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME);
- this.dynamoDBPartitionKey =
dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY);
- long leaseDuration =
dynamoDBLockConfiguration.getInt(DynamoDbBasedLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY);
- if (dynamoDB == null) {
- dynamoDB = getDynamoDBClient();
- }
- // build the dynamoDb lock client
- this.client = new AmazonDynamoDBLockClient(
- AmazonDynamoDBLockClientOptions.builder(dynamoDB, tableName)
- .withTimeUnit(TimeUnit.MILLISECONDS)
- .withLeaseDuration(leaseDuration)
- .withHeartbeatPeriod(leaseDuration / 3)
- .withCreateHeartbeatBackgroundThread(true)
- .build());
-
- if (!this.client.lockTableExists()) {
- createLockTableInDynamoDB(dynamoDB, tableName);
- }
- }
-
- @Override
- public boolean tryLock(long time, TimeUnit unit) {
- LOG.info(generateLogStatement(LockState.ACQUIRING,
generateLogSuffixString()));
- try {
- lock =
client.acquireLock(AcquireLockOptions.builder(dynamoDBPartitionKey)
- .withAdditionalTimeToWaitForLock(time)
- .withTimeUnit(TimeUnit.MILLISECONDS)
- .build());
- LOG.info(generateLogStatement(LockState.ACQUIRED,
generateLogSuffixString()));
- } catch (InterruptedException e) {
- throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE,
generateLogSuffixString()), e);
- } catch (LockNotGrantedException e) {
- return false;
- }
- return lock != null && !lock.isExpired();
+ super(lockConfiguration, conf, dynamoDB);
}
@Override
- public void unlock() {
- try {
- LOG.info(generateLogStatement(LockState.RELEASING,
generateLogSuffixString()));
- if (lock == null) {
- return;
- }
- if (!client.releaseLock(lock)) {
- LOG.warn("The lock has already been stolen");
- }
- lock = null;
- LOG.info(generateLogStatement(LockState.RELEASED,
generateLogSuffixString()));
- } catch (Exception e) {
- throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE,
generateLogSuffixString()), e);
- }
- }
-
- @Override
- public void close() {
- try {
- if (lock != null) {
- if (!client.releaseLock(lock)) {
- LOG.warn("The lock has already been stolen");
- }
- lock = null;
- }
- this.client.close();
- } catch (Exception e) {
- LOG.error(generateLogStatement(LockState.FAILED_TO_RELEASE,
generateLogSuffixString()));
- }
- }
-
- @Override
- public LockItem getLock() {
- return lock;
- }
-
- private DynamoDbClient getDynamoDBClient() {
- String region =
this.dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION);
- String endpointURL =
this.dynamoDBLockConfiguration.contains(DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL.key())
- ?
this.dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL)
- :
DynamoDbClient.serviceMetadata().endpointFor(Region.of(region)).toString();
-
- if (!endpointURL.startsWith("https://") &&
!endpointURL.startsWith("http://")) {
- endpointURL = "https://" + endpointURL;
- }
-
- return DynamoDbClient.builder()
- .endpointOverride(URI.create(endpointURL))
-
.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(dynamoDBLockConfiguration.getProps()))
- .build();
- }
-
- private void createLockTableInDynamoDB(DynamoDbClient dynamoDB, String
tableName) {
- String billingMode =
dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE);
- KeySchemaElement partitionKeyElement = KeySchemaElement
- .builder()
- .attributeName(DYNAMODB_ATTRIBUTE_NAME)
- .keyType(KeyType.HASH)
- .build();
-
- List<KeySchemaElement> keySchema = new ArrayList<>();
- keySchema.add(partitionKeyElement);
-
- Collection<AttributeDefinition> attributeDefinitions = new ArrayList<>();
-
attributeDefinitions.add(AttributeDefinition.builder().attributeName(DYNAMODB_ATTRIBUTE_NAME).attributeType(ScalarAttributeType.S).build());
- CreateTableRequest.Builder createTableRequestBuilder =
CreateTableRequest.builder();
- if (billingMode.equals(BillingMode.PROVISIONED.name())) {
-
createTableRequestBuilder.provisionedThroughput(ProvisionedThroughput.builder()
-
.readCapacityUnits(dynamoDBLockConfiguration.getLong(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY))
-
.writeCapacityUnits(dynamoDBLockConfiguration.getLong(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY))
- .build());
- }
- createTableRequestBuilder.tableName(tableName)
- .keySchema(keySchema)
- .attributeDefinitions(attributeDefinitions)
- .billingMode(billingMode);
- dynamoDB.createTable(createTableRequestBuilder.build());
-
- LOG.info("Creating dynamoDB table " + tableName + ", waiting for table to
be active");
- try {
-
- DynamoTableUtils.waitUntilActive(dynamoDB, tableName,
dynamoDBLockConfiguration.getInt(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT),
20 * 1000);
- } catch (DynamoTableUtils.TableNeverTransitionedToStateException e) {
- throw new HoodieLockException("Created dynamoDB table never transits to
active", e);
- } catch (InterruptedException e) {
- throw new HoodieLockException("Thread interrupted while waiting for
dynamoDB table to turn active", e);
- }
- LOG.info("Created dynamoDB table " + tableName);
- }
-
- private String generateLogSuffixString() {
- return StringUtils.join("DynamoDb table = ", tableName, ", partition key =
", dynamoDBPartitionKey);
- }
-
- protected String generateLogStatement(LockState state, String suffix) {
- return StringUtils.join(state.name(), " lock at ", suffix);
+ public String getDynamoDBPartitionKey(LockConfiguration lockConfiguration) {
+ DynamoDbBasedLockConfig config =
DynamoDbBasedLockConfig.from(lockConfiguration.getConfig());
+ ValidationUtils.checkArgument(
+ config.contains(DYNAMODB_LOCK_PARTITION_KEY),
+ "Config key is not found: " + DYNAMODB_LOCK_PARTITION_KEY.key());
+ return config.getString(DYNAMODB_LOCK_PARTITION_KEY);
}
}
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java
similarity index 66%
copy from
hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
copy to
hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java
index 2b67a483f383..3a1677d51749 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java
@@ -47,42 +47,44 @@ import
software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import javax.annotation.concurrent.NotThreadSafe;
+import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL;
+import static
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE;
+import static
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY;
+import static
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION;
+import static
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT;
+import static
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY;
+
/**
* A DynamoDB based lock. This {@link LockProvider} implementation allows to
lock table operations
* using DynamoDB. Users need to have access to AWS DynamoDB to be able to use
this lock.
*/
@NotThreadSafe
-public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(DynamoDBBasedLockProvider.class);
+public abstract class DynamoDBBasedLockProviderBase implements
LockProvider<LockItem>, Serializable {
- private static final String DYNAMODB_ATTRIBUTE_NAME = "key";
+ protected static final Logger LOG =
LoggerFactory.getLogger(DynamoDBBasedLockProviderBase.class);
- private final AmazonDynamoDBLockClient client;
- private final String tableName;
- private final String dynamoDBPartitionKey;
- protected final DynamoDbBasedLockConfig dynamoDBLockConfiguration;
- private volatile LockItem lock;
+ protected static final String DYNAMODB_ATTRIBUTE_NAME = "key";
- public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final StorageConfiguration<?> conf) {
- this(lockConfiguration, conf, null);
- }
+ protected final DynamoDbBasedLockConfig dynamoDbBasedLockConfig;
+ protected final transient AmazonDynamoDBLockClient client;
+ protected final String tableName;
+ protected final String dynamoDBPartitionKey;
+ protected volatile LockItem lock;
- public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final StorageConfiguration<?> conf, DynamoDbClient dynamoDB) {
- this.dynamoDBLockConfiguration = DynamoDbBasedLockConfig.newBuilder()
- .fromProperties(lockConfiguration.getConfig())
- .build();
- this.tableName =
dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME);
- this.dynamoDBPartitionKey =
dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY);
- long leaseDuration =
dynamoDBLockConfiguration.getInt(DynamoDbBasedLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY);
+ protected DynamoDBBasedLockProviderBase(final LockConfiguration
lockConfiguration, final StorageConfiguration<?> conf, DynamoDbClient dynamoDB)
{
+ this.dynamoDbBasedLockConfig =
DynamoDbBasedLockConfig.from(lockConfiguration.getConfig());
+ this.tableName =
dynamoDbBasedLockConfig.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME);
+ long leaseDuration =
dynamoDbBasedLockConfig.getInt(DynamoDbBasedLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY);
+ dynamoDBPartitionKey = getDynamoDBPartitionKey(lockConfiguration);
if (dynamoDB == null) {
- dynamoDB = getDynamoDBClient();
+ dynamoDB = getDynamoDBClient(dynamoDbBasedLockConfig);
}
// build the dynamoDb lock client
this.client = new AmazonDynamoDBLockClient(
@@ -98,14 +100,20 @@ public class DynamoDBBasedLockProvider implements
LockProvider<LockItem> {
}
}
+ public abstract String getDynamoDBPartitionKey(LockConfiguration
lockConfiguration);
+
+ public String getPartitionKey() {
+ return dynamoDBPartitionKey;
+ }
+
@Override
public boolean tryLock(long time, TimeUnit unit) {
LOG.info(generateLogStatement(LockState.ACQUIRING,
generateLogSuffixString()));
try {
lock =
client.acquireLock(AcquireLockOptions.builder(dynamoDBPartitionKey)
- .withAdditionalTimeToWaitForLock(time)
- .withTimeUnit(TimeUnit.MILLISECONDS)
- .build());
+ .withAdditionalTimeToWaitForLock(time)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .build());
LOG.info(generateLogStatement(LockState.ACQUIRED,
generateLogSuffixString()));
} catch (InterruptedException e) {
throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE,
generateLogSuffixString()), e);
@@ -152,29 +160,29 @@ public class DynamoDBBasedLockProvider implements
LockProvider<LockItem> {
return lock;
}
- private DynamoDbClient getDynamoDBClient() {
- String region =
this.dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION);
- String endpointURL =
this.dynamoDBLockConfiguration.contains(DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL.key())
- ?
this.dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL)
- :
DynamoDbClient.serviceMetadata().endpointFor(Region.of(region)).toString();
+ private static DynamoDbClient getDynamoDBClient(DynamoDbBasedLockConfig
dynamoDbBasedLockConfig) {
+ String region = dynamoDbBasedLockConfig.getString(DYNAMODB_LOCK_REGION);
+ String endpointURL =
dynamoDbBasedLockConfig.contains(DYNAMODB_ENDPOINT_URL.key())
+ ? dynamoDbBasedLockConfig.getString(DYNAMODB_ENDPOINT_URL)
+ :
DynamoDbClient.serviceMetadata().endpointFor(Region.of(region)).toString();
- if (!endpointURL.startsWith("https://") &&
!endpointURL.startsWith("http://")) {
+ if (!endpointURL.startsWith("https://") ||
!endpointURL.startsWith("http://")) {
endpointURL = "https://" + endpointURL;
}
return DynamoDbClient.builder()
- .endpointOverride(URI.create(endpointURL))
-
.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(dynamoDBLockConfiguration.getProps()))
- .build();
+ .endpointOverride(URI.create(endpointURL))
+
.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(dynamoDbBasedLockConfig.getProps()))
+ .build();
}
private void createLockTableInDynamoDB(DynamoDbClient dynamoDB, String
tableName) {
- String billingMode =
dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE);
+ String billingMode =
dynamoDbBasedLockConfig.getString(DYNAMODB_LOCK_BILLING_MODE);
KeySchemaElement partitionKeyElement = KeySchemaElement
- .builder()
- .attributeName(DYNAMODB_ATTRIBUTE_NAME)
- .keyType(KeyType.HASH)
- .build();
+ .builder()
+ .attributeName(DYNAMODB_ATTRIBUTE_NAME)
+ .keyType(KeyType.HASH)
+ .build();
List<KeySchemaElement> keySchema = new ArrayList<>();
keySchema.add(partitionKeyElement);
@@ -184,20 +192,20 @@ public class DynamoDBBasedLockProvider implements
LockProvider<LockItem> {
CreateTableRequest.Builder createTableRequestBuilder =
CreateTableRequest.builder();
if (billingMode.equals(BillingMode.PROVISIONED.name())) {
createTableRequestBuilder.provisionedThroughput(ProvisionedThroughput.builder()
-
.readCapacityUnits(dynamoDBLockConfiguration.getLong(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY))
-
.writeCapacityUnits(dynamoDBLockConfiguration.getLong(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY))
- .build());
+
.readCapacityUnits(dynamoDbBasedLockConfig.getLong(DYNAMODB_LOCK_READ_CAPACITY))
+
.writeCapacityUnits(dynamoDbBasedLockConfig.getLong(DYNAMODB_LOCK_WRITE_CAPACITY))
+ .build());
}
createTableRequestBuilder.tableName(tableName)
- .keySchema(keySchema)
- .attributeDefinitions(attributeDefinitions)
- .billingMode(billingMode);
+ .keySchema(keySchema)
+ .attributeDefinitions(attributeDefinitions)
+ .billingMode(billingMode);
dynamoDB.createTable(createTableRequestBuilder.build());
LOG.info("Creating dynamoDB table " + tableName + ", waiting for table to
be active");
try {
- DynamoTableUtils.waitUntilActive(dynamoDB, tableName,
dynamoDBLockConfiguration.getInt(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT),
20 * 1000);
+ DynamoTableUtils.waitUntilActive(dynamoDB, tableName,
dynamoDbBasedLockConfig.getInt(DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT), 20 *
1000);
} catch (DynamoTableUtils.TableNeverTransitionedToStateException e) {
throw new HoodieLockException("Created dynamoDB table never transits to
active", e);
} catch (InterruptedException e) {
@@ -206,7 +214,7 @@ public class DynamoDBBasedLockProvider implements
LockProvider<LockItem> {
LOG.info("Created dynamoDB table " + tableName);
}
- private String generateLogSuffixString() {
+ protected String generateLogSuffixString() {
return StringUtils.join("DynamoDb table = ", tableName, ", partition key =
", dynamoDBPartitionKey);
}
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
b/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
index 5639db025825..7e90c36f2cdb 100644
--- a/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
+++ b/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
@@ -43,10 +43,6 @@ import
software.amazon.awssdk.services.dynamodb.model.BillingMode;
+ " are auto managed internally.")
public class DynamoDbBasedLockConfig extends HoodieConfig {
- public static DynamoDbBasedLockConfig.Builder newBuilder() {
- return new DynamoDbBasedLockConfig.Builder();
- }
-
// configs for DynamoDb based locks
public static final String DYNAMODB_BASED_LOCK_PROPERTY_PREFIX =
LockConfiguration.LOCK_PREFIX + "dynamodb.";
@@ -132,34 +128,12 @@ public class DynamoDbBasedLockConfig extends HoodieConfig
{
.sinceVersion("0.10.0")
.withDocumentation("Lock Acquire Wait Timeout in milliseconds");
- /**
- * Builder for {@link DynamoDbBasedLockConfig}.
- */
- public static class Builder {
- private final DynamoDbBasedLockConfig lockConfig = new
DynamoDbBasedLockConfig();
-
- public DynamoDbBasedLockConfig build() {
- lockConfig.setDefaults(DynamoDbBasedLockConfig.class.getName());
- checkRequiredProps(lockConfig);
- return lockConfig;
- }
-
- public Builder fromProperties(TypedProperties props) {
- lockConfig.getProps().putAll(props);
- return this;
- }
-
- private void checkRequiredProps(final DynamoDbBasedLockConfig config) {
- String errorMsg = "Config key is not found: ";
- ValidationUtils.checkArgument(
- config.contains(DYNAMODB_LOCK_TABLE_NAME.key()),
- errorMsg + DYNAMODB_LOCK_TABLE_NAME.key());
- ValidationUtils.checkArgument(
- config.contains(DYNAMODB_LOCK_REGION.key()),
- errorMsg + DYNAMODB_LOCK_REGION.key());
- ValidationUtils.checkArgument(
- config.contains(DYNAMODB_LOCK_PARTITION_KEY.key()),
- errorMsg + DYNAMODB_LOCK_PARTITION_KEY.key());
- }
+ public static DynamoDbBasedLockConfig from(TypedProperties properties) {
+ DynamoDbBasedLockConfig config = new DynamoDbBasedLockConfig();
+ config.getProps().putAll(properties);
+ config.setDefaults(DynamoDbBasedLockConfig.class.getName());
+
ValidationUtils.checkArgument(config.contains(DYNAMODB_LOCK_TABLE_NAME.key()),
"Config key is not found: " + DYNAMODB_LOCK_TABLE_NAME.key());
+ ValidationUtils.checkArgument(config.contains(DYNAMODB_LOCK_REGION.key()),
"Config key is not found: " + DYNAMODB_LOCK_REGION.key());
+ return config;
}
}
diff --git
a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java
index b874f4f3c3cc..6a7d9ef78aab 100644
---
a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java
+++
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java
@@ -18,7 +18,24 @@
package org.apache.hudi.aws.transaction.integ;
+import
org.apache.hudi.aws.transaction.lock.DynamoDBBasedImplicitPartitionKeyLockProvider;
+import org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider;
+import org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProviderBase;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.hash.HashID;
+import org.apache.hudi.config.DynamoDbBasedLockConfig;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
@@ -26,18 +43,11 @@ import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
-import org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider;
-import org.apache.hudi.common.config.LockConfiguration;
-import org.apache.hudi.config.DynamoDbBasedLockConfig;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-
import java.net.URI;
import java.util.UUID;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
@@ -48,62 +58,70 @@ import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_
@Disabled("HUDI-7475 The tests do not work. Disabling them to unblock Azure
CI")
public class ITTestDynamoDBBasedLockProvider {
- private static LockConfiguration lockConfiguration;
- private static DynamoDbClient dynamoDb;
-
+ private static final LockConfiguration LOCK_CONFIGURATION;
+ private static final LockConfiguration
IMPLICIT_PART_KEY_LOCK_CONFIG_NO_BASE_PATH;
+ private static final LockConfiguration
IMPLICIT_PART_KEY_LOCK_CONFIG_WITH_PART_KEY;
+ private static final LockConfiguration
IMPLICIT_PART_KEY_LOCK_CONFIG_NO_TBL_NAME;
+ private static final LockConfiguration IMPLICIT_PART_KEY_LOCK_CONFIG;
private static final String TABLE_NAME_PREFIX = "testDDBTable-";
private static final String REGION = "us-east-2";
+ private static DynamoDbClient dynamoDb;
+ private static String SCHEME_S3 = "s3";
+ private static String SCHEME_S3A = "s3a";
+ private static String URI_NO_CLOUD_PROVIDER_PREFIX =
"://my-bucket-8b2a4b30/1718662238400/be715573/my_lake/my_table";
+
+ static {
+ Properties dynamoDblpProps = new Properties();
+ Properties implicitPartKeyLpProps;
+ // Common properties shared by both lock providers.
+
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key(),
BillingMode.PAY_PER_REQUEST.name());
+
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key(),
Integer.toString(20 * 1000 * 5));
+
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key(),
REGION);
+ dynamoDblpProps.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
+
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key(),
"0");
+
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key(),
"0");
+
+ implicitPartKeyLpProps = new TypedProperties(dynamoDblpProps);
+ dynamoDblpProps = new TypedProperties(dynamoDblpProps);
+
+ // For the day-1 DynamoDbBasedLockProvider, it requires an optional
partition key
+
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key(),
"testKey");
+ LOCK_CONFIGURATION = new LockConfiguration(dynamoDblpProps);
+
+ // For the newly added implicit partition key DDB lock provider, it can
derive the partition key from
+ // hudi table base path and hudi table name. These properties are
available in lockConfig as of today.
+ implicitPartKeyLpProps.setProperty(
+ HoodieCommonConfig.BASE_PATH.key(),
+ "gs://my-bucket-8b2a4b30/1718662238400/be715573/my_lake/my_table");
+ implicitPartKeyLpProps.setProperty(
+ HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "ma_po_tofu_is_awesome");
+ IMPLICIT_PART_KEY_LOCK_CONFIG = new
LockConfiguration(implicitPartKeyLpProps);
+ // With partition key nothing should break, the field should be simply
ignored.
+ TypedProperties withPartKey = new TypedProperties(implicitPartKeyLpProps);
+
withPartKey.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key(),
"testKey");
+ IMPLICIT_PART_KEY_LOCK_CONFIG_WITH_PART_KEY = new
LockConfiguration(withPartKey);
+
+ // Missing either base path or hoodie table name is a bad config for
implicit partition key lock provider.
+ TypedProperties missingBasePath = new
TypedProperties(implicitPartKeyLpProps);
+ missingBasePath.remove(HoodieCommonConfig.BASE_PATH.key());
+ IMPLICIT_PART_KEY_LOCK_CONFIG_NO_BASE_PATH = new
LockConfiguration(missingBasePath);
+
+ TypedProperties missingTableName = new
TypedProperties(implicitPartKeyLpProps);
+ missingTableName.remove(HoodieTableConfig.HOODIE_TABLE_NAME_KEY);
+ IMPLICIT_PART_KEY_LOCK_CONFIG_NO_TBL_NAME = new
LockConfiguration(missingTableName);
+ }
@BeforeAll
public static void setup() throws InterruptedException {
- Properties properties = new Properties();
-
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key(),
BillingMode.PAY_PER_REQUEST.name());
- // properties.setProperty(AWSLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
TABLE_NAME_PREFIX);
-
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key(),
"testKey");
- properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key(),
REGION);
- properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
-
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key(),
"0");
-
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key(),
"0");
- lockConfiguration = new LockConfiguration(properties);
dynamoDb = getDynamoClientWithLocalEndpoint();
}
- @Test
- public void testAcquireLock() {
-
lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
TABLE_NAME_PREFIX + UUID.randomUUID());
- DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new
DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
-
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
- .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
TimeUnit.MILLISECONDS));
- dynamoDbBasedLockProvider.unlock();
- }
-
- @Test
- public void testUnlock() {
-
lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
TABLE_NAME_PREFIX + UUID.randomUUID());
- DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new
DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
-
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
- .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
TimeUnit.MILLISECONDS));
- dynamoDbBasedLockProvider.unlock();
-
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
- .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
TimeUnit.MILLISECONDS));
- }
-
- @Test
- public void testReentrantLock() {
-
lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
TABLE_NAME_PREFIX + UUID.randomUUID());
- DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new
DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
-
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
- .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
TimeUnit.MILLISECONDS));
-
Assertions.assertFalse(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
- .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
TimeUnit.MILLISECONDS));
- dynamoDbBasedLockProvider.unlock();
- }
-
- @Test
- public void testUnlockWithoutLock() {
-
lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
TABLE_NAME_PREFIX + UUID.randomUUID());
- DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new
DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
- dynamoDbBasedLockProvider.unlock();
+ public static Stream<Object> badTestDimensions() {
+ return Stream.of(
+ Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG_NO_TBL_NAME,
DynamoDBBasedLockProvider.class),
+ Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG_NO_TBL_NAME,
DynamoDBBasedImplicitPartitionKeyLockProvider.class),
+ Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG_NO_BASE_PATH,
DynamoDBBasedImplicitPartitionKeyLockProvider.class)
+ );
}
private static DynamoDbClient getDynamoClientWithLocalEndpoint() {
@@ -112,13 +130,112 @@ public class ITTestDynamoDBBasedLockProvider {
throw new IllegalStateException("dynamodb-local.endpoint system property
not set");
}
return DynamoDbClient.builder()
- .region(Region.of(REGION))
- .endpointOverride(URI.create(endpoint))
- .credentialsProvider(getCredentials())
- .build();
+ .region(Region.of(REGION))
+ .endpointOverride(URI.create(endpoint))
+ .credentialsProvider(getCredentials())
+ .build();
}
private static AwsCredentialsProvider getCredentials() {
return
StaticCredentialsProvider.create(AwsBasicCredentials.create("random-access-key",
"random-secret-key"));
}
+
+ @ParameterizedTest
+ @MethodSource("badTestDimensions")
+ void testBadConfig(LockConfiguration lockConfig, Class<?> lockProviderClass)
{
+
lockConfig.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
TABLE_NAME_PREFIX + UUID.randomUUID());
+ Exception e = new Exception();
+ try {
+ ReflectionUtils.loadClass(
+ lockProviderClass.getName(),
+ new Class<?>[] {LockConfiguration.class, StorageConfiguration.class,
DynamoDbClient.class},
+ lockConfig, null, dynamoDb);
+ } catch (Exception ex) {
+ e = ex;
+ }
+ Assertions.assertEquals(IllegalArgumentException.class,
e.getCause().getCause().getClass());
+ }
+
+ public static Stream<Arguments> testDimensions() {
+ return Stream.of(
+ // Without partition key, only table name is used.
+ Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG,
DynamoDBBasedLockProvider.class),
+ // Even if we have partition key set, nothing would break.
+ Arguments.of(LOCK_CONFIGURATION, DynamoDBBasedLockProvider.class),
+ // Even if we have partition key set, nothing would break.
+ Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG_WITH_PART_KEY,
DynamoDBBasedImplicitPartitionKeyLockProvider.class),
+ Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG,
DynamoDBBasedImplicitPartitionKeyLockProvider.class)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("testDimensions")
+ void testAcquireLock(LockConfiguration lockConfig, Class<?>
lockProviderClass) {
+
lockConfig.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
TABLE_NAME_PREFIX + UUID.randomUUID());
+ DynamoDBBasedLockProviderBase dynamoDbBasedLockProvider =
(DynamoDBBasedLockProviderBase) ReflectionUtils.loadClass(
+ lockProviderClass.getName(),
+ new Class<?>[] {LockConfiguration.class, StorageConfiguration.class,
DynamoDbClient.class},
+ new Object[] {lockConfig, null, dynamoDb});
+
+ // Also validate the partition key is properly constructed.
+ if (lockProviderClass.equals(DynamoDBBasedLockProvider.class)) {
+ String tableName = (String)
lockConfig.getConfig().get(HoodieTableConfig.HOODIE_TABLE_NAME_KEY);
+ String partitionKey = (String)
lockConfig.getConfig().get(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key());
+ if (partitionKey != null) {
+ Assertions.assertEquals(partitionKey,
dynamoDbBasedLockProvider.getPartitionKey());
+ } else {
+ Assertions.assertEquals(tableName,
dynamoDbBasedLockProvider.getPartitionKey());
+ }
+ } else if
(lockProviderClass.equals(DynamoDBBasedImplicitPartitionKeyLockProvider.class))
{
+ String tableName = (String)
lockConfig.getConfig().get(HoodieTableConfig.HOODIE_TABLE_NAME_KEY);
+ String basePath = (String)
lockConfig.getConfig().get(HoodieCommonConfig.BASE_PATH.key());
+ // Base path is constructed with prefix s3a, verify that for partition
key calculation, s3a is replaced with s3
+ Assertions.assertTrue(basePath.startsWith(SCHEME_S3A));
+ // Verify base path only scheme partition key
+ Assertions.assertEquals(
+ HashID.generateXXHashAsString(SCHEME_S3 +
URI_NO_CLOUD_PROVIDER_PREFIX, HashID.Size.BITS_64),
+ dynamoDbBasedLockProvider.getPartitionKey());
+ }
+
+ // Test lock acquisition and release
+
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfig.getConfig().getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
TimeUnit.MILLISECONDS));
+ dynamoDbBasedLockProvider.unlock();
+ }
+
+ @ParameterizedTest
+ @MethodSource("testDimensions")
+ void testUnlock(LockConfiguration lockConfig, Class<?> lockProviderClass) {
+
lockConfig.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
TABLE_NAME_PREFIX + UUID.randomUUID());
+ DynamoDBBasedLockProviderBase dynamoDbBasedLockProvider =
(DynamoDBBasedLockProviderBase) ReflectionUtils.loadClass(
+ lockProviderClass.getName(),
+ new Class<?>[] {LockConfiguration.class, StorageConfiguration.class,
DynamoDbClient.class},
+ new Object[] {lockConfig, null, dynamoDb});
+
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfig.getConfig().getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
TimeUnit.MILLISECONDS));
+ dynamoDbBasedLockProvider.unlock();
+
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfig.getConfig().getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
TimeUnit.MILLISECONDS));
+ }
+
+ @ParameterizedTest
+ @MethodSource("testDimensions")
+ void testReentrantLock(LockConfiguration lockConfig, Class<?>
lockProviderClass) {
+
lockConfig.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
TABLE_NAME_PREFIX + UUID.randomUUID());
+ DynamoDBBasedLockProviderBase dynamoDbBasedLockProvider =
(DynamoDBBasedLockProviderBase) ReflectionUtils.loadClass(
+ lockProviderClass.getName(),
+ new Class<?>[] {LockConfiguration.class, StorageConfiguration.class,
DynamoDbClient.class},
+ new Object[] {lockConfig, null, dynamoDb});
+
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfig.getConfig().getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
TimeUnit.MILLISECONDS));
+
Assertions.assertFalse(dynamoDbBasedLockProvider.tryLock(lockConfig.getConfig().getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
TimeUnit.MILLISECONDS));
+ dynamoDbBasedLockProvider.unlock();
+ }
+
+ @ParameterizedTest
+ @MethodSource("testDimensions")
+ void testUnlockWithoutLock(LockConfiguration lockConfig, Class<?>
lockProviderClass) {
+
lockConfig.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
TABLE_NAME_PREFIX + UUID.randomUUID());
+ DynamoDBBasedLockProviderBase dynamoDbBasedLockProvider =
(DynamoDBBasedLockProviderBase) ReflectionUtils.loadClass(
+ lockProviderClass.getName(),
+ new Class<?>[] {LockConfiguration.class, StorageConfiguration.class,
DynamoDbClient.class},
+ new Object[] {lockConfig, null, dynamoDb});
+ dynamoDbBasedLockProvider.unlock();
+ }
}
diff --git
a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBaseTest.java
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBaseTest.java
new file mode 100644
index 000000000000..9e7256c3c84f
--- /dev/null
+++
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBaseTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.aws.transaction.lock;
+
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.config.DynamoDbBasedLockConfig;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.BillingMode;
+
+import java.util.Properties;
+
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+
+class DynamoDBBasedLockProviderBaseTest {
+ private static final LockConfiguration LOCK_CONFIGURATION;
+ @Mock
+ private static DynamoDbClient mockClient = new DynamoDbClient() {
+ @Override
+ public String serviceName() {
+ return "";
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
+
+ static {
+ Properties dynamoDblpProps = new TypedProperties();
+
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key(),
BillingMode.PAY_PER_REQUEST.name());
+
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key(),
Integer.toString(20 * 1000 * 5));
+
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key(),
"us-east-2");
+
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
"my-table");
+ dynamoDblpProps.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
+
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key(),
"0");
+
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key(),
"0");
+
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key(),
"testKey");
+ LOCK_CONFIGURATION = new LockConfiguration(dynamoDblpProps);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testLockProviderBaseInitialization(boolean isNull) {
+ Exception e = null;
+ try {
+ new DynamoDBBasedLockProvider(LOCK_CONFIGURATION, new
HadoopStorageConfiguration(true), isNull ? null : mockClient);
+ } catch (Exception ex) {
+ e = ex;
+ }
+ Assertions.assertNotNull(e);
+ if (isNull) {
+ // Initialization should fail on AWS API call due to invalid setup.
+
Assertions.assertEquals(software.amazon.awssdk.core.exception.SdkClientException.class,
e.getClass());
+ } else {
+ // Otherwise it should be anything but NPE.
+ Assertions.assertNotEquals(java.lang.NullPointerException.class,
e.getClass());
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
similarity index 64%
copy from
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
copy to
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
index 02f137b509a6..807226460a4a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/BaseZookeeperBasedLockProvider.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client.transaction.lock;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.lock.LockProvider;
import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieLockException;
@@ -28,7 +29,6 @@ import org.apache.hudi.storage.StorageConfiguration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.zookeeper.KeeperException;
@@ -40,47 +40,57 @@ import javax.annotation.concurrent.NotThreadSafe;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
-import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
-import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
-import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
-import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY;
-import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
-import static
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP_KEY;
-import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP_KEY;
import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP_KEY;
-import static
org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP_KEY;
-import static
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP_KEY;
+import static org.apache.hudi.config.HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES;
+import static
org.apache.hudi.config.HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS;
+import static
org.apache.hudi.config.HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS;
+import static org.apache.hudi.config.HoodieLockConfig.ZK_CONNECTION_TIMEOUT_MS;
+import static org.apache.hudi.config.HoodieLockConfig.ZK_CONNECT_URL;
+import static org.apache.hudi.config.HoodieLockConfig.ZK_SESSION_TIMEOUT_MS;
/**
* A zookeeper based lock. This {@link LockProvider} implementation allows to
lock table operations
* using zookeeper. Users need to have a Zookeeper cluster deployed to be able
to use this lock.
*/
@NotThreadSafe
-public class ZookeeperBasedLockProvider implements
LockProvider<InterProcessMutex>, Serializable {
+public abstract class BaseZookeeperBasedLockProvider implements
LockProvider<InterProcessMutex>, Serializable {
- private static final Logger LOG =
LoggerFactory.getLogger(ZookeeperBasedLockProvider.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(BaseZookeeperBasedLockProvider.class);
private final transient CuratorFramework curatorFrameworkClient;
private volatile InterProcessMutex lock = null;
- protected LockConfiguration lockConfiguration;
+ protected final LockConfiguration lockConfiguration;
+ protected final String zkBasePath;
+ protected final String lockKey;
- public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration,
final StorageConfiguration<?> conf) {
+ public BaseZookeeperBasedLockProvider(final LockConfiguration
lockConfiguration, final StorageConfiguration<?> conf) {
checkRequiredProps(lockConfiguration);
this.lockConfiguration = lockConfiguration;
+ zkBasePath = getZkBasePath(lockConfiguration);
+ lockKey = getLockKey(lockConfiguration);
this.curatorFrameworkClient = CuratorFrameworkFactory.builder()
-
.connectString(lockConfiguration.getConfig().getString(ZK_CONNECT_URL_PROP_KEY))
- .retryPolicy(new
BoundedExponentialBackoffRetry(lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY),
-
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY),
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY)))
-
.sessionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_SESSION_TIMEOUT_MS_PROP_KEY,
DEFAULT_ZK_SESSION_TIMEOUT_MS))
-
.connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY,
DEFAULT_ZK_CONNECTION_TIMEOUT_MS))
+
.connectString(ConfigUtils.getStringWithAltKeys(lockConfiguration.getConfig(),
ZK_CONNECT_URL))
+ .retryPolicy(new BoundedExponentialBackoffRetry(
+ ConfigUtils.getIntWithAltKeys(lockConfiguration.getConfig(),
LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS),
+ ConfigUtils.getIntWithAltKeys(lockConfiguration.getConfig(),
LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS),
+ ConfigUtils.getIntWithAltKeys(lockConfiguration.getConfig(),
LOCK_ACQUIRE_NUM_RETRIES)))
+
.sessionTimeoutMs(ConfigUtils.getIntWithAltKeys(lockConfiguration.getConfig(),
ZK_SESSION_TIMEOUT_MS))
+
.connectionTimeoutMs(ConfigUtils.getIntWithAltKeys(lockConfiguration.getConfig(),
ZK_CONNECTION_TIMEOUT_MS))
.build();
this.curatorFrameworkClient.start();
createPathIfNotExists();
}
- private String getLockPath() {
- return lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY) + "/"
- + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY);
+ protected abstract String getZkBasePath(LockConfiguration lockConfiguration);
+
+ protected abstract String getLockKey(LockConfiguration lockConfiguration);
+
+ protected String generateLogSuffixString() {
+ return StringUtils.join("ZkBasePath = ", zkBasePath, ", lock key = ",
lockKey);
+ }
+
+ protected String getLockPath() {
+ return zkBasePath + '/' + lockKey;
}
private void createPathIfNotExists() {
@@ -116,21 +126,6 @@ public class ZookeeperBasedLockProvider implements
LockProvider<InterProcessMute
}
}
-
- // Only used for testing
- public ZookeeperBasedLockProvider(
- final LockConfiguration lockConfiguration, final CuratorFramework
curatorFrameworkClient) {
- checkRequiredProps(lockConfiguration);
- this.lockConfiguration = lockConfiguration;
- this.curatorFrameworkClient = curatorFrameworkClient;
- synchronized (this.curatorFrameworkClient) {
- if (this.curatorFrameworkClient.getState() !=
CuratorFrameworkState.STARTED) {
- this.curatorFrameworkClient.start();
- createPathIfNotExists();
- }
- }
- }
-
@Override
public boolean tryLock(long time, TimeUnit unit) {
LOG.info(generateLogStatement(LockState.ACQUIRING,
generateLogSuffixString()));
@@ -181,8 +176,7 @@ public class ZookeeperBasedLockProvider implements
LockProvider<InterProcessMute
private void acquireLock(long time, TimeUnit unit) throws Exception {
ValidationUtils.checkArgument(this.lock == null,
generateLogStatement(LockState.ALREADY_ACQUIRED, generateLogSuffixString()));
InterProcessMutex newLock = new InterProcessMutex(
- this.curatorFrameworkClient,
lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY) + "/"
- + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY));
+ this.curatorFrameworkClient, getLockPath());
boolean acquired = newLock.acquire(time, unit);
if (!acquired) {
throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE,
generateLogSuffixString()));
@@ -196,14 +190,6 @@ public class ZookeeperBasedLockProvider implements
LockProvider<InterProcessMute
private void checkRequiredProps(final LockConfiguration config) {
ValidationUtils.checkArgument(config.getConfig().getString(ZK_CONNECT_URL_PROP_KEY)
!= null);
-
ValidationUtils.checkArgument(config.getConfig().getString(ZK_BASE_PATH_PROP_KEY)
!= null);
-
ValidationUtils.checkArgument(config.getConfig().getString(ZK_LOCK_KEY_PROP_KEY)
!= null);
- }
-
- private String generateLogSuffixString() {
- String zkBasePath =
this.lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY);
- String lockKey =
this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY);
- return StringUtils.join("ZkBasePath = ", zkBasePath, ", lock key = ",
lockKey);
}
protected String generateLogStatement(LockState state, String suffix) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedImplicitBasePathLockProvider.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedImplicitBasePathLockProvider.java
new file mode 100644
index 000000000000..9c4a57bc271f
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedImplicitBasePathLockProvider.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.hash.HashID;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.common.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static org.apache.hudi.common.fs.FSUtils.s3aToS3;
+
+/**
+ * A zookeeper based lock. This {@link LockProvider} implementation allows to
lock table operations
+ * using zookeeper. Users need to have a Zookeeper cluster deployed to be able
to use this lock.
+ *
+ * This class derives the zookeeper base path from the hudi table base path
(hoodie.base.path) and
+ * table name (hoodie.table.name), with lock key set to a hard-coded value.
+ */
+@NotThreadSafe
+public class ZookeeperBasedImplicitBasePathLockProvider extends
BaseZookeeperBasedLockProvider {
+ private static final Logger LOG =
LoggerFactory.getLogger(ZookeeperBasedImplicitBasePathLockProvider.class);
+
+ public static final String LOCK_KEY = "lock_key";
+ private final String hudiTableBasePath;
+
+ public static String getLockBasePath(String hudiTableBasePath) {
+ // Ensure consistent format for S3 URI.
+ String lockBasePath = "/tmp/" +
HashID.generateXXHashAsString(s3aToS3(hudiTableBasePath), HashID.Size.BITS_64);
+ LOG.info(String.format("The Zookeeper lock key for the base path %s is
%s", hudiTableBasePath, lockBasePath));
+ return lockBasePath;
+ }
+
+ public ZookeeperBasedImplicitBasePathLockProvider(final LockConfiguration
lockConfiguration, final StorageConfiguration<?> conf) {
+ super(lockConfiguration, conf);
+ hudiTableBasePath =
s3aToS3(lockConfiguration.getConfig().getString(HoodieCommonConfig.BASE_PATH.key()));
+ }
+
+ @Override
+ protected String getZkBasePath(LockConfiguration lockConfiguration) {
+ String hudiTableBasePath =
lockConfiguration.getConfig().getString(HoodieCommonConfig.BASE_PATH.key());
+ ValidationUtils.checkArgument(hudiTableBasePath != null);
+ return getLockBasePath(hudiTableBasePath);
+ }
+
+ @Override
+ protected String getLockKey(LockConfiguration lockConfiguration) {
+ return LOCK_KEY;
+ }
+
+ @Override
+ protected String generateLogSuffixString() {
+ return StringUtils.join("ZkBasePath = ", zkBasePath,
+ ", lock key = ", lockKey, ", hudi table base path = ",
hudiTableBasePath);
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
index 02f137b509a6..e1eacd7a870e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java
@@ -20,193 +20,39 @@ package org.apache.hudi.client.transaction.lock;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.lock.LockProvider;
-import org.apache.hudi.common.lock.LockState;
-import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.exception.HoodieLockException;
import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.retry.BoundedExponentialBackoffRetry;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import javax.annotation.concurrent.NotThreadSafe;
-import java.io.Serializable;
-import java.util.concurrent.TimeUnit;
-
-import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
-import static
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
-import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
-import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY;
-import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
import static
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP_KEY;
-import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP_KEY;
-import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP_KEY;
import static
org.apache.hudi.common.config.LockConfiguration.ZK_LOCK_KEY_PROP_KEY;
-import static
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP_KEY;
+import static org.apache.hudi.config.HoodieLockConfig.ZK_BASE_PATH;
+import static org.apache.hudi.config.HoodieLockConfig.ZK_LOCK_KEY;
/**
* A zookeeper based lock. This {@link LockProvider} implementation allows to
lock table operations
* using zookeeper. Users need to have a Zookeeper cluster deployed to be able
to use this lock.
+ * The lock provider requires mandatory config
"hoodie.write.lock.zookeeper.base_path" and
+ * "hoodie.write.lock.zookeeper.lock_key" to be set.
*/
@NotThreadSafe
-public class ZookeeperBasedLockProvider implements
LockProvider<InterProcessMutex>, Serializable {
-
- private static final Logger LOG =
LoggerFactory.getLogger(ZookeeperBasedLockProvider.class);
-
- private final transient CuratorFramework curatorFrameworkClient;
- private volatile InterProcessMutex lock = null;
- protected LockConfiguration lockConfiguration;
+public class ZookeeperBasedLockProvider extends BaseZookeeperBasedLockProvider
{
public ZookeeperBasedLockProvider(final LockConfiguration lockConfiguration,
final StorageConfiguration<?> conf) {
- checkRequiredProps(lockConfiguration);
- this.lockConfiguration = lockConfiguration;
- this.curatorFrameworkClient = CuratorFrameworkFactory.builder()
-
.connectString(lockConfiguration.getConfig().getString(ZK_CONNECT_URL_PROP_KEY))
- .retryPolicy(new
BoundedExponentialBackoffRetry(lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY),
-
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY),
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY)))
-
.sessionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_SESSION_TIMEOUT_MS_PROP_KEY,
DEFAULT_ZK_SESSION_TIMEOUT_MS))
-
.connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY,
DEFAULT_ZK_CONNECTION_TIMEOUT_MS))
- .build();
- this.curatorFrameworkClient.start();
- createPathIfNotExists();
- }
-
- private String getLockPath() {
- return lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY) + "/"
- + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY);
- }
-
- private void createPathIfNotExists() {
- try {
- String lockPath = getLockPath();
- LOG.info(String.format("Creating zookeeper path %s if not exists",
lockPath));
- String[] parts = lockPath.split("/");
- StringBuilder currentPath = new StringBuilder();
- for (String part : parts) {
- if (!part.isEmpty()) {
- currentPath.append("/").append(part);
- createNodeIfNotExists(currentPath.toString());
- }
- }
- } catch (Exception e) {
- LOG.error("Failed to create ZooKeeper path: " + e.getMessage());
- throw new HoodieLockException("Failed to initialize ZooKeeper path", e);
- }
- }
-
- private void createNodeIfNotExists(String path) throws Exception {
- if (this.curatorFrameworkClient.checkExists().forPath(path) == null) {
- try {
- this.curatorFrameworkClient.create().forPath(path);
- // to avoid failure due to synchronous calls.
- } catch (KeeperException e) {
- if (e.code() == KeeperException.Code.NODEEXISTS) {
- LOG.debug(String.format("Node already exist for path = %s", path));
- } else {
- throw new HoodieLockException("Failed to create zookeeper node", e);
- }
- }
- }
- }
-
-
- // Only used for testing
- public ZookeeperBasedLockProvider(
- final LockConfiguration lockConfiguration, final CuratorFramework
curatorFrameworkClient) {
- checkRequiredProps(lockConfiguration);
- this.lockConfiguration = lockConfiguration;
- this.curatorFrameworkClient = curatorFrameworkClient;
- synchronized (this.curatorFrameworkClient) {
- if (this.curatorFrameworkClient.getState() !=
CuratorFrameworkState.STARTED) {
- this.curatorFrameworkClient.start();
- createPathIfNotExists();
- }
- }
+ super(lockConfiguration, conf);
}
@Override
- public boolean tryLock(long time, TimeUnit unit) {
- LOG.info(generateLogStatement(LockState.ACQUIRING,
generateLogSuffixString()));
- try {
- acquireLock(time, unit);
- LOG.info(generateLogStatement(LockState.ACQUIRED,
generateLogSuffixString()));
- } catch (HoodieLockException e) {
- throw e;
- } catch (Exception e) {
- throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE,
generateLogSuffixString()), e);
- }
- return lock != null && lock.isAcquiredInThisProcess();
+ protected String getZkBasePath(LockConfiguration lockConfiguration) {
+
ValidationUtils.checkArgument(ConfigUtils.getStringWithAltKeys(lockConfiguration.getConfig(),
ZK_BASE_PATH) != null);
+ return lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY);
}
@Override
- public void unlock() {
- try {
- LOG.info(generateLogStatement(LockState.RELEASING,
generateLogSuffixString()));
- if (lock == null || !lock.isAcquiredInThisProcess()) {
- return;
- }
- lock.release();
- lock = null;
- LOG.info(generateLogStatement(LockState.RELEASED,
generateLogSuffixString()));
- } catch (Exception e) {
- throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE,
generateLogSuffixString()), e);
- }
- }
-
- @Override
- public void close() {
- try {
- if (lock != null) {
- lock.release();
- lock = null;
- }
- this.curatorFrameworkClient.close();
- } catch (Exception e) {
- LOG.error(generateLogStatement(LockState.FAILED_TO_RELEASE,
generateLogSuffixString()));
- }
- }
-
- @Override
- public InterProcessMutex getLock() {
- return this.lock;
- }
-
- private void acquireLock(long time, TimeUnit unit) throws Exception {
- ValidationUtils.checkArgument(this.lock == null,
generateLogStatement(LockState.ALREADY_ACQUIRED, generateLogSuffixString()));
- InterProcessMutex newLock = new InterProcessMutex(
- this.curatorFrameworkClient,
lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY) + "/"
- + this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY));
- boolean acquired = newLock.acquire(time, unit);
- if (!acquired) {
- throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE,
generateLogSuffixString()));
- }
- if (newLock.isAcquiredInThisProcess()) {
- lock = newLock;
- } else {
- throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE,
generateLogSuffixString()));
- }
- }
-
- private void checkRequiredProps(final LockConfiguration config) {
-
ValidationUtils.checkArgument(config.getConfig().getString(ZK_CONNECT_URL_PROP_KEY)
!= null);
-
ValidationUtils.checkArgument(config.getConfig().getString(ZK_BASE_PATH_PROP_KEY)
!= null);
-
ValidationUtils.checkArgument(config.getConfig().getString(ZK_LOCK_KEY_PROP_KEY)
!= null);
- }
-
- private String generateLogSuffixString() {
- String zkBasePath =
this.lockConfiguration.getConfig().getString(ZK_BASE_PATH_PROP_KEY);
- String lockKey =
this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY);
- return StringUtils.join("ZkBasePath = ", zkBasePath, ", lock key = ",
lockKey);
- }
-
- protected String generateLogStatement(LockState state, String suffix) {
- return StringUtils.join(state.name(), " lock at", suffix);
+ protected String getLockKey(LockConfiguration lockConfiguration) {
+
ValidationUtils.checkArgument(ConfigUtils.getStringWithAltKeys(lockConfiguration.getConfig(),
ZK_LOCK_KEY) != null);
+ return this.lockConfiguration.getConfig().getString(ZK_LOCK_KEY_PROP_KEY);
}
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
index 616ba0d228f3..d5a2c9ae28c5 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestZookeeperBasedLockProvider.java
@@ -18,9 +18,15 @@
package org.apache.hudi.client.transaction;
+import
org.apache.hudi.client.transaction.lock.ZookeeperBasedImplicitBasePathLockProvider;
import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
+import org.apache.hudi.client.transaction.lock.BaseZookeeperBasedLockProvider;
+import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieLockException;
+import org.apache.hudi.storage.StorageConfiguration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -30,13 +36,21 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
import static
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP_KEY;
import static
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP_KEY;
@@ -52,7 +66,10 @@ public class TestZookeeperBasedLockProvider {
private static CuratorFramework client;
private static String basePath = "/hudi/test/lock";
private static String key = "table1";
- private static LockConfiguration lockConfiguration;
+ private static LockConfiguration zkConfWithZkBasePathAndLockKeyLock;
+ private static LockConfiguration zkConfNoTableBasePathTableName;
+ private static LockConfiguration zkConfWithTableBasePathTableName;
+ private static LockConfiguration zkConfWithZkBasePathLockKeyTableInfo;
@BeforeAll
public static void setup() {
@@ -67,15 +84,33 @@ public class TestZookeeperBasedLockProvider {
}
}
Properties properties = new Properties();
- properties.setProperty(ZK_BASE_PATH_PROP_KEY, basePath);
- properties.setProperty(ZK_LOCK_KEY_PROP_KEY, key);
+
properties.setProperty(ZK_CONNECT_URL_PROP_KEY, server.getConnectString());
- properties.setProperty(ZK_BASE_PATH_PROP_KEY,
server.getTempDirectory().getAbsolutePath());
+ properties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
"1000");
+
properties.setProperty(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY,
"3000");
+ properties.setProperty(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
+ properties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3");
properties.setProperty(ZK_SESSION_TIMEOUT_MS_PROP_KEY, "10000");
properties.setProperty(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY, "10000");
- properties.setProperty(ZK_LOCK_KEY_PROP_KEY, "key");
properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
- lockConfiguration = new LockConfiguration(properties);
+ zkConfNoTableBasePathTableName = new LockConfiguration(properties);
+
+ Properties propsWithTableInfo = (Properties) properties.clone();
+ propsWithTableInfo.setProperty(
+ HoodieCommonConfig.BASE_PATH.key(),
"s3://my-bucket-8b2a4b30/1718662238400/be715573/my_lake/my_table");
+ propsWithTableInfo.setProperty(
+ HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "ma_po_tofu_is_awesome");
+ zkConfWithTableBasePathTableName = new
LockConfiguration(propsWithTableInfo);
+
+ properties.setProperty(ZK_BASE_PATH_PROP_KEY, basePath);
+ properties.setProperty(ZK_LOCK_KEY_PROP_KEY, key);
+ zkConfWithZkBasePathAndLockKeyLock = new LockConfiguration(properties);
+
+ properties.setProperty(
+ HoodieCommonConfig.BASE_PATH.key(),
"s3://my-bucket-8b2a4b30/1718662238400/be715573/my_lake/my_table");
+ properties.setProperty(
+ HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "ma_po_tofu_is_awesome");
+ zkConfWithZkBasePathLockKeyTableInfo = new LockConfiguration(properties);
}
@AfterAll
@@ -88,31 +123,66 @@ public class TestZookeeperBasedLockProvider {
}
}
- @Test
- public void testAcquireLock() {
- ZookeeperBasedLockProvider zookeeperBasedLockProvider = new
ZookeeperBasedLockProvider(lockConfiguration, client);
-
Assertions.assertTrue(zookeeperBasedLockProvider.tryLock(lockConfiguration.getConfig()
+ public static Stream<Object> testDimensions() {
+ return Stream.of(
+ Arguments.of(zkConfWithTableBasePathTableName,
ZookeeperBasedImplicitBasePathLockProvider.class),
+ Arguments.of(zkConfWithZkBasePathAndLockKeyLock,
ZookeeperBasedLockProvider.class),
+ // Even if we have base path set, nothing would break.
+ Arguments.of(zkConfWithZkBasePathLockKeyTableInfo,
ZookeeperBasedImplicitBasePathLockProvider.class)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("testDimensions")
+ void testAcquireLock(LockConfiguration lockConfig, Class<?>
lockProviderClass) {
+ BaseZookeeperBasedLockProvider zookeeperLP =
(BaseZookeeperBasedLockProvider) ReflectionUtils.loadClass(
+ lockProviderClass.getName(),
+ new Class<?>[] {LockConfiguration.class, StorageConfiguration.class},
+ new Object[] {lockConfig, null});
+
Assertions.assertTrue(zookeeperLP.tryLock(zkConfWithZkBasePathAndLockKeyLock.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
TimeUnit.MILLISECONDS));
- zookeeperBasedLockProvider.unlock();
+ zookeeperLP.unlock();
+ }
+
+ public static Stream<Object> testBadDimensions() {
+ return Stream.of(
+ Arguments.of(zkConfNoTableBasePathTableName,
ZookeeperBasedImplicitBasePathLockProvider.class),
+ Arguments.of(zkConfWithTableBasePathTableName,
ZookeeperBasedLockProvider.class)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("testBadDimensions")
+ void testBadLockConfig(LockConfiguration lockConfig, Class<?>
lockProviderClass) {
+ Exception ex = null;
+ try {
+ ReflectionUtils.loadClass(
+ lockProviderClass.getName(),
+ new Class<?>[] {LockConfiguration.class, StorageConfiguration.class},
+ new Object[] {lockConfig, null});
+ } catch (Exception e) {
+ ex = e;
+ }
+ Assertions.assertEquals(IllegalArgumentException.class,
ex.getCause().getCause().getClass());
}
@Test
public void testUnLock() {
- ZookeeperBasedLockProvider zookeeperBasedLockProvider = new
ZookeeperBasedLockProvider(lockConfiguration, client);
-
Assertions.assertTrue(zookeeperBasedLockProvider.tryLock(lockConfiguration.getConfig()
+ ZookeeperBasedLockProvider zookeeperBasedLockProvider = new
ZookeeperBasedLockProvider(zkConfWithZkBasePathAndLockKeyLock, null);
+
Assertions.assertTrue(zookeeperBasedLockProvider.tryLock(zkConfWithZkBasePathAndLockKeyLock.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
TimeUnit.MILLISECONDS));
zookeeperBasedLockProvider.unlock();
- zookeeperBasedLockProvider.tryLock(lockConfiguration.getConfig()
+
zookeeperBasedLockProvider.tryLock(zkConfWithZkBasePathAndLockKeyLock.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
TimeUnit.MILLISECONDS);
}
@Test
public void testReentrantLock() {
- ZookeeperBasedLockProvider zookeeperBasedLockProvider = new
ZookeeperBasedLockProvider(lockConfiguration, client);
-
Assertions.assertTrue(zookeeperBasedLockProvider.tryLock(lockConfiguration.getConfig()
+ ZookeeperBasedLockProvider zookeeperBasedLockProvider = new
ZookeeperBasedLockProvider(zkConfWithZkBasePathAndLockKeyLock, null);
+
Assertions.assertTrue(zookeeperBasedLockProvider.tryLock(zkConfWithZkBasePathAndLockKeyLock.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
TimeUnit.MILLISECONDS));
try {
- zookeeperBasedLockProvider.tryLock(lockConfiguration.getConfig()
+
zookeeperBasedLockProvider.tryLock(zkConfWithZkBasePathAndLockKeyLock.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
TimeUnit.MILLISECONDS);
Assertions.fail();
} catch (HoodieLockException e) {
@@ -123,7 +193,7 @@ public class TestZookeeperBasedLockProvider {
@Test
public void testUnlockWithoutLock() {
- ZookeeperBasedLockProvider zookeeperBasedLockProvider = new
ZookeeperBasedLockProvider(lockConfiguration, client);
+ ZookeeperBasedLockProvider zookeeperBasedLockProvider = new
ZookeeperBasedLockProvider(zkConfWithZkBasePathAndLockKeyLock, null);
zookeeperBasedLockProvider.unlock();
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/hash/HashID.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/hash/HashID.java
index 4df8c3852892..e76c5a8d0739 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/hash/HashID.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/hash/HashID.java
@@ -79,10 +79,10 @@ public class HashID implements Serializable {
}
/**
- * Get the hash value for a string message and for the desired @{@link Size}.
+ * Get the hash value for a string message and for the desired @{@link
HashID.Size}.
*
* @param message - String message to get the hash value for
- * @param bits - @{@link Size} of the hash value
+ * @param bits - @{@link HashID.Size} of the hash value
* @return Hash value for the message as byte array
*/
public static byte[] hash(final String message, final Size bits) {
@@ -108,6 +108,24 @@ public class HashID implements Serializable {
}
}
+ /**
+ * Get the hash value as string for a given string and for the desired
@{@link Size}.
+ *
+ * @param input - String message to get the hash value for.
+ * @param size - @{@link Size} of the hash value
+ * @return Hash value for the message as string.
+ */
+ public static String generateXXHashAsString(String input, Size size) {
+ // Compute the hash
+ byte[] hashBytes = hash(input, size);
+ // Convert the hash value to a hexadecimal string
+ StringBuilder hexString = new StringBuilder();
+ for (byte hashByte : hashBytes) {
+ hexString.append(String.format("%02X", hashByte));
+ }
+ return hexString.toString();
+ }
+
public static int getXXHash32(final String message, int hashSeed) {
return getXXHash32(getUTF8Bytes(message), hashSeed);
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/hash/TestHashID.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/hash/TestHashID.java
index 1ab9d82b2b92..319e216ff4f5 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/hash/TestHashID.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/hash/TestHashID.java
@@ -21,24 +21,40 @@ package org.apache.hudi.common.util.hash;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
import javax.xml.bind.DatatypeConverter;
-import java.util.Arrays;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Random;
+import static java.util.Arrays.asList;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Tests {@link HashID}.
*/
public class TestHashID {
+ public static List<Arguments> hashIdSizes() {
+ return asList(
+ Arguments.of(HashID.Size.BITS_32),
+ Arguments.of(HashID.Size.BITS_64),
+ Arguments.of(HashID.Size.BITS_128)
+ );
+ }
+
/**
* Test HashID of all sizes for ByteArray type input message.
*/
@@ -124,8 +140,26 @@ public class TestHashID {
for (Map.Entry<String, String> sizeEntry :
allSizeEntries.getValue().entrySet()) {
final byte[] actualHashBytes = HashID.hash(sizeEntry.getKey(),
allSizeEntries.getKey());
final byte[] expectedHashBytes =
DatatypeConverter.parseHexBinary(sizeEntry.getValue());
- assertTrue(Arrays.equals(expectedHashBytes, actualHashBytes));
+ assertArrayEquals(expectedHashBytes, actualHashBytes);
}
}
}
+
+ @ParameterizedTest
+ @MethodSource("hashIdSizes")
+ void testGenerateXXHashMagicNumber(HashID.Size size) throws IOException,
URISyntaxException {
+ // We need to make sure we always generate the same hash value for the
same input. This test
+ // guards against unexpected change of hash value due to accidents like
library version upgrade.
+ // Load inputs and expected hash values from files.
+ List<String> inputs = Files.readAllLines(Paths.get(Objects.requireNonNull(
+
getClass().getClassLoader().getResource("hash/magic_input.txt")).toURI()));
+ List<String> expectedHash =
Files.readAllLines(Paths.get(Objects.requireNonNull(
+
getClass().getClassLoader().getResource(String.format("hash/xxhash_%s_for_magic_input.txt",
size.name()))).toURI()));
+
+ for (int i = 0; i < expectedHash.size(); ++i) {
+ String hash = HashID.generateXXHashAsString(inputs.get(i), size);
+ // Magic number test to guard against accidental upgrade that changes
the hash value
+ assertEquals(expectedHash.get(i), hash);
+ }
+ }
}
diff --git a/hudi-common/src/test/resources/hash/magic_input.txt
b/hudi-common/src/test/resources/hash/magic_input.txt
new file mode 100644
index 000000000000..ef6468b087ab
--- /dev/null
+++ b/hudi-common/src/test/resources/hash/magic_input.txt
@@ -0,0 +1,10 @@
+vB1xYjlEfL8tN9Ap6NMCwraFQlpQ6QU2z9XCPCv9lfIxzRfs6GfxKir2FuU:Xbl9uMZqjNRhKLqZY2MIH5vPCKZbV1zjoHVaVBnyRfzJLZ0jwWZQjJqjpdg/PsrIq3YdyAsDHcb__oXfbZUpMFX00g6ALt/uRZ2LOFFdQA9Ofuu_BLhhPl4gIfGNJIt1ATuCGIFXFG4zg1qn96MsLy8Nr2v/6pxzBXM5Rhzn6o_HhiFo1RzooJcm1tJlObZroxBNKSDbDcdYkm4KnAVUAVcyVfO2DRx_KQfphLYbG1rNRTV03RNN7jQakMf8Vq38Q1Joz:STilSqalcvrDmdPTY1RETh8eVP0ELqsKevQ22OKKl5ypy4I2Y6nqRfTfoepo_l0zc_QEZNawdif5l:SpvaJUaWU9rpUpqxiz2T_40ekLVXDM_B9oGgS4e9AMT50OASHp/cVue2qFpgrYSptp4EZZ9yfEqPXX3Ij7hSUGL29zar
[...]
+EXDDWHY_7:5oYXsoCWM1LVD2mxXAilEXwsA8J_btuAAPUm6ODVlM/njpnAyULYvBKrAiwjY4xMWkJLCGS09qqmvu8ea5:ib/WYg1nHAELdy8fiREkQ/QXIByXL1xJ40ZfPYiMzUlNoUMXJNVqK59bipG7NSOhEZ0KSAW9uxv77S4G:8B1ybZNgWbYx_f1/DqYgPD7PcHfr3c_NMYvHVOCEOB3oYZwU2p_/BRWfaY/aJfydpG_VivzSqYM7qCghR8LXEL8955iXjPVMpXHu03BdAqE/uXsrlvTm8sdzum/PNsrwHYH:2YCHRjJ7B9yawP2mYZmmCTA9FSAqhizLrs7JvqUQNfF0DW8Imy_ld__eTq0hM55H3QEIzDs:u45zd6Y/1hEeBMFEGmgXHJDOGfSt1boX_lO6q/M33U5n0LEv1o_4_tQeqnsqODq509y7rbgfOVg7whAtZ81QlXNQ3VITG0kadegE3EFcD6OwC1I3BE
[...]
+qbl3zptxyhBdGZzXYSuz/dc_W79I3nnZIg41:_ujQnA8NnzrsENtSvxUX5Hj5nHyjmlzS9vH2zawvXaoeTtwXPqEVp0Vc2p7Ho93iRUq:Zj/bjAxIuxBXEJMPYTE5PBVPSR2yPP4ci0ChrVtIRCXTD7v9gluqs4ouz_G8sNuXaT8uCP7ypOMRUG:VQuQ69yY/KIS1_AffBHL8n7jgT7nGfzra5dZZUBe4oqm9BI3nyHpCa5ookke:703QP:8sDMg3UdCg8uw6ccqAcco7xBbOgdvS6qpz1LZdku0:As2LOmX42B0aEReh52BdtZtBFnk5t_cm0m5ZAuEH_Bii8:reXocImMCSsnL2hVpbs8gn8jI4QAGW2rDOvj1N/byJbjrJxhlsZ6Y9dDZc7Z8B8aCx8kRoRQVkskAO4lULO1pNaIINreWaij:tJhHeF_kkhSevooTRkGXoWouI/1a4ADi5ijTESgsrBleqpU2e4yGmBKw
[...]
+kha:j60RU2k4jsMcyoK/gJ7F4IEkoWzrtVN3kUEPvP/9M4NfaoPtruIe0reUxlMhoTKJbLTLFJ0pIVfqGrx/VAcW1ZWdjret19cb7TAP08wpOP_t5ljP/pUYR5KsEn5zUNtKWiNHJxPsT7u9FYNupiIRIe_VGG_J:1wPQJ2xBtWCbiCOGk15zGoOHhAjTE8V5JwyyJHrxSfZQpHXMqOYICII9/rXoKpTsV/uUASF58e/OuQ9pg0IA4KENs3_XnptctzloUYr0AI4zTLIRL5N:CvFRujqaOmKgiMGYmC1V9DdYkUTAiv_3NYvUnJuV54mXMEQlAoyrILPimonjdpjLx7fjep9vPjImfsyIXK3EoStTNJoWcAWsOzSn:wb84sBDg4mfDYvqz3pL13SFbIxtlsAl66/11xBVIUI0TRK6KWX5yX806JnbFSG:oi6X4_OdwucUDSx:ac_RqzZiA9Ig:wFg2dg6JYrVt3iLj/KDaS0
[...]
+XViqCa5XfqAB0kski4A4N82FE8i0U/WjPvcgLbi:X9LhXRV0qfusVp19Xr_5pCQUDFRr34z_foGnpbWC5:pOn:jQHwVJIer:hfh5uErwLimX6OC1nLPXdPVrhKJ4/XN2YBfF5luB_7:dEGJ1E22iCTgto6sQW63f/c:wUtZdRioJ3JiA14KMVRKI8KD79zrCF1p61XSjQUvzrBh4VOjxGervgu2kZarv4HvvzvTuzThk4yrE1YvQse5iKeZWHbjCYBS7Tr0eTAjZLIqZRZ4/t/zuI9ZgnIerWJ8z3kfYv:maAiYP:Fjc4k32CXdXVhfiP9OLMqu4XWA2U1i7htnAotjF3G1ECwGLo9faRUxp5XwK85DhoQqrld/9e4BX4FxI61omgU_sa9SlSI:oU4/GrmsejIJn8adPcxwUyYf5uetKOVbcuBesp9l7pVmc4O97sojQ6:ohnhcvGssks0UpzdUideM8Hfo0yIWF1XPSJjJV
[...]
+uYeJoV7jMpUbSfo0PZfVqxapg8VlFDxp7t6x3M:r/hMj3EdSrXNmtnAJOg/DubMz6uGvkZM3equxs8LflyyjZYFypahe90QXYM6mngjaMpXGl/ngy6_wQ512E7tqMnaVaz52xbKAdCCgastCJ5xM2jikxmIOK/28J77VXwkL6T43JGKXrFzoqm/eelXf:G5oihZt/jBKTMg/wXgC0:BvTVM9A1e4nmXw_ANjnBxCWe7QD7O/A/k4hrVt0MKpH2glsJKAnz_ZAY_qZp33vN5v975ZFwhAJtkYwtsXl4UFT_06Uy9BHyWnMcvHaPgEgslv/_P2FLzlJWfsSI8UjWUdMVhYAff4NE7UQF2Oye6ciXsW1ga_TS0VN8tugmqE:xs:5/SlyG3puDh:mzA60V7EpM38_rsKO/v9l1ZG9ICucglBv0hzHuVz/_vUR_5B_scT4nEIFmLOB67RsoGXMDVI7oJzETHzA3DLk:nX1yF4g_VT
[...]
+1/dFjdWmwjcDBD6QBMjRqWtTJEdCZ2:2X2:NiPutlDhlxkp7k2MBqiAMDffvA5bbe0Yxel0IF94is_h0cMtgXKwYgCPKgFQgH0O2hnFrWp2fmETy4DCHnUV54mNUF4_xMESiZwJaUQFCPUxK8j28qlPppbt2M:zsJ64LUVIoCZpJsc1GRVY9lImikNRoEiOXKOQIMLOHlDfVEYyJubQy14zWQsqSoHK6Du_S76_MdZi3DEHJmInbDHvykJRoKke2QQL7wae/7D4FVg/zKbfnOSrgDTWbdDewy2lJKMti5MnrMnevTCQSSFhueoekPYsLiyoDIDWDoqAzdiWP2vkzisR1CbZ74DQeOyyk4rvF85TRTfjPWjdv6PZX40W0K2VOPWUiys5224nSnfT886LVBkuKwjImo8NqZskfp1LYJtMRYlad9/fRLVwo65h:l4FFLEuPH1OzlFaz6wsMqqh4GGtQeScwfzTWA78A9iosPNyn
[...]
+twVaFB_AiBxWsfHRYHdhnwMtrtp/PzyU6lJivvSGGRfGmLUu0EcaW5dzO0vx/Hy/E33JY:2oeb1p:Hl35zPFj:98DQRRyodl5JJrBiNeyavBKlQPoYglYIpWK3uy47lsTHXe2SYrEEscKtwH/:BJiQn4XPQejfVBPNPI35Othp64j0tf2GBiqrAETQT0xnSrN6vnQnNWAj:RDR9IbUSZISaJ/GrQoT651W7iwQQvBo5L/XcdCrvWGbmw/3Yp5S2HeoA/:qJ:6gmrOQ9eoFGCe8X9yERKKbPzcmSB7SXKuP3UUbT1L0_vvFOP2mIsr46k:77G31gPJv6zzdDZLR1Ewxaz38vR6MeSjTtKR53yw44cWgWPD2wS/GHJDI4PhHxnsgUamNi8_PVmdFceTXouehmnrdrM_byV_iQOF:3Jq7czDhcAwMF2eYqZkdlPdG:kr:WQoIBu2UTgZB/uKI8kf7l4qosy:aTz_QLmouh5ilZ1
[...]
+wM51f5wQY1p4N_J4hC/oN9BryO/ZUwUisXuYaGHIJGDZmTE52hPf7vJdPTR:NiJiPCNMG:cnWGoJwNRiKIQY3u/Yppx6Ig6wdvScLqPhXI5Fxq/W4XUgBkCwEVkhfd3FCv0/LmaNvg6RJelbz:/ympzvRLOpAgsYmxtnqh17rEqCkxKp4nC4o_j7VLJgyHqXYP9RnKw2sTpMP7seq1zHe00drKb_k:_iS59NZ2NnAJBjjKCo2kbNmT4LvAVa9COquUyCF4Q:Qp39Rxs30mNtTW30x_0LuL8TH4pg65Z0OjBoQtwo4:GBF_2rE2WbkRDeNeGGB:DUU4p0MqkkYTT7jraK10pmKi2C:wH6HXN8SX6UwCJoY2m24sIaU3f:k2vBzqYu4Mq5E9slvsx:vNjlA4KtSiy5XLt01s0bRuXhXXrjQuSsFXvGUdlppWtQcR8Xbg6W0Bk5Qdx267BBbsQ5bk6JOg8xHfGqcy2Kde_u_ZFz
[...]
+AXipEHIS1WnAkrLS_TZT8AEBe2KPR:0tgOLRhUIEiN33Ml4_pOtN6TuDNvfSa4MSPexRluckSpCnl6le__MucXEilapxfqXY/MW3ebuICROpXQr9rn4fvJxxR25eeyDDOJfVkXu9gTwPJD:CAqb:byah1PXndGFR3L4NNgzofPKVvyh/liGhvcU54qF3:U8B4Ed4K5VVZKlDC_LIjyPpO:fzwhCY:a7plQHp3G8rwqZlVWll8gZt1BRTJS3nvxCLNe2IPjq5aj5QD/P3O4QkLPDW/bdXTFxfILaQ3xrBO:kqPMyqowiLcm6gAwY5fdkPgvv6xh7yC5t8d98io1UxEOoVdWdGUXgztIKs2gxjzVUqj6qsbLaIg2ft/j4txWJMQ3z3anRSmCZ1Th6V7LFEKaX2PBlK//eC0Sm1f45XPf5MWpHuRMxBzxFqY60IXZVJ1tx/Daa86UC5A0jyyJyPxmdUlt1FTqDqZ_TQK4qTWGrz
[...]
diff --git
a/hudi-common/src/test/resources/hash/xxhash_BITS_128_for_magic_input.txt
b/hudi-common/src/test/resources/hash/xxhash_BITS_128_for_magic_input.txt
new file mode 100644
index 000000000000..ee2a4f5fbea0
--- /dev/null
+++ b/hudi-common/src/test/resources/hash/xxhash_BITS_128_for_magic_input.txt
@@ -0,0 +1,10 @@
+AF26454CFF3D72BBDB7A6CD6BAF23770
+080D42690A45825782F4CD4DB31AA449
+E14D364EF059C966A7F50708E9CF2E7B
+68F4BCB6F281DD101D5B7375652E574E
+1F2A77EB57C4091E9F89887DA88169B2
+1A57459BE2F7ECD655C090BADAA3ED1A
+E79D2F90D440296F94419C7093518DE4
+86C4CFD114B59C0CB29AF95425767B63
+EE70F0B8607E339D92E6823DA2538113
+0E6F75EC00B024201B91FBCEBDB9F971
\ No newline at end of file
diff --git
a/hudi-common/src/test/resources/hash/xxhash_BITS_32_for_magic_input.txt
b/hudi-common/src/test/resources/hash/xxhash_BITS_32_for_magic_input.txt
new file mode 100644
index 000000000000..4cba48b9f4e8
--- /dev/null
+++ b/hudi-common/src/test/resources/hash/xxhash_BITS_32_for_magic_input.txt
@@ -0,0 +1,10 @@
+0304CA31
+71EC1ABE
+4F003C35
+79A69D14
+62CAF229
+0136BCE2
+1D68C842
+0B416E0E
+CF7373EB
+F5933F12
\ No newline at end of file
diff --git
a/hudi-common/src/test/resources/hash/xxhash_BITS_64_for_magic_input.txt
b/hudi-common/src/test/resources/hash/xxhash_BITS_64_for_magic_input.txt
new file mode 100644
index 000000000000..bc06cc5614e4
--- /dev/null
+++ b/hudi-common/src/test/resources/hash/xxhash_BITS_64_for_magic_input.txt
@@ -0,0 +1,10 @@
+F0E17DF624BF7B6A
+07EC6A7186973385
+24A8A6F1FEA11BA2
+BFE00C8A367BAF03
+C323770B790400C8
+9F96033C96E80D39
+5B99015437BBEA94
+C6C54FCA5141979D
+3580238C2761459E
+D9DBB1D9BF7174BC
\ No newline at end of file
diff --git a/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
b/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
index eb8f19987484..6cb3544db491 100644
--- a/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
+++ b/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
@@ -302,4 +302,83 @@ public class StringUtils {
buf.append(text.substring(start));
return buf.toString();
}
+
+ /**
+ * Strips given characters from end of the string and returns the result
+ * @param str - string to be stripped
+ * @param stripChars - characters to strip
+ */
+ public static String stripEnd(final String str, final String stripChars) {
+ int end;
+ if (str == null || (end = str.length()) == 0) {
+ return str;
+ }
+
+ if (stripChars == null) {
+ while (end != 0 && Character.isWhitespace(str.charAt(end - 1))) {
+ end--;
+ }
+ } else if (stripChars.isEmpty()) {
+ return str;
+ } else {
+ while (end != 0 && stripChars.indexOf(str.charAt(end - 1)) !=
INDEX_NOT_FOUND) {
+ end--;
+ }
+ }
+ return str.substring(0, end);
+ }
+
+ /**
+ * Concatenates two strings such that the total byte length does not exceed
the threshold.
+ * If the total byte length exceeds the threshold, the function will find
the maximum length of the first string
+ * that fits within the threshold and concatenate that with the second
string.
+ *
+ * @param a The first string
+ * @param b The second string
+ * @param byteLengthThreshold The maximum byte length
+ */
+ public static String concatenateWithThreshold(String a, String b, int
byteLengthThreshold) {
+ // Convert both strings to byte arrays in UTF-8 encoding
+ byte[] bytesA = getUTF8Bytes(a);
+ byte[] bytesB = getUTF8Bytes(b);
+ if (bytesB.length > byteLengthThreshold) {
+ throw new IllegalArgumentException(String.format(
+ "Length of the Second string to concatenate exceeds the threshold
(%d > %d)",
+ bytesB.length, byteLengthThreshold));
+ }
+
+ // Calculate total bytes
+ int totalBytes = bytesA.length + bytesB.length;
+
+ // If total bytes is within the threshold, return concatenated string
+ if (totalBytes <= byteLengthThreshold) {
+ return a + b;
+ }
+
+ // Calculate the maximum bytes 'a' can take
+ int bestLength = getBestLength(a, byteLengthThreshold - bytesB.length);
+
+ // Concatenate the valid substring of 'a' with 'b'
+ return a.substring(0, bestLength) + b;
+ }
+
+ private static int getBestLength(String a, int threshold) {
+ // Binary search to find the maximum length of substring of 'a' that fits
within maxBytesForA
+ int low = 0;
+ int high = Math.min(a.length(), threshold);
+ int bestLength = 0;
+
+ while (low <= high) {
+ int mid = (low + high) / 2;
+ byte[] subABytes = getUTF8Bytes(a.substring(0, mid));
+
+ if (subABytes.length <= threshold) {
+ bestLength = mid;
+ low = mid + 1;
+ } else {
+ high = mid - 1;
+ }
+ }
+ return bestLength;
+ }
}
diff --git
a/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
b/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
index a4bee6bc6be7..bcb0a9f65b97 100644
--- a/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
+++ b/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
@@ -22,14 +22,19 @@ package org.apache.hudi.common.util;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Random;
+import static org.apache.hudi.common.util.StringUtils.concatenateWithThreshold;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -39,6 +44,29 @@ public class TestStringUtils {
private static final String[] STRINGS = {"This", "is", "a", "test"};
+ private static final String CHARACTERS_FOR_RANDOM_GEN =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_/:";
+ private static final Random RANDOM = new SecureRandom();
+
+ private static String toHexString(byte[] bytes) {
+ StringBuilder sb = new StringBuilder(bytes.length * 2);
+ for (byte b : bytes) {
+ sb.append(String.format("%02x", b));
+ }
+ return sb.toString();
+ }
+
+ private static String generateRandomString(int length) {
+ if (length < 1) {
+ throw new IllegalArgumentException("Length must be greater than 0");
+ }
+ StringBuilder builder = new StringBuilder(length);
+ for (int i = 0; i < length; i++) {
+ int randomIndex = RANDOM.nextInt(CHARACTERS_FOR_RANDOM_GEN.length());
+ builder.append(CHARACTERS_FOR_RANDOM_GEN.charAt(randomIndex));
+ }
+ return new String(getUTF8Bytes(builder.toString()),
StandardCharsets.UTF_8);
+ }
+
@Test
public void testStringJoinWithDelim() {
String joinedString = StringUtils.joinUsingDelim("-", STRINGS);
@@ -108,14 +136,6 @@ public class TestStringUtils {
assertEquals(StringUtils.toHexString(getUTF8Bytes(str)),
toHexString(getUTF8Bytes(str)));
}
- private static String toHexString(byte[] bytes) {
- StringBuilder sb = new StringBuilder(bytes.length * 2);
- for (byte b : bytes) {
- sb.append(String.format("%02x", b));
- }
- return sb.toString();
- }
-
@Test
public void testTruncate() {
assertNull(StringUtils.truncate(null, 10, 10));
@@ -129,6 +149,60 @@ public class TestStringUtils {
assertTrue(StringUtils.compareVersions("1.9", "1.10") < 0);
assertTrue(StringUtils.compareVersions("1.100.1", "1.10") > 0);
assertTrue(StringUtils.compareVersions("1.10.1", "1.10") > 0);
- assertTrue(StringUtils.compareVersions("1.10", "1.10") == 0);
+ assertEquals(0, StringUtils.compareVersions("1.10", "1.10"));
+ }
+
+ @Test
+ void testConcatenateWithinThreshold() {
+ String a = generateRandomString(1000); // 1000 bytes in UTF-8
+ String b = generateRandomString(1048); // 1048 bytes in UTF-8
+ int threshold = 2048;
+
+ // The total length of bytes of `a` + `b` exceeds the threshold
+ String result = StringUtils.concatenateWithThreshold(a, b, threshold);
+
+ // The resulting string should be exactly `threshold` bytes long
+ assertEquals(threshold, getUTF8Bytes(result).length);
+ assertEquals(a + b, result);
+
+ // Test case when a + b is within the threshold
+ String a2 = generateRandomString(900);
+ String b2 = generateRandomString(1000);
+ String result2 = concatenateWithThreshold(a2, b2, threshold);
+
+ // The resulting string should be `a2 + b2`
+ assertEquals(a2 + b2, result2);
+ }
+
+ @Test
+ void testConcatenateInvalidInput() {
+ // Test case when b alone exceeds the threshold
+ String a = generateRandomString(900);
+ String b = generateRandomString(3000); // 3000 bytes in UTF-8
+ Exception exception = assertThrows(IllegalArgumentException.class, () -> {
+ concatenateWithThreshold(a, b, 2048);
+ });
+
+ String expectedMessage = "Length of the Second string to concatenate
exceeds the threshold (3000 > 2048)";
+ String actualMessage = exception.getMessage();
+
+ assertTrue(actualMessage.contains(expectedMessage));
+ }
+
+ @Test
+ void testConcatenateTruncateCase() {
+ // 'é' is 2 bytes
+ assertEquals("ad", concatenateWithThreshold("aé", "d", 3));
+ // Chinese chars are 3 bytes
+ assertEquals("世d", concatenateWithThreshold("世界", "d", 4));
+ assertEquals("ad", concatenateWithThreshold("ab", "d", 2));
+ }
+
+ @Test
+ void testGenerateInvalidRandomString() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> generateRandomString(-1)
+ );
}
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java
index 6e484a142164..1d001b331e77 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java
@@ -46,6 +46,7 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Serializable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -77,18 +78,18 @@ import static
org.apache.hudi.common.lock.LockState.RELEASING;
* using hive metastore APIs. Users need to have a HiveMetastore & Zookeeper
cluster deployed to be able to use this lock.
*
*/
-public class HiveMetastoreBasedLockProvider implements
LockProvider<LockResponse> {
+public class HiveMetastoreBasedLockProvider implements
LockProvider<LockResponse>, Serializable {
private static final Logger LOG =
LoggerFactory.getLogger(HiveMetastoreBasedLockProvider.class);
private final String databaseName;
private final String tableName;
private final String hiveMetastoreUris;
- private IMetaStoreClient hiveClient;
+ private transient IMetaStoreClient hiveClient;
private volatile LockResponse lock = null;
protected LockConfiguration lockConfiguration;
- private ScheduledFuture<?> future = null;
- private final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(2);
+ private transient ScheduledFuture<?> future = null;
+ private final transient ScheduledExecutorService executor =
Executors.newScheduledThreadPool(2);
public HiveMetastoreBasedLockProvider(final LockConfiguration
lockConfiguration, final StorageConfiguration<?> conf) {
this(lockConfiguration);
diff --git a/scripts/release/validate_source_copyright.sh
b/scripts/release/validate_source_copyright.sh
index d44864135be8..52aa15bb3d70 100755
--- a/scripts/release/validate_source_copyright.sh
+++ b/scripts/release/validate_source_copyright.sh
@@ -46,10 +46,10 @@ echo -e "\t\tNotice file exists ? [OK]\n"
### Licensing Check
echo "Performing custom Licensing Check "
-numfilesWithNoLicense=`find . -iname '*' -type f | grep -v NOTICE | grep -v
LICENSE | grep -v '.jpg' | grep -v '.json' | grep -v '.hfile' | grep -v '.data'
| grep -v '.commit' | grep -v emptyFile | grep -v DISCLAIMER | grep -v KEYS |
grep -v '.mailmap' | grep -v '.sqltemplate' | grep -v 'banner.txt' | grep -v
"fixtures" | xargs grep -L "Licensed to the Apache Software Foundation (ASF)" |
wc -l`
+numfilesWithNoLicense=`find . -iname '*' -type f | grep -v NOTICE | grep -v
LICENSE | grep -v '.jpg' | grep -v '.json' | grep -v '.hfile' | grep -v '.data'
| grep -v '.commit' | grep -v emptyFile | grep -v DISCLAIMER | grep -v KEYS |
grep -v '.mailmap' | grep -v '.sqltemplate' | grep -v 'banner.txt' | grep -v
'.txt' | grep -v "fixtures" | xargs grep -L "Licensed to the Apache Software
Foundation (ASF)" | wc -l`
if [ "$numfilesWithNoLicense" -gt "0" ]; then
echo "There were some source files that did not have Apache License [ERROR]"
- find . -iname '*' -type f | grep -v NOTICE | grep -v LICENSE | grep -v
'.jpg' | grep -v '.json' | grep -v '.hfile' | grep -v '.data' | grep -v
'.commit' | grep -v emptyFile | grep -v DISCLAIMER | grep -v '.sqltemplate' |
grep -v KEYS | grep -v '.mailmap' | grep -v 'banner.txt' | grep -v "fixtures" |
xargs grep -L "Licensed to the Apache Software Foundation (ASF)"
+ find . -iname '*' -type f | grep -v NOTICE | grep -v LICENSE | grep -v
'.jpg' | grep -v '.json' | grep -v '.zip' | grep -v '.hfile' | grep -v '.data'
| grep -v '.commit' | grep -v emptyFile | grep -v DISCLAIMER | grep -v
'.sqltemplate' | grep -v KEYS | grep -v '.mailmap' | grep -v 'banner.txt' |
grep -v '.txt' | grep -v "fixtures" | xargs grep -L "Licensed to the Apache
Software Foundation (ASF)"
exit 1
fi
echo -e "\t\tLicensing Check Passed [OK]\n"