nsivabalan commented on a change in pull request #3486:
URL: https://github.com/apache/hudi/pull/3486#discussion_r725711054
##########
File path: hudi-client/hudi-client-common/pom.xml
##########
@@ -275,6 +300,45 @@
<skip>false</skip>
</configuration>
</plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>prepare-it-database</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>start</goal>
+ </goals>
+ <configuration>
+ <images>
+ <image>
+ <name>amazon/dynamodb-local:${dynamodb-local.version}</name>
Review comment:
may I know whats the purpose of this. Is it to run dynamoDb based tests
with integration tests?
##########
File path: hudi-client/hudi-client-common/pom.xml
##########
@@ -275,6 +300,45 @@
<skip>false</skip>
</configuration>
</plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>prepare-it-database</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>start</goal>
+ </goals>
+ <configuration>
+ <images>
+ <image>
+ <name>amazon/dynamodb-local:${dynamodb-local.version}</name>
Review comment:
is it not possible to restrict these just to integ test module if my
above statement is true.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
##########
@@ -164,6 +171,52 @@
.withDocumentation("Key name under base_path at which to create a ZNode
and acquire lock. "
+ "Final path on zk will look like base_path/lock_key. We recommend
setting this to the table name");
+ public static final ConfigProperty<String> DYNAMODB_TABLE_NAME =
ConfigProperty
Review comment:
lets prefix all config properties with "DYNAMODB_LOCK_*"
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/DynamoDBBasedLockProvider.java
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.RegionUtils;
+import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
+import com.amazonaws.services.dynamodbv2.LockItem;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+import com.amazonaws.services.dynamodbv2.util.TableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.aws.HoodieAWSCredentialsProviderFactory;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_BILLING_MODE_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_PARTITION_KEY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_READ_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_REGION_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_TABLE_NAME_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_WRITE_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+
+/**
+ * A DynamoDB based lock. This {@link LockProvider} implementation allows to
lock table operations
+ * using DynamoDB. Users need to have access to AWS DynamoDB to be able to use
this lock.
+ */
+@NotThreadSafe
+public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
+
+ private static final Logger LOG =
LogManager.getLogger(DynamoDBBasedLockProvider.class);
+
+ private final AmazonDynamoDBLockClient client;
+ private final long leaseDuration;
+ private final String tableName;
+ protected LockConfiguration lockConfiguration;
+ private volatile LockItem lock;
+
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final Configuration conf) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ AmazonDynamoDB dynamoDB = getDynamoClient();
+ // build the dynamoDb lock client
+ this.client = new AmazonDynamoDBLockClient(
+ AmazonDynamoDBLockClientOptions.builder(dynamoDB, tableName)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .withLeaseDuration(leaseDuration)
+ .withHeartbeatPeriod(leaseDuration / 3)
+ .withCreateHeartbeatBackgroundThread(true)
+ .build());
+ if (!client.lockTableExists()) {
+ createLockTableInDynamoDB(
+ dynamoDB,
+ tableName,
+
lockConfiguration.getConfig().getString(DYNAMODB_BILLING_MODE_PROP_KEY),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_READ_CAPACITY_PROP_KEY)),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_WRITE_CAPACITY_PROP_KEY)));
+ }
+ }
+
+ // Only used for testing
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final AmazonDynamoDBLockClient client) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ this.client = client;
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) {
+ LOG.info(generateLogStatement(LockState.ACQUIRING,
generateLogSuffixString()));
+ long millisTime = unit.toMillis(time);
+ try {
+ lock =
client.acquireLock(AcquireLockOptions.builder(lockConfiguration.getConfig().getString(DYNAMODB_PARTITION_KEY_PROP_KEY))
+ .withAdditionalTimeToWaitForLock(millisTime - leaseDuration > 0
? millisTime - leaseDuration : 0)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .build());
+ LOG.info(generateLogStatement(LockState.ACQUIRED,
generateLogSuffixString()));
+ } catch (InterruptedException | LockNotGrantedException e) {
+ throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE,
generateLogSuffixString()), e);
+ }
+ return lock != null && !lock.isExpired();
+ }
+
+ @Override
+ public void unlock() {
+ try {
+ LOG.info(generateLogStatement(LockState.RELEASING,
generateLogSuffixString()));
+ if (lock == null) {
+ return;
+ }
+ if (!client.releaseLock(lock)) {
+ LOG.warn("The lock has already been stolen");
+ }
+ lock = null;
+ LOG.info(generateLogStatement(LockState.RELEASED,
generateLogSuffixString()));
+ } catch (Exception e) {
+ throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE,
generateLogSuffixString()), e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (lock != null) {
+ if (!client.releaseLock(lock)) {
+ LOG.warn("The lock has already been stolen");
+ }
+ lock = null;
+ }
+ this.client.close();
+ } catch (Exception e) {
+ LOG.error(generateLogStatement(LockState.FAILED_TO_RELEASE,
generateLogSuffixString()));
+ }
+ }
+
+ @Override
+ public LockItem getLock() {
+ return lock;
+ }
+
+ private AmazonDynamoDB getDynamoClient() {
+ String region =
this.lockConfiguration.getConfig().getString(DYNAMODB_REGION_PROP_KEY);
+ String endpointURL =
RegionUtils.getRegion(region).getServiceEndpoint(AmazonDynamoDB.ENDPOINT_PREFIX);
+ AwsClientBuilder.EndpointConfiguration dynamodbEndpoint =
+ new AwsClientBuilder.EndpointConfiguration(endpointURL, region);
+ return AmazonDynamoDBClientBuilder.standard()
+ .withEndpointConfiguration(dynamodbEndpoint)
+
.withCredentials(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(lockConfiguration.getConfig()))
+ .build();
+ }
+
+ public static void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB, String
tableName) {
+ createLockTableInDynamoDB(dynamoDB, tableName, "PAY_PER_REQUEST", 0L, 0L);
+ }
+
+ private static void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB,
String tableName, String billingMode,
+ Long readCapacityUnits, Long
writeCapacityUnits) {
+ KeySchemaElement partitionKeyElement = new KeySchemaElement();
+ partitionKeyElement.setAttributeName("key");
+ partitionKeyElement.setKeyType(KeyType.HASH);
+
+ List<KeySchemaElement> keySchema = new ArrayList<>();
+ keySchema.add(partitionKeyElement);
+
+ Collection<AttributeDefinition> attributeDefinitions = new ArrayList<>();
+ attributeDefinitions.add(new
AttributeDefinition().withAttributeName("key").withAttributeType(ScalarAttributeType.S));
+
+ CreateTableRequest createTableRequest = new CreateTableRequest(tableName,
keySchema);
+ createTableRequest.setAttributeDefinitions(attributeDefinitions);
+ createTableRequest.setBillingMode(billingMode);
+ if (billingMode.equals("PROVISIONED")) {
Review comment:
there isn't any enum for billing mode in aws sdk is it?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/DynamoDBBasedLockProvider.java
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.RegionUtils;
+import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
+import com.amazonaws.services.dynamodbv2.LockItem;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+import com.amazonaws.services.dynamodbv2.util.TableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.aws.HoodieAWSCredentialsProviderFactory;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_BILLING_MODE_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_PARTITION_KEY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_READ_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_REGION_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_TABLE_NAME_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_WRITE_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+
+/**
+ * A DynamoDB based lock. This {@link LockProvider} implementation allows to
lock table operations
+ * using DynamoDB. Users need to have access to AWS DynamoDB to be able to use
this lock.
+ */
+@NotThreadSafe
+public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
+
+ private static final Logger LOG =
LogManager.getLogger(DynamoDBBasedLockProvider.class);
+
+ private final AmazonDynamoDBLockClient client;
+ private final long leaseDuration;
+ private final String tableName;
+ protected LockConfiguration lockConfiguration;
+ private volatile LockItem lock;
+
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final Configuration conf) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ AmazonDynamoDB dynamoDB = getDynamoClient();
+ // build the dynamoDb lock client
+ this.client = new AmazonDynamoDBLockClient(
+ AmazonDynamoDBLockClientOptions.builder(dynamoDB, tableName)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .withLeaseDuration(leaseDuration)
+ .withHeartbeatPeriod(leaseDuration / 3)
+ .withCreateHeartbeatBackgroundThread(true)
+ .build());
+ if (!client.lockTableExists()) {
+ createLockTableInDynamoDB(
+ dynamoDB,
+ tableName,
+
lockConfiguration.getConfig().getString(DYNAMODB_BILLING_MODE_PROP_KEY),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_READ_CAPACITY_PROP_KEY)),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_WRITE_CAPACITY_PROP_KEY)));
+ }
+ }
+
+ // Only used for testing
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final AmazonDynamoDBLockClient client) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ this.client = client;
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) {
+ LOG.info(generateLogStatement(LockState.ACQUIRING,
generateLogSuffixString()));
+ long millisTime = unit.toMillis(time);
+ try {
+ lock =
client.acquireLock(AcquireLockOptions.builder(lockConfiguration.getConfig().getString(DYNAMODB_PARTITION_KEY_PROP_KEY))
+ .withAdditionalTimeToWaitForLock(millisTime - leaseDuration > 0
? millisTime - leaseDuration : 0)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .build());
+ LOG.info(generateLogStatement(LockState.ACQUIRED,
generateLogSuffixString()));
+ } catch (InterruptedException | LockNotGrantedException e) {
+ throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE,
generateLogSuffixString()), e);
+ }
+ return lock != null && !lock.isExpired();
+ }
+
+ @Override
+ public void unlock() {
+ try {
+ LOG.info(generateLogStatement(LockState.RELEASING,
generateLogSuffixString()));
+ if (lock == null) {
+ return;
+ }
+ if (!client.releaseLock(lock)) {
+ LOG.warn("The lock has already been stolen");
+ }
+ lock = null;
+ LOG.info(generateLogStatement(LockState.RELEASED,
generateLogSuffixString()));
+ } catch (Exception e) {
+ throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE,
generateLogSuffixString()), e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (lock != null) {
+ if (!client.releaseLock(lock)) {
+ LOG.warn("The lock has already been stolen");
+ }
+ lock = null;
+ }
+ this.client.close();
+ } catch (Exception e) {
+ LOG.error(generateLogStatement(LockState.FAILED_TO_RELEASE,
generateLogSuffixString()));
+ }
+ }
+
+ @Override
+ public LockItem getLock() {
+ return lock;
+ }
+
+ private AmazonDynamoDB getDynamoClient() {
+ String region =
this.lockConfiguration.getConfig().getString(DYNAMODB_REGION_PROP_KEY);
+ String endpointURL =
RegionUtils.getRegion(region).getServiceEndpoint(AmazonDynamoDB.ENDPOINT_PREFIX);
+ AwsClientBuilder.EndpointConfiguration dynamodbEndpoint =
+ new AwsClientBuilder.EndpointConfiguration(endpointURL, region);
+ return AmazonDynamoDBClientBuilder.standard()
+ .withEndpointConfiguration(dynamodbEndpoint)
+
.withCredentials(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(lockConfiguration.getConfig()))
+ .build();
+ }
+
+ public static void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB, String
tableName) {
+ createLockTableInDynamoDB(dynamoDB, tableName, "PAY_PER_REQUEST", 0L, 0L);
+ }
+
+ private static void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB,
String tableName, String billingMode,
+ Long readCapacityUnits, Long
writeCapacityUnits) {
+ KeySchemaElement partitionKeyElement = new KeySchemaElement();
+ partitionKeyElement.setAttributeName("key");
+ partitionKeyElement.setKeyType(KeyType.HASH);
+
+ List<KeySchemaElement> keySchema = new ArrayList<>();
+ keySchema.add(partitionKeyElement);
+
+ Collection<AttributeDefinition> attributeDefinitions = new ArrayList<>();
+ attributeDefinitions.add(new
AttributeDefinition().withAttributeName("key").withAttributeType(ScalarAttributeType.S));
+
+ CreateTableRequest createTableRequest = new CreateTableRequest(tableName,
keySchema);
+ createTableRequest.setAttributeDefinitions(attributeDefinitions);
+ createTableRequest.setBillingMode(billingMode);
+ if (billingMode.equals("PROVISIONED")) {
+ createTableRequest.setProvisionedThroughput(
+ new
ProvisionedThroughput().withReadCapacityUnits(readCapacityUnits).withWriteCapacityUnits(writeCapacityUnits));
+ }
+ dynamoDB.createTable(createTableRequest);
+
+ LOG.info("Creating dynamoDB table " + tableName + ", waiting for table to
be active");
+ try {
+ TableUtils.waitUntilActive(dynamoDB, tableName);
Review comment:
can't we configure a max wait time here?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/DynamoDBBasedLockProvider.java
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.RegionUtils;
+import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
+import com.amazonaws.services.dynamodbv2.LockItem;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+import com.amazonaws.services.dynamodbv2.util.TableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.aws.HoodieAWSCredentialsProviderFactory;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_BILLING_MODE_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_PARTITION_KEY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_READ_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_REGION_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_TABLE_NAME_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_WRITE_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+
+/**
+ * A DynamoDB based lock. This {@link LockProvider} implementation allows to
lock table operations
+ * using DynamoDB. Users need to have access to AWS DynamoDB to be able to use
this lock.
+ */
+@NotThreadSafe
+public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
+
+ private static final Logger LOG =
LogManager.getLogger(DynamoDBBasedLockProvider.class);
+
+ private final AmazonDynamoDBLockClient client;
+ private final long leaseDuration;
+ private final String tableName;
+ protected LockConfiguration lockConfiguration;
+ private volatile LockItem lock;
+
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final Configuration conf) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ AmazonDynamoDB dynamoDB = getDynamoClient();
+ // build the dynamoDb lock client
+ this.client = new AmazonDynamoDBLockClient(
+ AmazonDynamoDBLockClientOptions.builder(dynamoDB, tableName)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .withLeaseDuration(leaseDuration)
+ .withHeartbeatPeriod(leaseDuration / 3)
+ .withCreateHeartbeatBackgroundThread(true)
+ .build());
+ if (!client.lockTableExists()) {
+ createLockTableInDynamoDB(
+ dynamoDB,
+ tableName,
+
lockConfiguration.getConfig().getString(DYNAMODB_BILLING_MODE_PROP_KEY),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_READ_CAPACITY_PROP_KEY)),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_WRITE_CAPACITY_PROP_KEY)));
+ }
+ }
+
+ // Only used for testing
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final AmazonDynamoDBLockClient client) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ this.client = client;
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) {
+ LOG.info(generateLogStatement(LockState.ACQUIRING,
generateLogSuffixString()));
+ long millisTime = unit.toMillis(time);
+ try {
+ lock =
client.acquireLock(AcquireLockOptions.builder(lockConfiguration.getConfig().getString(DYNAMODB_PARTITION_KEY_PROP_KEY))
Review comment:
lets fetch the value of
lockConfiguration.getConfig().getString(DYNAMODB_PARTITION_KEY_PROP_KEY) and
store it in class variable and reuse it.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/DynamoDBBasedLockProvider.java
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.RegionUtils;
+import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
+import com.amazonaws.services.dynamodbv2.LockItem;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+import com.amazonaws.services.dynamodbv2.util.TableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.aws.HoodieAWSCredentialsProviderFactory;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_BILLING_MODE_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_PARTITION_KEY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_READ_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_REGION_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_TABLE_NAME_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_WRITE_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+
+/**
+ * A DynamoDB based lock. This {@link LockProvider} implementation allows to
lock table operations
+ * using DynamoDB. Users need to have access to AWS DynamoDB to be able to use
this lock.
+ */
+@NotThreadSafe
+public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
+
+ private static final Logger LOG =
LogManager.getLogger(DynamoDBBasedLockProvider.class);
+
+ private final AmazonDynamoDBLockClient client;
+ private final long leaseDuration;
+ private final String tableName;
+ protected LockConfiguration lockConfiguration;
+ private volatile LockItem lock;
+
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final Configuration conf) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ AmazonDynamoDB dynamoDB = getDynamoClient();
+ // build the dynamoDb lock client
+ this.client = new AmazonDynamoDBLockClient(
+ AmazonDynamoDBLockClientOptions.builder(dynamoDB, tableName)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .withLeaseDuration(leaseDuration)
+ .withHeartbeatPeriod(leaseDuration / 3)
+ .withCreateHeartbeatBackgroundThread(true)
+ .build());
+ if (!client.lockTableExists()) {
+ createLockTableInDynamoDB(
+ dynamoDB,
+ tableName,
+
lockConfiguration.getConfig().getString(DYNAMODB_BILLING_MODE_PROP_KEY),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_READ_CAPACITY_PROP_KEY)),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_WRITE_CAPACITY_PROP_KEY)));
+ }
+ }
+
+ // Only used for testing
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final AmazonDynamoDBLockClient client) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ this.client = client;
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) {
+ LOG.info(generateLogStatement(LockState.ACQUIRING,
generateLogSuffixString()));
+ long millisTime = unit.toMillis(time);
+ try {
+ lock =
client.acquireLock(AcquireLockOptions.builder(lockConfiguration.getConfig().getString(DYNAMODB_PARTITION_KEY_PROP_KEY))
+ .withAdditionalTimeToWaitForLock(millisTime - leaseDuration > 0
? millisTime - leaseDuration : 0)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .build());
+ LOG.info(generateLogStatement(LockState.ACQUIRED,
generateLogSuffixString()));
+ } catch (InterruptedException | LockNotGrantedException e) {
+ throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE,
generateLogSuffixString()), e);
+ }
+ return lock != null && !lock.isExpired();
+ }
+
+ @Override
+ public void unlock() {
+ try {
+ LOG.info(generateLogStatement(LockState.RELEASING,
generateLogSuffixString()));
+ if (lock == null) {
+ return;
+ }
+ if (!client.releaseLock(lock)) {
+ LOG.warn("The lock has already been stolen");
+ }
+ lock = null;
+ LOG.info(generateLogStatement(LockState.RELEASED,
generateLogSuffixString()));
+ } catch (Exception e) {
+ throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE,
generateLogSuffixString()), e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (lock != null) {
+ if (!client.releaseLock(lock)) {
+ LOG.warn("The lock has already been stolen");
+ }
+ lock = null;
+ }
+ this.client.close();
+ } catch (Exception e) {
+ LOG.error(generateLogStatement(LockState.FAILED_TO_RELEASE,
generateLogSuffixString()));
+ }
+ }
+
+ @Override
+ public LockItem getLock() {
+ return lock;
+ }
+
+ private AmazonDynamoDB getDynamoClient() {
+ String region =
this.lockConfiguration.getConfig().getString(DYNAMODB_REGION_PROP_KEY);
+ String endpointURL =
RegionUtils.getRegion(region).getServiceEndpoint(AmazonDynamoDB.ENDPOINT_PREFIX);
+ AwsClientBuilder.EndpointConfiguration dynamodbEndpoint =
+ new AwsClientBuilder.EndpointConfiguration(endpointURL, region);
+ return AmazonDynamoDBClientBuilder.standard()
+ .withEndpointConfiguration(dynamodbEndpoint)
+
.withCredentials(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(lockConfiguration.getConfig()))
+ .build();
+ }
+
+ public static void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB, String
tableName) {
+ createLockTableInDynamoDB(dynamoDB, tableName, "PAY_PER_REQUEST", 0L, 0L);
+ }
+
+ private static void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB,
String tableName, String billingMode,
+ Long readCapacityUnits, Long
writeCapacityUnits) {
+ KeySchemaElement partitionKeyElement = new KeySchemaElement();
+ partitionKeyElement.setAttributeName("key");
+ partitionKeyElement.setKeyType(KeyType.HASH);
+
+ List<KeySchemaElement> keySchema = new ArrayList<>();
+ keySchema.add(partitionKeyElement);
+
+ Collection<AttributeDefinition> attributeDefinitions = new ArrayList<>();
+ attributeDefinitions.add(new
AttributeDefinition().withAttributeName("key").withAttributeType(ScalarAttributeType.S));
Review comment:
can we declare a constant for "key" and reuse it.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/aws/HoodieAWSCredentialsProviderFactory.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.aws;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSCredentialsProviderChain;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+public class HoodieAWSCredentialsProviderFactory {
Review comment:
java docs
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/aws/HoodieConfigAWSCredentialsProvider.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.aws;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.BasicSessionCredentials;
+import org.apache.hudi.config.HoodieAWSConfig;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.Properties;
+
+public class HoodieConfigAWSCredentialsProvider implements
AWSCredentialsProvider {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieConfigAWSCredentialsProvider.class);
+
+ private AWSCredentials awsCredentials;
+
+ public HoodieConfigAWSCredentialsProvider(Properties props) {
+ String accessKey = props.getProperty(HoodieAWSConfig.AWS_ACCESS_KEY.key(),
null);
Review comment:
you don't need to set null as default value explicitly.
props.getProperty(HoodieAWSConfig.AWS_ACCESS_KEY.key()) will return null if not
found.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/DynamoDBBasedLockProvider.java
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.RegionUtils;
+import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
+import com.amazonaws.services.dynamodbv2.LockItem;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+import com.amazonaws.services.dynamodbv2.util.TableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.aws.HoodieAWSCredentialsProviderFactory;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_BILLING_MODE_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_PARTITION_KEY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_READ_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_REGION_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_TABLE_NAME_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_WRITE_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+
+/**
+ * A DynamoDB based lock. This {@link LockProvider} implementation allows to
lock table operations
+ * using DynamoDB. Users need to have access to AWS DynamoDB to be able to use
this lock.
+ */
+@NotThreadSafe
+public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
+
+ private static final Logger LOG =
LogManager.getLogger(DynamoDBBasedLockProvider.class);
+
+ private final AmazonDynamoDBLockClient client;
+ private final long leaseDuration;
+ private final String tableName;
+ protected LockConfiguration lockConfiguration;
+ private volatile LockItem lock;
+
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final Configuration conf) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ AmazonDynamoDB dynamoDB = getDynamoClient();
+ // build the dynamoDb lock client
+ this.client = new AmazonDynamoDBLockClient(
+ AmazonDynamoDBLockClientOptions.builder(dynamoDB, tableName)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .withLeaseDuration(leaseDuration)
+ .withHeartbeatPeriod(leaseDuration / 3)
+ .withCreateHeartbeatBackgroundThread(true)
+ .build());
+ if (!client.lockTableExists()) {
+ createLockTableInDynamoDB(
+ dynamoDB,
+ tableName,
+
lockConfiguration.getConfig().getString(DYNAMODB_BILLING_MODE_PROP_KEY),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_READ_CAPACITY_PROP_KEY)),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_WRITE_CAPACITY_PROP_KEY)));
+ }
+ }
+
+ // Only used for testing
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final AmazonDynamoDBLockClient client) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ this.client = client;
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) {
+ LOG.info(generateLogStatement(LockState.ACQUIRING,
generateLogSuffixString()));
+ long millisTime = unit.toMillis(time);
+ try {
+ lock =
client.acquireLock(AcquireLockOptions.builder(lockConfiguration.getConfig().getString(DYNAMODB_PARTITION_KEY_PROP_KEY))
+ .withAdditionalTimeToWaitForLock(millisTime - leaseDuration > 0
? millisTime - leaseDuration : 0)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .build());
+ LOG.info(generateLogStatement(LockState.ACQUIRED,
generateLogSuffixString()));
+ } catch (InterruptedException | LockNotGrantedException e) {
+ throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE,
generateLogSuffixString()), e);
+ }
+ return lock != null && !lock.isExpired();
+ }
+
+ @Override
+ public void unlock() {
+ try {
+ LOG.info(generateLogStatement(LockState.RELEASING,
generateLogSuffixString()));
+ if (lock == null) {
+ return;
+ }
+ if (!client.releaseLock(lock)) {
+ LOG.warn("The lock has already been stolen");
+ }
+ lock = null;
+ LOG.info(generateLogStatement(LockState.RELEASED,
generateLogSuffixString()));
+ } catch (Exception e) {
+ throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE,
generateLogSuffixString()), e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (lock != null) {
+ if (!client.releaseLock(lock)) {
+ LOG.warn("The lock has already been stolen");
+ }
+ lock = null;
+ }
+ this.client.close();
+ } catch (Exception e) {
+ LOG.error(generateLogStatement(LockState.FAILED_TO_RELEASE,
generateLogSuffixString()));
+ }
+ }
+
+ @Override
+ public LockItem getLock() {
+ return lock;
+ }
+
+ private AmazonDynamoDB getDynamoClient() {
+ String region =
this.lockConfiguration.getConfig().getString(DYNAMODB_REGION_PROP_KEY);
+ String endpointURL =
RegionUtils.getRegion(region).getServiceEndpoint(AmazonDynamoDB.ENDPOINT_PREFIX);
+ AwsClientBuilder.EndpointConfiguration dynamodbEndpoint =
+ new AwsClientBuilder.EndpointConfiguration(endpointURL, region);
+ return AmazonDynamoDBClientBuilder.standard()
+ .withEndpointConfiguration(dynamodbEndpoint)
+
.withCredentials(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(lockConfiguration.getConfig()))
+ .build();
+ }
+
+ public static void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB, String
tableName) {
+ createLockTableInDynamoDB(dynamoDB, tableName, "PAY_PER_REQUEST", 0L, 0L);
+ }
+
+ private static void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB,
String tableName, String billingMode,
+ Long readCapacityUnits, Long
writeCapacityUnits) {
+ KeySchemaElement partitionKeyElement = new KeySchemaElement();
+ partitionKeyElement.setAttributeName("key");
+ partitionKeyElement.setKeyType(KeyType.HASH);
+
+ List<KeySchemaElement> keySchema = new ArrayList<>();
+ keySchema.add(partitionKeyElement);
+
+ Collection<AttributeDefinition> attributeDefinitions = new ArrayList<>();
+ attributeDefinitions.add(new
AttributeDefinition().withAttributeName("key").withAttributeType(ScalarAttributeType.S));
+
+ CreateTableRequest createTableRequest = new CreateTableRequest(tableName,
keySchema);
+ createTableRequest.setAttributeDefinitions(attributeDefinitions);
+ createTableRequest.setBillingMode(billingMode);
+ if (billingMode.equals("PROVISIONED")) {
+ createTableRequest.setProvisionedThroughput(
+ new
ProvisionedThroughput().withReadCapacityUnits(readCapacityUnits).withWriteCapacityUnits(writeCapacityUnits));
+ }
+ dynamoDB.createTable(createTableRequest);
+
+ LOG.info("Creating dynamoDB table " + tableName + ", waiting for table to
be active");
+ try {
+ TableUtils.waitUntilActive(dynamoDB, tableName);
+ } catch (TableUtils.TableNeverTransitionedToStateException e) {
+ throw new HoodieLockException("Created dynamoDB table never transits to
active", e);
+ } catch (InterruptedException e) {
+ throw new HoodieLockException("Thread interrupted while waiting for
dynamoDB table to turn active", e);
+ }
+ LOG.info("Created dynamoDB table " + tableName);
+ }
+
+ private void checkRequiredProps(final LockConfiguration config) {
+
ValidationUtils.checkArgument(config.getConfig().getString(DYNAMODB_BILLING_MODE_PROP_KEY)
!= null);
+
ValidationUtils.checkArgument(config.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY)
!= null);
+
ValidationUtils.checkArgument(config.getConfig().getString(DYNAMODB_PARTITION_KEY_PROP_KEY)
!= null);
+
ValidationUtils.checkArgument(config.getConfig().getString(DYNAMODB_REGION_PROP_KEY)
!= null);
+ }
+
+ private String generateLogSuffixString() {
+ String dynamoDbPartitionKey =
this.lockConfiguration.getConfig().getString(DYNAMODB_PARTITION_KEY_PROP_KEY);
Review comment:
can we fetch the partition key in constructor and store in a local
instance variable rather than fetching from config everytime
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/DynamoDBBasedLockProvider.java
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.RegionUtils;
+import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
+import com.amazonaws.services.dynamodbv2.LockItem;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+import com.amazonaws.services.dynamodbv2.util.TableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.aws.HoodieAWSCredentialsProviderFactory;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_BILLING_MODE_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_PARTITION_KEY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_READ_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_REGION_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_TABLE_NAME_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_WRITE_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+
+/**
+ * A DynamoDB based lock. This {@link LockProvider} implementation allows to
lock table operations
+ * using DynamoDB. Users need to have access to AWS DynamoDB to be able to use
this lock.
+ */
+@NotThreadSafe
+public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
+
+ private static final Logger LOG =
LogManager.getLogger(DynamoDBBasedLockProvider.class);
+
+ private final AmazonDynamoDBLockClient client;
+ private final long leaseDuration;
+ private final String tableName;
+ protected LockConfiguration lockConfiguration;
+ private volatile LockItem lock;
+
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final Configuration conf) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ AmazonDynamoDB dynamoDB = getDynamoClient();
+ // build the dynamoDb lock client
+ this.client = new AmazonDynamoDBLockClient(
+ AmazonDynamoDBLockClientOptions.builder(dynamoDB, tableName)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .withLeaseDuration(leaseDuration)
+ .withHeartbeatPeriod(leaseDuration / 3)
+ .withCreateHeartbeatBackgroundThread(true)
+ .build());
+ if (!client.lockTableExists()) {
+ createLockTableInDynamoDB(
+ dynamoDB,
+ tableName,
+
lockConfiguration.getConfig().getString(DYNAMODB_BILLING_MODE_PROP_KEY),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_READ_CAPACITY_PROP_KEY)),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_WRITE_CAPACITY_PROP_KEY)));
+ }
+ }
+
+ // Only used for testing
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final AmazonDynamoDBLockClient client) {
Review comment:
lets not create a constructor used only in tests. Can we do something
like this. Split the first constructor into two. Once you construct the
AmazonDynamoDBLockClient, you can call into another constructor. That way,
source code path also flows thru the 2nd constructor.
Do you think this would work ?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/DynamoDBBasedLockProvider.java
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.RegionUtils;
+import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
+import com.amazonaws.services.dynamodbv2.LockItem;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+import com.amazonaws.services.dynamodbv2.util.TableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.aws.HoodieAWSCredentialsProviderFactory;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_BILLING_MODE_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_PARTITION_KEY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_READ_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_REGION_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_TABLE_NAME_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_WRITE_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+
+/**
+ * A DynamoDB based lock. This {@link LockProvider} implementation allows to
lock table operations
+ * using DynamoDB. Users need to have access to AWS DynamoDB to be able to use
this lock.
+ */
+@NotThreadSafe
+public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
+
+ private static final Logger LOG =
LogManager.getLogger(DynamoDBBasedLockProvider.class);
+
+ private final AmazonDynamoDBLockClient client;
+ private final long leaseDuration;
+ private final String tableName;
+ protected LockConfiguration lockConfiguration;
+ private volatile LockItem lock;
+
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final Configuration conf) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ AmazonDynamoDB dynamoDB = getDynamoClient();
Review comment:
minor. getDynamoDbClient
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/DynamoDBBasedLockProvider.java
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.RegionUtils;
+import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
+import com.amazonaws.services.dynamodbv2.LockItem;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+import com.amazonaws.services.dynamodbv2.util.TableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.aws.HoodieAWSCredentialsProviderFactory;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_BILLING_MODE_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_PARTITION_KEY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_READ_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_REGION_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_TABLE_NAME_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_WRITE_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+
+/**
+ * A DynamoDB based lock. This {@link LockProvider} implementation allows to
lock table operations
+ * using DynamoDB. Users need to have access to AWS DynamoDB to be able to use
this lock.
+ */
+@NotThreadSafe
+public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
+
+ private static final Logger LOG =
LogManager.getLogger(DynamoDBBasedLockProvider.class);
+
+ private final AmazonDynamoDBLockClient client;
+ private final long leaseDuration;
+ private final String tableName;
+ protected LockConfiguration lockConfiguration;
+ private volatile LockItem lock;
+
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final Configuration conf) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ AmazonDynamoDB dynamoDB = getDynamoClient();
+ // build the dynamoDb lock client
+ this.client = new AmazonDynamoDBLockClient(
+ AmazonDynamoDBLockClientOptions.builder(dynamoDB, tableName)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .withLeaseDuration(leaseDuration)
+ .withHeartbeatPeriod(leaseDuration / 3)
+ .withCreateHeartbeatBackgroundThread(true)
+ .build());
+ if (!client.lockTableExists()) {
+ createLockTableInDynamoDB(
+ dynamoDB,
+ tableName,
+
lockConfiguration.getConfig().getString(DYNAMODB_BILLING_MODE_PROP_KEY),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_READ_CAPACITY_PROP_KEY)),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_WRITE_CAPACITY_PROP_KEY)));
+ }
+ }
+
+ // Only used for testing
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final AmazonDynamoDBLockClient client) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ this.client = client;
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) {
+ LOG.info(generateLogStatement(LockState.ACQUIRING,
generateLogSuffixString()));
+ long millisTime = unit.toMillis(time);
+ try {
+ lock =
client.acquireLock(AcquireLockOptions.builder(lockConfiguration.getConfig().getString(DYNAMODB_PARTITION_KEY_PROP_KEY))
+ .withAdditionalTimeToWaitForLock(millisTime - leaseDuration > 0
? millisTime - leaseDuration : 0)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .build());
+ LOG.info(generateLogStatement(LockState.ACQUIRED,
generateLogSuffixString()));
+ } catch (InterruptedException | LockNotGrantedException e) {
+ throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE,
generateLogSuffixString()), e);
+ }
+ return lock != null && !lock.isExpired();
+ }
+
+ @Override
+ public void unlock() {
+ try {
+ LOG.info(generateLogStatement(LockState.RELEASING,
generateLogSuffixString()));
+ if (lock == null) {
+ return;
+ }
+ if (!client.releaseLock(lock)) {
+ LOG.warn("The lock has already been stolen");
+ }
+ lock = null;
+ LOG.info(generateLogStatement(LockState.RELEASED,
generateLogSuffixString()));
+ } catch (Exception e) {
+ throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE,
generateLogSuffixString()), e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (lock != null) {
+ if (!client.releaseLock(lock)) {
+ LOG.warn("The lock has already been stolen");
+ }
+ lock = null;
+ }
+ this.client.close();
+ } catch (Exception e) {
+ LOG.error(generateLogStatement(LockState.FAILED_TO_RELEASE,
generateLogSuffixString()));
+ }
+ }
+
+ @Override
+ public LockItem getLock() {
+ return lock;
+ }
+
+ private AmazonDynamoDB getDynamoClient() {
+ String region =
this.lockConfiguration.getConfig().getString(DYNAMODB_REGION_PROP_KEY);
+ String endpointURL =
RegionUtils.getRegion(region).getServiceEndpoint(AmazonDynamoDB.ENDPOINT_PREFIX);
+ AwsClientBuilder.EndpointConfiguration dynamodbEndpoint =
+ new AwsClientBuilder.EndpointConfiguration(endpointURL, region);
+ return AmazonDynamoDBClientBuilder.standard()
+ .withEndpointConfiguration(dynamodbEndpoint)
+
.withCredentials(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(lockConfiguration.getConfig()))
+ .build();
+ }
+
+ public static void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB, String
tableName) {
Review comment:
can we move static methods to a util class.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/aws/HoodieAWSCredentialsProviderFactory.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.aws;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSCredentialsProviderChain;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+public class HoodieAWSCredentialsProviderFactory {
+ public static AWSCredentialsProvider getAwsCredentialsProvider(Properties
props) {
+ return getAwsCredentialsProviderChain(props);
+ }
+
+ private static AWSCredentialsProvider
getAwsCredentialsProviderChain(Properties props) {
+ List<AWSCredentialsProvider> providers = new ArrayList<>();
+ providers.add(new HoodieConfigAWSCredentialsProvider(props));
+ providers.add(new DefaultAWSCredentialsProviderChain());
Review comment:
Can you throw some light here please.
adding two providers mean, we first try with
HoodieConfigAWSCredentialsProvider and if its not available, we will fallback
to using DefaultAWSCredentialsProvider is it ?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/DynamoDBBasedLockProvider.java
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.RegionUtils;
+import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
+import com.amazonaws.services.dynamodbv2.LockItem;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+import com.amazonaws.services.dynamodbv2.util.TableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.aws.HoodieAWSCredentialsProviderFactory;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_BILLING_MODE_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_PARTITION_KEY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_READ_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_REGION_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_TABLE_NAME_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_WRITE_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+
+/**
+ * A DynamoDB based lock. This {@link LockProvider} implementation allows to
lock table operations
+ * using DynamoDB. Users need to have access to AWS DynamoDB to be able to use
this lock.
+ */
+@NotThreadSafe
+public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
+
+ private static final Logger LOG =
LogManager.getLogger(DynamoDBBasedLockProvider.class);
+
+ private final AmazonDynamoDBLockClient client;
+ private final long leaseDuration;
+ private final String tableName;
+ protected LockConfiguration lockConfiguration;
+ private volatile LockItem lock;
+
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final Configuration conf) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ AmazonDynamoDB dynamoDB = getDynamoClient();
+ // build the dynamoDb lock client
+ this.client = new AmazonDynamoDBLockClient(
+ AmazonDynamoDBLockClientOptions.builder(dynamoDB, tableName)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .withLeaseDuration(leaseDuration)
+ .withHeartbeatPeriod(leaseDuration / 3)
+ .withCreateHeartbeatBackgroundThread(true)
+ .build());
+ if (!client.lockTableExists()) {
+ createLockTableInDynamoDB(
+ dynamoDB,
+ tableName,
+
lockConfiguration.getConfig().getString(DYNAMODB_BILLING_MODE_PROP_KEY),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_READ_CAPACITY_PROP_KEY)),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_WRITE_CAPACITY_PROP_KEY)));
+ }
+ }
+
+ // Only used for testing
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final AmazonDynamoDBLockClient client) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ this.client = client;
Review comment:
curious as to why local table creation is not called in this constructor
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/DynamoDBBasedLockProvider.java
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.RegionUtils;
+import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
+import com.amazonaws.services.dynamodbv2.LockItem;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+import com.amazonaws.services.dynamodbv2.util.TableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.aws.HoodieAWSCredentialsProviderFactory;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_BILLING_MODE_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_PARTITION_KEY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_READ_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_REGION_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_TABLE_NAME_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_WRITE_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+
+/**
+ * A DynamoDB based lock. This {@link LockProvider} implementation allows to
lock table operations
+ * using DynamoDB. Users need to have access to AWS DynamoDB to be able to use
this lock.
+ */
+@NotThreadSafe
+public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
+
+ private static final Logger LOG =
LogManager.getLogger(DynamoDBBasedLockProvider.class);
+
+ private final AmazonDynamoDBLockClient client;
+ private final long leaseDuration;
+ private final String tableName;
+ protected LockConfiguration lockConfiguration;
+ private volatile LockItem lock;
+
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final Configuration conf) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ AmazonDynamoDB dynamoDB = getDynamoClient();
+ // build the dynamoDb lock client
+ this.client = new AmazonDynamoDBLockClient(
+ AmazonDynamoDBLockClientOptions.builder(dynamoDB, tableName)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .withLeaseDuration(leaseDuration)
+ .withHeartbeatPeriod(leaseDuration / 3)
+ .withCreateHeartbeatBackgroundThread(true)
+ .build());
+ if (!client.lockTableExists()) {
+ createLockTableInDynamoDB(
+ dynamoDB,
+ tableName,
+
lockConfiguration.getConfig().getString(DYNAMODB_BILLING_MODE_PROP_KEY),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_READ_CAPACITY_PROP_KEY)),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_WRITE_CAPACITY_PROP_KEY)));
+ }
+ }
+
+ // Only used for testing
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final AmazonDynamoDBLockClient client) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ this.client = client;
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) {
+ LOG.info(generateLogStatement(LockState.ACQUIRING,
generateLogSuffixString()));
+ long millisTime = unit.toMillis(time);
+ try {
+ lock =
client.acquireLock(AcquireLockOptions.builder(lockConfiguration.getConfig().getString(DYNAMODB_PARTITION_KEY_PROP_KEY))
+ .withAdditionalTimeToWaitForLock(millisTime - leaseDuration > 0
? millisTime - leaseDuration : 0)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .build());
+ LOG.info(generateLogStatement(LockState.ACQUIRED,
generateLogSuffixString()));
+ } catch (InterruptedException | LockNotGrantedException e) {
+ throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE,
generateLogSuffixString()), e);
Review comment:
If lock is not granted, why do we throw exception here? LockManager has
a retry mechanism through which it will try to acquire lock few times with some
delays inbetween. Don't think we can throw from here if lock could not be
acquired.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/DynamoDBBasedLockProvider.java
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.RegionUtils;
+import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
+import com.amazonaws.services.dynamodbv2.LockItem;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+import com.amazonaws.services.dynamodbv2.util.TableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.aws.HoodieAWSCredentialsProviderFactory;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_BILLING_MODE_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_PARTITION_KEY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_READ_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_REGION_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_TABLE_NAME_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_WRITE_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+
+/**
+ * A DynamoDB based lock. This {@link LockProvider} implementation allows to
lock table operations
+ * using DynamoDB. Users need to have access to AWS DynamoDB to be able to use
this lock.
+ */
+@NotThreadSafe
+public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
+
+ private static final Logger LOG =
LogManager.getLogger(DynamoDBBasedLockProvider.class);
+
+ private final AmazonDynamoDBLockClient client;
+ private final long leaseDuration;
+ private final String tableName;
+ protected LockConfiguration lockConfiguration;
+ private volatile LockItem lock;
+
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final Configuration conf) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ AmazonDynamoDB dynamoDB = getDynamoClient();
+ // build the dynamoDb lock client
+ this.client = new AmazonDynamoDBLockClient(
+ AmazonDynamoDBLockClientOptions.builder(dynamoDB, tableName)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .withLeaseDuration(leaseDuration)
+ .withHeartbeatPeriod(leaseDuration / 3)
+ .withCreateHeartbeatBackgroundThread(true)
+ .build());
+ if (!client.lockTableExists()) {
+ createLockTableInDynamoDB(
+ dynamoDB,
+ tableName,
+
lockConfiguration.getConfig().getString(DYNAMODB_BILLING_MODE_PROP_KEY),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_READ_CAPACITY_PROP_KEY)),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_WRITE_CAPACITY_PROP_KEY)));
+ }
+ }
+
+ // Only used for testing
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final AmazonDynamoDBLockClient client) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ this.client = client;
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) {
+ LOG.info(generateLogStatement(LockState.ACQUIRING,
generateLogSuffixString()));
+ long millisTime = unit.toMillis(time);
+ try {
+ lock =
client.acquireLock(AcquireLockOptions.builder(lockConfiguration.getConfig().getString(DYNAMODB_PARTITION_KEY_PROP_KEY))
+ .withAdditionalTimeToWaitForLock(millisTime - leaseDuration > 0
? millisTime - leaseDuration : 0)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .build());
+ LOG.info(generateLogStatement(LockState.ACQUIRED,
generateLogSuffixString()));
+ } catch (InterruptedException | LockNotGrantedException e) {
+ throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE,
generateLogSuffixString()), e);
+ }
+ return lock != null && !lock.isExpired();
+ }
+
+ @Override
+ public void unlock() {
+ try {
+ LOG.info(generateLogStatement(LockState.RELEASING,
generateLogSuffixString()));
+ if (lock == null) {
+ return;
+ }
+ if (!client.releaseLock(lock)) {
+ LOG.warn("The lock has already been stolen");
+ }
+ lock = null;
+ LOG.info(generateLogStatement(LockState.RELEASED,
generateLogSuffixString()));
+ } catch (Exception e) {
+ throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE,
generateLogSuffixString()), e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (lock != null) {
+ if (!client.releaseLock(lock)) {
+ LOG.warn("The lock has already been stolen");
+ }
+ lock = null;
+ }
+ this.client.close();
+ } catch (Exception e) {
+ LOG.error(generateLogStatement(LockState.FAILED_TO_RELEASE,
generateLogSuffixString()));
+ }
+ }
+
+ @Override
+ public LockItem getLock() {
+ return lock;
+ }
+
+ private AmazonDynamoDB getDynamoClient() {
+ String region =
this.lockConfiguration.getConfig().getString(DYNAMODB_REGION_PROP_KEY);
+ String endpointURL =
RegionUtils.getRegion(region).getServiceEndpoint(AmazonDynamoDB.ENDPOINT_PREFIX);
+ AwsClientBuilder.EndpointConfiguration dynamodbEndpoint =
+ new AwsClientBuilder.EndpointConfiguration(endpointURL, region);
+ return AmazonDynamoDBClientBuilder.standard()
+ .withEndpointConfiguration(dynamodbEndpoint)
+
.withCredentials(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(lockConfiguration.getConfig()))
+ .build();
+ }
+
+ public static void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB, String
tableName) {
Review comment:
I see this method L175 is called only in tests. Should we move this to
test module only.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/DynamoDBBasedLockProvider.java
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.RegionUtils;
+import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
+import com.amazonaws.services.dynamodbv2.LockItem;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+import com.amazonaws.services.dynamodbv2.util.TableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.aws.HoodieAWSCredentialsProviderFactory;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieLockException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_BILLING_MODE_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_PARTITION_KEY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_READ_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_REGION_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_TABLE_NAME_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.DYNAMODB_WRITE_CAPACITY_PROP_KEY;
+import static
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+
+/**
+ * A DynamoDB based lock. This {@link LockProvider} implementation allows to
lock table operations
+ * using DynamoDB. Users need to have access to AWS DynamoDB to be able to use
this lock.
+ */
+@NotThreadSafe
+public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
+
+ private static final Logger LOG =
LogManager.getLogger(DynamoDBBasedLockProvider.class);
+
+ private final AmazonDynamoDBLockClient client;
+ private final long leaseDuration;
+ private final String tableName;
+ protected LockConfiguration lockConfiguration;
+ private volatile LockItem lock;
+
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final Configuration conf) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ AmazonDynamoDB dynamoDB = getDynamoClient();
+ // build the dynamoDb lock client
+ this.client = new AmazonDynamoDBLockClient(
+ AmazonDynamoDBLockClientOptions.builder(dynamoDB, tableName)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .withLeaseDuration(leaseDuration)
+ .withHeartbeatPeriod(leaseDuration / 3)
+ .withCreateHeartbeatBackgroundThread(true)
+ .build());
+ if (!client.lockTableExists()) {
+ createLockTableInDynamoDB(
+ dynamoDB,
+ tableName,
+
lockConfiguration.getConfig().getString(DYNAMODB_BILLING_MODE_PROP_KEY),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_READ_CAPACITY_PROP_KEY)),
+
Long.parseLong(lockConfiguration.getConfig().getString(DYNAMODB_WRITE_CAPACITY_PROP_KEY)));
+ }
+ }
+
+ // Only used for testing
+ public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration,
final AmazonDynamoDBLockClient client) {
+ checkRequiredProps(lockConfiguration);
+ this.lockConfiguration = lockConfiguration;
+ this.tableName =
lockConfiguration.getConfig().getString(DYNAMODB_TABLE_NAME_PROP_KEY);
+ this.leaseDuration =
Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
+ this.client = client;
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) {
+ LOG.info(generateLogStatement(LockState.ACQUIRING,
generateLogSuffixString()));
+ long millisTime = unit.toMillis(time);
+ try {
+ lock =
client.acquireLock(AcquireLockOptions.builder(lockConfiguration.getConfig().getString(DYNAMODB_PARTITION_KEY_PROP_KEY))
+ .withAdditionalTimeToWaitForLock(millisTime - leaseDuration > 0
? millisTime - leaseDuration : 0)
+ .withTimeUnit(TimeUnit.MILLISECONDS)
+ .build());
+ LOG.info(generateLogStatement(LockState.ACQUIRED,
generateLogSuffixString()));
+ } catch (InterruptedException | LockNotGrantedException e) {
+ throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE,
generateLogSuffixString()), e);
+ }
+ return lock != null && !lock.isExpired();
+ }
+
+ @Override
+ public void unlock() {
+ try {
+ LOG.info(generateLogStatement(LockState.RELEASING,
generateLogSuffixString()));
+ if (lock == null) {
+ return;
+ }
+ if (!client.releaseLock(lock)) {
+ LOG.warn("The lock has already been stolen");
+ }
+ lock = null;
+ LOG.info(generateLogStatement(LockState.RELEASED,
generateLogSuffixString()));
+ } catch (Exception e) {
+ throw new
HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE,
generateLogSuffixString()), e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (lock != null) {
+ if (!client.releaseLock(lock)) {
+ LOG.warn("The lock has already been stolen");
+ }
+ lock = null;
+ }
+ this.client.close();
+ } catch (Exception e) {
+ LOG.error(generateLogStatement(LockState.FAILED_TO_RELEASE,
generateLogSuffixString()));
+ }
+ }
+
+ @Override
+ public LockItem getLock() {
+ return lock;
+ }
+
+ private AmazonDynamoDB getDynamoClient() {
+ String region =
this.lockConfiguration.getConfig().getString(DYNAMODB_REGION_PROP_KEY);
+ String endpointURL =
RegionUtils.getRegion(region).getServiceEndpoint(AmazonDynamoDB.ENDPOINT_PREFIX);
+ AwsClientBuilder.EndpointConfiguration dynamodbEndpoint =
+ new AwsClientBuilder.EndpointConfiguration(endpointURL, region);
+ return AmazonDynamoDBClientBuilder.standard()
+ .withEndpointConfiguration(dynamodbEndpoint)
+
.withCredentials(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(lockConfiguration.getConfig()))
+ .build();
+ }
+
+ public static void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB, String
tableName) {
+ createLockTableInDynamoDB(dynamoDB, tableName, "PAY_PER_REQUEST", 0L, 0L);
+ }
+
+ private static void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB,
String tableName, String billingMode,
+ Long readCapacityUnits, Long
writeCapacityUnits) {
+ KeySchemaElement partitionKeyElement = new KeySchemaElement();
+ partitionKeyElement.setAttributeName("key");
+ partitionKeyElement.setKeyType(KeyType.HASH);
+
+ List<KeySchemaElement> keySchema = new ArrayList<>();
+ keySchema.add(partitionKeyElement);
+
+ Collection<AttributeDefinition> attributeDefinitions = new ArrayList<>();
+ attributeDefinitions.add(new
AttributeDefinition().withAttributeName("key").withAttributeType(ScalarAttributeType.S));
+
+ CreateTableRequest createTableRequest = new CreateTableRequest(tableName,
keySchema);
+ createTableRequest.setAttributeDefinitions(attributeDefinitions);
+ createTableRequest.setBillingMode(billingMode);
+ if (billingMode.equals("PROVISIONED")) {
Review comment:
https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/dynamodbv2/model/BillingMode.html
can we use the enum rather than hardcoding
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]