http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 3fbdcb0..f846689 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -79,6 +79,9 @@ class S3ABlockOutputStream extends OutputStream { /** Size of all blocks. */ private final int blockSize; + /** Total bytes for uploads submitted so far. */ + private long bytesSubmitted; + /** Callback for progress. */ private final ProgressListener progressListener; private final ListeningExecutorService executorService; @@ -302,6 +305,7 @@ class S3ABlockOutputStream extends OutputStream { } try { multiPartUpload.uploadBlockAsync(getActiveBlock()); + bytesSubmitted += getActiveBlock().dataSize(); } finally { // set the block to null, so the next write will create a new block. clearActiveBlock(); @@ -330,13 +334,14 @@ class S3ABlockOutputStream extends OutputStream { this, blockCount, hasBlock ? block : "(none)"); + long bytes = 0; try { if (multiPartUpload == null) { if (hasBlock) { // no uploads of data have taken place, put the single block up. // This must happen even if there is no data, so that 0 byte files // are created. - putObject(); + bytes = putObject(); } } else { // there has already been at least one block scheduled for upload; @@ -350,6 +355,7 @@ class S3ABlockOutputStream extends OutputStream { multiPartUpload.waitForAllPartUploads(); // then complete the operation multiPartUpload.complete(partETags); + bytes = bytesSubmitted; } LOG.debug("Upload complete for {}", writeOperationHelper); } catch (IOException ioe) { @@ -362,7 +368,7 @@ class S3ABlockOutputStream extends OutputStream { clearActiveBlock(); } // All end of write operations, including deleting fake parent directories - writeOperationHelper.writeSuccessful(); + writeOperationHelper.writeSuccessful(bytes); } /** @@ -370,8 +376,11 @@ class S3ABlockOutputStream extends OutputStream { * is empty a 0-byte PUT will be invoked, as it is needed to create an * entry at the far end. * @throws IOException any problem. + * @return number of bytes uploaded. If thread was interrupted while + * waiting for upload to complete, returns zero with interrupted flag set + * on this thread. */ - private void putObject() throws IOException { + private int putObject() throws IOException { LOG.debug("Executing regular upload for {}", writeOperationHelper); final S3ADataBlocks.DataBlock block = getActiveBlock(); @@ -405,9 +414,11 @@ class S3ABlockOutputStream extends OutputStream { //wait for completion try { putObjectResult.get(); + return size; } catch (InterruptedException ie) { LOG.warn("Interrupted object upload", ie); Thread.currentThread().interrupt(); + return 0; } catch (ExecutionException ee) { throw extractException("regular upload", key, ee); }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java index b0f08e3..be08afe 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java @@ -31,7 +31,7 @@ import org.apache.hadoop.fs.Path; @InterfaceAudience.Private @InterfaceStability.Evolving public class S3AFileStatus extends FileStatus { - private boolean isEmptyDirectory; + private Tristate isEmptyDirectory; /** * Create a directory status. @@ -42,6 +42,18 @@ public class S3AFileStatus extends FileStatus { public S3AFileStatus(boolean isemptydir, Path path, String owner) { + this(Tristate.fromBool(isemptydir), path, owner); + } + + /** + * Create a directory status. + * @param isemptydir is this an empty directory? + * @param path the path + * @param owner the owner + */ + public S3AFileStatus(Tristate isemptydir, + Path path, + String owner) { super(0, true, 1, 0, 0, path); isEmptyDirectory = isemptydir; setOwner(owner); @@ -59,12 +71,37 @@ public class S3AFileStatus extends FileStatus { public S3AFileStatus(long length, long modification_time, Path path, long blockSize, String owner) { super(length, false, 1, blockSize, modification_time, path); - isEmptyDirectory = false; + isEmptyDirectory = Tristate.FALSE; setOwner(owner); setGroup(owner); } - public boolean isEmptyDirectory() { + /** + * Convenience constructor for creating from a vanilla FileStatus plus + * an isEmptyDirectory flag. + * @param source FileStatus to convert to S3AFileStatus + * @param isEmptyDirectory TRUE/FALSE if known to be / not be an empty + * directory, UNKNOWN if that information was not computed. + * @return a new S3AFileStatus + */ + public static S3AFileStatus fromFileStatus(FileStatus source, + Tristate isEmptyDirectory) { + if (source.isDirectory()) { + return new S3AFileStatus(isEmptyDirectory, source.getPath(), + source.getOwner()); + } else { + return new S3AFileStatus(source.getLen(), source.getModificationTime(), + source.getPath(), source.getBlockSize(), source.getOwner()); + } + } + + + /** + * @return FALSE if status is not a directory, or its a dir, but known to + * not be empty. TRUE if it is an empty directory. UNKNOWN if it is a + * directory, but we have not computed whether or not it is empty. + */ + public Tristate isEmptyDirectory() { return isEmptyDirectory; } @@ -110,7 +147,7 @@ public class S3AFileStatus extends FileStatus { @Override public String toString() { return super.toString() + - String.format(" isEmptyDirectory=%s", isEmptyDirectory()); + String.format(" isEmptyDirectory=%s", isEmptyDirectory().name()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 872dd5f..c22383a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -25,12 +25,16 @@ import java.io.InputStream; import java.io.InterruptedIOException; import java.net.URI; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; +import java.util.Set; import java.util.Objects; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -92,6 +96,11 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.StorageStatistics; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata; +import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator; +import org.apache.hadoop.fs.s3a.s3guard.MetadataStore; +import org.apache.hadoop.fs.s3a.s3guard.PathMetadata; +import org.apache.hadoop.fs.s3a.s3guard.S3Guard; import org.apache.hadoop.fs.s3native.S3xLoginHelper; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; @@ -149,6 +158,8 @@ public class S3AFileSystem extends FileSystem { private long readAhead; private S3AInputPolicy inputPolicy; private final AtomicBoolean closed = new AtomicBoolean(false); + private MetadataStore metadataStore; + private boolean allowAuthoritative; // The maximum number of entries that can be deleted in any call to s3 private static final int MAX_ENTRIES_TO_DELETE = 1000; @@ -277,6 +288,10 @@ public class S3AFileSystem extends FileSystem { } else { LOG.debug("Using S3AOutputStream"); } + + metadataStore = S3Guard.getMetadataStore(this); + allowAuthoritative = conf.getBoolean(METADATASTORE_AUTHORITATIVE, + DEFAULT_METADATASTORE_AUTHORITATIVE); } catch (AmazonClientException e) { throw translateException("initializing ", new Path(name), e); } @@ -388,12 +403,35 @@ public class S3AFileSystem extends FileSystem { * Returns the S3 client used by this filesystem. * @return AmazonS3Client */ - @VisibleForTesting AmazonS3 getAmazonS3Client() { return s3; } /** + * Get the region of a bucket. + * @return the region in which a bucket is located + * @throws IOException on any failure. + */ + public String getBucketLocation() throws IOException { + return getBucketLocation(bucket); + } + + /** + * Get the region of a bucket. + * @param bucketName the name of the bucket + * @return the region in which a bucket is located + * @throws IOException on any failure. + */ + public String getBucketLocation(String bucketName) throws IOException { + try { + return s3.getBucketLocation(bucketName); + } catch (AmazonClientException e) { + throw translateException("getBucketLocation()", + bucketName, e); + } + } + + /** * Returns the read ahead range value used by this filesystem * @return */ @@ -457,7 +495,7 @@ public class S3AFileSystem extends FileSystem { * @return a key excluding the leading "/", or, if it is the root path, "" */ @VisibleForTesting - String pathToKey(Path path) { + public String pathToKey(Path path) { if (!path.isAbsolute()) { path = new Path(workingDir, path); } @@ -508,7 +546,7 @@ public class S3AFileSystem extends FileSystem { * @param path path to qualify * @return a qualified path. */ - Path qualify(Path path) { + public Path qualify(Path path) { return path.makeQualified(uri, workingDir); } @@ -578,7 +616,7 @@ public class S3AFileSystem extends FileSystem { boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { String key = pathToKey(f); - S3AFileStatus status = null; + FileStatus status = null; try { // get the status or throw an FNFE status = getFileStatus(f); @@ -706,8 +744,8 @@ public class S3AFileSystem extends FileSystem { * the description of the operation. * This operation throws an exception on any failure which needs to be * reported and downgraded to a failure. That is: if a rename - * @param src path to be renamed - * @param dst new path after rename + * @param source path to be renamed + * @param dest new path after rename * @throws RenameFailedException if some criteria for a state changing * rename was not met. This means work didn't happen; it's not something * which is reported upstream to the FileSystem APIs, for which the semantics @@ -716,9 +754,12 @@ public class S3AFileSystem extends FileSystem { * @throws IOException on IO failure. * @throws AmazonClientException on failures inside the AWS SDK */ - private boolean innerRename(Path src, Path dst) + private boolean innerRename(Path source, Path dest) throws RenameFailedException, FileNotFoundException, IOException, AmazonClientException { + Path src = qualify(source); + Path dst = qualify(dest); + LOG.debug("Rename path {} to {}", src, dst); incrementStatistic(INVOCATION_RENAME); @@ -734,7 +775,7 @@ public class S3AFileSystem extends FileSystem { // get the source file status; this raises a FNFE if there is no source // file. - S3AFileStatus srcStatus = getFileStatus(src); + S3AFileStatus srcStatus = innerGetFileStatus(src, true); if (srcKey.equals(dstKey)) { LOG.debug("rename: src and dest refer to the same file or directory: {}", @@ -746,7 +787,7 @@ public class S3AFileSystem extends FileSystem { S3AFileStatus dstStatus = null; try { - dstStatus = getFileStatus(dst); + dstStatus = innerGetFileStatus(dst, true); // if there is no destination entry, an exception is raised. // hence this code sequence can assume that there is something // at the end of the path; the only detail being what it is and @@ -756,7 +797,7 @@ public class S3AFileSystem extends FileSystem { throw new RenameFailedException(src, dst, "source is a directory and dest is a file") .withExitCode(srcStatus.isFile()); - } else if (!dstStatus.isEmptyDirectory()) { + } else if (dstStatus.isEmptyDirectory() != Tristate.TRUE) { throw new RenameFailedException(src, dst, "Destination is a non-empty directory") .withExitCode(false); @@ -778,7 +819,8 @@ public class S3AFileSystem extends FileSystem { Path parent = dst.getParent(); if (!pathToKey(parent).isEmpty()) { try { - S3AFileStatus dstParentStatus = getFileStatus(dst.getParent()); + S3AFileStatus dstParentStatus = innerGetFileStatus(dst.getParent(), + false); if (!dstParentStatus.isDirectory()) { throw new RenameFailedException(src, dst, "destination parent is not a directory"); @@ -790,9 +832,20 @@ public class S3AFileSystem extends FileSystem { } } + // If we have a MetadataStore, track deletions/creations. + Collection<Path> srcPaths = null; + List<PathMetadata> dstMetas = null; + if (hasMetadataStore()) { + srcPaths = new HashSet<>(); // srcPaths need fast look up before put + dstMetas = new ArrayList<>(); + } + // TODO S3Guard HADOOP-13761: retries when source paths are not visible yet + // TODO S3Guard: performance: mark destination dirs as authoritative + // Ok! Time to start if (srcStatus.isFile()) { LOG.debug("rename: renaming file {} to {}", src, dst); + long length = srcStatus.getLen(); if (dstStatus != null && dstStatus.isDirectory()) { String newDstKey = dstKey; if (!newDstKey.endsWith("/")) { @@ -801,9 +854,14 @@ public class S3AFileSystem extends FileSystem { String filename = srcKey.substring(pathToKey(src.getParent()).length()+1); newDstKey = newDstKey + filename; - copyFile(srcKey, newDstKey, srcStatus.getLen()); + copyFile(srcKey, newDstKey, length); + S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, src, + keyToQualifiedPath(newDstKey), length, getDefaultBlockSize(dst), + username); } else { copyFile(srcKey, dstKey, srcStatus.getLen()); + S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, src, dst, + length, getDefaultBlockSize(dst), username); } innerDelete(srcStatus, false); } else { @@ -825,42 +883,66 @@ public class S3AFileSystem extends FileSystem { } List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>(); - if (dstStatus != null && dstStatus.isEmptyDirectory()) { + if (dstStatus != null && dstStatus.isEmptyDirectory() == Tristate.TRUE) { // delete unnecessary fake directory. keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey)); } - ListObjectsRequest request = new ListObjectsRequest(); - request.setBucketName(bucket); - request.setPrefix(srcKey); - request.setMaxKeys(maxKeys); - - ObjectListing objects = listObjects(request); - - while (true) { - for (S3ObjectSummary summary : objects.getObjectSummaries()) { - keysToDelete.add( - new DeleteObjectsRequest.KeyVersion(summary.getKey())); - String newDstKey = - dstKey + summary.getKey().substring(srcKey.length()); - copyFile(summary.getKey(), newDstKey, summary.getSize()); - - if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) { - removeKeys(keysToDelete, true, false); + Path parentPath = keyToPath(srcKey); + RemoteIterator<LocatedFileStatus> iterator = listFilesAndEmptyDirectories( + parentPath, true); + while (iterator.hasNext()) { + LocatedFileStatus status = iterator.next(); + long length = status.getLen(); + String key = pathToKey(status.getPath()); + if (status.isDirectory() && !key.endsWith("/")) { + key += "/"; + } + keysToDelete + .add(new DeleteObjectsRequest.KeyVersion(key)); + String newDstKey = + dstKey + key.substring(srcKey.length()); + copyFile(key, newDstKey, length); + + if (hasMetadataStore()) { + // with a metadata store, the object entries need to be updated, + // including, potentially, the ancestors + Path childSrc = keyToQualifiedPath(key); + Path childDst = keyToQualifiedPath(newDstKey); + if (objectRepresentsDirectory(key, length)) { + S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, childSrc, + childDst, username); + } else { + S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, childSrc, + childDst, length, getDefaultBlockSize(childDst), username); } + // Ancestor directories may not be listed, so we explicitly add them + S3Guard.addMoveAncestors(metadataStore, srcPaths, dstMetas, + keyToQualifiedPath(srcKey), childSrc, childDst, username); } - if (objects.isTruncated()) { - objects = continueListObjects(objects); - } else { - if (!keysToDelete.isEmpty()) { - removeKeys(keysToDelete, false, false); - } - break; + if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) { + removeKeys(keysToDelete, true, false); } } + if (!keysToDelete.isEmpty()) { + removeKeys(keysToDelete, false, false); + } + + // We moved all the children, now move the top-level dir + // Empty directory should have been added as the object summary + if (hasMetadataStore() + && srcPaths != null + && !srcPaths.contains(src)) { + LOG.debug("To move the non-empty top-level dir src={} and dst={}", + src, dst); + S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, src, dst, + username); + } } + metadataStore.move(srcPaths, dstMetas); + if (src.getParent() != dst.getParent()) { deleteUnnecessaryFakeDirectories(dst.getParent()); createFakeDirectoryIfNecessary(src.getParent()); @@ -880,6 +962,31 @@ public class S3AFileSystem extends FileSystem { } /** + * Does this Filesystem have a metadata store? + * @return true iff the FS has been instantiated with a metadata store + */ + public boolean hasMetadataStore() { + return !S3Guard.isNullMetadataStore(metadataStore); + } + + /** + * Get the metadata store. + * This will always be non-null, but may be bound to the + * {@code NullMetadataStore}. + * @return the metadata store of this FS instance + */ + @VisibleForTesting + MetadataStore getMetadataStore() { + return metadataStore; + } + + /** For testing only. See ITestS3GuardEmptyDirs. */ + @VisibleForTesting + void setMetadataStore(MetadataStore ms) { + metadataStore = ms; + } + + /** * Increment a statistic by 1. * @param statistic The operation to increment */ @@ -1063,8 +1170,9 @@ public class S3AFileSystem extends FileSystem { * @param inputStream source data. * @return the request */ - private PutObjectRequest newPutObjectRequest(String key, - ObjectMetadata metadata, InputStream inputStream) { + PutObjectRequest newPutObjectRequest(String key, + ObjectMetadata metadata, + InputStream inputStream) { Preconditions.checkNotNull(inputStream); PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, inputStream, metadata); @@ -1115,7 +1223,7 @@ public class S3AFileSystem extends FileSystem { * @param putObjectRequest the request * @return the upload initiated */ - public Upload putObject(PutObjectRequest putObjectRequest) { + public UploadInfo putObject(PutObjectRequest putObjectRequest) { long len; if (putObjectRequest.getFile() != null) { len = putObjectRequest.getFile().length(); @@ -1126,7 +1234,7 @@ public class S3AFileSystem extends FileSystem { try { Upload upload = transfers.upload(putObjectRequest); incrementPutCompletedStatistics(true, len); - return upload; + return new UploadInfo(upload, len); } catch (AmazonClientException e) { incrementPutCompletedStatistics(false, len); throw e; @@ -1142,14 +1250,10 @@ public class S3AFileSystem extends FileSystem { * @return the upload initiated * @throws AmazonClientException on problems */ - public PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest) + PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest) throws AmazonClientException { - long len; - if (putObjectRequest.getFile() != null) { - len = putObjectRequest.getFile().length(); - } else { - len = putObjectRequest.getMetadata().getContentLength(); - } + long len = getPutRequestLength(putObjectRequest); + LOG.debug("PUT {} bytes to {}", len, putObjectRequest.getKey()); incrementPutStartStatistics(len); try { PutObjectResult result = s3.putObject(putObjectRequest); @@ -1162,6 +1266,23 @@ public class S3AFileSystem extends FileSystem { } /** + * Get the length of the PUT, verifying that the length is known. + * @param putObjectRequest a request bound to a file or a stream. + * @return the request length + * @throws IllegalArgumentException if the length is negative + */ + private long getPutRequestLength(PutObjectRequest putObjectRequest) { + long len; + if (putObjectRequest.getFile() != null) { + len = putObjectRequest.getFile().length(); + } else { + len = putObjectRequest.getMetadata().getContentLength(); + } + Preconditions.checkState(len >= 0, "Cannot PUT object of unknown length"); + return len; + } + + /** * Upload part of a multi-partition file. * Increments the write and put counters. * <i>Important: this call does not close any input stream in the request.</i> @@ -1288,7 +1409,7 @@ public class S3AFileSystem extends FileSystem { */ public boolean delete(Path f, boolean recursive) throws IOException { try { - return innerDelete(getFileStatus(f), recursive); + return innerDelete(innerGetFileStatus(f, true), recursive); } catch (FileNotFoundException e) { LOG.debug("Couldn't delete {} - does not exist", f); instrumentation.errorIgnored(); @@ -1318,6 +1439,9 @@ public class S3AFileSystem extends FileSystem { if (status.isDirectory()) { LOG.debug("delete: Path is a directory: {}", f); + Preconditions.checkArgument( + status.isEmptyDirectory() != Tristate.UNKNOWN, + "File status must have directory emptiness computed"); if (!key.endsWith("/")) { key = key + "/"; @@ -1327,13 +1451,15 @@ public class S3AFileSystem extends FileSystem { return rejectRootDirectoryDelete(status, recursive); } - if (!recursive && !status.isEmptyDirectory()) { + if (!recursive && status.isEmptyDirectory() == Tristate.FALSE) { throw new PathIsNotEmptyDirectoryException(f.toString()); } - if (status.isEmptyDirectory()) { + if (status.isEmptyDirectory() == Tristate.TRUE) { LOG.debug("Deleting fake empty directory {}", key); + // HADOOP-13761 S3Guard: retries here deleteObject(key); + metadataStore.delete(f); instrumentation.directoryDeleted(); } else { LOG.debug("Getting objects for directory prefix {} to delete", key); @@ -1349,6 +1475,7 @@ public class S3AFileSystem extends FileSystem { LOG.debug("Got object to delete {}", summary.getKey()); if (keys.size() == MAX_ENTRIES_TO_DELETE) { + // TODO: HADOOP-13761 S3Guard: retries removeKeys(keys, true, false); } } @@ -1357,16 +1484,19 @@ public class S3AFileSystem extends FileSystem { objects = continueListObjects(objects); } else { if (!keys.isEmpty()) { + // TODO: HADOOP-13761 S3Guard: retries removeKeys(keys, false, false); } break; } } } + metadataStore.deleteSubtree(f); } else { LOG.debug("delete: Path is a file"); instrumentation.fileDeleted(1); deleteObject(key); + metadataStore.delete(f); } Path parent = f.getParent(); @@ -1390,7 +1520,7 @@ public class S3AFileSystem extends FileSystem { private boolean rejectRootDirectoryDelete(S3AFileStatus status, boolean recursive) throws IOException { LOG.info("s3a delete the {} root directory of {}", bucket, recursive); - boolean emptyRoot = status.isEmptyDirectory(); + boolean emptyRoot = status.isEmptyDirectory() == Tristate.TRUE; if (emptyRoot) { return true; } @@ -1405,7 +1535,7 @@ public class S3AFileSystem extends FileSystem { private void createFakeDirectoryIfNecessary(Path f) throws IOException, AmazonClientException { String key = pathToKey(f); - if (!key.isEmpty() && !exists(f)) { + if (!key.isEmpty() && !s3Exists(f)) { LOG.debug("Creating new fake directory at {}", f); createFakeDirectory(key); } @@ -1454,6 +1584,11 @@ public class S3AFileSystem extends FileSystem { key = key + '/'; } + DirListingMetadata dirMeta = metadataStore.listChildren(path); + if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) { + return S3Guard.dirMetaToStatuses(dirMeta); + } + ListObjectsRequest request = createListObjectsRequest(key, "/"); LOG.debug("listStatus: doing listObjects for directory {}", key); @@ -1466,7 +1601,8 @@ public class S3AFileSystem extends FileSystem { while (files.hasNext()) { result.add(files.next()); } - return result.toArray(new FileStatus[result.size()]); + return S3Guard.dirListingUnion(metadataStore, path, result, dirMeta, + allowAuthoritative); } else { LOG.debug("Adding: rd (not a dir): {}", path); FileStatus[] stats = new FileStatus[1]; @@ -1482,7 +1618,8 @@ public class S3AFileSystem extends FileSystem { * @param delimiter any delimiter * @return the request */ - private ListObjectsRequest createListObjectsRequest(String key, + @VisibleForTesting + ListObjectsRequest createListObjectsRequest(String key, String delimiter) { ListObjectsRequest request = new ListObjectsRequest(); request.setBucketName(bucket); @@ -1541,23 +1678,30 @@ public class S3AFileSystem extends FileSystem { throw translateException("innerMkdirs", path, e); } } + /** * * Make the given path and all non-existent parents into * directories. * See {@link #mkdirs(Path, FsPermission)} - * @param f path to create + * @param p path to create * @param permission to apply to f - * @return true if a directory was created + * @return true if a directory was created or already existed * @throws FileAlreadyExistsException there is a file at the path specified * @throws IOException other IO problems * @throws AmazonClientException on failures inside the AWS SDK */ - private boolean innerMkdirs(Path f, FsPermission permission) + private boolean innerMkdirs(Path p, FsPermission permission) throws IOException, FileAlreadyExistsException, AmazonClientException { + Path f = qualify(p); LOG.debug("Making directory: {}", f); incrementStatistic(INVOCATION_MKDIRS); FileStatus fileStatus; + List<Path> metadataStoreDirs = null; + if (hasMetadataStore()) { + metadataStoreDirs = new ArrayList<>(); + } + try { fileStatus = getFileStatus(f); @@ -1567,8 +1711,12 @@ public class S3AFileSystem extends FileSystem { throw new FileAlreadyExistsException("Path is a file: " + f); } } catch (FileNotFoundException e) { + // Walk path to root, ensuring closest ancestor is a directory, not file Path fPart = f.getParent(); - do { + if (metadataStoreDirs != null) { + metadataStoreDirs.add(f); + } + while (fPart != null) { try { fileStatus = getFileStatus(fPart); if (fileStatus.isDirectory()) { @@ -1581,12 +1729,17 @@ public class S3AFileSystem extends FileSystem { } } catch (FileNotFoundException fnfe) { instrumentation.errorIgnored(); + // We create all missing directories in MetadataStore; it does not + // infer directories exist by prefix like S3. + if (metadataStoreDirs != null) { + metadataStoreDirs.add(fPart); + } } fPart = fPart.getParent(); - } while (fPart != null); - + } String key = pathToKey(f); createFakeDirectory(key); + S3Guard.makeDirsOrdered(metadataStore, metadataStoreDirs, username, true); // this is complicated because getParent(a/b/c/) returns a/b/c, but // we want a/b. See HADOOP-14428 for more details. deleteUnnecessaryFakeDirectories(new Path(f.toString()).getParent()); @@ -1598,21 +1751,93 @@ public class S3AFileSystem extends FileSystem { * Return a file status object that represents the path. * @param f The path we want information from * @return a FileStatus object - * @throws java.io.FileNotFoundException when the path does not exist; + * @throws FileNotFoundException when the path does not exist * @throws IOException on other problems. */ - public S3AFileStatus getFileStatus(final Path f) throws IOException { + public FileStatus getFileStatus(final Path f) throws IOException { + return innerGetFileStatus(f, false); + } + + /** + * Internal version of {@link #getFileStatus(Path)}. + * @param f The path we want information from + * @param needEmptyDirectoryFlag if true, implementation will calculate + * a TRUE or FALSE value for {@link S3AFileStatus#isEmptyDirectory()} + * @return a S3AFileStatus object + * @throws FileNotFoundException when the path does not exist + * @throws IOException on other problems. + */ + @VisibleForTesting + S3AFileStatus innerGetFileStatus(final Path f, + boolean needEmptyDirectoryFlag) throws IOException { incrementStatistic(INVOCATION_GET_FILE_STATUS); final Path path = qualify(f); String key = pathToKey(path); - LOG.debug("Getting path status for {} ({})", path , key); + LOG.debug("Getting path status for {} ({})", path, key); + + // Check MetadataStore, if any. + PathMetadata pm = metadataStore.get(path, needEmptyDirectoryFlag); + Set<Path> tombstones = Collections.EMPTY_SET; + if (pm != null) { + if (pm.isDeleted()) { + throw new FileNotFoundException("Path " + f + " is recorded as " + + "deleted by S3Guard"); + } + + FileStatus msStatus = pm.getFileStatus(); + if (needEmptyDirectoryFlag && msStatus.isDirectory()) { + if (pm.isEmptyDirectory() != Tristate.UNKNOWN) { + // We have a definitive true / false from MetadataStore, we are done. + return S3AFileStatus.fromFileStatus(msStatus, pm.isEmptyDirectory()); + } else { + DirListingMetadata children = metadataStore.listChildren(path); + if (children != null) { + tombstones = children.listTombstones(); + } + LOG.debug("MetadataStore doesn't know if dir is empty, using S3."); + } + } else { + // Either this is not a directory, or we don't care if it is empty + return S3AFileStatus.fromFileStatus(msStatus, pm.isEmptyDirectory()); + } + + // If the metadata store has no children for it and it's not listed in + // S3 yet, we'll assume the empty directory is true; + S3AFileStatus s3FileStatus; + try { + s3FileStatus = s3GetFileStatus(path, key, tombstones); + } catch (FileNotFoundException e) { + return S3AFileStatus.fromFileStatus(msStatus, Tristate.TRUE); + } + // entry was found, save in S3Guard + return S3Guard.putAndReturn(metadataStore, s3FileStatus, instrumentation); + } else { + // there was no entry in S3Guard + // retrieve the data and update the metadata store in the process. + return S3Guard.putAndReturn(metadataStore, + s3GetFileStatus(path, key, tombstones), instrumentation); + } + } + + /** + * Raw {@code getFileStatus} that talks direct to S3. + * Used to implement {@link #innerGetFileStatus(Path, boolean)}, + * and for direct management of empty directory blobs. + * @param path Qualified path + * @param key Key string for the path + * @return Status + * @throws FileNotFoundException when the path does not exist + * @throws IOException on other problems. + */ + private S3AFileStatus s3GetFileStatus(final Path path, String key, + Set<Path> tombstones) throws IOException { if (!key.isEmpty()) { try { ObjectMetadata meta = getObjectMetadata(key); if (objectRepresentsDirectory(key, meta.getContentLength())) { LOG.debug("Found exact file: fake directory"); - return new S3AFileStatus(true, path, username); + return new S3AFileStatus(Tristate.TRUE, path, username); } else { LOG.debug("Found exact file: normal file"); return new S3AFileStatus(meta.getContentLength(), @@ -1637,16 +1862,16 @@ public class S3AFileSystem extends FileSystem { if (objectRepresentsDirectory(newKey, meta.getContentLength())) { LOG.debug("Found file (with /): fake directory"); - return new S3AFileStatus(true, path, username); + return new S3AFileStatus(Tristate.TRUE, path, username); } else { LOG.warn("Found file (with /): real file? should not happen: {}", key); return new S3AFileStatus(meta.getContentLength(), - dateToLong(meta.getLastModified()), - path, - getDefaultBlockSize(path), - username); + dateToLong(meta.getLastModified()), + path, + getDefaultBlockSize(path), + username); } } catch (AmazonServiceException e) { if (e.getStatusCode() != 404) { @@ -1668,25 +1893,26 @@ public class S3AFileSystem extends FileSystem { ObjectListing objects = listObjects(request); - if (!objects.getCommonPrefixes().isEmpty() - || !objects.getObjectSummaries().isEmpty()) { + Collection<String> prefixes = objects.getCommonPrefixes(); + Collection<S3ObjectSummary> summaries = objects.getObjectSummaries(); + if (!isEmptyOfKeys(prefixes, tombstones) || + !isEmptyOfObjects(summaries, tombstones)) { if (LOG.isDebugEnabled()) { LOG.debug("Found path as directory (with /): {}/{}", - objects.getCommonPrefixes().size() , - objects.getObjectSummaries().size()); + prefixes.size(), summaries.size()); - for (S3ObjectSummary summary : objects.getObjectSummaries()) { + for (S3ObjectSummary summary : summaries) { LOG.debug("Summary: {} {}", summary.getKey(), summary.getSize()); } - for (String prefix : objects.getCommonPrefixes()) { + for (String prefix : prefixes) { LOG.debug("Prefix: {}", prefix); } } - return new S3AFileStatus(false, path, username); + return new S3AFileStatus(Tristate.FALSE, path, username); } else if (key.isEmpty()) { LOG.debug("Found root directory"); - return new S3AFileStatus(true, path, username); + return new S3AFileStatus(Tristate.TRUE, path, username); } } catch (AmazonServiceException e) { if (e.getStatusCode() != 404) { @@ -1701,6 +1927,64 @@ public class S3AFileSystem extends FileSystem { } /** + * Helper function to determine if a collection of paths is empty + * after accounting for tombstone markers (if provided). + * @param keys Collection of path (prefixes / directories or keys). + * @param tombstones Set of tombstone markers, or null if not applicable. + * @return false if summaries contains objects not accounted for by + * tombstones. + */ + private boolean isEmptyOfKeys(Collection<String> keys, Set<Path> + tombstones) { + if (tombstones == null) { + return keys.isEmpty(); + } + for (String key : keys) { + Path qualified = keyToQualifiedPath(key); + if (!tombstones.contains(qualified)) { + return false; + } + } + return true; + } + + /** + * Helper function to determine if a collection of object summaries is empty + * after accounting for tombstone markers (if provided). + * @param summaries Collection of objects as returned by listObjects. + * @param tombstones Set of tombstone markers, or null if not applicable. + * @return false if summaries contains objects not accounted for by + * tombstones. + */ + private boolean isEmptyOfObjects(Collection<S3ObjectSummary> summaries, + Set<Path> tombstones) { + if (tombstones == null) { + return summaries.isEmpty(); + } + Collection<String> stringCollection = new ArrayList<>(summaries.size()); + for (S3ObjectSummary summary : summaries) { + stringCollection.add(summary.getKey()); + } + return isEmptyOfKeys(stringCollection, tombstones); + } + + /** + * Raw version of {@link FileSystem#exists(Path)} which uses S3 only: + * S3Guard MetadataStore, if any, will be skipped. + * @return true if path exists in S3 + */ + private boolean s3Exists(final Path f) throws IOException { + Path path = qualify(f); + String key = pathToKey(path); + try { + s3GetFileStatus(path, key, null); + return true; + } catch (FileNotFoundException e) { + return false; + } + } + + /** * The src file is on the local disk. Add it to FS at * the given dst name. * @@ -1777,12 +2061,13 @@ public class S3AFileSystem extends FileSystem { final String key = pathToKey(dst); final ObjectMetadata om = newObjectMetadata(srcfile.length()); PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile); - Upload up = putObject(putObjectRequest); + UploadInfo info = putObject(putObjectRequest); + Upload upload = info.getUpload(); ProgressableProgressListener listener = new ProgressableProgressListener( - this, key, up, null); - up.addProgressListener(listener); + this, key, upload, null); + upload.addProgressListener(listener); try { - up.waitForUploadResult(); + upload.waitForUploadResult(); } catch (InterruptedException e) { throw new InterruptedIOException("Interrupted copying " + src + " to " + dst + ", cancelling"); @@ -1790,7 +2075,7 @@ public class S3AFileSystem extends FileSystem { listener.uploadCompleted(); // This will delete unnecessary fake parent directories - finishedWrite(key); + finishedWrite(key, info.getLength()); if (delSrc) { local.delete(src, false); @@ -1814,6 +2099,10 @@ public class S3AFileSystem extends FileSystem { transfers.shutdownNow(true); transfers = null; } + if (metadataStore != null) { + metadataStore.close(); + metadataStore = null; + } } } @@ -1956,11 +2245,38 @@ public class S3AFileSystem extends FileSystem { /** * Perform post-write actions. + * This operation MUST be called after any PUT/multipart PUT completes + * successfully. + * This includes + * <ol> + * <li>Calling {@link #deleteUnnecessaryFakeDirectories(Path)}</li> + * <li>Updating any metadata store with details on the newly created + * object.</li> + * </ol> * @param key key written to + * @param length total length of file written */ - public void finishedWrite(String key) { - LOG.debug("Finished write to {}", key); - deleteUnnecessaryFakeDirectories(keyToPath(key).getParent()); + @InterfaceAudience.Private + void finishedWrite(String key, long length) { + LOG.debug("Finished write to {}, len {}", key, length); + Path p = keyToQualifiedPath(key); + deleteUnnecessaryFakeDirectories(p.getParent()); + Preconditions.checkArgument(length >= 0, "content length is negative"); + + // See note about failure semantics in S3Guard documentation + try { + if (hasMetadataStore()) { + S3Guard.addAncestors(metadataStore, p, username); + S3AFileStatus status = createUploadFileStatus(p, + S3AUtils.objectRepresentsDirectory(key, length), length, + getDefaultBlockSize(p), username); + S3Guard.putAndReturn(metadataStore, status, instrumentation); + } + } catch (IOException e) { + LOG.error("S3Guard: Error updating MetadataStore for write to {}:", + key, e); + instrumentation.errorIgnored(); + } } /** @@ -2015,9 +2331,9 @@ public class S3AFileSystem extends FileSystem { PutObjectRequest putObjectRequest = newPutObjectRequest(objectName, newObjectMetadata(0L), im); - Upload upload = putObject(putObjectRequest); + UploadInfo info = putObject(putObjectRequest); try { - upload.waitForUploadResult(); + info.getUpload().waitForUploadResult(); } catch (InterruptedException e) { throw new InterruptedIOException("Interrupted creating " + objectName); } @@ -2123,6 +2439,8 @@ public class S3AFileSystem extends FileSystem { if (blockFactory != null) { sb.append(", blockFactory=").append(blockFactory); } + sb.append(", metastore=").append(metadataStore); + sb.append(", authoritative=").append(allowAuthoritative); sb.append(", boundedExecutor=").append(boundedThreadPool); sb.append(", unboundedExecutor=").append(unboundedThreadPool); sb.append(", statistics {") @@ -2241,6 +2559,18 @@ public class S3AFileSystem extends FileSystem { @Override public RemoteIterator<LocatedFileStatus> listFiles(Path f, boolean recursive) throws FileNotFoundException, IOException { + return innerListFiles(f, recursive, + new Listing.AcceptFilesOnly(qualify(f))); + } + + public RemoteIterator<LocatedFileStatus> listFilesAndEmptyDirectories(Path f, + boolean recursive) throws IOException { + return innerListFiles(f, recursive, + new Listing.AcceptAllButS3nDirs()); + } + + private RemoteIterator<LocatedFileStatus> innerListFiles(Path f, boolean + recursive, Listing.FileStatusAcceptor acceptor) throws IOException { incrementStatistic(INVOCATION_LIST_FILES); Path path = qualify(f); LOG.debug("listFiles({}, {})", path, recursive); @@ -2258,13 +2588,42 @@ public class S3AFileSystem extends FileSystem { String delimiter = recursive ? null : "/"; LOG.debug("Requesting all entries under {} with delimiter '{}'", key, delimiter); - return listing.createLocatedFileStatusIterator( - listing.createFileStatusListingIterator(path, - createListObjectsRequest(key, delimiter), - ACCEPT_ALL, - new Listing.AcceptFilesOnly(path))); + final RemoteIterator<FileStatus> cachedFilesIterator; + final Set<Path> tombstones; + if (recursive) { + final PathMetadata pm = metadataStore.get(path, true); + // shouldn't need to check pm.isDeleted() because that will have + // been caught by getFileStatus above. + MetadataStoreListFilesIterator metadataStoreListFilesIterator = + new MetadataStoreListFilesIterator(metadataStore, pm, + allowAuthoritative); + tombstones = metadataStoreListFilesIterator.listTombstones(); + cachedFilesIterator = metadataStoreListFilesIterator; + } else { + DirListingMetadata meta = metadataStore.listChildren(path); + if (meta != null) { + tombstones = meta.listTombstones(); + } else { + tombstones = null; + } + cachedFilesIterator = listing.createProvidedFileStatusIterator( + S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor); + if (allowAuthoritative && meta != null && meta.isAuthoritative()) { + // metadata listing is authoritative, so return it directly + return listing.createLocatedFileStatusIterator(cachedFilesIterator); + } + } + return listing.createTombstoneReconcilingIterator( + listing.createLocatedFileStatusIterator( + listing.createFileStatusListingIterator(path, + createListObjectsRequest(key, delimiter), + ACCEPT_ALL, + acceptor, + cachedFilesIterator)), + tombstones); } } catch (AmazonClientException e) { + // TODO S3Guard: retry on file not found exception throw translateException("listFiles", path, e); } } @@ -2309,12 +2668,21 @@ public class S3AFileSystem extends FileSystem { filter.accept(path) ? toLocatedFileStatus(fileStatus) : null); } else { // directory: trigger a lookup - String key = maybeAddTrailingSlash(pathToKey(path)); - return listing.createLocatedFileStatusIterator( - listing.createFileStatusListingIterator(path, - createListObjectsRequest(key, "/"), - filter, - new Listing.AcceptAllButSelfAndS3nDirs(path))); + final String key = maybeAddTrailingSlash(pathToKey(path)); + final Listing.FileStatusAcceptor acceptor = + new Listing.AcceptAllButSelfAndS3nDirs(path); + DirListingMetadata meta = metadataStore.listChildren(path); + final RemoteIterator<FileStatus> cachedFileStatusIterator = + listing.createProvidedFileStatusIterator( + S3Guard.dirMetaToStatuses(meta), filter, acceptor); + return (allowAuthoritative && meta != null && meta.isAuthoritative()) + ? listing.createLocatedFileStatusIterator(cachedFileStatusIterator) + : listing.createLocatedFileStatusIterator( + listing.createFileStatusListingIterator(path, + createListObjectsRequest(key, "/"), + filter, + acceptor, + cachedFileStatusIterator)); } } catch (AmazonClientException e) { throw translateException("listLocatedStatus", path, e); @@ -2389,8 +2757,8 @@ public class S3AFileSystem extends FileSystem { /** * Callback on a successful write. */ - void writeSuccessful() { - finishedWrite(key); + void writeSuccessful(long length) { + finishedWrite(key, length); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index d2e7a88..da1fc5a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.metrics2.MetricStringBuilder; import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.Interns; @@ -30,6 +31,7 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableMetric; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; import java.io.Closeable; import java.net.URI; @@ -38,7 +40,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.fs.FileSystem.Statistics; import static org.apache.hadoop.fs.s3a.Statistic.*; @@ -90,6 +91,10 @@ public class S3AInstrumentation { private final Map<String, MutableCounterLong> streamMetrics = new HashMap<>(30); + /** Instantiate this without caring whether or not S3Guard is enabled. */ + private final S3GuardInstrumentation s3GuardInstrumentation + = new S3GuardInstrumentation(); + private static final Statistic[] COUNTERS_TO_CREATE = { INVOCATION_COPY_FROM_LOCAL_FILE, INVOCATION_EXISTS, @@ -117,6 +122,8 @@ public class S3AInstrumentation { STREAM_WRITE_BLOCK_UPLOADS_ABORTED, STREAM_WRITE_TOTAL_TIME, STREAM_WRITE_TOTAL_DATA, + S3GUARD_METADATASTORE_PUT_PATH_REQUEST, + S3GUARD_METADATASTORE_INITIALIZATION }; @@ -171,6 +178,9 @@ public class S3AInstrumentation { for (Statistic statistic : GAUGES_TO_CREATE) { gauge(statistic.getSymbol(), statistic.getDescription()); } + //todo need a config for the quantiles interval? + quantiles(S3GUARD_METADATASTORE_PUT_PATH_LATENCY, + "ops", "latency", 1); } /** @@ -227,6 +237,22 @@ public class S3AInstrumentation { } /** + * Create a quantiles in the registry. + * @param op statistic to collect + * @param sampleName sample name of the quantiles + * @param valueName value name of the quantiles + * @param interval interval of the quantiles in seconds + * @return the created quantiles metric + */ + protected final MutableQuantiles quantiles(Statistic op, + String sampleName, + String valueName, + int interval) { + return registry.newQuantiles(op.getSymbol(), op.getDescription(), + sampleName, valueName, interval); + } + + /** * Get the metrics registry. * @return the registry */ @@ -311,6 +337,20 @@ public class S3AInstrumentation { } /** + * Look up a quantiles. + * @param name quantiles name + * @return the quantiles or null + * @throws ClassCastException if the metric is not a Quantiles. + */ + public MutableQuantiles lookupQuantiles(String name) { + MutableMetric metric = lookupMetric(name); + if (metric == null) { + LOG.debug("No quantiles {}", name); + } + return (MutableQuantiles) metric; + } + + /** * Look up a metric from both the registered set and the lighter weight * stream entries. * @param name metric name @@ -391,6 +431,21 @@ public class S3AInstrumentation { counter.incr(count); } } + + /** + * Add a value to a quantiles statistic. No-op if the quantile + * isn't found. + * @param op operation to look up. + * @param value value to add. + * @throws ClassCastException if the metric is not a Quantiles. + */ + public void addValueToQuantiles(Statistic op, long value) { + MutableQuantiles quantiles = lookupQuantiles(op.getSymbol()); + if (quantiles != null) { + quantiles.add(value); + } + } + /** * Increment a specific counter. * No-op if not defined. @@ -442,6 +497,15 @@ public class S3AInstrumentation { } /** + * Create a S3Guard instrumentation instance. + * There's likely to be at most one instance of this per FS instance. + * @return the S3Guard instrumentation point. + */ + public S3GuardInstrumentation getS3GuardInstrumentation() { + return s3GuardInstrumentation; + } + + /** * Merge in the statistics of a single input stream into * the filesystem-wide statistics. * @param statistics stream statistics @@ -840,4 +904,19 @@ public class S3AInstrumentation { return sb.toString(); } } + + /** + * Instrumentation exported to S3Guard. + */ + public final class S3GuardInstrumentation { + + /** Initialized event. */ + public void initialized() { + incrementCounter(S3GUARD_METADATASTORE_INITIALIZATION, 1); + } + + public void storeClosed() { + + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java index 6ebc9e4..e723b75 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java @@ -20,7 +20,6 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.transfer.Upload; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -101,19 +100,20 @@ public class S3AOutputStream extends OutputStream { try { final ObjectMetadata om = fs.newObjectMetadata(backupFile.length()); - Upload upload = fs.putObject( + UploadInfo info = fs.putObject( fs.newPutObjectRequest( key, om, backupFile)); ProgressableProgressListener listener = - new ProgressableProgressListener(fs, key, upload, progress); - upload.addProgressListener(listener); + new ProgressableProgressListener(fs, key, info.getUpload(), progress); + info.getUpload().addProgressListener(listener); - upload.waitForUploadResult(); + info.getUpload().waitForUploadResult(); listener.uploadCompleted(); - // This will delete unnecessary fake parent directories - fs.finishedWrite(key); + // This will delete unnecessary fake parent directories, update any + // MetadataStore + fs.finishedWrite(key, info.getLength()); } catch (InterruptedException e) { throw (InterruptedIOException) new InterruptedIOException(e.toString()) .initCause(e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 27406b6..9dd5def 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -294,12 +294,38 @@ public final class S3AUtils { S3ObjectSummary summary, long blockSize, String owner) { - if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) { - return new S3AFileStatus(true, keyPath, owner); + long size = summary.getSize(); + return createFileStatus(keyPath, + objectRepresentsDirectory(summary.getKey(), size), + size, summary.getLastModified(), blockSize, owner); + } + + /** + * Create a file status for object we just uploaded. For files, we use + * current time as modification time, since s3a uses S3's service-based + * modification time, which will not be available until we do a + * getFileStatus() later on. + * @param keyPath path for created object + * @param isDir true iff directory + * @param size file length + * @param blockSize block size for file status + * @param owner Hadoop username + * @return a status entry + */ + public static S3AFileStatus createUploadFileStatus(Path keyPath, + boolean isDir, long size, long blockSize, String owner) { + Date date = isDir ? null : new Date(); + return createFileStatus(keyPath, isDir, size, date, blockSize, owner); + } + + /* Date 'modified' is ignored when isDir is true. */ + private static S3AFileStatus createFileStatus(Path keyPath, boolean isDir, + long size, Date modified, long blockSize, String owner) { + if (isDir) { + return new S3AFileStatus(Tristate.UNKNOWN, keyPath, owner); } else { - return new S3AFileStatus(summary.getSize(), - dateToLong(summary.getLastModified()), keyPath, - blockSize, owner); + return new S3AFileStatus(size, dateToLong(modified), keyPath, blockSize, + owner); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java index d4e09e3..e7603d9 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java @@ -18,33 +18,20 @@ package org.apache.hadoop.fs.s3a; -import static org.apache.hadoop.fs.s3a.Constants.*; -import static org.apache.hadoop.fs.s3a.S3AUtils.*; - import java.io.IOException; import java.net.URI; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.Protocol; -import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.S3ClientOptions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.util.VersionInfo; - -import org.slf4j.Logger; /** - * Factory for creation of S3 client instances to be used by {@link S3Store}. + * Factory for creation of {@link AmazonS3} client instances. */ @InterfaceAudience.Private @InterfaceStability.Unstable -interface S3ClientFactory { +public interface S3ClientFactory { /** * Creates a new {@link AmazonS3} client. This method accepts the S3A file @@ -57,177 +44,4 @@ interface S3ClientFactory { */ AmazonS3 createS3Client(URI name) throws IOException; - /** - * The default factory implementation, which calls the AWS SDK to configure - * and create an {@link AmazonS3Client} that communicates with the S3 service. - */ - static class DefaultS3ClientFactory extends Configured - implements S3ClientFactory { - - private static final Logger LOG = S3AFileSystem.LOG; - - @Override - public AmazonS3 createS3Client(URI name) throws IOException { - Configuration conf = getConf(); - AWSCredentialsProvider credentials = - createAWSCredentialProviderSet(name, conf); - ClientConfiguration awsConf = new ClientConfiguration(); - initConnectionSettings(conf, awsConf); - initProxySupport(conf, awsConf); - initUserAgent(conf, awsConf); - return createAmazonS3Client(conf, credentials, awsConf); - } - - /** - * Initializes all AWS SDK settings related to connection management. - * - * @param conf Hadoop configuration - * @param awsConf AWS SDK configuration - */ - private static void initConnectionSettings(Configuration conf, - ClientConfiguration awsConf) { - awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS, - DEFAULT_MAXIMUM_CONNECTIONS, 1)); - boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, - DEFAULT_SECURE_CONNECTIONS); - awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); - awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES, - DEFAULT_MAX_ERROR_RETRIES, 0)); - awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT, - DEFAULT_ESTABLISH_TIMEOUT, 0)); - awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT, - DEFAULT_SOCKET_TIMEOUT, 0)); - int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER, - DEFAULT_SOCKET_SEND_BUFFER, 2048); - int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER, - DEFAULT_SOCKET_RECV_BUFFER, 2048); - awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer); - String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, ""); - if (!signerOverride.isEmpty()) { - LOG.debug("Signer override = {}", signerOverride); - awsConf.setSignerOverride(signerOverride); - } - } - - /** - * Initializes AWS SDK proxy support if configured. - * - * @param conf Hadoop configuration - * @param awsConf AWS SDK configuration - * @throws IllegalArgumentException if misconfigured - */ - private static void initProxySupport(Configuration conf, - ClientConfiguration awsConf) throws IllegalArgumentException { - String proxyHost = conf.getTrimmed(PROXY_HOST, ""); - int proxyPort = conf.getInt(PROXY_PORT, -1); - if (!proxyHost.isEmpty()) { - awsConf.setProxyHost(proxyHost); - if (proxyPort >= 0) { - awsConf.setProxyPort(proxyPort); - } else { - if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) { - LOG.warn("Proxy host set without port. Using HTTPS default 443"); - awsConf.setProxyPort(443); - } else { - LOG.warn("Proxy host set without port. Using HTTP default 80"); - awsConf.setProxyPort(80); - } - } - String proxyUsername = conf.getTrimmed(PROXY_USERNAME); - String proxyPassword = conf.getTrimmed(PROXY_PASSWORD); - if ((proxyUsername == null) != (proxyPassword == null)) { - String msg = "Proxy error: " + PROXY_USERNAME + " or " + - PROXY_PASSWORD + " set without the other."; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - awsConf.setProxyUsername(proxyUsername); - awsConf.setProxyPassword(proxyPassword); - awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN)); - awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION)); - if (LOG.isDebugEnabled()) { - LOG.debug("Using proxy server {}:{} as user {} with password {} on " + - "domain {} as workstation {}", awsConf.getProxyHost(), - awsConf.getProxyPort(), - String.valueOf(awsConf.getProxyUsername()), - awsConf.getProxyPassword(), awsConf.getProxyDomain(), - awsConf.getProxyWorkstation()); - } - } else if (proxyPort >= 0) { - String msg = - "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - } - - /** - * Initializes the User-Agent header to send in HTTP requests to the S3 - * back-end. We always include the Hadoop version number. The user also - * may set an optional custom prefix to put in front of the Hadoop version - * number. The AWS SDK interally appends its own information, which seems - * to include the AWS SDK version, OS and JVM version. - * - * @param conf Hadoop configuration - * @param awsConf AWS SDK configuration - */ - private static void initUserAgent(Configuration conf, - ClientConfiguration awsConf) { - String userAgent = "Hadoop " + VersionInfo.getVersion(); - String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, ""); - if (!userAgentPrefix.isEmpty()) { - userAgent = userAgentPrefix + ", " + userAgent; - } - LOG.debug("Using User-Agent: {}", userAgent); - awsConf.setUserAgentPrefix(userAgent); - } - - /** - * Creates an {@link AmazonS3Client} from the established configuration. - * - * @param conf Hadoop configuration - * @param credentials AWS credentials - * @param awsConf AWS SDK configuration - * @return S3 client - * @throws IllegalArgumentException if misconfigured - */ - private static AmazonS3 createAmazonS3Client(Configuration conf, - AWSCredentialsProvider credentials, ClientConfiguration awsConf) - throws IllegalArgumentException { - AmazonS3 s3 = new AmazonS3Client(credentials, awsConf); - String endPoint = conf.getTrimmed(ENDPOINT, ""); - if (!endPoint.isEmpty()) { - try { - s3.setEndpoint(endPoint); - } catch (IllegalArgumentException e) { - String msg = "Incorrect endpoint: " + e.getMessage(); - LOG.error(msg); - throw new IllegalArgumentException(msg, e); - } - } - enablePathStyleAccessIfRequired(s3, conf); - return s3; - } - - /** - * Enables path-style access to S3 buckets if configured. By default, the - * behavior is to use virtual hosted-style access with URIs of the form - * http://bucketname.s3.amazonaws.com. Enabling path-style access and a - * region-specific endpoint switches the behavior to use URIs of the form - * http://s3-eu-west-1.amazonaws.com/bucketname. - * - * @param s3 S3 client - * @param conf Hadoop configuration - */ - private static void enablePathStyleAccessIfRequired(AmazonS3 s3, - Configuration conf) { - final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false); - if (pathStyleAccess) { - LOG.debug("Enabling path style access!"); - s3.setS3ClientOptions(S3ClientOptions.builder() - .setPathStyleAccess(true) - .build()); - } - } - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index 789c6d7..777c161 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -140,7 +140,18 @@ public enum Statistic { STREAM_WRITE_TOTAL_DATA("stream_write_total_data", "Count of total data uploaded in block output"), STREAM_WRITE_QUEUE_DURATION("stream_write_queue_duration", - "Total queue duration of all block uploads"); + "Total queue duration of all block uploads"), + + // S3Guard stats + S3GUARD_METADATASTORE_PUT_PATH_REQUEST( + "s3guard_metadatastore_put_path_request", + "s3guard metadata store put one metadata path request"), + S3GUARD_METADATASTORE_PUT_PATH_LATENCY( + "s3guard_metadatastore_put_path_latency", + "s3guard metadata store put one metadata path lantency"), + S3GUARD_METADATASTORE_INITIALIZATION("s3guard_metadatastore_initialization", + "s3guard metadata store initialization times"); + private static final Map<String, Statistic> SYMBOL_MAP = new HashMap<>(Statistic.values().length); http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Tristate.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Tristate.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Tristate.java new file mode 100644 index 0000000..0462ccf --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Tristate.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +/** + * Simple enum to express {true, false, don't know}. + */ +public enum Tristate { + // Do not add additional values here. Logic will assume there are exactly + // three possibilities. + TRUE, FALSE, UNKNOWN; + + public static Tristate fromBool(boolean v) { + return v ? TRUE : FALSE; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/UploadInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/UploadInfo.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/UploadInfo.java new file mode 100644 index 0000000..238cd97 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/UploadInfo.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.services.s3.transfer.Upload; + +/** + * Simple struct that contains information about a S3 upload. + */ +public class UploadInfo { + private final Upload upload; + private final long length; + + public UploadInfo(Upload upload, long length) { + this.upload = upload; + this.length = length; + } + + public Upload getUpload() { + return upload; + } + + public long getLength() { + return length; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java new file mode 100644 index 0000000..dcee358 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; +import java.util.NoSuchElementException; +import java.util.Queue; + +import com.google.common.base.Preconditions; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; + +/** + * {@code DescendantsIterator} is a {@link RemoteIterator} that implements + * pre-ordering breadth-first traversal (BFS) of a path and all of its + * descendants recursively. After visiting each path, that path's direct + * children are discovered by calling {@link MetadataStore#listChildren(Path)}. + * Each iteration returns the next direct child, and if that child is a + * directory, also pushes it onto a queue to discover its children later. + * + * For example, assume the consistent store contains metadata representing this + * file system structure: + * + * <pre> + * /dir1 + * |-- dir2 + * | |-- file1 + * | `-- file2 + * `-- dir3 + * |-- dir4 + * | `-- file3 + * |-- dir5 + * | `-- file4 + * `-- dir6 + * </pre> + * + * Consider this code sample: + * <pre> + * final PathMetadata dir1 = get(new Path("/dir1")); + * for (DescendantsIterator descendants = new DescendantsIterator(dir1); + * descendants.hasNext(); ) { + * final FileStatus status = descendants.next().getFileStatus(); + * System.out.printf("%s %s%n", status.isDirectory() ? 'D' : 'F', + * status.getPath()); + * } + * </pre> + * + * The output is: + * <pre> + * D /dir1 + * D /dir1/dir2 + * D /dir1/dir3 + * F /dir1/dir2/file1 + * F /dir1/dir2/file2 + * D /dir1/dir3/dir4 + * D /dir1/dir3/dir5 + * F /dir1/dir3/dir4/file3 + * F /dir1/dir3/dir5/file4 + * D /dir1/dir3/dir6 + * </pre> + */ [email protected] [email protected] +public class DescendantsIterator implements RemoteIterator<FileStatus> { + + private final MetadataStore metadataStore; + private final Queue<PathMetadata> queue = new LinkedList<>(); + + /** + * Creates a new {@code DescendantsIterator}. + * + * @param ms the associated {@link MetadataStore} + * @param meta base path for descendants iteration, which will be the first + * returned during iteration (except root). Null makes empty iterator. + * @throws IOException if errors happen during metadata store listing + */ + public DescendantsIterator(MetadataStore ms, PathMetadata meta) + throws IOException { + Preconditions.checkNotNull(ms); + this.metadataStore = ms; + + if (meta != null) { + final Path path = meta.getFileStatus().getPath(); + if (path.isRoot()) { + DirListingMetadata rootListing = ms.listChildren(path); + if (rootListing != null) { + rootListing = rootListing.withoutTombstones(); + queue.addAll(rootListing.getListing()); + } + } else { + queue.add(meta); + } + } + } + + @Override + public boolean hasNext() throws IOException { + return !queue.isEmpty(); + } + + @Override + public FileStatus next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException("No more descendants."); + } + PathMetadata next; + next = queue.poll(); + if (next.getFileStatus().isDirectory()) { + final Path path = next.getFileStatus().getPath(); + DirListingMetadata meta = metadataStore.listChildren(path); + if (meta != null) { + Collection<PathMetadata> more = meta.withoutTombstones().getListing(); + if (!more.isEmpty()) { + queue.addAll(more); + } + } + } + return next.getFileStatus(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java new file mode 100644 index 0000000..e5b4fb5 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.base.Preconditions; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Tristate; + +/** + * {@code DirListingMetadata} models a directory listing stored in a + * {@link MetadataStore}. Instances of this class are mutable and thread-safe. + */ [email protected] [email protected] +public class DirListingMetadata { + + /** + * Convenience parameter for passing into constructor. + */ + public static final Collection<PathMetadata> EMPTY_DIR = + Collections.emptyList(); + + private final Path path; + + /** Using a map for fast find / remove with large directories. */ + private Map<Path, PathMetadata> listMap = new ConcurrentHashMap<>(); + + private boolean isAuthoritative; + + /** + * Create a directory listing metadata container. + * + * @param path Path of the directory. If this path has a host component, then + * all paths added later via {@link #put(FileStatus)} must also have + * the same host. + * @param listing Entries in the directory. + * @param isAuthoritative true iff listing is the full contents of the + * directory, and the calling client reports that this may be cached as + * the full and authoritative listing of all files in the directory. + */ + public DirListingMetadata(Path path, Collection<PathMetadata> listing, + boolean isAuthoritative) { + + checkPathAbsolute(path); + this.path = path; + + if (listing != null) { + for (PathMetadata entry : listing) { + Path childPath = entry.getFileStatus().getPath(); + checkChildPath(childPath); + listMap.put(childPath, entry); + } + } + this.isAuthoritative = isAuthoritative; + } + + /** + * Copy constructor. + * @param d the existing {@link DirListingMetadata} object. + */ + public DirListingMetadata(DirListingMetadata d) { + path = d.path; + isAuthoritative = d.isAuthoritative; + listMap = new ConcurrentHashMap<>(d.listMap); + } + + /** + * @return {@code Path} of the directory that contains this listing. + */ + public Path getPath() { + return path; + } + + /** + * @return entries in the directory + */ + public Collection<PathMetadata> getListing() { + return Collections.unmodifiableCollection(listMap.values()); + } + + public Set<Path> listTombstones() { + Set<Path> tombstones = new HashSet<>(); + for (PathMetadata meta : listMap.values()) { + if (meta.isDeleted()) { + tombstones.add(meta.getFileStatus().getPath()); + } + } + return tombstones; + } + + public DirListingMetadata withoutTombstones() { + Collection<PathMetadata> filteredList = new ArrayList<>(); + for (PathMetadata meta : listMap.values()) { + if (!meta.isDeleted()) { + filteredList.add(meta); + } + } + return new DirListingMetadata(path, filteredList, isAuthoritative); + } + + /** + * @return number of entries tracked. This is not the same as the number + * of entries in the actual directory unless {@link #isAuthoritative()} is + * true. + */ + public int numEntries() { + return listMap.size(); + } + + /** + * @return true iff this directory listing is full and authoritative within + * the scope of the {@code MetadataStore} that returned it. + */ + public boolean isAuthoritative() { + return isAuthoritative; + } + + + /** + * Is the underlying directory known to be empty? + * @return FALSE if directory is known to have a child entry, TRUE if + * directory is known to be empty, UNKNOWN otherwise. + */ + public Tristate isEmpty() { + if (getListing().isEmpty()) { + if (isAuthoritative()) { + return Tristate.TRUE; + } else { + // This listing is empty, but may not be full list of underlying dir. + return Tristate.UNKNOWN; + } + } else { // not empty listing + // There exists at least one child, dir not empty. + return Tristate.FALSE; + } + } + + /** + * Marks this directory listing as full and authoritative. + * @param authoritative see {@link #isAuthoritative()}. + */ + public void setAuthoritative(boolean authoritative) { + this.isAuthoritative = authoritative; + } + + /** + * Lookup entry within this directory listing. This may return null if the + * {@code MetadataStore} only tracks a partial set of the directory entries. + * In the case where {@link #isAuthoritative()} is true, however, this + * function returns null iff the directory is known not to contain the listing + * at given path (within the scope of the {@code MetadataStore} that returned + * it). + * + * @param childPath path of entry to look for. + * @return entry, or null if it is not present or not being tracked. + */ + public PathMetadata get(Path childPath) { + checkChildPath(childPath); + return listMap.get(childPath); + } + + /** + * Replace an entry with a tombstone. + * @param childPath path of entry to replace. + */ + public void markDeleted(Path childPath) { + checkChildPath(childPath); + listMap.put(childPath, PathMetadata.tombstone(childPath)); + } + + /** + * Remove entry from this directory. + * + * @param childPath path of entry to remove. + */ + public void remove(Path childPath) { + checkChildPath(childPath); + listMap.remove(childPath); + } + + /** + * Add an entry to the directory listing. If this listing already contains a + * {@code FileStatus} with the same path, it will be replaced. + * + * @param childFileStatus entry to add to this directory listing. + * @return true if the status was added or replaced with a new value. False + * if the same FileStatus value was already present. + */ + public boolean put(FileStatus childFileStatus) { + Preconditions.checkNotNull(childFileStatus, + "childFileStatus must be non-null"); + Path childPath = childStatusToPathKey(childFileStatus); + PathMetadata newValue = new PathMetadata(childFileStatus); + PathMetadata oldValue = listMap.put(childPath, newValue); + return oldValue == null || !oldValue.equals(newValue); + } + + @Override + public String toString() { + return "DirListingMetadata{" + + "path=" + path + + ", listMap=" + listMap + + ", isAuthoritative=" + isAuthoritative + + '}'; + } + + /** + * Log contents to supplied StringBuilder in a pretty fashion. + * @param sb target StringBuilder + */ + public void prettyPrint(StringBuilder sb) { + sb.append(String.format("DirMeta %-20s %-18s", + path.toString(), + isAuthoritative ? "Authoritative" : "Not Authoritative")); + for (Map.Entry<Path, PathMetadata> entry : listMap.entrySet()) { + sb.append("\n key: ").append(entry.getKey()).append(": "); + entry.getValue().prettyPrint(sb); + } + sb.append("\n"); + } + + public String prettyPrint() { + StringBuilder sb = new StringBuilder(); + prettyPrint(sb); + return sb.toString(); + } + + /** + * Checks that child path is valid. + * @param childPath path to check. + */ + private void checkChildPath(Path childPath) { + checkPathAbsolute(childPath); + + // If this dir's path has host (and thus scheme), so must its children + URI parentUri = path.toUri(); + if (parentUri.getHost() != null) { + URI childUri = childPath.toUri(); + Preconditions.checkNotNull(childUri.getHost(), "Expected non-null URI " + + "host: %s", childUri); + Preconditions.checkArgument( + childUri.getHost().equals(parentUri.getHost()), + "childUri %s and parentUri %s must have the same host", + childUri, parentUri); + Preconditions.checkNotNull(childUri.getScheme(), "No scheme in path %s", + childUri); + } + Preconditions.checkArgument(!childPath.isRoot(), + "childPath cannot be the root path: %s", childPath); + Preconditions.checkArgument(childPath.getParent().equals(path), + "childPath %s must be a child of %s", childPath, path); + } + + /** + * For Paths that are handed in directly, we assert they are in consistent + * format with checkPath(). For paths that are supplied embedded in + * FileStatus, we attempt to fill in missing scheme and host, when this + * DirListingMetadata is associated with one. + * + * @return Path suitable for consistent hashtable lookups + * @throws NullPointerException null status argument + * @throws IllegalArgumentException bad status values or failure to + * create a URI. + */ + private Path childStatusToPathKey(FileStatus status) { + Path p = status.getPath(); + Preconditions.checkNotNull(p, "Child status' path cannot be null"); + Preconditions.checkArgument(!p.isRoot(), + "childPath cannot be the root path: %s", p); + Preconditions.checkArgument(p.getParent().equals(path), + "childPath %s must be a child of %s", p, path); + URI uri = p.toUri(); + URI parentUri = path.toUri(); + // If FileStatus' path is missing host, but should have one, add it. + if (uri.getHost() == null && parentUri.getHost() != null) { + try { + return new Path(new URI(parentUri.getScheme(), parentUri.getHost(), + uri.getPath(), uri.getFragment())); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("FileStatus path invalid with" + + " added " + parentUri.getScheme() + "://" + parentUri.getHost() + + " added", e); + } + } + return p; + } + + private void checkPathAbsolute(Path p) { + Preconditions.checkNotNull(p, "path must be non-null"); + Preconditions.checkArgument(p.isAbsolute(), "path must be absolute: %s", p); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
