http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java new file mode 100644 index 0000000..66ada49 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import java.io.IOException; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; +import com.google.common.base.Preconditions; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory; + +import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_REGION_KEY; +import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet; + +/** + * Interface to create a DynamoDB client. + * + * Implementation should be configured for setting and getting configuration. + */ [email protected] +public interface DynamoDBClientFactory extends Configurable { + Logger LOG = LoggerFactory.getLogger(DynamoDBClientFactory.class); + + /** + * Create a DynamoDB client object from configuration. + * + * The DynamoDB client to create does not have to relate to any S3 buckets. + * All information needed to create a DynamoDB client is from the hadoop + * configuration. Specially, if the region is not configured, it will use the + * provided region parameter. If region is neither configured nor provided, + * it will indicate an error. + * + * @param defaultRegion the default region of the AmazonDynamoDB client + * @return a new DynamoDB client + * @throws IOException if any IO error happens + */ + AmazonDynamoDB createDynamoDBClient(String defaultRegion) throws IOException; + + /** + * The default implementation for creating an AmazonDynamoDB. + */ + class DefaultDynamoDBClientFactory extends Configured + implements DynamoDBClientFactory { + @Override + public AmazonDynamoDB createDynamoDBClient(String defaultRegion) + throws IOException { + Preconditions.checkNotNull(getConf(), + "Should have been configured before usage"); + + final Configuration conf = getConf(); + final AWSCredentialsProvider credentials = + createAWSCredentialProviderSet(null, conf); + final ClientConfiguration awsConf = + DefaultS3ClientFactory.createAwsConf(conf); + + final String region = getRegion(conf, defaultRegion); + LOG.debug("Creating DynamoDB client in region {}", region); + + return AmazonDynamoDBClientBuilder.standard() + .withCredentials(credentials) + .withClientConfiguration(awsConf) + .withRegion(region) + .build(); + } + + /** + * Helper method to get and validate the AWS region for DynamoDBClient. + * + * @param conf configuration + * @param defaultRegion the default region + * @return configured region or else the provided default region + * @throws IOException if the region is not valid + */ + static String getRegion(Configuration conf, String defaultRegion) + throws IOException { + String region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY); + if (StringUtils.isEmpty(region)) { + region = defaultRegion; + } + try { + Regions.fromName(region); + } catch (IllegalArgumentException | NullPointerException e) { + throw new IOException("Invalid region specified: " + region + "; " + + "Region can be configured with " + S3GUARD_DDB_REGION_KEY + ": " + + validRegionsString()); + } + return region; + } + + private static String validRegionsString() { + final String delimiter = ", "; + Regions[] regions = Regions.values(); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < regions.length; i++) { + if (i > 0) { + sb.append(delimiter); + } + sb.append(regions[i].getName()); + } + return sb.toString(); + + } + } + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java new file mode 100644 index 0000000..1bed03d --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java @@ -0,0 +1,1010 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.Item; +import com.amazonaws.services.dynamodbv2.document.ItemCollection; +import com.amazonaws.services.dynamodbv2.document.PrimaryKey; +import com.amazonaws.services.dynamodbv2.document.PutItemOutcome; +import com.amazonaws.services.dynamodbv2.document.QueryOutcome; +import com.amazonaws.services.dynamodbv2.document.ScanOutcome; +import com.amazonaws.services.dynamodbv2.document.Table; +import com.amazonaws.services.dynamodbv2.document.TableWriteItems; +import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec; +import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec; +import com.amazonaws.services.dynamodbv2.document.utils.ValueMap; +import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription; +import com.amazonaws.services.dynamodbv2.model.ResourceInUseException; +import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException; +import com.amazonaws.services.dynamodbv2.model.WriteRequest; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AInstrumentation; +import org.apache.hadoop.fs.s3a.Tristate; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; + +import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.S3AUtils.translateException; +import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*; +import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.*; + +/** + * DynamoDBMetadataStore is a {@link MetadataStore} that persists + * file system metadata to DynamoDB. + * + * The current implementation uses a schema consisting of a single table. The + * name of the table can be configured by config key + * {@link org.apache.hadoop.fs.s3a.Constants#S3GUARD_DDB_TABLE_NAME_KEY}. + * By default, it matches the name of the S3 bucket. Each item in the table + * represents a single directory or file. Its path is split into separate table + * attributes: + * <ul> + * <li> parent (absolute path of the parent, with bucket name inserted as + * first path component). </li> + * <li> child (path of that specific child, relative to parent). </li> + * <li> optional boolean attribute tracking whether the path is a directory. + * Absence or a false value indicates the path is a file. </li> + * <li> optional long attribute revealing modification time of file. + * This attribute is meaningful only to file items.</li> + * <li> optional long attribute revealing file length. + * This attribute is meaningful only to file items.</li> + * <li> optional long attribute revealing block size of the file. + * This attribute is meaningful only to file items.</li> + * </ul> + * + * The DynamoDB partition key is the parent, and the range key is the child. + * + * To allow multiple buckets to share the same DynamoDB table, the bucket + * name is treated as the root directory. + * + * For example, assume the consistent store contains metadata representing this + * file system structure: + * + * <pre> + * s3a://bucket/dir1 + * |-- dir2 + * | |-- file1 + * | `-- file2 + * `-- dir3 + * |-- dir4 + * | `-- file3 + * |-- dir5 + * | `-- file4 + * `-- dir6 + * </pre> + * + * This is persisted to a single DynamoDB table as: + * + * <pre> + * ========================================================================= + * | parent | child | is_dir | mod_time | len | ... | + * ========================================================================= + * | /bucket | dir1 | true | | | | + * | /bucket/dir1 | dir2 | true | | | | + * | /bucket/dir1 | dir3 | true | | | | + * | /bucket/dir1/dir2 | file1 | | 100 | 111 | | + * | /bucket/dir1/dir2 | file2 | | 200 | 222 | | + * | /bucket/dir1/dir3 | dir4 | true | | | | + * | /bucket/dir1/dir3 | dir5 | true | | | | + * | /bucket/dir1/dir3/dir4 | file3 | | 300 | 333 | | + * | /bucket/dir1/dir3/dir5 | file4 | | 400 | 444 | | + * | /bucket/dir1/dir3 | dir6 | true | | | | + * ========================================================================= + * </pre> + * + * This choice of schema is efficient for read access patterns. + * {@link #get(Path)} can be served from a single item lookup. + * {@link #listChildren(Path)} can be served from a query against all rows + * matching the parent (the partition key) and the returned list is guaranteed + * to be sorted by child (the range key). Tracking whether or not a path is a + * directory helps prevent unnecessary queries during traversal of an entire + * sub-tree. + * + * Some mutating operations, notably {@link #deleteSubtree(Path)} and + * {@link #move(Collection, Collection)}, are less efficient with this schema. + * They require mutating multiple items in the DynamoDB table. + * + * By default, DynamoDB access is performed within the same AWS region as + * the S3 bucket that hosts the S3A instance. During initialization, it checks + * the location of the S3 bucket and creates a DynamoDB client connected to the + * same region. The region may also be set explicitly by setting the config + * parameter {@code fs.s3a.s3guard.ddb.region} to the corresponding region. + */ [email protected] [email protected] +public class DynamoDBMetadataStore implements MetadataStore { + public static final Logger LOG = LoggerFactory.getLogger( + DynamoDBMetadataStore.class); + + /** parent/child name to use in the version marker. */ + public static final String VERSION_MARKER = "../VERSION"; + + /** Current version number. */ + public static final int VERSION = 100; + + /** Error: version marker not found in table. */ + public static final String E_NO_VERSION_MARKER + = "S3Guard table lacks version marker."; + + /** Error: version mismatch. */ + public static final String E_INCOMPATIBLE_VERSION + = "Database table is from an incompatible S3Guard version."; + + /** Initial delay for retries when batched operations get throttled by + * DynamoDB. Value is {@value} msec. */ + public static final long MIN_RETRY_SLEEP_MSEC = 100; + + private static ValueMap deleteTrackingValueMap = + new ValueMap().withBoolean(":false", false); + + private DynamoDB dynamoDB; + private String region; + private Table table; + private String tableName; + private Configuration conf; + private String username; + + private RetryPolicy dataAccessRetryPolicy; + private S3AInstrumentation.S3GuardInstrumentation instrumentation; + + /** + * A utility function to create DynamoDB instance. + * @param conf the file system configuration + * @param s3Region region of the associated S3 bucket (if any). + * @return DynamoDB instance. + * @throws IOException I/O error. + */ + private static DynamoDB createDynamoDB(Configuration conf, String s3Region) + throws IOException { + Preconditions.checkNotNull(conf); + final Class<? extends DynamoDBClientFactory> cls = conf.getClass( + S3GUARD_DDB_CLIENT_FACTORY_IMPL, + S3GUARD_DDB_CLIENT_FACTORY_IMPL_DEFAULT, + DynamoDBClientFactory.class); + LOG.debug("Creating DynamoDB client {} with S3 region {}", cls, s3Region); + final AmazonDynamoDB dynamoDBClient = ReflectionUtils.newInstance(cls, conf) + .createDynamoDBClient(s3Region); + return new DynamoDB(dynamoDBClient); + } + + @Override + public void initialize(FileSystem fs) throws IOException { + Preconditions.checkArgument(fs instanceof S3AFileSystem, + "DynamoDBMetadataStore only supports S3A filesystem."); + final S3AFileSystem s3afs = (S3AFileSystem) fs; + instrumentation = s3afs.getInstrumentation().getS3GuardInstrumentation(); + final String bucket = s3afs.getBucket(); + String confRegion = s3afs.getConf().getTrimmed(S3GUARD_DDB_REGION_KEY); + if (!StringUtils.isEmpty(confRegion)) { + region = confRegion; + LOG.debug("Overriding S3 region with configured DynamoDB region: {}", + region); + } else { + region = s3afs.getBucketLocation(); + LOG.debug("Inferring DynamoDB region from S3 bucket: {}", region); + } + username = s3afs.getUsername(); + conf = s3afs.getConf(); + dynamoDB = createDynamoDB(conf, region); + + // use the bucket as the DynamoDB table name if not specified in config + tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY, bucket); + setMaxRetries(conf); + + initTable(); + + instrumentation.initialized(); + } + + /** + * Performs one-time initialization of the metadata store via configuration. + * + * This initialization depends on the configuration object to get AWS + * credentials, DynamoDBFactory implementation class, DynamoDB endpoints, + * DynamoDB table names etc. After initialization, this metadata store does + * not explicitly relate to any S3 bucket, which be nonexistent. + * + * This is used to operate the metadata store directly beyond the scope of the + * S3AFileSystem integration, e.g. command line tools. + * Generally, callers should use {@link #initialize(FileSystem)} + * with an initialized {@code S3AFileSystem} instance. + * + * Without a filesystem to act as a reference point, the configuration itself + * must declare the table name and region in the + * {@link Constants#S3GUARD_DDB_TABLE_NAME_KEY} and + * {@link Constants#S3GUARD_DDB_REGION_KEY} respectively. + * + * @see #initialize(FileSystem) + * @throws IOException if there is an error + * @throws IllegalArgumentException if the configuration is incomplete + */ + @Override + public void initialize(Configuration config) throws IOException { + conf = config; + // use the bucket as the DynamoDB table name if not specified in config + tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY); + Preconditions.checkArgument(!StringUtils.isEmpty(tableName), + "No DynamoDB table name configured"); + region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY); + Preconditions.checkArgument(!StringUtils.isEmpty(region), + "No DynamoDB region configured"); + dynamoDB = createDynamoDB(conf, region); + + username = UserGroupInformation.getCurrentUser().getShortUserName(); + setMaxRetries(conf); + + initTable(); + } + + /** + * Set retry policy. This is driven by the value of + * {@link Constants#S3GUARD_DDB_MAX_RETRIES} with an exponential backoff + * between each attempt of {@link #MIN_RETRY_SLEEP_MSEC} milliseconds. + * @param config + */ + private void setMaxRetries(Configuration config) { + int maxRetries = config.getInt(S3GUARD_DDB_MAX_RETRIES, + S3GUARD_DDB_MAX_RETRIES_DEFAULT); + dataAccessRetryPolicy = RetryPolicies + .exponentialBackoffRetry(maxRetries, MIN_RETRY_SLEEP_MSEC, + TimeUnit.MILLISECONDS); + } + + @Override + public void delete(Path path) throws IOException { + innerDelete(path, true); + } + + @Override + public void forgetMetadata(Path path) throws IOException { + innerDelete(path, false); + } + + /** + * Inner delete option, action based on the {@code tombstone} flag. + * No tombstone: delete the entry. Tombstone: create a tombstone entry. + * There is no check as to whether the entry exists in the table first. + * @param path path to delete + * @param tombstone flag to create a tombstone marker + * @throws IOException I/O error. + */ + private void innerDelete(Path path, boolean tombstone) + throws IOException { + path = checkPath(path); + LOG.debug("Deleting from table {} in region {}: {}", + tableName, region, path); + + // deleting nonexistent item consumes 1 write capacity; skip it + if (path.isRoot()) { + LOG.debug("Skip deleting root directory as it does not exist in table"); + return; + } + + try { + if (tombstone) { + Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem( + PathMetadata.tombstone(path)); + table.putItem(item); + } else { + table.deleteItem(pathToKey(path)); + } + } catch (AmazonClientException e) { + throw translateException("delete", path, e); + } + } + + @Override + public void deleteSubtree(Path path) throws IOException { + path = checkPath(path); + LOG.debug("Deleting subtree from table {} in region {}: {}", + tableName, region, path); + + final PathMetadata meta = get(path); + if (meta == null || meta.isDeleted()) { + LOG.debug("Subtree path {} does not exist; this will be a no-op", path); + return; + } + + for (DescendantsIterator desc = new DescendantsIterator(this, meta); + desc.hasNext();) { + innerDelete(desc.next().getPath(), true); + } + } + + private Item getConsistentItem(PrimaryKey key) { + final GetItemSpec spec = new GetItemSpec() + .withPrimaryKey(key) + .withConsistentRead(true); // strictly consistent read + return table.getItem(spec); + } + + @Override + public PathMetadata get(Path path) throws IOException { + return get(path, false); + } + + @Override + public PathMetadata get(Path path, boolean wantEmptyDirectoryFlag) + throws IOException { + path = checkPath(path); + LOG.debug("Get from table {} in region {}: {}", tableName, region, path); + + try { + final PathMetadata meta; + if (path.isRoot()) { + // Root does not persist in the table + meta = new PathMetadata(makeDirStatus(username, path)); + } else { + final Item item = getConsistentItem(pathToKey(path)); + meta = itemToPathMetadata(item, username); + LOG.debug("Get from table {} in region {} returning for {}: {}", + tableName, region, path, meta); + } + + if (wantEmptyDirectoryFlag && meta != null) { + final FileStatus status = meta.getFileStatus(); + // for directory, we query its direct children to determine isEmpty bit + if (status.isDirectory()) { + final QuerySpec spec = new QuerySpec() + .withHashKey(pathToParentKeyAttribute(path)) + .withConsistentRead(true) + .withFilterExpression(IS_DELETED + " = :false") + .withValueMap(deleteTrackingValueMap); + final ItemCollection<QueryOutcome> items = table.query(spec); + boolean hasChildren = items.iterator().hasNext(); + // When this class has support for authoritative + // (fully-cached) directory listings, we may also be able to answer + // TRUE here. Until then, we don't know if we have full listing or + // not, thus the UNKNOWN here: + meta.setIsEmptyDirectory( + hasChildren ? Tristate.FALSE : Tristate.UNKNOWN); + } + } + + return meta; + } catch (AmazonClientException e) { + throw translateException("get", path, e); + } + } + + /** + * Make a FileStatus object for a directory at given path. The FileStatus + * only contains what S3A needs, and omits mod time since S3A uses its own + * implementation which returns current system time. + * @param owner username of owner + * @param path path to dir + * @return new FileStatus + */ + private FileStatus makeDirStatus(String owner, Path path) { + return new FileStatus(0, true, 1, 0, 0, 0, null, + owner, null, path); + } + + @Override + public DirListingMetadata listChildren(Path path) throws IOException { + path = checkPath(path); + LOG.debug("Listing table {} in region {}: {}", tableName, region, path); + + // find the children in the table + try { + final QuerySpec spec = new QuerySpec() + .withHashKey(pathToParentKeyAttribute(path)) + .withConsistentRead(true); // strictly consistent read + final ItemCollection<QueryOutcome> items = table.query(spec); + + final List<PathMetadata> metas = new ArrayList<>(); + for (Item item : items) { + PathMetadata meta = itemToPathMetadata(item, username); + metas.add(meta); + } + LOG.trace("Listing table {} in region {} for {} returning {}", + tableName, region, path, metas); + + return (metas.isEmpty() && get(path) == null) + ? null + : new DirListingMetadata(path, metas, false); + } catch (AmazonClientException e) { + // failure, including the path not being present + throw translateException("listChildren", path, e); + } + } + + // build the list of all parent entries. + Collection<PathMetadata> completeAncestry( + Collection<PathMetadata> pathsToCreate) { + // Key on path to allow fast lookup + Map<Path, PathMetadata> ancestry = new HashMap<>(); + + for (PathMetadata meta : pathsToCreate) { + Preconditions.checkArgument(meta != null); + Path path = meta.getFileStatus().getPath(); + if (path.isRoot()) { + break; + } + ancestry.put(path, meta); + Path parent = path.getParent(); + while (!parent.isRoot() && !ancestry.containsKey(parent)) { + LOG.debug("auto-create ancestor path {} for child path {}", + parent, path); + final FileStatus status = makeDirStatus(parent, username); + ancestry.put(parent, new PathMetadata(status, Tristate.FALSE, false)); + parent = parent.getParent(); + } + } + return ancestry.values(); + } + + @Override + public void move(Collection<Path> pathsToDelete, + Collection<PathMetadata> pathsToCreate) throws IOException { + if (pathsToDelete == null && pathsToCreate == null) { + return; + } + + LOG.debug("Moving paths of table {} in region {}: {} paths to delete and {}" + + " paths to create", tableName, region, + pathsToDelete == null ? 0 : pathsToDelete.size(), + pathsToCreate == null ? 0 : pathsToCreate.size()); + LOG.trace("move: pathsToDelete = {}, pathsToCreate = {}", pathsToDelete, + pathsToCreate); + + // In DynamoDBMetadataStore implementation, we assume that if a path + // exists, all its ancestors will also exist in the table. + // Following code is to maintain this invariant by putting all ancestor + // directories of the paths to create. + // ancestor paths that are not explicitly added to paths to create + Collection<PathMetadata> newItems = new ArrayList<>(); + if (pathsToCreate != null) { + newItems.addAll(completeAncestry(pathsToCreate)); + } + if (pathsToDelete != null) { + for (Path meta : pathsToDelete) { + newItems.add(PathMetadata.tombstone(meta)); + } + } + + try { + processBatchWriteRequest(null, pathMetadataToItem(newItems)); + } catch (AmazonClientException e) { + throw translateException("move", (String) null, e); + } + } + + /** + * Helper method to issue a batch write request to DynamoDB. + * + * Callers of this method should catch the {@link AmazonClientException} and + * translate it for better error report and easier debugging. + * @param keysToDelete primary keys to be deleted; can be null + * @param itemsToPut new items to be put; can be null + */ + private void processBatchWriteRequest(PrimaryKey[] keysToDelete, + Item[] itemsToPut) throws IOException { + final int totalToDelete = (keysToDelete == null ? 0 : keysToDelete.length); + final int totalToPut = (itemsToPut == null ? 0 : itemsToPut.length); + int count = 0; + while (count < totalToDelete + totalToPut) { + final TableWriteItems writeItems = new TableWriteItems(tableName); + int numToDelete = 0; + if (keysToDelete != null + && count < totalToDelete) { + numToDelete = Math.min(S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT, + totalToDelete - count); + writeItems.withPrimaryKeysToDelete( + Arrays.copyOfRange(keysToDelete, count, count + numToDelete)); + count += numToDelete; + } + + if (numToDelete < S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT + && itemsToPut != null + && count < totalToDelete + totalToPut) { + final int numToPut = Math.min( + S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT - numToDelete, + totalToDelete + totalToPut - count); + final int index = count - totalToDelete; + writeItems.withItemsToPut( + Arrays.copyOfRange(itemsToPut, index, index + numToPut)); + count += numToPut; + } + + BatchWriteItemOutcome res = dynamoDB.batchWriteItem(writeItems); + // Check for unprocessed keys in case of exceeding provisioned throughput + Map<String, List<WriteRequest>> unprocessed = res.getUnprocessedItems(); + int retryCount = 0; + while (unprocessed.size() > 0) { + retryBackoff(retryCount++); + res = dynamoDB.batchWriteItemUnprocessed(unprocessed); + unprocessed = res.getUnprocessedItems(); + } + } + } + + /** + * Put the current thread to sleep to implement exponential backoff + * depending on retryCount. If max retries are exceeded, throws an + * exception instead. + * @param retryCount number of retries so far + * @throws IOException when max retryCount is exceeded. + */ + private void retryBackoff(int retryCount) throws IOException { + try { + // Our RetryPolicy ignores everything but retryCount here. + RetryPolicy.RetryAction action = dataAccessRetryPolicy.shouldRetry(null, + retryCount, 0, true); + if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { + throw new IOException( + String.format("Max retries exceeded (%d) for DynamoDB", + retryCount)); + } else { + LOG.debug("Sleeping {} msec before next retry", action.delayMillis); + Thread.sleep(action.delayMillis); + } + } catch (Exception e) { + throw new IOException("Unexpected exception", e); + } + } + + @Override + public void put(PathMetadata meta) throws IOException { + // For a deeply nested path, this method will automatically create the full + // ancestry and save respective item in DynamoDB table. + // So after put operation, we maintain the invariant that if a path exists, + // all its ancestors will also exist in the table. + // For performance purpose, we generate the full paths to put and use batch + // write item request to save the items. + LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta); + + Collection<PathMetadata> wrapper = new ArrayList<>(1); + wrapper.add(meta); + put(wrapper); + } + + @Override + public void put(Collection<PathMetadata> metas) throws IOException { + LOG.debug("Saving batch to table {} in region {}", tableName, region); + + processBatchWriteRequest(null, pathMetadataToItem(completeAncestry(metas))); + } + + /** + * Helper method to get full path of ancestors that are nonexistent in table. + */ + private Collection<PathMetadata> fullPathsToPut(PathMetadata meta) + throws IOException { + checkPathMetadata(meta); + final Collection<PathMetadata> metasToPut = new ArrayList<>(); + // root path is not persisted + if (!meta.getFileStatus().getPath().isRoot()) { + metasToPut.add(meta); + } + + // put all its ancestors if not present; as an optimization we return at its + // first existent ancestor + Path path = meta.getFileStatus().getPath().getParent(); + while (path != null && !path.isRoot()) { + final Item item = getConsistentItem(pathToKey(path)); + if (!itemExists(item)) { + final FileStatus status = makeDirStatus(path, username); + metasToPut.add(new PathMetadata(status, Tristate.FALSE, false)); + path = path.getParent(); + } else { + break; + } + } + return metasToPut; + } + + private boolean itemExists(Item item) { + if (item == null) { + return false; + } + if (item.hasAttribute(IS_DELETED) && + item.getBoolean(IS_DELETED)) { + return false; + } + return true; + } + + /** Create a directory FileStatus using current system time as mod time. */ + static FileStatus makeDirStatus(Path f, String owner) { + return new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, + null, owner, owner, f); + } + + @Override + public void put(DirListingMetadata meta) throws IOException { + LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta); + + // directory path + PathMetadata p = new PathMetadata(makeDirStatus(meta.getPath(), username), + meta.isEmpty(), false); + + // First add any missing ancestors... + final Collection<PathMetadata> metasToPut = fullPathsToPut(p); + + // next add all children of the directory + metasToPut.addAll(meta.getListing()); + + try { + processBatchWriteRequest(null, pathMetadataToItem(metasToPut)); + } catch (AmazonClientException e) { + throw translateException("put", (String) null, e); + } + } + + @Override + public synchronized void close() { + if (instrumentation != null) { + instrumentation.storeClosed(); + } + if (dynamoDB != null) { + LOG.debug("Shutting down {}", this); + dynamoDB.shutdown(); + dynamoDB = null; + } + } + + @Override + public void destroy() throws IOException { + if (table == null) { + LOG.info("In destroy(): no table to delete"); + return; + } + LOG.info("Deleting DynamoDB table {} in region {}", tableName, region); + Preconditions.checkNotNull(dynamoDB, "Not connected to DynamoDB"); + try { + table.delete(); + table.waitForDelete(); + } catch (ResourceNotFoundException rnfe) { + LOG.info("ResourceNotFoundException while deleting DynamoDB table {} in " + + "region {}. This may indicate that the table does not exist, " + + "or has been deleted by another concurrent thread or process.", + tableName, region); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while waiting for DynamoDB table {} being deleted", + tableName, ie); + throw new InterruptedIOException("Table " + tableName + + " in region " + region + " has not been deleted"); + } catch (AmazonClientException e) { + throw translateException("destroy", (String) null, e); + } + } + + private ItemCollection<ScanOutcome> expiredFiles(long modTime) { + String filterExpression = "mod_time < :mod_time"; + String projectionExpression = "parent,child"; + ValueMap map = new ValueMap().withLong(":mod_time", modTime); + return table.scan(filterExpression, projectionExpression, null, map); + } + + @Override + public void prune(long modTime) throws IOException { + int itemCount = 0; + try { + Collection<Path> deletionBatch = + new ArrayList<>(S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT); + int delay = conf.getInt(S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY, + S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT); + for (Item item : expiredFiles(modTime)) { + PathMetadata md = PathMetadataDynamoDBTranslation + .itemToPathMetadata(item, username); + Path path = md.getFileStatus().getPath(); + deletionBatch.add(path); + itemCount++; + if (deletionBatch.size() == S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT) { + Thread.sleep(delay); + processBatchWriteRequest(pathToKey(deletionBatch), null); + deletionBatch.clear(); + } + } + if (deletionBatch.size() > 0) { + Thread.sleep(delay); + processBatchWriteRequest(pathToKey(deletionBatch), null); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException("Pruning was interrupted"); + } + LOG.info("Finished pruning {} items in batches of {}", itemCount, + S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT); + } + + @Override + public String toString() { + return getClass().getSimpleName() + '{' + + "region=" + region + + ", tableName=" + tableName + + '}'; + } + + /** + * Create a table if it does not exist and wait for it to become active. + * + * If a table with the intended name already exists, then it uses that table. + * Otherwise, it will automatically create the table if the config + * {@link org.apache.hadoop.fs.s3a.Constants#S3GUARD_DDB_TABLE_CREATE_KEY} is + * enabled. The DynamoDB table creation API is asynchronous. This method wait + * for the table to become active after sending the creation request, so + * overall, this method is synchronous, and the table is guaranteed to exist + * after this method returns successfully. + * + * @throws IOException if table does not exist and auto-creation is disabled; + * or table is being deleted, or any other I/O exception occurred. + */ + @VisibleForTesting + void initTable() throws IOException { + table = dynamoDB.getTable(tableName); + try { + try { + LOG.debug("Binding to table {}", tableName); + final String status = table.describe().getTableStatus(); + switch (status) { + case "CREATING": + case "UPDATING": + LOG.debug("Table {} in region {} is being created/updated. This may" + + " indicate that the table is being operated by another " + + "concurrent thread or process. Waiting for active...", + tableName, region); + waitForTableActive(table); + break; + case "DELETING": + throw new FileNotFoundException("DynamoDB table " + + "'" + tableName + "' is being " + + "deleted in region " + region); + case "ACTIVE": + break; + default: + throw new IOException("Unknown DynamoDB table status " + status + + ": tableName='" + tableName + "', region=" + region); + } + + final Item versionMarker = getVersionMarkerItem(); + verifyVersionCompatibility(tableName, versionMarker); + Long created = extractCreationTimeFromMarker(versionMarker); + LOG.debug("Using existing DynamoDB table {} in region {} created {}", + tableName, region, (created != null) ? new Date(created) : null); + } catch (ResourceNotFoundException rnfe) { + if (conf.getBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, false)) { + final ProvisionedThroughput capacity = new ProvisionedThroughput( + conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_READ_KEY, + S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT), + conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY, + S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT)); + + createTable(capacity); + } else { + throw new FileNotFoundException("DynamoDB table " + + "'" + tableName + "' does not " + + "exist in region " + region + "; auto-creation is turned off"); + } + } + + } catch (AmazonClientException e) { + throw translateException("initTable", (String) null, e); + } + } + + /** + * Get the version mark item in the existing DynamoDB table. + * + * As the version marker item may be created by another concurrent thread or + * process, we retry a limited times before we fail to get it. + */ + private Item getVersionMarkerItem() throws IOException { + final PrimaryKey versionMarkerKey = + createVersionMarkerPrimaryKey(VERSION_MARKER); + int retryCount = 0; + Item versionMarker = table.getItem(versionMarkerKey); + while (versionMarker == null) { + try { + RetryPolicy.RetryAction action = dataAccessRetryPolicy.shouldRetry(null, + retryCount, 0, true); + if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { + break; + } else { + LOG.debug("Sleeping {} ms before next retry", action.delayMillis); + Thread.sleep(action.delayMillis); + } + } catch (Exception e) { + throw new IOException("initTable: Unexpected exception", e); + } + retryCount++; + versionMarker = table.getItem(versionMarkerKey); + } + return versionMarker; + } + + /** + * Verify that a table version is compatible with this S3Guard client. + * @param tableName name of the table (for error messages) + * @param versionMarker the version marker retrieved from the table + * @throws IOException on any incompatibility + */ + @VisibleForTesting + static void verifyVersionCompatibility(String tableName, + Item versionMarker) throws IOException { + if (versionMarker == null) { + LOG.warn("Table {} contains no version marker", tableName); + throw new IOException(E_NO_VERSION_MARKER + + " Table: " + tableName); + } else { + final int version = extractVersionFromMarker(versionMarker); + if (VERSION != version) { + // version mismatch. Unless/until there is support for + // upgrading versions, treat this as an incompatible change + // and fail. + throw new IOException(E_INCOMPATIBLE_VERSION + + " Table "+ tableName + + " Expected version " + VERSION + " actual " + version); + } + } + } + + /** + * Wait for table being active. + * @param t table to block on. + * @throws IOException IO problems + * @throws InterruptedIOException if the wait was interrupted + */ + private void waitForTableActive(Table t) throws IOException { + try { + t.waitForActive(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for table {} in region {} active", + tableName, region, e); + Thread.currentThread().interrupt(); + throw (IOException) new InterruptedIOException("DynamoDB table '" + + tableName + "' is not active yet in region " + region).initCause(e); + } + } + + /** + * Create a table, wait for it to become active, then add the version + * marker. + * @param capacity capacity to provision + * @throws IOException on any failure. + * @throws InterruptedIOException if the wait was interrupted + */ + private void createTable(ProvisionedThroughput capacity) throws IOException { + try { + LOG.info("Creating non-existent DynamoDB table {} in region {}", + tableName, region); + table = dynamoDB.createTable(new CreateTableRequest() + .withTableName(tableName) + .withKeySchema(keySchema()) + .withAttributeDefinitions(attributeDefinitions()) + .withProvisionedThroughput(capacity)); + LOG.debug("Awaiting table becoming active"); + } catch (ResourceInUseException e) { + LOG.warn("ResourceInUseException while creating DynamoDB table {} " + + "in region {}. This may indicate that the table was " + + "created by another concurrent thread or process.", + tableName, region); + } + waitForTableActive(table); + final Item marker = createVersionMarker(VERSION_MARKER, VERSION, + System.currentTimeMillis()); + putItem(marker); + } + + /** + * PUT a single item to the table. + * @param item item to put + * @return the outcome. + */ + PutItemOutcome putItem(Item item) { + LOG.debug("Putting item {}", item); + return table.putItem(item); + } + + /** + * Provision the table with given read and write capacity units. + */ + void provisionTable(Long readCapacity, Long writeCapacity) + throws IOException { + final ProvisionedThroughput toProvision = new ProvisionedThroughput() + .withReadCapacityUnits(readCapacity) + .withWriteCapacityUnits(writeCapacity); + try { + final ProvisionedThroughputDescription p = + table.updateTable(toProvision).getProvisionedThroughput(); + LOG.info("Provision table {} in region {}: readCapacityUnits={}, " + + "writeCapacityUnits={}", + tableName, region, p.getReadCapacityUnits(), + p.getWriteCapacityUnits()); + } catch (AmazonClientException e) { + throw translateException("provisionTable", (String) null, e); + } + } + + Table getTable() { + return table; + } + + String getRegion() { + return region; + } + + @VisibleForTesting + DynamoDB getDynamoDB() { + return dynamoDB; + } + + /** + * Validates a path object; it must be absolute, and contain a host + * (bucket) component. + */ + private Path checkPath(Path path) { + Preconditions.checkNotNull(path); + Preconditions.checkArgument(path.isAbsolute(), "Path %s is not absolute", + path); + URI uri = path.toUri(); + Preconditions.checkNotNull(uri.getScheme(), "Path %s missing scheme", path); + Preconditions.checkArgument(uri.getScheme().equals(Constants.FS_S3A), + "Path %s scheme must be %s", path, Constants.FS_S3A); + Preconditions.checkArgument(!StringUtils.isEmpty(uri.getHost()), "Path %s" + + " is missing bucket.", path); + return path; + } + + /** + * Validates a path meta-data object. + */ + private static void checkPathMetadata(PathMetadata meta) { + Preconditions.checkNotNull(meta); + Preconditions.checkNotNull(meta.getFileStatus()); + Preconditions.checkNotNull(meta.getFileStatus().getPath()); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java new file mode 100644 index 0000000..1ef8b0d --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Tristate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; + +/** + * This is a local, in-memory, implementation of MetadataStore. + * This is <i>not</i> a coherent cache across processes. It is only + * locally-coherent. + * + * The purpose of this is for unit and integration testing. + * It could also be used to accelerate local-only operations where only one + * process is operating on a given object store, or multiple processes are + * accessing a read-only storage bucket. + * + * This MetadataStore does not enforce filesystem rules such as disallowing + * non-recursive removal of non-empty directories. It is assumed the caller + * already has to perform these sorts of checks. + */ +public class LocalMetadataStore implements MetadataStore { + + public static final Logger LOG = LoggerFactory.getLogger(MetadataStore.class); + // TODO HADOOP-13649: use time instead of capacity for eviction. + public static final int DEFAULT_MAX_RECORDS = 128; + + /** + * Maximum number of records. + */ + public static final String CONF_MAX_RECORDS = + "fs.metadatastore.local.max_records"; + + /** Contains directories and files. */ + private LruHashMap<Path, PathMetadata> fileHash; + + /** Contains directory listings. */ + private LruHashMap<Path, DirListingMetadata> dirHash; + + private FileSystem fs; + /* Null iff this FS does not have an associated URI host. */ + private String uriHost; + + @Override + public void initialize(FileSystem fileSystem) throws IOException { + Preconditions.checkNotNull(fileSystem); + fs = fileSystem; + URI fsURI = fs.getUri(); + uriHost = fsURI.getHost(); + if (uriHost != null && uriHost.equals("")) { + uriHost = null; + } + + initialize(fs.getConf()); + } + + @Override + public void initialize(Configuration conf) throws IOException { + Preconditions.checkNotNull(conf); + int maxRecords = conf.getInt(CONF_MAX_RECORDS, DEFAULT_MAX_RECORDS); + if (maxRecords < 4) { + maxRecords = 4; + } + // Start w/ less than max capacity. Space / time trade off. + fileHash = new LruHashMap<>(maxRecords/2, maxRecords); + dirHash = new LruHashMap<>(maxRecords/4, maxRecords); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "LocalMetadataStore{"); + sb.append(", uriHost='").append(uriHost).append('\''); + sb.append('}'); + return sb.toString(); + } + + @Override + public void delete(Path p) throws IOException { + doDelete(p, false, true); + } + + @Override + public void forgetMetadata(Path p) throws IOException { + doDelete(p, false, false); + } + + @Override + public void deleteSubtree(Path path) throws IOException { + doDelete(path, true, true); + } + + private synchronized void doDelete(Path p, boolean recursive, boolean + tombstone) { + + Path path = standardize(p); + + // Delete entry from file cache, then from cached parent directory, if any + + deleteHashEntries(path, tombstone); + + if (recursive) { + // Remove all entries that have this dir as path prefix. + deleteHashByAncestor(path, dirHash, tombstone); + deleteHashByAncestor(path, fileHash, tombstone); + } + } + + @Override + public synchronized PathMetadata get(Path p) throws IOException { + return get(p, false); + } + + @Override + public PathMetadata get(Path p, boolean wantEmptyDirectoryFlag) + throws IOException { + Path path = standardize(p); + synchronized (this) { + PathMetadata m = fileHash.mruGet(path); + + if (wantEmptyDirectoryFlag && m != null && + m.getFileStatus().isDirectory()) { + m.setIsEmptyDirectory(isEmptyDirectory(p)); + } + + LOG.debug("get({}) -> {}", path, m == null ? "null" : m.prettyPrint()); + return m; + } + } + + /** + * Determine if directory is empty. + * Call with lock held. + * @param p a Path, already filtered through standardize() + * @return TRUE / FALSE if known empty / not-empty, UNKNOWN otherwise. + */ + private Tristate isEmptyDirectory(Path p) { + DirListingMetadata dirMeta = dirHash.get(p); + return dirMeta.withoutTombstones().isEmpty(); + } + + @Override + public synchronized DirListingMetadata listChildren(Path p) throws + IOException { + Path path = standardize(p); + DirListingMetadata listing = dirHash.mruGet(path); + if (LOG.isDebugEnabled()) { + LOG.debug("listChildren({}) -> {}", path, + listing == null ? "null" : listing.prettyPrint()); + } + // Make a copy so callers can mutate without affecting our state + return listing == null ? null : new DirListingMetadata(listing); + } + + @Override + public void move(Collection<Path> pathsToDelete, + Collection<PathMetadata> pathsToCreate) throws IOException { + + Preconditions.checkNotNull(pathsToDelete, "pathsToDelete is null"); + Preconditions.checkNotNull(pathsToCreate, "pathsToCreate is null"); + Preconditions.checkArgument(pathsToDelete.size() == pathsToCreate.size(), + "Must supply same number of paths to delete/create."); + + // I feel dirty for using reentrant lock. :-| + synchronized (this) { + + // 1. Delete pathsToDelete + for (Path meta : pathsToDelete) { + LOG.debug("move: deleting metadata {}", meta); + delete(meta); + } + + // 2. Create new destination path metadata + for (PathMetadata meta : pathsToCreate) { + LOG.debug("move: adding metadata {}", meta); + put(meta); + } + + // 3. We now know full contents of all dirs in destination subtree + for (PathMetadata meta : pathsToCreate) { + FileStatus status = meta.getFileStatus(); + if (status == null || status.isDirectory()) { + continue; + } + DirListingMetadata dir = listChildren(status.getPath()); + if (dir != null) { // could be evicted already + dir.setAuthoritative(true); + } + } + } + } + + @Override + public void put(PathMetadata meta) throws IOException { + + Preconditions.checkNotNull(meta); + FileStatus status = meta.getFileStatus(); + Path path = standardize(status.getPath()); + synchronized (this) { + + /* Add entry for this file. */ + if (LOG.isDebugEnabled()) { + LOG.debug("put {} -> {}", path, meta.prettyPrint()); + } + fileHash.put(path, meta); + + /* Directory case: + * We also make sure we have an entry in the dirHash, so subsequent + * listStatus(path) at least see the directory. + * + * If we had a boolean flag argument "isNew", we would know whether this + * is an existing directory the client discovered via getFileStatus(), + * or if it is a newly-created directory. In the latter case, we would + * be able to mark the directory as authoritative (fully-cached), + * saving round trips to underlying store for subsequent listStatus() + */ + + if (status.isDirectory()) { + DirListingMetadata dir = dirHash.mruGet(path); + if (dir == null) { + dirHash.put(path, new DirListingMetadata(path, DirListingMetadata + .EMPTY_DIR, false)); + } + } + + /* Update cached parent dir. */ + Path parentPath = path.getParent(); + if (parentPath != null) { + DirListingMetadata parent = dirHash.mruGet(parentPath); + if (parent == null) { + /* Track this new file's listing in parent. Parent is not + * authoritative, since there may be other items in it we don't know + * about. */ + parent = new DirListingMetadata(parentPath, + DirListingMetadata.EMPTY_DIR, false); + dirHash.put(parentPath, parent); + } + parent.put(status); + } + } + } + + @Override + public synchronized void put(DirListingMetadata meta) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("put dirMeta {}", meta.prettyPrint()); + } + dirHash.put(standardize(meta.getPath()), meta); + } + + public synchronized void put(Collection<PathMetadata> metas) throws + IOException { + for (PathMetadata meta : metas) { + put(meta); + } + } + + @Override + public void close() throws IOException { + + } + + @Override + public void destroy() throws IOException { + if (dirHash != null) { + dirHash.clear(); + } + } + + @Override + public synchronized void prune(long modTime) throws IOException { + Iterator<Map.Entry<Path, PathMetadata>> files = + fileHash.entrySet().iterator(); + while (files.hasNext()) { + Map.Entry<Path, PathMetadata> entry = files.next(); + if (expired(entry.getValue().getFileStatus(), modTime)) { + files.remove(); + } + } + Iterator<Map.Entry<Path, DirListingMetadata>> dirs = + dirHash.entrySet().iterator(); + while (dirs.hasNext()) { + Map.Entry<Path, DirListingMetadata> entry = dirs.next(); + Path path = entry.getKey(); + DirListingMetadata metadata = entry.getValue(); + Collection<PathMetadata> oldChildren = metadata.getListing(); + Collection<PathMetadata> newChildren = new LinkedList<>(); + + for (PathMetadata child : oldChildren) { + FileStatus status = child.getFileStatus(); + if (!expired(status, modTime)) { + newChildren.add(child); + } + } + if (newChildren.size() != oldChildren.size()) { + dirHash.put(path, new DirListingMetadata(path, newChildren, false)); + if (!path.isRoot()) { + DirListingMetadata parent = dirHash.get(path.getParent()); + if (parent != null) { + parent.setAuthoritative(false); + } + } + } + } + } + + private boolean expired(FileStatus status, long expiry) { + // Note: S3 doesn't track modification time on directories, so for + // consistency with the DynamoDB implementation we ignore that here + return status.getModificationTime() < expiry && !status.isDirectory(); + } + + @VisibleForTesting + static <T> void deleteHashByAncestor(Path ancestor, Map<Path, T> hash, + boolean tombstone) { + for (Iterator<Map.Entry<Path, T>> it = hash.entrySet().iterator(); + it.hasNext();) { + Map.Entry<Path, T> entry = it.next(); + Path f = entry.getKey(); + T meta = entry.getValue(); + if (isAncestorOf(ancestor, f)) { + if (tombstone) { + if (meta instanceof PathMetadata) { + entry.setValue((T) PathMetadata.tombstone(f)); + } else if (meta instanceof DirListingMetadata) { + it.remove(); + } else { + throw new IllegalStateException("Unknown type in hash"); + } + } else { + it.remove(); + } + } + } + } + + /** + * @return true iff 'ancestor' is ancestor dir in path 'f'. + * All paths here are absolute. Dir does not count as its own ancestor. + */ + private static boolean isAncestorOf(Path ancestor, Path f) { + String aStr = ancestor.toString(); + if (!ancestor.isRoot()) { + aStr += "/"; + } + String fStr = f.toString(); + return (fStr.startsWith(aStr)); + } + + /** + * Update fileHash and dirHash to reflect deletion of file 'f'. Call with + * lock held. + */ + private void deleteHashEntries(Path path, boolean tombstone) { + + // Remove target file/dir + LOG.debug("delete file entry for {}", path); + if (tombstone) { + fileHash.put(path, PathMetadata.tombstone(path)); + } else { + fileHash.remove(path); + } + + // Update this and parent dir listing, if any + + /* If this path is a dir, remove its listing */ + LOG.debug("removing listing of {}", path); + + dirHash.remove(path); + + /* Remove this path from parent's dir listing */ + Path parent = path.getParent(); + if (parent != null) { + DirListingMetadata dir = dirHash.get(parent); + if (dir != null) { + LOG.debug("removing parent's entry for {} ", path); + if (tombstone) { + dir.markDeleted(path); + } else { + dir.remove(path); + } + } + } + } + + /** + * Return a "standardized" version of a path so we always have a consistent + * hash value. Also asserts the path is absolute, and contains host + * component. + * @param p input Path + * @return standardized version of Path, suitable for hash key + */ + private Path standardize(Path p) { + Preconditions.checkArgument(p.isAbsolute(), "Path must be absolute"); + URI uri = p.toUri(); + if (uriHost != null) { + Preconditions.checkArgument(!isEmpty(uri.getHost())); + } + return p; + } + + private static boolean isEmpty(String s) { + return (s == null || s.isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LruHashMap.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LruHashMap.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LruHashMap.java new file mode 100644 index 0000000..e355095 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LruHashMap.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.s3a.s3guard; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * LinkedHashMap that implements a maximum size and LRU eviction policy. + */ +public class LruHashMap<K, V> extends LinkedHashMap<K, V> { + private final int maxSize; + public LruHashMap(int initialCapacity, int maxSize) { + super(initialCapacity); + this.maxSize = maxSize; + } + + @Override + protected boolean removeEldestEntry(Map.Entry<K, V> eldest) { + return size() > maxSize; + } + + /** + * get() plus side-effect of making the element Most Recently Used. + * @param key lookup key + * @return value + */ + + public V mruGet(K key) { + V val = remove(key); + if (val != null) { + put(key, val); + } + return val; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java new file mode 100644 index 0000000..dd8077b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * {@code MetadataStore} defines the set of operations that any metadata store + * implementation must provide. Note that all {@link Path} objects provided + * to methods must be absolute, not relative paths. + */ [email protected] [email protected] +public interface MetadataStore extends Closeable { + + /** + * Performs one-time initialization of the metadata store. + * + * @param fs {@code FileSystem} associated with the MetadataStore + * @throws IOException if there is an error + */ + void initialize(FileSystem fs) throws IOException; + + /** + * Performs one-time initialization of the metadata store via configuration. + * @see #initialize(FileSystem) + * @param conf Configuration. + * @throws IOException if there is an error + */ + void initialize(Configuration conf) throws IOException; + + /** + * Deletes exactly one path, leaving a tombstone to prevent lingering, + * inconsistent copies of it from being listed. + * + * @param path the path to delete + * @throws IOException if there is an error + */ + void delete(Path path) throws IOException; + + /** + * Removes the record of exactly one path. Does not leave a tombstone (see + * {@link MetadataStore#delete(Path)}. It is currently intended for testing + * only, and a need to use it as part of normal FileSystem usage is not + * anticipated. + * + * @param path the path to delete + * @throws IOException if there is an error + */ + @VisibleForTesting + void forgetMetadata(Path path) throws IOException; + + /** + * Deletes the entire sub-tree rooted at the given path, leaving tombstones + * to prevent lingering, inconsistent copies of it from being listed. + * + * In addition to affecting future calls to {@link #get(Path)}, + * implementations must also update any stored {@code DirListingMetadata} + * objects which track the parent of this file. + * + * @param path the root of the sub-tree to delete + * @throws IOException if there is an error + */ + void deleteSubtree(Path path) throws IOException; + + /** + * Gets metadata for a path. + * + * @param path the path to get + * @return metadata for {@code path}, {@code null} if not found + * @throws IOException if there is an error + */ + PathMetadata get(Path path) throws IOException; + + /** + * Gets metadata for a path. Alternate method that includes a hint + * whether or not the MetadataStore should do work to compute the value for + * {@link PathMetadata#isEmptyDirectory()}. Since determining emptiness + * may be an expensive operation, this can save wasted work. + * + * @param path the path to get + * @param wantEmptyDirectoryFlag Set to true to give a hint to the + * MetadataStore that it should try to compute the empty directory flag. + * @return metadata for {@code path}, {@code null} if not found + * @throws IOException if there is an error + */ + PathMetadata get(Path path, boolean wantEmptyDirectoryFlag) + throws IOException; + + /** + * Lists metadata for all direct children of a path. + * + * @param path the path to list + * @return metadata for all direct children of {@code path} which are being + * tracked by the MetadataStore, or {@code null} if the path was not found + * in the MetadataStore. + * @throws IOException if there is an error + */ + DirListingMetadata listChildren(Path path) throws IOException; + + /** + * Record the effects of a {@link FileSystem#rename(Path, Path)} in the + * MetadataStore. Clients provide explicit enumeration of the affected + * paths (recursively), before and after the rename. + * + * This operation is not atomic, unless specific implementations claim + * otherwise. + * + * On the need to provide an enumeration of directory trees instead of just + * source and destination paths: + * Since a MetadataStore does not have to track all metadata for the + * underlying storage system, and a new MetadataStore may be created on an + * existing underlying filesystem, this move() may be the first time the + * MetadataStore sees the affected paths. Therefore, simply providing src + * and destination paths may not be enough to record the deletions (under + * src path) and creations (at destination) that are happening during the + * rename(). + * + * @param pathsToDelete Collection of all paths that were removed from the + * source directory tree of the move. + * @param pathsToCreate Collection of all PathMetadata for the new paths + * that were created at the destination of the rename + * (). + * @throws IOException if there is an error + */ + void move(Collection<Path> pathsToDelete, + Collection<PathMetadata> pathsToCreate) throws IOException; + + /** + * Saves metadata for exactly one path. + * + * Implementations may pre-create all the path's ancestors automatically. + * Implementations must update any {@code DirListingMetadata} objects which + * track the immediate parent of this file. + * + * @param meta the metadata to save + * @throws IOException if there is an error + */ + void put(PathMetadata meta) throws IOException; + + /** + * Saves metadata for any number of paths. + * + * Semantics are otherwise the same as single-path puts. + * + * @param metas the metadata to save + * @throws IOException if there is an error + */ + void put(Collection<PathMetadata> metas) throws IOException; + + /** + * Save directory listing metadata. Callers may save a partial directory + * listing for a given path, or may store a complete and authoritative copy + * of the directory listing. {@code MetadataStore} implementations may + * subsequently keep track of all modifications to the directory contents at + * this path, and return authoritative results from subsequent calls to + * {@link #listChildren(Path)}. See {@link DirListingMetadata}. + * + * Any authoritative results returned are only authoritative for the scope + * of the {@code MetadataStore}: A per-process {@code MetadataStore}, for + * example, would only show results visible to that process, potentially + * missing metadata updates (create, delete) made to the same path by + * another process. + * + * @param meta Directory listing metadata. + * @throws IOException if there is an error + */ + void put(DirListingMetadata meta) throws IOException; + + /** + * Destroy all resources associated with the metadata store. + * + * The destroyed resources can be DynamoDB tables, MySQL databases/tables, or + * HDFS directories. Any operations after calling this method may possibly + * fail. + * + * This operation is idempotent. + * + * @throws IOException if there is an error + */ + void destroy() throws IOException; + + /** + * Clear any metadata older than a specified time from the repository. + * Implementations MUST clear file metadata, and MAY clear directory metadata + * (s3a itself does not track modification time for directories). + * Implementations may also choose to throw UnsupportedOperationException + * istead. Note that modification times should be in UTC, as returned by + * System.currentTimeMillis at the time of modification. + * + * @param modTime Oldest modification time to allow + * @throws IOException if there is an error + * @throws UnsupportedOperationException if not implemented + */ + void prune(long modTime) throws IOException, UnsupportedOperationException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreListFilesIterator.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreListFilesIterator.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreListFilesIterator.java new file mode 100644 index 0000000..378d109 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreListFilesIterator.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Set; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; + +/** + * {@code MetadataStoreListFilesIterator} is a {@link RemoteIterator} that + * is similar to {@code DescendantsIterator} but does not return directories + * that have (or may have) children, and will also provide access to the set of + * tombstones to allow recently deleted S3 objects to be filtered out from a + * corresponding request. In other words, it returns tombstones and the same + * set of objects that should exist in S3: empty directories, and files, and not + * other directories whose existence is inferred therefrom. + * + * For example, assume the consistent store contains metadata representing this + * file system structure: + * + * <pre> + * /dir1 + * |-- dir2 + * | |-- file1 + * | `-- file2 + * `-- dir3 + * |-- dir4 + * | `-- file3 + * |-- dir5 + * | `-- file4 + * `-- dir6 + * </pre> + * + * Consider this code sample: + * <pre> + * final PathMetadata dir1 = get(new Path("/dir1")); + * for (MetadataStoreListFilesIterator files = + * new MetadataStoreListFilesIterator(dir1); files.hasNext(); ) { + * final FileStatus status = files.next().getFileStatus(); + * System.out.printf("%s %s%n", status.isDirectory() ? 'D' : 'F', + * status.getPath()); + * } + * </pre> + * + * The output is: + * <pre> + * F /dir1/dir2/file1 + * F /dir1/dir2/file2 + * F /dir1/dir3/dir4/file3 + * F /dir1/dir3/dir5/file4 + * D /dir1/dir3/dir6 + * </pre> + */ [email protected] [email protected] +public class MetadataStoreListFilesIterator implements + RemoteIterator<FileStatus> { + public static final Logger LOG = LoggerFactory.getLogger( + MetadataStoreListFilesIterator.class); + + private final boolean allowAuthoritative; + private final MetadataStore metadataStore; + private final Set<Path> tombstones = new HashSet<>(); + private Iterator<FileStatus> leafNodesIterator = null; + + public MetadataStoreListFilesIterator(MetadataStore ms, PathMetadata meta, + boolean allowAuthoritative) throws IOException { + Preconditions.checkNotNull(ms); + this.metadataStore = ms; + this.allowAuthoritative = allowAuthoritative; + prefetch(meta); + } + + private void prefetch(PathMetadata meta) throws IOException { + final Queue<PathMetadata> queue = new LinkedList<>(); + final Collection<FileStatus> leafNodes = new ArrayList<>(); + + if (meta != null) { + final Path path = meta.getFileStatus().getPath(); + if (path.isRoot()) { + DirListingMetadata rootListing = metadataStore.listChildren(path); + if (rootListing != null) { + tombstones.addAll(rootListing.listTombstones()); + queue.addAll(rootListing.withoutTombstones().getListing()); + } + } else { + queue.add(meta); + } + } + + while(!queue.isEmpty()) { + PathMetadata nextMetadata = queue.poll(); + FileStatus nextStatus = nextMetadata.getFileStatus(); + if (nextStatus.isFile()) { + // All files are leaf nodes by definition + leafNodes.add(nextStatus); + continue; + } + if (nextStatus.isDirectory()) { + final Path path = nextStatus.getPath(); + DirListingMetadata children = metadataStore.listChildren(path); + if (children != null) { + tombstones.addAll(children.listTombstones()); + Collection<PathMetadata> liveChildren = + children.withoutTombstones().getListing(); + if (!liveChildren.isEmpty()) { + // If it's a directory, has children, not all deleted, then we + // add the children to the queue and move on to the next node + queue.addAll(liveChildren); + continue; + } else if (allowAuthoritative && children.isAuthoritative()) { + leafNodes.add(nextStatus); + } + } + } + // Directories that *might* be empty are ignored for now, since we + // cannot confirm that they are empty without incurring other costs. + // Users of this class can still discover empty directories via S3's + // fake directories, subject to the same consistency semantics as before. + // The only other possibility is a symlink, which is unsupported on S3A. + } + leafNodesIterator = leafNodes.iterator(); + } + + @Override + public boolean hasNext() { + return leafNodesIterator.hasNext(); + } + + @Override + public FileStatus next() { + return leafNodesIterator.next(); + } + + public Set<Path> listTombstones() { + return tombstones; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java new file mode 100644 index 0000000..08ae89e --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Collection; + +/** + * A no-op implementation of MetadataStore. Clients that use this + * implementation should behave the same as they would without any + * MetadataStore. + */ +public class NullMetadataStore implements MetadataStore { + + @Override + public void initialize(FileSystem fs) throws IOException { + } + + @Override + public void initialize(Configuration conf) throws IOException { + } + + @Override + public void close() throws IOException { + } + + @Override + public void delete(Path path) throws IOException { + } + + @Override + public void forgetMetadata(Path path) throws IOException { + } + + @Override + public void deleteSubtree(Path path) throws IOException { + } + + @Override + public PathMetadata get(Path path) throws IOException { + return null; + } + + @Override + public PathMetadata get(Path path, boolean wantEmptyDirectoryFlag) + throws IOException { + return null; + } + + @Override + public DirListingMetadata listChildren(Path path) throws IOException { + return null; + } + + @Override + public void move(Collection<Path> pathsToDelete, + Collection<PathMetadata> pathsToCreate) throws IOException { + } + + @Override + public void put(PathMetadata meta) throws IOException { + } + + @Override + public void put(Collection<PathMetadata> meta) throws IOException { + } + + @Override + public void put(DirListingMetadata meta) throws IOException { + } + + @Override + public void destroy() throws IOException { + } + + @Override + public void prune(long modTime) { + } + + @Override + public String toString() { + return "NullMetadataStore"; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java new file mode 100644 index 0000000..2a0219e --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Tristate; + +/** + * {@code PathMetadata} models path metadata stored in the + * {@link MetadataStore}. + */ [email protected] [email protected] +public class PathMetadata { + + private final FileStatus fileStatus; + private Tristate isEmptyDirectory; + private boolean isDeleted; + + /** + * Create a tombstone from the current time. + * @param path path to tombstone + * @return the entry. + */ + public static PathMetadata tombstone(Path path) { + long now = System.currentTimeMillis(); + FileStatus status = new FileStatus(0, false, 0, 0, now, path); + return new PathMetadata(status, Tristate.UNKNOWN, true); + } + + /** + * Creates a new {@code PathMetadata} containing given {@code FileStatus}. + * @param fileStatus file status containing an absolute path. + */ + public PathMetadata(FileStatus fileStatus) { + this(fileStatus, Tristate.UNKNOWN); + } + + public PathMetadata(FileStatus fileStatus, Tristate isEmptyDir) { + this(fileStatus, isEmptyDir, false); + } + + public PathMetadata(FileStatus fileStatus, Tristate isEmptyDir, boolean + isDeleted) { + Preconditions.checkNotNull(fileStatus, "fileStatus must be non-null"); + Preconditions.checkNotNull(fileStatus.getPath(), "fileStatus path must be" + + " non-null"); + Preconditions.checkArgument(fileStatus.getPath().isAbsolute(), "path must" + + " be absolute"); + this.fileStatus = fileStatus; + this.isEmptyDirectory = isEmptyDir; + this.isDeleted = isDeleted; + } + + /** + * @return {@code FileStatus} contained in this {@code PathMetadata}. + */ + public final FileStatus getFileStatus() { + return fileStatus; + } + + /** + * Query if a directory is empty. + * @return Tristate.TRUE if this is known to be an empty directory, + * Tristate.FALSE if known to not be empty, and Tristate.UNKNOWN if the + * MetadataStore does have enough information to determine either way. + */ + public Tristate isEmptyDirectory() { + return isEmptyDirectory; + } + + void setIsEmptyDirectory(Tristate isEmptyDirectory) { + this.isEmptyDirectory = isEmptyDirectory; + } + + public boolean isDeleted() { + return isDeleted; + } + + void setIsDeleted(boolean isDeleted) { + this.isDeleted = isDeleted; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof PathMetadata)) { + return false; + } + return this.fileStatus.equals(((PathMetadata)o).fileStatus); + } + + @Override + public int hashCode() { + return fileStatus.hashCode(); + } + + @Override + public String toString() { + return "PathMetadata{" + + "fileStatus=" + fileStatus + + "; isEmptyDirectory=" + isEmptyDirectory + + "; isDeleted=" + isDeleted + + '}'; + } + + /** + * Log contents to supplied StringBuilder in a pretty fashion. + * @param sb target StringBuilder + */ + public void prettyPrint(StringBuilder sb) { + sb.append(String.format("%-5s %-20s %-7d %-8s %-6s", + fileStatus.isDirectory() ? "dir" : "file", + fileStatus.getPath().toString(), fileStatus.getLen(), + isEmptyDirectory.name(), isDeleted)); + sb.append(fileStatus); + } + + public String prettyPrint() { + StringBuilder sb = new StringBuilder(); + prettyPrint(sb); + return sb.toString(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
