Repository: hadoop Updated Branches: refs/heads/HADOOP-13345 be03eb6a4 -> 872e10610
HADOOP-14215. DynamoDB client should waitForActive on existing tables. Contributed by Mingliang Liu and Sean Mackrory Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/872e1061 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/872e1061 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/872e1061 Branch: refs/heads/HADOOP-13345 Commit: 872e10610cd53265e5b44d32aab4d257f40935f5 Parents: be03eb6 Author: Mingliang Liu <lium...@apache.org> Authored: Thu Apr 6 17:42:49 2017 -0700 Committer: Mingliang Liu <lium...@apache.org> Committed: Thu Apr 6 18:40:19 2017 -0700 ---------------------------------------------------------------------- .../fs/s3a/s3guard/DynamoDBMetadataStore.java | 83 +++++++++-- .../s3a/s3guard/ITestS3GuardConcurrentOps.java | 139 +++++++++++++++++++ 2 files changed, 207 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/872e1061/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java index b2f011c..e0a171d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java @@ -698,7 +698,7 @@ public class DynamoDBMetadataStore implements MetadataStore { * after this method returns successfully. * * @throws IOException if table does not exist and auto-creation is disabled; - * or any other I/O exception occurred. + * or table is being deleted, or any other I/O exception occurred. */ @VisibleForTesting void initTable() throws IOException { @@ -706,15 +706,31 @@ public class DynamoDBMetadataStore implements MetadataStore { try { try { LOG.debug("Binding to table {}", tableName); - table.describe(); - final Item versionMarker = table.getItem( - createVersionMarkerPrimaryKey(VERSION_MARKER)); + final String status = table.describe().getTableStatus(); + switch (status) { + case "CREATING": + case "UPDATING": + LOG.debug("Table {} in region {} is being created/updated. This may " + + "indicate that the table is being operated by another " + + "concurrent thread or process. Waiting for active...", + tableName, region); + waitForTableActive(table); + break; + case "DELETING": + throw new IOException("DynamoDB table '" + tableName + "' is being " + + "deleted in region " + region); + case "ACTIVE": + break; + default: + throw new IOException("Unknown DynamoDB table status " + status + + ": tableName='" + tableName + "', region=" + region); + } + + final Item versionMarker = getVersionMarkerItem(); verifyVersionCompatibility(tableName, versionMarker); Long created = extractCreationTimeFromMarker(versionMarker); LOG.debug("Using existing DynamoDB table {} in region {} created {}", - tableName, region, - created != null ? new Date(created) : null); - + tableName, region, (created != null) ? new Date(created) : null); } catch (ResourceNotFoundException rnfe) { if (conf.getBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, false)) { final ProvisionedThroughput capacity = new ProvisionedThroughput( @@ -736,6 +752,35 @@ public class DynamoDBMetadataStore implements MetadataStore { } /** + * Get the version mark item in the existing DynamoDB table. + * + * As the version marker item may be created by another concurrent thread or + * process, we retry a limited times before we fail to get it. + */ + private Item getVersionMarkerItem() throws IOException { + final PrimaryKey versionMarkerKey = + createVersionMarkerPrimaryKey(VERSION_MARKER); + int retryCount = 0; + Item versionMarker = table.getItem(versionMarkerKey); + while (versionMarker == null) { + try { + RetryPolicy.RetryAction action = batchRetryPolicy.shouldRetry(null, + retryCount, 0, true); + if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { + break; + } else { + LOG.debug("Sleeping {} ms before next retry", action.delayMillis); + Thread.sleep(action.delayMillis); + } + } catch (Exception e) { + throw new IOException("initTable: Unexpected exception", e); + } + versionMarker = table.getItem(versionMarkerKey); + } + return versionMarker; + } + + /** * Verify that a table version is compatible with this S3Guard client. * @param tableName name of the table (for error messages) * @param versionMarker the version marker retrieved from the table @@ -762,6 +807,21 @@ public class DynamoDBMetadataStore implements MetadataStore { } /** + * Wait for table being active. + */ + private void waitForTableActive(Table table) throws IOException { + try { + table.waitForActive(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for table {} in region {} active", + tableName, region, e); + Thread.currentThread().interrupt(); + throw (IOException) new InterruptedIOException("DynamoDB table '" + + tableName + "' is not active yet in region " + region).initCause(e); + } + } + + /** * Create a table, wait for it to become active, then add the version * marker. * @param capacity capacity to provision @@ -777,20 +837,13 @@ public class DynamoDBMetadataStore implements MetadataStore { .withAttributeDefinitions(attributeDefinitions()) .withProvisionedThroughput(capacity)); LOG.debug("Awaiting table becoming active"); - table.waitForActive(); } catch (ResourceInUseException e) { LOG.warn("ResourceInUseException while creating DynamoDB table {} " + "in region {}. This may indicate that the table was " + "created by another concurrent thread or process.", tableName, region); - } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting for DynamoDB table {} active", - tableName, e); - Thread.currentThread().interrupt(); - throw (IOException) new InterruptedIOException( - "DynamoDB table '" + tableName + "' " - + "is not active yet in region " + region).initCause(e); } + waitForTableActive(table); final Item marker = createVersionMarker(VERSION_MARKER, VERSION, System.currentTimeMillis()); putItem(marker); http://git-wip-us.apache.org/repos/asf/hadoop/blob/872e1061/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java new file mode 100644 index 0000000..bfad328 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.Table; +import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException; + +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.Constants; + +import org.junit.Assume; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +/** + * Tests concurrent operations on S3Guard. + */ +public class ITestS3GuardConcurrentOps extends AbstractS3ATestBase { + + @Rule + public final Timeout timeout = new Timeout(5 * 60 * 1000); + + private void failIfTableExists(DynamoDB db, String tableName) { + boolean tableExists = true; + try { + Table table = db.getTable(tableName); + table.describe(); + } catch (ResourceNotFoundException e) { + tableExists = false; + } + if (tableExists) { + fail("Table already exists: " + tableName); + } + } + + private void deleteTable(DynamoDB db, String tableName) throws + InterruptedException { + Table table = db.getTable(tableName); + table.waitForActive(); + table.delete(); + table.waitForDelete(); + } + + @Test + public void testConcurrentTableCreations() throws Exception { + Configuration conf = getConfiguration(); + Assume.assumeTrue("Test only applies when DynamoDB is used for S3Guard", + conf.get(Constants.S3_METADATA_STORE_IMPL).equals( + Constants.S3GUARD_METASTORE_DYNAMO)); + + DynamoDBMetadataStore ms = new DynamoDBMetadataStore(); + ms.initialize(conf); + DynamoDB db = ms.getDynamoDB(); + + String tableName = "testConcurrentTableCreations" + new Random().nextInt(); + conf.setBoolean(Constants.S3GUARD_DDB_TABLE_CREATE_KEY, true); + conf.set(Constants.S3GUARD_DDB_TABLE_NAME_KEY, tableName); + int concurrentOps = 16; + int iterations = 4; + + failIfTableExists(db, tableName); + + for (int i = 0; i < iterations; i++) { + ExecutorService executor = Executors.newFixedThreadPool( + concurrentOps, new ThreadFactory() { + private AtomicInteger count = new AtomicInteger(0); + + public Thread newThread(Runnable r) { + return new Thread(r, + "testConcurrentTableCreations" + count.getAndIncrement()); + } + }); + ((ThreadPoolExecutor) executor).prestartAllCoreThreads(); + Future<Boolean>[] futures = new Future[concurrentOps]; + int exceptionsThrown = 0; + for (int f = 0; f < concurrentOps; f++) { + final int index = f; + futures[f] = executor.submit(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + + boolean result = false; + try { + new DynamoDBMetadataStore().initialize(conf); + } catch (Exception e) { + LOG.error(e.getClass() + ": " + e.getMessage()); + result = true; + } + + timer.end("parallel DynamoDB client creation %d", index); + LOG.info("Parallel DynamoDB client creation {} ran from {} to {}", + index, timer.getStartTime(), timer.getEndTime()); + return result; + } + }); + } + for (int f = 0; f < concurrentOps; f++) { + if (futures[f].get()) { + exceptionsThrown++; + } + } + deleteTable(db, tableName); + if (exceptionsThrown > 0) { + fail(exceptionsThrown + "/" + concurrentOps + + " threads threw exceptions while initializing on iteration " + i); + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org