http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index f846689..96de8e4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -22,17 +22,16 @@ import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; +import java.util.Locale; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import com.amazonaws.AmazonClientException; import com.amazonaws.event.ProgressEvent; import com.amazonaws.event.ProgressEventType; import com.amazonaws.event.ProgressListener; -import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectResult; @@ -47,8 +46,9 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.PutTracker; import org.apache.hadoop.util.Progressable; import static org.apache.hadoop.fs.s3a.S3AUtils.*; @@ -65,7 +65,8 @@ import static org.apache.hadoop.fs.s3a.Statistic.*; */ @InterfaceAudience.Private @InterfaceStability.Unstable -class S3ABlockOutputStream extends OutputStream { +class S3ABlockOutputStream extends OutputStream implements + StreamCapabilities { private static final Logger LOG = LoggerFactory.getLogger(S3ABlockOutputStream.class); @@ -87,14 +88,6 @@ class S3ABlockOutputStream extends OutputStream { private final ListeningExecutorService executorService; /** - * Retry policy for multipart commits; not all AWS SDK versions retry that. - */ - private final RetryPolicy retryPolicy = - RetryPolicies.retryUpToMaximumCountWithProportionalSleep( - 5, - 2000, - TimeUnit.MILLISECONDS); - /** * Factory for blocks. */ private final S3ADataBlocks.BlockFactory blockFactory; @@ -120,7 +113,12 @@ class S3ABlockOutputStream extends OutputStream { /** * Write operation helper; encapsulation of the filesystem operations. */ - private final S3AFileSystem.WriteOperationHelper writeOperationHelper; + private final WriteOperationHelper writeOperationHelper; + + /** + * Track multipart put operation. + */ + private final PutTracker putTracker; /** * An S3A output stream which uploads partitions in a separate pool of @@ -138,6 +136,7 @@ class S3ABlockOutputStream extends OutputStream { * @param blockFactory factory for creating stream destinations * @param statistics stats for this stream * @param writeOperationHelper state of the write operation. + * @param putTracker put tracking for commit support * @throws IOException on any problem */ S3ABlockOutputStream(S3AFileSystem fs, @@ -147,7 +146,8 @@ class S3ABlockOutputStream extends OutputStream { long blockSize, S3ADataBlocks.BlockFactory blockFactory, S3AInstrumentation.OutputStreamStatistics statistics, - S3AFileSystem.WriteOperationHelper writeOperationHelper) + WriteOperationHelper writeOperationHelper, + PutTracker putTracker) throws IOException { this.fs = fs; this.key = key; @@ -155,6 +155,7 @@ class S3ABlockOutputStream extends OutputStream { this.blockSize = (int) blockSize; this.statistics = statistics; this.writeOperationHelper = writeOperationHelper; + this.putTracker = putTracker; Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE, "Block size is too small: %d", blockSize); this.executorService = MoreExecutors.listeningDecorator(executorService); @@ -166,7 +167,11 @@ class S3ABlockOutputStream extends OutputStream { // writes a 0-byte entry. createBlockIfNeeded(); LOG.debug("Initialized S3ABlockOutputStream for {}" + - " output to {}", writeOperationHelper, activeBlock); + " output to {}", key, activeBlock); + if (putTracker.initialize()) { + LOG.debug("Put tracker requests multipart upload"); + initMultipartUpload(); + } } /** @@ -299,10 +304,7 @@ class S3ABlockOutputStream extends OutputStream { private synchronized void uploadCurrentBlock() throws IOException { Preconditions.checkState(hasActiveBlock(), "No active block"); LOG.debug("Writing block # {}", blockCount); - if (multiPartUpload == null) { - LOG.debug("Initiating Multipart upload"); - multiPartUpload = new MultiPartUpload(); - } + initMultipartUpload(); try { multiPartUpload.uploadBlockAsync(getActiveBlock()); bytesSubmitted += getActiveBlock().dataSize(); @@ -313,6 +315,20 @@ class S3ABlockOutputStream extends OutputStream { } /** + * Init multipart upload. Assumption: this is called from + * a synchronized block. + * Note that this makes a blocking HTTPS request to the far end, so + * can take time and potentially fail. + * @throws IOException failure to initialize the upload + */ + private void initMultipartUpload() throws IOException { + if (multiPartUpload == null) { + LOG.debug("Initiating Multipart upload"); + multiPartUpload = new MultiPartUpload(key); + } + } + + /** * Close the stream. * * This will not return until the upload is complete @@ -342,22 +358,35 @@ class S3ABlockOutputStream extends OutputStream { // This must happen even if there is no data, so that 0 byte files // are created. bytes = putObject(); + bytesSubmitted = bytes; } } else { - // there has already been at least one block scheduled for upload; - // put up the current then wait - if (hasBlock && block.hasData()) { + // there's an MPU in progress'; + // IF there is more data to upload, or no data has yet been uploaded, + // PUT the final block + if (hasBlock && + (block.hasData() || multiPartUpload.getPartsSubmitted() == 0)) { //send last part uploadCurrentBlock(); } // wait for the partial uploads to finish final List<PartETag> partETags = multiPartUpload.waitForAllPartUploads(); - // then complete the operation - multiPartUpload.complete(partETags); bytes = bytesSubmitted; + // then complete the operation + if (putTracker.aboutToComplete(multiPartUpload.getUploadId(), + partETags, + bytes)) { + multiPartUpload.complete(partETags); + } else { + LOG.info("File {} will be visible when the job is committed", key); + } + } + if (!putTracker.outputImmediatelyVisible()) { + // track the number of bytes uploaded as commit operations. + statistics.commitUploaded(bytes); } - LOG.debug("Upload complete for {}", writeOperationHelper); + LOG.debug("Upload complete to {} by {}", key, writeOperationHelper); } catch (IOException ioe) { writeOperationHelper.writeFailed(ioe); throw ioe; @@ -367,7 +396,7 @@ class S3ABlockOutputStream extends OutputStream { closeAll(LOG, statistics); clearActiveBlock(); } - // All end of write operations, including deleting fake parent directories + // Note end of write. This does not change the state of the remote FS. writeOperationHelper.writeSuccessful(bytes); } @@ -387,8 +416,9 @@ class S3ABlockOutputStream extends OutputStream { int size = block.dataSize(); final S3ADataBlocks.BlockUploadData uploadData = block.startUpload(); final PutObjectRequest putObjectRequest = uploadData.hasFile() ? - writeOperationHelper.newPutRequest(uploadData.getFile()) : - writeOperationHelper.newPutRequest(uploadData.getUploadStream(), size); + writeOperationHelper.createPutObjectRequest(key, uploadData.getFile()) + : writeOperationHelper.createPutObjectRequest(key, + uploadData.getUploadStream(), size); long transferQueueTime = now(); BlockUploadProgress callback = new BlockUploadProgress( @@ -396,18 +426,13 @@ class S3ABlockOutputStream extends OutputStream { putObjectRequest.setGeneralProgressListener(callback); statistics.blockUploadQueued(size); ListenableFuture<PutObjectResult> putObjectResult = - executorService.submit(new Callable<PutObjectResult>() { - @Override - public PutObjectResult call() throws Exception { - PutObjectResult result; - try { - // the putObject call automatically closes the input - // stream afterwards. - result = writeOperationHelper.putObject(putObjectRequest); - } finally { - closeAll(LOG, uploadData, block); - } - return result; + executorService.submit(() -> { + try { + // the putObject call automatically closes the input + // stream afterwards. + return writeOperationHelper.putObject(putObjectRequest); + } finally { + closeAll(LOG, uploadData, block); } }); clearActiveBlock(); @@ -460,20 +485,82 @@ class S3ABlockOutputStream extends OutputStream { } /** + * Return the stream capabilities. + * This stream always returns false when queried about hflush and hsync. + * If asked about {@link CommitConstants#STREAM_CAPABILITY_MAGIC_OUTPUT} + * it will return true iff this is an active "magic" output stream. + * @param capability string to query the stream support for. + * @return true if the capability is supported by this instance. + */ + @Override + public boolean hasCapability(String capability) { + switch (capability.toLowerCase(Locale.ENGLISH)) { + + // does the output stream have delayed visibility + case CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT: + return !putTracker.outputImmediatelyVisible(); + + // The flush/sync options are absolutely not supported + case "hflush": + case "hsync": + return false; + + default: + return false; + } + } + + /** * Multiple partition upload. */ private class MultiPartUpload { private final String uploadId; private final List<ListenableFuture<PartETag>> partETagsFutures; + private int partsSubmitted; + private int partsUploaded; + private long bytesSubmitted; - MultiPartUpload() throws IOException { - this.uploadId = writeOperationHelper.initiateMultiPartUpload(); + MultiPartUpload(String key) throws IOException { + this.uploadId = writeOperationHelper.initiateMultiPartUpload(key); this.partETagsFutures = new ArrayList<>(2); LOG.debug("Initiated multi-part upload for {} with " + "id '{}'", writeOperationHelper, uploadId); } /** + * Get a count of parts submitted. + * @return the number of parts submitted; will always be >= the + * value of {@link #getPartsUploaded()} + */ + public int getPartsSubmitted() { + return partsSubmitted; + } + + /** + * Count of parts actually uploaded. + * @return the count of successfully completed part uploads. + */ + public int getPartsUploaded() { + return partsUploaded; + } + + /** + * Get the upload ID; will be null after construction completes. + * @return the upload ID + */ + public String getUploadId() { + return uploadId; + } + + /** + * Get the count of bytes submitted. + * @return the current upload size. + */ + public long getBytesSubmitted() { + return bytesSubmitted; + } + + /** * Upload a block of data. * This will take the block * @param block block to upload @@ -481,17 +568,22 @@ class S3ABlockOutputStream extends OutputStream { */ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block) throws IOException { - LOG.debug("Queueing upload of {}", block); + LOG.debug("Queueing upload of {} for upload {}", block, uploadId); + Preconditions.checkNotNull(uploadId, "Null uploadId"); + partsSubmitted++; final int size = block.dataSize(); + bytesSubmitted += size; final S3ADataBlocks.BlockUploadData uploadData = block.startUpload(); final int currentPartNumber = partETagsFutures.size() + 1; final UploadPartRequest request = writeOperationHelper.newUploadPartRequest( + key, uploadId, currentPartNumber, size, uploadData.getUploadStream(), - uploadData.getFile()); + uploadData.getFile(), + 0L); long transferQueueTime = now(); BlockUploadProgress callback = @@ -500,24 +592,22 @@ class S3ABlockOutputStream extends OutputStream { request.setGeneralProgressListener(callback); statistics.blockUploadQueued(block.dataSize()); ListenableFuture<PartETag> partETagFuture = - executorService.submit(new Callable<PartETag>() { - @Override - public PartETag call() throws Exception { - // this is the queued upload operation - LOG.debug("Uploading part {} for id '{}'", currentPartNumber, - uploadId); - // do the upload - PartETag partETag; - try { - partETag = fs.uploadPart(request).getPartETag(); - LOG.debug("Completed upload of {} to part {}", block, - partETag.getETag()); - LOG.debug("Stream statistics of {}", statistics); - } finally { - // close the stream and block - closeAll(LOG, uploadData, block); - } + executorService.submit(() -> { + // this is the queued upload operation + // do the upload + try { + LOG.debug("Uploading part {} for id '{}'", + currentPartNumber, uploadId); + PartETag partETag = writeOperationHelper.uploadPart(request) + .getPartETag(); + LOG.debug("Completed upload of {} to part {}", + block, partETag.getETag()); + LOG.debug("Stream statistics of {}", statistics); + partsUploaded++; return partETag; + } finally { + // close the stream and block + closeAll(LOG, uploadData, block); } }); partETagsFutures.add(partETagFuture); @@ -558,28 +648,18 @@ class S3ABlockOutputStream extends OutputStream { * @param partETags list of partial uploads * @throws IOException on any problem */ - private CompleteMultipartUploadResult complete(List<PartETag> partETags) + private void complete(List<PartETag> partETags) throws IOException { - int retryCount = 0; - AmazonClientException lastException; - String operation = - String.format("Completing multi-part upload for key '%s'," + - " id '%s' with %s partitions ", - key, uploadId, partETags.size()); - do { - try { - LOG.debug(operation); - return writeOperationHelper.completeMultipartUpload( - uploadId, - partETags); - } catch (AmazonClientException e) { - lastException = e; - statistics.exceptionInMultipartComplete(); - } - } while (shouldRetry(operation, lastException, retryCount++)); - // this point is only reached if the operation failed more than - // the allowed retry count - throw translateException(operation, key, lastException); + AtomicInteger errorCount = new AtomicInteger(0); + try { + writeOperationHelper.completeMPUwithRetries(key, + uploadId, + partETags, + bytesSubmitted, + errorCount); + } finally { + statistics.exceptionInMultipartComplete(errorCount.get()); + } } /** @@ -590,57 +670,14 @@ class S3ABlockOutputStream extends OutputStream { int retryCount = 0; AmazonClientException lastException; fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED); - String operation = - String.format("Aborting multi-part upload for '%s', id '%s", - writeOperationHelper, uploadId); - do { - try { - LOG.debug(operation); - writeOperationHelper.abortMultipartUpload(uploadId); - return; - } catch (AmazonClientException e) { - lastException = e; - statistics.exceptionInMultipartAbort(); - } - } while (shouldRetry(operation, lastException, retryCount++)); - // this point is only reached if the operation failed more than - // the allowed retry count - LOG.warn("Unable to abort multipart upload, you may need to purge " + - "uploaded parts", lastException); - } - - /** - * Predicate to determine whether a failed operation should - * be attempted again. - * If a retry is advised, the exception is automatically logged and - * the filesystem statistic {@link Statistic#IGNORED_ERRORS} incremented. - * The method then sleeps for the sleep time suggested by the sleep policy; - * if the sleep is interrupted then {@code Thread.interrupted()} is set - * to indicate the thread was interrupted; then false is returned. - * - * @param operation operation for log message - * @param e exception raised. - * @param retryCount number of retries already attempted - * @return true if another attempt should be made - */ - private boolean shouldRetry(String operation, - AmazonClientException e, - int retryCount) { try { - RetryPolicy.RetryAction retryAction = - retryPolicy.shouldRetry(e, retryCount, 0, true); - boolean retry = retryAction == RetryPolicy.RetryAction.RETRY; - if (retry) { - fs.incrementStatistic(IGNORED_ERRORS); - LOG.info("Retrying {} after exception ", operation, e); - Thread.sleep(retryAction.delayMillis); - } - return retry; - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - return false; - } catch (Exception ignored) { - return false; + writeOperationHelper.abortMultipartUpload(key, uploadId, + (text, e, r, i) -> statistics.exceptionInMultipartAbort()); + } catch (IOException e) { + // this point is only reached if the operation failed more than + // the allowed retry count + LOG.warn("Unable to abort multipart upload," + + " you may need to purge uploaded parts", e); } } @@ -718,7 +755,7 @@ class S3ABlockOutputStream extends OutputStream { private static class ProgressableListener implements ProgressListener { private final Progressable progress; - public ProgressableListener(Progressable progress) { + ProgressableListener(Progressable progress) { this.progress = progress; }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java index 9bc8dcd..0e3bca5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java @@ -401,7 +401,7 @@ final class S3ADataBlocks { } /** - * InputStream backed by the internal byte array + * InputStream backed by the internal byte array. * * @return */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 2171957..b08a134 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -24,6 +24,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; import java.net.URI; +import java.nio.file.AccessDeniedException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -31,6 +34,7 @@ import java.util.Date; import java.util.EnumSet; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.Objects; @@ -44,19 +48,18 @@ import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; -import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.CannedAccessControlList; -import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; -import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; import com.amazonaws.services.s3.model.CopyObjectRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.GetObjectMetadataRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; +import com.amazonaws.services.s3.model.ListMultipartUploadsRequest; import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.MultiObjectDeleteException; +import com.amazonaws.services.s3.model.MultipartUpload; import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.S3ObjectSummary; @@ -68,6 +71,7 @@ import com.amazonaws.services.s3.transfer.Copy; import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.TransferManagerConfiguration; import com.amazonaws.services.s3.transfer.Upload; +import com.amazonaws.services.s3.transfer.model.UploadResult; import com.amazonaws.event.ProgressListener; import com.amazonaws.event.ProgressEvent; import com.google.common.annotations.VisibleForTesting; @@ -94,20 +98,24 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.fs.StorageStatistics; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.PutTracker; +import org.apache.hadoop.fs.s3a.commit.MagicCommitIntegration; import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata; import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator; import org.apache.hadoop.fs.s3a.s3guard.MetadataStore; import org.apache.hadoop.fs.s3a.s3guard.PathMetadata; import org.apache.hadoop.fs.s3a.s3guard.S3Guard; import org.apache.hadoop.fs.s3native.S3xLoginHelper; +import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; import static org.apache.hadoop.fs.s3a.Constants.*; -import static org.apache.hadoop.fs.s3a.Listing.ACCEPT_ALL; +import static org.apache.hadoop.fs.s3a.Invoker.*; import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.Statistic.*; @@ -129,15 +137,31 @@ import org.slf4j.LoggerFactory; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class S3AFileSystem extends FileSystem { +public class S3AFileSystem extends FileSystem implements StreamCapabilities { /** * Default blocksize as used in blocksize and FS status queries. */ public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024; + + /** + * This declared delete as idempotent. + * This is an "interesting" topic in past Hadoop FS work. + * Essentially: with a single caller, DELETE is idempotent + * but in a shared filesystem, it is is very much not so. + * Here, on the basis that isn't a filesystem with consistency guarantees, + * retryable results in files being deleted. + */ + public static final boolean DELETE_CONSIDERED_IDEMPOTENT = true; private URI uri; private Path workingDir; private String username; private AmazonS3 s3; + // initial callback policy is fail-once; it's there just to assist + // some mock tests and other codepaths trying to call the low level + // APIs on an uninitialized filesystem. + private Invoker invoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, + Invoker.LOG_EVENT); + private final Retried onRetry = this::operationRetried; private String bucket; private int maxKeys; private Listing listing; @@ -154,7 +178,8 @@ public class S3AFileSystem extends FileSystem { private CannedAccessControlList cannedACL; private S3AEncryptionMethods serverSideEncryptionAlgorithm; private S3AInstrumentation instrumentation; - private S3AStorageStatistics storageStatistics; + private final S3AStorageStatistics storageStatistics = + createStorageStatistics(); private long readAhead; private S3AInputPolicy inputPolicy; private final AtomicBoolean closed = new AtomicBoolean(false); @@ -167,6 +192,7 @@ public class S3AFileSystem extends FileSystem { private S3ADataBlocks.BlockFactory blockFactory; private int blockOutputActiveBlocks; private boolean useListV1; + private MagicCommitIntegration committerIntegration; /** Add any deprecated keys. */ @SuppressWarnings("deprecation") @@ -194,9 +220,10 @@ public class S3AFileSystem extends FileSystem { */ public void initialize(URI name, Configuration originalConf) throws IOException { - uri = S3xLoginHelper.buildFSURI(name); + setUri(name); // get the host; this is guaranteed to be non-null, non-empty bucket = name.getHost(); + LOG.debug("Initializing S3AFileSystem for {}", bucket); // clone the configuration into one with propagated bucket options Configuration conf = propagateBucketOptions(originalConf, bucket); patchSecurityCredentialProviders(conf); @@ -216,6 +243,7 @@ public class S3AFileSystem extends FileSystem { S3ClientFactory.class); s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf) .createS3Client(name); + invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry); maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1); listing = new Listing(this); @@ -230,15 +258,6 @@ public class S3AFileSystem extends FileSystem { readAhead = longBytesOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 0); - storageStatistics = (S3AStorageStatistics) - GlobalStorageStatistics.INSTANCE - .put(S3AStorageStatistics.NAME, - new GlobalStorageStatistics.StorageStatisticsProvider() { - @Override - public StorageStatistics provide() { - return new S3AStorageStatistics(); - } - }); int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS); if (maxThreads < 2) { @@ -274,11 +293,17 @@ public class S3AFileSystem extends FileSystem { verifyBucketExists(); - initMultipartUploads(conf); - serverSideEncryptionAlgorithm = getEncryptionAlgorithm(conf); inputPolicy = S3AInputPolicy.getPolicy( conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL)); + LOG.debug("Input fadvise policy = {}", inputPolicy); + boolean magicCommitterEnabled = conf.getBoolean( + CommitConstants.MAGIC_COMMITTER_ENABLED, + CommitConstants.DEFAULT_MAGIC_COMMITTER_ENABLED); + LOG.debug("Filesystem support for magic committers {} enabled", + magicCommitterEnabled ? "is" : "is not"); + committerIntegration = new MagicCommitIntegration( + this, magicCommitterEnabled); boolean blockUploadEnabled = conf.getBoolean(FAST_UPLOAD, true); @@ -295,13 +320,14 @@ public class S3AFileSystem extends FileSystem { " queue limit={}", blockOutputBuffer, partSize, blockOutputActiveBlocks); - metadataStore = S3Guard.getMetadataStore(this); + setMetadataStore(S3Guard.getMetadataStore(this)); allowAuthoritative = conf.getBoolean(METADATASTORE_AUTHORITATIVE, DEFAULT_METADATASTORE_AUTHORITATIVE); if (hasMetadataStore()) { LOG.debug("Using metadata store {}, authoritative={}", getMetadataStore(), allowAuthoritative); } + initMultipartUploads(conf); } catch (AmazonClientException e) { throw translateException("initializing ", new Path(name), e); } @@ -309,27 +335,29 @@ public class S3AFileSystem extends FileSystem { } /** + * Create the storage statistics or bind to an existing one. + * @return a storage statistics instance. + */ + protected static S3AStorageStatistics createStorageStatistics() { + return (S3AStorageStatistics) + GlobalStorageStatistics.INSTANCE + .put(S3AStorageStatistics.NAME, + () -> new S3AStorageStatistics()); + } + + /** * Verify that the bucket exists. This does not check permissions, * not even read access. + * Retry policy: retrying, translated. * @throws FileNotFoundException the bucket is absent * @throws IOException any other problem talking to S3 */ + @Retries.RetryTranslated protected void verifyBucketExists() throws FileNotFoundException, IOException { - try { - if (!s3.doesBucketExist(bucket)) { - throw new FileNotFoundException("Bucket " + bucket + " does not exist"); - } - } catch (AmazonS3Exception e) { - // this is a sign of a serious startup problem so do dump everything - LOG.warn(stringify(e), e); - throw translateException("doesBucketExist", bucket, e); - } catch (AmazonServiceException e) { - // this is a sign of a serious startup problem so do dump everything - LOG.warn(stringify(e), e); - throw translateException("doesBucketExist", bucket, e); - } catch (AmazonClientException e) { - throw translateException("doesBucketExist", bucket, e); + if (!invoker.retry("doesBucketExist", bucket, true, + () -> s3.doesBucketExist(bucket))) { + throw new FileNotFoundException("Bucket " + bucket + " does not exist"); } } @@ -362,6 +390,7 @@ public class S3AFileSystem extends FileSystem { } } + @Retries.RetryTranslated private void initMultipartUploads(Configuration conf) throws IOException { boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART, DEFAULT_PURGE_EXISTING_MULTIPART); @@ -369,24 +398,34 @@ public class S3AFileSystem extends FileSystem { PURGE_EXISTING_MULTIPART_AGE, DEFAULT_PURGE_EXISTING_MULTIPART_AGE, 0); if (purgeExistingMultipart) { - Date purgeBefore = - new Date(new Date().getTime() - purgeExistingMultipartAge * 1000); - try { - transfers.abortMultipartUploads(bucket, purgeBefore); - } catch (AmazonServiceException e) { - if (e.getStatusCode() == 403) { - instrumentation.errorIgnored(); - LOG.debug("Failed to purging multipart uploads against {}," + - " FS may be read only", bucket, e); - } else { - throw translateException("purging multipart uploads", bucket, e); - } + abortOutstandingMultipartUploads(purgeExistingMultipartAge); + } catch (AccessDeniedException e) { + instrumentation.errorIgnored(); + LOG.debug("Failed to purge multipart uploads against {}," + + " FS may be read only", bucket); } } } /** + * Abort all outstanding MPUs older than a given age. + * @param seconds time in seconds + * @throws IOException on any failure, other than 403 "permission denied" + */ + @Retries.RetryTranslated + public void abortOutstandingMultipartUploads(long seconds) + throws IOException { + Preconditions.checkArgument(seconds >= 0); + Date purgeBefore = + new Date(new Date().getTime() - seconds * 1000); + LOG.debug("Purging outstanding multipart uploads older than {}", + purgeBefore); + invoker.retry("Purging multipart uploads", bucket, true, + () -> transfers.abortMultipartUploads(bucket, purgeBefore)); + } + + /** * Return the protocol scheme for the FileSystem. * * @return "s3a" @@ -404,6 +443,16 @@ public class S3AFileSystem extends FileSystem { return uri; } + /** + * Set the URI field through {@link S3xLoginHelper}. + * Exported for testing. + * @param uri filesystem URI. + */ + @VisibleForTesting + protected void setUri(URI uri) { + this.uri = S3xLoginHelper.buildFSURI(uri); + } + @Override public int getDefaultPort() { return Constants.S3A_DEFAULT_PORT; @@ -411,6 +460,7 @@ public class S3AFileSystem extends FileSystem { /** * Returns the S3 client used by this filesystem. + * This is for internal use within the S3A code itself. * @return AmazonS3Client */ AmazonS3 getAmazonS3Client() { @@ -418,34 +468,55 @@ public class S3AFileSystem extends FileSystem { } /** + * Returns the S3 client used by this filesystem. + * <i>Warning: this must only be used for testing, as it bypasses core + * S3A operations. </i> + * @param reason a justification for requesting access. + * @return AmazonS3Client + */ + @VisibleForTesting + public AmazonS3 getAmazonS3ClientForTesting(String reason) { + LOG.warn("Access to S3A client requested, reason {}", reason); + return s3; + } + + /** + * Set the client -used in mocking tests to force in a different client. + * @param client client. + */ + protected void setAmazonS3Client(AmazonS3 client) { + Preconditions.checkNotNull(client, "client"); + LOG.debug("Setting S3 client to {}", client); + s3 = client; + } + + /** * Get the region of a bucket. * @return the region in which a bucket is located * @throws IOException on any failure. */ + @Retries.RetryTranslated public String getBucketLocation() throws IOException { return getBucketLocation(bucket); } /** * Get the region of a bucket. + * Retry policy: retrying, translated. * @param bucketName the name of the bucket * @return the region in which a bucket is located * @throws IOException on any failure. */ + @Retries.RetryTranslated public String getBucketLocation(String bucketName) throws IOException { - try { - return s3.getBucketLocation(bucketName); - } catch (AmazonClientException e) { - throw translateException("getBucketLocation()", - bucketName, e); - } + return invoker.retry("getBucketLocation()", bucketName, true, + ()-> s3.getBucketLocation(bucketName)); } /** - * Returns the read ahead range value used by this filesystem - * @return + * Returns the read ahead range value used by this filesystem. + * @return the readahead range */ - @VisibleForTesting long getReadAheadRange() { return readAhead; @@ -473,7 +544,7 @@ public class S3AFileSystem extends FileSystem { Configuration conf) throws IOException { if (directoryAllocator == null) { String bufferDir = conf.get(BUFFER_DIR) != null - ? BUFFER_DIR : "hadoop.tmp.dir"; + ? BUFFER_DIR : HADOOP_TMP_DIR; directoryAllocator = new LocalDirAllocator(bufferDir); } return directoryAllocator.createTmpFileForWrite(pathStr, size, conf); @@ -488,6 +559,23 @@ public class S3AFileSystem extends FileSystem { } /** + * Set the bucket. + * @param bucket the bucket + */ + @VisibleForTesting + protected void setBucket(String bucket) { + this.bucket = bucket; + } + + /** + * Get the canned ACL of this FS. + * @return an ACL, if any + */ + CannedAccessControlList getCannedACL() { + return cannedACL; + } + + /** * Change the input policy for this FS. * @param inputPolicy new policy */ @@ -538,7 +626,7 @@ public class S3AFileSystem extends FileSystem { * @param key input key * @return the path from this key */ - private Path keyToPath(String key) { + Path keyToPath(String key) { return new Path("/" + key); } @@ -547,7 +635,7 @@ public class S3AFileSystem extends FileSystem { * @param key input key * @return the fully qualified path including URI scheme and bucket name. */ - Path keyToQualifiedPath(String key) { + public Path keyToQualifiedPath(String key) { return qualify(keyToPath(key)); } @@ -585,7 +673,7 @@ public class S3AFileSystem extends FileSystem { public FSDataInputStream open(Path f, int bufferSize) throws IOException { - LOG.debug("Opening '{}' for reading.", f); + LOG.debug("Opening '{}' for reading; input policy = {}", f, inputPolicy); final FileStatus fileStatus = getFileStatus(f); if (fileStatus.isDirectory()) { throw new FileNotFoundException("Can't open " + f @@ -603,12 +691,15 @@ public class S3AFileSystem extends FileSystem { statistics, instrumentation, readAhead, - inputPolicy)); + inputPolicy, + invoker)); } /** * Create an FSDataOutputStream at the indicated Path with write-progress * reporting. + * Retry policy: retrying, translated on the getFileStatus() probe. + * No data is uploaded to S3 in this call, so retry issues related to that. * @param f the file name to open * @param permission the permission to set. * @param overwrite if a file with this name already exists, then if true, @@ -625,42 +716,60 @@ public class S3AFileSystem extends FileSystem { public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { - String key = pathToKey(f); + final Path path = qualify(f); + String key = pathToKey(path); FileStatus status = null; try { // get the status or throw an FNFE - status = getFileStatus(f); + status = getFileStatus(path); // if the thread reaches here, there is something at the path if (status.isDirectory()) { // path references a directory: automatic error - throw new FileAlreadyExistsException(f + " is a directory"); + throw new FileAlreadyExistsException(path + " is a directory"); } if (!overwrite) { // path references a file and overwrite is disabled - throw new FileAlreadyExistsException(f + " already exists"); + throw new FileAlreadyExistsException(path + " already exists"); } - LOG.debug("Overwriting file {}", f); + LOG.debug("Overwriting file {}", path); } catch (FileNotFoundException e) { // this means the file is not found } instrumentation.fileCreated(); + PutTracker putTracker = + committerIntegration.createTracker(path, key); + String destKey = putTracker.getDestKey(); return new FSDataOutputStream( new S3ABlockOutputStream(this, - key, + destKey, new SemaphoredDelegatingExecutor(boundedThreadPool, blockOutputActiveBlocks, true), progress, partSize, blockFactory, instrumentation.newOutputStreamStatistics(statistics), - new WriteOperationHelper(key) - ), + createWriteOperationHelper(), + putTracker), null); } /** + * Create a new {@code WriteOperationHelper} instance. + * + * This class permits other low-level operations against the store. + * It is unstable and + * only intended for code with intimate knowledge of the object store. + * If using this, be prepared for changes even on minor point releases. + * @return a new helper. + */ + @InterfaceAudience.Private + public WriteOperationHelper createWriteOperationHelper() { + return new WriteOperationHelper(this); + } + + /** * {@inheritDoc} * @throws FileNotFoundException if the parent directory is not present -or * is not a directory. @@ -883,7 +992,7 @@ public class S3AFileSystem extends FileSystem { keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey)); } - Path parentPath = keyToPath(srcKey); + Path parentPath = keyToQualifiedPath(srcKey); RemoteIterator<LocatedFileStatus> iterator = listFilesAndEmptyDirectories( parentPath, true); while (iterator.hasNext()) { @@ -940,7 +1049,7 @@ public class S3AFileSystem extends FileSystem { if (src.getParent() != dst.getParent()) { deleteUnnecessaryFakeDirectories(dst.getParent()); - createFakeDirectoryIfNecessary(src.getParent()); + maybeCreateFakeParentDirectory(src); } return true; } @@ -978,6 +1087,7 @@ public class S3AFileSystem extends FileSystem { /** For testing only. See ITestS3GuardEmptyDirs. */ @VisibleForTesting void setMetadataStore(MetadataStore ms) { + Preconditions.checkNotNull(ms); metadataStore = ms; } @@ -1018,6 +1128,48 @@ public class S3AFileSystem extends FileSystem { } /** + * Callback when an operation was retried. + * Increments the statistics of ignored errors or throttled requests, + * depending up on the exception class. + * @param ex exception. + */ + public void operationRetried(Exception ex) { + Statistic stat = isThrottleException(ex) + ? STORE_IO_THROTTLED + : IGNORED_ERRORS; + instrumentation.incrementCounter(stat, 1); + storageStatistics.incrementCounter(stat, 1); + } + + /** + * Callback from {@link Invoker} when an operation is retried. + * @param text text of the operation + * @param ex exception + * @param retries number of retries + * @param idempotent is the method idempotent + */ + public void operationRetried( + String text, + Exception ex, + int retries, + boolean idempotent) { + operationRetried(ex); + } + + /** + * Callback from {@link Invoker} when an operation against a metastore + * is retried. + * @param ex exception + * @param retries number of retries + * @param idempotent is the method idempotent + */ + public void metastoreOperationRetried(Exception ex, + int retries, + boolean idempotent) { + operationRetried(ex); + } + + /** * Get the storage statistics of this filesystem. * @return the storage statistics */ @@ -1028,11 +1180,13 @@ public class S3AFileSystem extends FileSystem { /** * Request object metadata; increments counters in the process. + * Retry policy: retry untranslated. * @param key key * @return the metadata + * @throws IOException if the retry invocation raises one (it shouldn't). */ - protected ObjectMetadata getObjectMetadata(String key) { - incrementStatistic(OBJECT_METADATA_REQUESTS); + @Retries.RetryRaw + protected ObjectMetadata getObjectMetadata(String key) throws IOException { GetObjectMetadataRequest request = new GetObjectMetadataRequest(bucket, key); //SSE-C requires to be filled in if enabled for object metadata @@ -1040,7 +1194,11 @@ public class S3AFileSystem extends FileSystem { StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))){ request.setSSECustomerKey(generateSSECustomerKey()); } - ObjectMetadata meta = s3.getObjectMetadata(request); + ObjectMetadata meta = invoker.retryUntranslated("GET " + key, true, + () -> { + incrementStatistic(OBJECT_METADATA_REQUESTS); + return s3.getObjectMetadata(request); + }); incrementReadOperations(); return meta; } @@ -1048,40 +1206,68 @@ public class S3AFileSystem extends FileSystem { /** * Initiate a {@code listObjects} operation, incrementing metrics * in the process. + * + * Retry policy: retry untranslated. * @param request request to initiate * @return the results + * @throws IOException if the retry invocation raises one (it shouldn't). */ - protected S3ListResult listObjects(S3ListRequest request) { - incrementStatistic(OBJECT_LIST_REQUESTS); + @Retries.RetryRaw + protected S3ListResult listObjects(S3ListRequest request) throws IOException { incrementReadOperations(); + incrementStatistic(OBJECT_LIST_REQUESTS); + validateListArguments(request); + return invoker.retryUntranslated( + request.toString(), + true, + () -> { + if (useListV1) { + return S3ListResult.v1(s3.listObjects(request.getV1())); + } else { + return S3ListResult.v2(s3.listObjectsV2(request.getV2())); + } + }); + } + + /** + * Validate the list arguments with this bucket's settings. + * @param request the request to validate + */ + private void validateListArguments(S3ListRequest request) { if (useListV1) { Preconditions.checkArgument(request.isV1()); - return S3ListResult.v1(s3.listObjects(request.getV1())); } else { Preconditions.checkArgument(!request.isV1()); - return S3ListResult.v2(s3.listObjectsV2(request.getV2())); } } /** * List the next set of objects. + * Retry policy: retry untranslated. * @param request last list objects request to continue * @param prevResult last paged result to continue from * @return the next result object + * @throws IOException: none, just there for retryUntranslated. */ + @Retries.RetryRaw protected S3ListResult continueListObjects(S3ListRequest request, - S3ListResult prevResult) { - incrementStatistic(OBJECT_CONTINUE_LIST_REQUESTS); + S3ListResult prevResult) throws IOException { incrementReadOperations(); - if (useListV1) { - Preconditions.checkArgument(request.isV1()); - return S3ListResult.v1(s3.listNextBatchOfObjects(prevResult.getV1())); - } else { - Preconditions.checkArgument(!request.isV1()); - request.getV2().setContinuationToken(prevResult.getV2() - .getNextContinuationToken()); - return S3ListResult.v2(s3.listObjectsV2(request.getV2())); - } + validateListArguments(request); + return invoker.retryUntranslated( + request.toString(), + true, + () -> { + incrementStatistic(OBJECT_CONTINUE_LIST_REQUESTS); + if (useListV1) { + return S3ListResult.v1( + s3.listNextBatchOfObjects(prevResult.getV1())); + } else { + request.getV2().setContinuationToken(prevResult.getV2() + .getNextContinuationToken()); + return S3ListResult.v2(s3.listObjectsV2(request.getV2())); + } + }); } /** @@ -1101,16 +1287,52 @@ public class S3AFileSystem extends FileSystem { } /** - * Delete an object. + * Delete an object. This is the low-level internal call which + * <i>does not</i> update the metastore. * Increments the {@code OBJECT_DELETE_REQUESTS} and write * operation statistics. + * + * Retry policy: retry untranslated; delete considered idempotent. * @param key key to blob to delete. + * @throws AmazonClientException problems working with S3 + * @throws InvalidRequestException if the request was rejected due to + * a mistaken attempt to delete the root directory. */ - private void deleteObject(String key) throws InvalidRequestException { + @VisibleForTesting + @Retries.RetryRaw + protected void deleteObject(String key) + throws AmazonClientException, IOException { blockRootDelete(key); incrementWriteOperations(); - incrementStatistic(OBJECT_DELETE_REQUESTS); - s3.deleteObject(bucket, key); + invoker.retryUntranslated("Delete "+ bucket + ":/" + key, + DELETE_CONSIDERED_IDEMPOTENT, + ()-> { + incrementStatistic(OBJECT_DELETE_REQUESTS); + s3.deleteObject(bucket, key); + return null; + }); + } + + /** + * Delete an object, also updating the metastore. + * This call does <i>not</i> create any mock parent entries. + * Retry policy: retry untranslated; delete considered idempotent. + * @param f path path to delete + * @param key key of entry + * @param isFile is the path a file (used for instrumentation only) + * @throws AmazonClientException problems working with S3 + * @throws IOException IO failure + */ + @Retries.RetryRaw + void deleteObjectAtPath(Path f, String key, boolean isFile) + throws AmazonClientException, IOException { + if (isFile) { + instrumentation.fileDeleted(1); + } else { + instrumentation.directoryDeleted(); + } + deleteObject(key); + metadataStore.delete(f); } /** @@ -1130,17 +1352,23 @@ public class S3AFileSystem extends FileSystem { * Perform a bulk object delete operation. * Increments the {@code OBJECT_DELETE_REQUESTS} and write * operation statistics. + * Retry policy: retry untranslated; delete considered idempotent. * @param deleteRequest keys to delete on the s3-backend * @throws MultiObjectDeleteException one or more of the keys could not * be deleted. * @throws AmazonClientException amazon-layer failure. */ + @Retries.RetryRaw private void deleteObjects(DeleteObjectsRequest deleteRequest) - throws MultiObjectDeleteException, AmazonClientException { + throws MultiObjectDeleteException, AmazonClientException, IOException { incrementWriteOperations(); - incrementStatistic(OBJECT_DELETE_REQUESTS, 1); try { - s3.deleteObjects(deleteRequest); + invoker.retryUntranslated("delete", + DELETE_CONSIDERED_IDEMPOTENT, + () -> { + incrementStatistic(OBJECT_DELETE_REQUESTS, 1); + return s3.deleteObjects(deleteRequest); + }); } catch (MultiObjectDeleteException e) { // one or more of the operations failed. List<MultiObjectDeleteException.DeleteError> errors = e.getErrors(); @@ -1185,6 +1413,7 @@ public class S3AFileSystem extends FileSystem { ObjectMetadata metadata, InputStream inputStream) { Preconditions.checkNotNull(inputStream); + Preconditions.checkArgument(StringUtils.isNotEmpty(key), "Null/empty key"); PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, inputStream, metadata); setOptionalPutRequestParameters(putObjectRequest); @@ -1231,36 +1460,32 @@ public class S3AFileSystem extends FileSystem { * Because the operation is async, any stream supplied in the request * must reference data (files, buffers) which stay valid until the upload * completes. + * Retry policy: N/A: the transfer manager is performing the upload. * @param putObjectRequest the request * @return the upload initiated */ + @Retries.OnceRaw public UploadInfo putObject(PutObjectRequest putObjectRequest) { - long len; - if (putObjectRequest.getFile() != null) { - len = putObjectRequest.getFile().length(); - } else { - len = putObjectRequest.getMetadata().getContentLength(); - } + long len = getPutRequestLength(putObjectRequest); + LOG.debug("PUT {} bytes to {} via transfer manager ", + len, putObjectRequest.getKey()); incrementPutStartStatistics(len); - try { - Upload upload = transfers.upload(putObjectRequest); - incrementPutCompletedStatistics(true, len); - return new UploadInfo(upload, len); - } catch (AmazonClientException e) { - incrementPutCompletedStatistics(false, len); - throw e; - } + Upload upload = transfers.upload(putObjectRequest); + return new UploadInfo(upload, len); } /** * PUT an object directly (i.e. not via the transfer manager). * Byte length is calculated from the file length, or, if there is no * file, from the content length of the header. + * + * Retry Policy: none. * <i>Important: this call will close any input stream in the request.</i> * @param putObjectRequest the request * @return the upload initiated * @throws AmazonClientException on problems */ + @Retries.OnceRaw PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest) throws AmazonClientException { long len = getPutRequestLength(putObjectRequest); @@ -1269,6 +1494,8 @@ public class S3AFileSystem extends FileSystem { try { PutObjectResult result = s3.putObject(putObjectRequest); incrementPutCompletedStatistics(true, len); + // update metadata + finishedWrite(putObjectRequest.getKey(), len); return result; } catch (AmazonClientException e) { incrementPutCompletedStatistics(false, len); @@ -1297,11 +1524,14 @@ public class S3AFileSystem extends FileSystem { * Upload part of a multi-partition file. * Increments the write and put counters. * <i>Important: this call does not close any input stream in the request.</i> + * + * Retry Policy: none. * @param request request * @return the result of the operation. * @throws AmazonClientException on problems */ - public UploadPartResult uploadPart(UploadPartRequest request) + @Retries.OnceRaw + UploadPartResult uploadPart(UploadPartRequest request) throws AmazonClientException { long len = request.getPartSize(); incrementPutStartStatistics(len); @@ -1366,7 +1596,7 @@ public class S3AFileSystem extends FileSystem { /** * A helper method to delete a list of keys on a s3-backend. - * + * Retry policy: retry untranslated; delete considered idempotent. * @param keysToDelete collection of keys to delete on the s3-backend. * if empty, no request is made of the object store. * @param clearKeys clears the keysToDelete-list after processing the list @@ -1379,10 +1609,11 @@ public class S3AFileSystem extends FileSystem { * @throws AmazonClientException amazon-layer failure. */ @VisibleForTesting + @Retries.RetryRaw void removeKeys(List<DeleteObjectsRequest.KeyVersion> keysToDelete, boolean clearKeys, boolean deleteFakeDir) throws MultiObjectDeleteException, AmazonClientException, - InvalidRequestException { + IOException { if (keysToDelete.isEmpty()) { // exit fast if there are no keys to delete return; @@ -1415,9 +1646,12 @@ public class S3AFileSystem extends FileSystem { * @param recursive if path is a directory and set to * true, the directory is deleted else throws an exception. In * case of a file the recursive can be set to either true or false. - * @return true if delete is successful else false. + * @return true if the path existed and then was deleted; false if there + * was no path in the first place, or the corner cases of root path deletion + * have surfaced. * @throws IOException due to inability to delete a directory or file. */ + @Retries.RetryTranslated public boolean delete(Path f, boolean recursive) throws IOException { try { return innerDelete(innerGetFileStatus(f, true), recursive); @@ -1437,14 +1671,15 @@ public class S3AFileSystem extends FileSystem { * @param recursive if path is a directory and set to * true, the directory is deleted else throws an exception. In * case of a file the recursive can be set to either true or false. - * @return true if delete is successful else false. + * @return true, except in the corner cases of root directory deletion * @throws IOException due to inability to delete a directory or file. * @throws AmazonClientException on failures inside the AWS SDK */ + @Retries.RetryMixed private boolean innerDelete(S3AFileStatus status, boolean recursive) throws IOException, AmazonClientException { Path f = status.getPath(); - LOG.debug("Delete path {} - recursive {}", f , recursive); + LOG.debug("Delete path {} - recursive {}", f, recursive); String key = pathToKey(f); @@ -1468,10 +1703,8 @@ public class S3AFileSystem extends FileSystem { if (status.isEmptyDirectory() == Tristate.TRUE) { LOG.debug("Deleting fake empty directory {}", key); - // HADOOP-13761 S3Guard: retries here - deleteObject(key); - metadataStore.delete(f); - instrumentation.directoryDeleted(); + // HADOOP-13761 s3guard: retries here + deleteObjectAtPath(f, key, false); } else { LOG.debug("Getting objects for directory prefix {} to delete", key); @@ -1486,7 +1719,6 @@ public class S3AFileSystem extends FileSystem { LOG.debug("Got object to delete {}", summary.getKey()); if (keys.size() == MAX_ENTRIES_TO_DELETE) { - // TODO: HADOOP-13761 S3Guard: retries removeKeys(keys, true, false); } } @@ -1505,15 +1737,10 @@ public class S3AFileSystem extends FileSystem { metadataStore.deleteSubtree(f); } else { LOG.debug("delete: Path is a file"); - instrumentation.fileDeleted(1); - deleteObject(key); - metadataStore.delete(f); + deleteObjectAtPath(f, key, true); } - Path parent = f.getParent(); - if (parent != null) { - createFakeDirectoryIfNecessary(parent); - } + maybeCreateFakeParentDirectory(f); return true; } @@ -1543,6 +1770,15 @@ public class S3AFileSystem extends FileSystem { } } + /** + * Create a fake directory if required. + * That is: it is not the root path and the path does not exist. + * Retry policy: retrying; untranslated. + * @param f path to create + * @throws IOException IO problem + * @throws AmazonClientException untranslated AWS client problem + */ + @Retries.RetryTranslated private void createFakeDirectoryIfNecessary(Path f) throws IOException, AmazonClientException { String key = pathToKey(f); @@ -1553,6 +1789,21 @@ public class S3AFileSystem extends FileSystem { } /** + * Create a fake parent directory if required. + * That is: it parent is not the root path and does not yet exist. + * @param path whose parent is created if needed. + * @throws IOException IO problem + * @throws AmazonClientException untranslated AWS client problem + */ + void maybeCreateFakeParentDirectory(Path path) + throws IOException, AmazonClientException { + Path parent = path.getParent(); + if (parent != null) { + createFakeDirectoryIfNecessary(parent); + } + } + + /** * List the statuses of the files/directories in the given path if the path is * a directory. * @@ -1563,11 +1814,7 @@ public class S3AFileSystem extends FileSystem { */ public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { - try { - return innerListStatus(f); - } catch (AmazonClientException e) { - throw translateException("listStatus", f, e); - } + return once("listStatus", f.toString(), () -> innerListStatus(f)); } /** @@ -1697,7 +1944,7 @@ public class S3AFileSystem extends FileSystem { * Existence of the directory hierarchy is not an error. * @param path path to create * @param permission to apply to f - * @return true if a directory was created + * @return true if a directory was created or already existed * @throws FileAlreadyExistsException there is a file at the path specified * @throws IOException other IO problems */ @@ -1787,6 +2034,7 @@ public class S3AFileSystem extends FileSystem { * @throws FileNotFoundException when the path does not exist * @throws IOException on other problems. */ + @Retries.RetryTranslated public FileStatus getFileStatus(final Path f) throws IOException { return innerGetFileStatus(f, false); } @@ -1801,6 +2049,7 @@ public class S3AFileSystem extends FileSystem { * @throws IOException on other problems. */ @VisibleForTesting + @Retries.RetryTranslated S3AFileStatus innerGetFileStatus(final Path f, boolean needEmptyDirectoryFlag) throws IOException { incrementStatistic(INVOCATION_GET_FILE_STATUS); @@ -1856,12 +2105,14 @@ public class S3AFileSystem extends FileSystem { * Raw {@code getFileStatus} that talks direct to S3. * Used to implement {@link #innerGetFileStatus(Path, boolean)}, * and for direct management of empty directory blobs. + * Retry policy: retry translated. * @param path Qualified path * @param key Key string for the path * @return Status * @throws FileNotFoundException when the path does not exist * @throws IOException on other problems. */ + @Retries.RetryTranslated private S3AFileStatus s3GetFileStatus(final Path path, String key, Set<Path> tombstones) throws IOException { if (!key.isEmpty()) { @@ -2000,8 +2251,11 @@ public class S3AFileSystem extends FileSystem { /** * Raw version of {@link FileSystem#exists(Path)} which uses S3 only: * S3Guard MetadataStore, if any, will be skipped. + * Retry policy: retrying; translated. * @return true if path exists in S3 + * @throws IOException IO failure */ + @Retries.RetryTranslated private boolean s3Exists(final Path f) throws IOException { Path path = qualify(f); String key = pathToKey(path); @@ -2033,12 +2287,7 @@ public class S3AFileSystem extends FileSystem { @Override public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) throws IOException { - try { - innerCopyFromLocalFile(delSrc, overwrite, src, dst); - } catch (AmazonClientException e) { - throw translateException("copyFromLocalFile(" + src + ", " + dst + ")", - src, e); - } + innerCopyFromLocalFile(delSrc, overwrite, src, dst); } /** @@ -2060,6 +2309,7 @@ public class S3AFileSystem extends FileSystem { * @throws AmazonClientException failure in the AWS SDK * @throws IllegalArgumentException if the source path is not on the local FS */ + @Retries.RetryTranslated private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) throws IOException, FileAlreadyExistsException, AmazonClientException { @@ -2089,25 +2339,66 @@ public class S3AFileSystem extends FileSystem { } final String key = pathToKey(dst); final ObjectMetadata om = newObjectMetadata(srcfile.length()); + Progressable progress = null; PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile); + invoker.retry("copyFromLocalFile(" + src + ")", dst.toString(), true, + () -> executePut(putObjectRequest, progress)); + if (delSrc) { + local.delete(src, false); + } + } + + /** + * Execute a PUT via the transfer manager, blocking for completion, + * updating the metastore afterwards. + * If the waiting for completion is interrupted, the upload will be + * aborted before an {@code InterruptedIOException} is thrown. + * @param putObjectRequest request + * @param progress optional progress callback + * @return the upload result + * @throws InterruptedIOException if the blocking was interrupted. + */ + @Retries.OnceRaw + UploadResult executePut(PutObjectRequest putObjectRequest, + Progressable progress) + throws InterruptedIOException { + String key = putObjectRequest.getKey(); UploadInfo info = putObject(putObjectRequest); Upload upload = info.getUpload(); ProgressableProgressListener listener = new ProgressableProgressListener( - this, key, upload, null); + this, key, upload, progress); upload.addProgressListener(listener); - try { - upload.waitForUploadResult(); - } catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted copying " + src - + " to " + dst + ", cancelling"); - } + UploadResult result = waitForUploadCompletion(key, info); listener.uploadCompleted(); - - // This will delete unnecessary fake parent directories + // post-write actions finishedWrite(key, info.getLength()); + return result; + } - if (delSrc) { - local.delete(src, false); + /** + * Wait for an upload to complete. + * If the waiting for completion is interrupted, the upload will be + * aborted before an {@code InterruptedIOException} is thrown. + * @param upload upload to wait for + * @param key destination key + * @return the upload result + * @throws InterruptedIOException if the blocking was interrupted. + */ + UploadResult waitForUploadCompletion(String key, UploadInfo uploadInfo) + throws InterruptedIOException { + Upload upload = uploadInfo.getUpload(); + try { + UploadResult result = upload.waitForUploadResult(); + incrementPutCompletedStatistics(true, uploadInfo.getLength()); + return result; + } catch (InterruptedException e) { + LOG.info("Interrupted: aborting upload"); + incrementPutCompletedStatistics(false, uploadInfo.getLength()); + upload.abort(); + throw (InterruptedIOException) + new InterruptedIOException("Interrupted in PUT to " + + keyToQualifiedPath(key)) + .initCause(e); } } @@ -2211,6 +2502,21 @@ public class S3AFileSystem extends FileSystem { } } + /** + * Initiate a multipart upload from the preconfigured request. + * Retry policy: none + untranslated. + * @param request request to initiate + * @return the result of the call + * @throws AmazonClientException on failures inside the AWS SDK + * @throws IOException Other IO problems + */ + @Retries.OnceRaw + InitiateMultipartUploadResult initiateMultipartUpload( + InitiateMultipartUploadRequest request) throws IOException { + LOG.debug("Initiate multipart upload to {}", request.getKey()); + incrementStatistic(OBJECT_MULTIPART_UPLOAD_INITIATED); + return getAmazonS3Client().initiateMultipartUpload(request); + } protected void setOptionalCopyObjectRequestParameters( CopyObjectRequest copyObjectRequest) throws IOException { @@ -2286,6 +2592,7 @@ public class S3AFileSystem extends FileSystem { * @param length total length of file written */ @InterfaceAudience.Private + @Retries.RetryTranslated("Exceptions are swallowed") void finishedWrite(String key, long length) { LOG.debug("Finished write to {}, len {}", key, length); Path p = keyToQualifiedPath(key); @@ -2310,9 +2617,10 @@ public class S3AFileSystem extends FileSystem { /** * Delete mock parent directories which are no longer needed. - * This code swallows IO exceptions encountered + * Retry policy: retrying; exceptions swallowed. * @param path path */ + @Retries.RetryRaw("Exceptions are swallowed") private void deleteUnnecessaryFakeDirectories(Path path) { List<DeleteObjectsRequest.KeyVersion> keysToRemove = new ArrayList<>(); while (!path.isRoot()) { @@ -2324,7 +2632,7 @@ public class S3AFileSystem extends FileSystem { } try { removeKeys(keysToRemove, false, true); - } catch(AmazonClientException | InvalidRequestException e) { + } catch(AmazonClientException | IOException e) { instrumentation.errorIgnored(); if (LOG.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); @@ -2336,9 +2644,15 @@ public class S3AFileSystem extends FileSystem { } } + /** + * Create a fake directory, always ending in "/". + * Retry policy: retrying; translated. + * @param objectName name of directory object. + * @throws IOException IO failure + */ + @Retries.RetryTranslated private void createFakeDirectory(final String objectName) - throws AmazonClientException, AmazonServiceException, - InterruptedIOException { + throws IOException { if (!objectName.endsWith("/")) { createEmptyObject(objectName + "/"); } else { @@ -2346,10 +2660,15 @@ public class S3AFileSystem extends FileSystem { } } - // Used to create an empty file that represents an empty directory + /** + * Used to create an empty file that represents an empty directory. + * Retry policy: retrying; translated. + * @param objectName object to create + * @throws IOException IO failure + */ + @Retries.RetryTranslated private void createEmptyObject(final String objectName) - throws AmazonClientException, AmazonServiceException, - InterruptedIOException { + throws IOException { final InputStream im = new InputStream() { @Override public int read() throws IOException { @@ -2360,12 +2679,9 @@ public class S3AFileSystem extends FileSystem { PutObjectRequest putObjectRequest = newPutObjectRequest(objectName, newObjectMetadata(0L), im); - UploadInfo info = putObject(putObjectRequest); - try { - info.getUpload().waitForUploadResult(); - } catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted creating " + objectName); - } + invoker.retry("PUT 0-byte object ", objectName, + true, + () -> putObjectDirect(putObjectRequest)); incrementPutProgressStatistics(objectName, 0); instrumentation.directoryCreated(); } @@ -2473,6 +2789,7 @@ public class S3AFileSystem extends FileSystem { sb.append(", metastore=").append(metadataStore); sb.append(", authoritative=").append(allowAuthoritative); sb.append(", useListV1=").append(useListV1); + sb.append(", magicCommitter=").append(isMagicCommitEnabled()); sb.append(", boundedExecutor=").append(boundedThreadPool); sb.append(", unboundedExecutor=").append(unboundedThreadPool); sb.append(", statistics {") @@ -2512,6 +2829,24 @@ public class S3AFileSystem extends FileSystem { } /** + * Is magic commit enabled? + * @return true if magic commit support is turned on. + */ + public boolean isMagicCommitEnabled() { + return committerIntegration.isMagicCommitEnabled(); + } + + /** + * Predicate: is a path a magic commit path? + * True if magic commit is enabled and the path qualifies as special. + * @param path path to examine + * @return true if the path is or is under a magic directory + */ + public boolean isMagicCommitPath(Path path) { + return committerIntegration.isMagicCommitPath(path); + } + + /** * Increments the statistic {@link Statistic#INVOCATION_GLOB_STATUS}. * {@inheritDoc} */ @@ -2686,6 +3021,7 @@ public class S3AFileSystem extends FileSystem { * @throws IOException if any I/O error occurred */ @Override + @Retries.OnceTranslated public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f, final PathFilter filter) throws FileNotFoundException, IOException { @@ -2738,203 +3074,91 @@ public class S3AFileSystem extends FileSystem { } /** - * Helper for an ongoing write operation. - * <p> - * It hides direct access to the S3 API from the output stream, - * and is a location where the object upload process can be evolved/enhanced. - * <p> - * Features - * <ul> - * <li>Methods to create and submit requests to S3, so avoiding - * all direct interaction with the AWS APIs.</li> - * <li>Some extra preflight checks of arguments, so failing fast on - * errors.</li> - * <li>Callbacks to let the FS know of events in the output stream - * upload process.</li> - * </ul> - * - * Each instance of this state is unique to a single output stream. - */ - final class WriteOperationHelper { - private final String key; - - private WriteOperationHelper(String key) { - this.key = key; - } - - /** - * Create a {@link PutObjectRequest} request. - * If {@code length} is set, the metadata is configured with the size of - * the upload. - * @param inputStream source data. - * @param length size, if known. Use -1 for not known - * @return the request - */ - PutObjectRequest newPutRequest(InputStream inputStream, long length) { - PutObjectRequest request = newPutObjectRequest(key, - newObjectMetadata(length), inputStream); - return request; - } - - /** - * Create a {@link PutObjectRequest} request to upload a file. - * @param sourceFile source file - * @return the request - */ - PutObjectRequest newPutRequest(File sourceFile) { - int length = (int) sourceFile.length(); - PutObjectRequest request = newPutObjectRequest(key, - newObjectMetadata(length), sourceFile); - return request; - } - - /** - * Callback on a successful write. - */ - void writeSuccessful(long length) { - finishedWrite(key, length); - } - - /** - * Callback on a write failure. - * @param e Any exception raised which triggered the failure. - */ - void writeFailed(Exception e) { - LOG.debug("Write to {} failed", this, e); - } - - /** - * Create a new object metadata instance. - * Any standard metadata headers are added here, for example: - * encryption. - * @param length size, if known. Use -1 for not known - * @return a new metadata instance - */ - public ObjectMetadata newObjectMetadata(long length) { - return S3AFileSystem.this.newObjectMetadata(length); - } - - /** - * Start the multipart upload process. - * @return the upload result containing the ID - * @throws IOException IO problem - */ - String initiateMultiPartUpload() throws IOException { - LOG.debug("Initiating Multipart upload"); - final InitiateMultipartUploadRequest initiateMPURequest = - new InitiateMultipartUploadRequest(bucket, - key, - newObjectMetadata(-1)); - initiateMPURequest.setCannedACL(cannedACL); - setOptionalMultipartUploadRequestParameters(initiateMPURequest); - try { - return s3.initiateMultipartUpload(initiateMPURequest) - .getUploadId(); - } catch (AmazonClientException ace) { - throw translateException("initiate MultiPartUpload", key, ace); + * Listing all multipart uploads; limited to the first few hundred. + * Retry policy: retry, translated. + * @return a listing of multipart uploads. + * @param prefix prefix to scan for, "" for none + * @throws IOException IO failure, including any uprated AmazonClientException + */ + @InterfaceAudience.Private + @Retries.RetryTranslated + public List<MultipartUpload> listMultipartUploads(String prefix) + throws IOException { + ListMultipartUploadsRequest request = new ListMultipartUploadsRequest( + bucket); + if (!prefix.isEmpty()) { + if (!prefix.endsWith("/")) { + prefix = prefix + "/"; } + request.setPrefix(prefix); } - /** - * Complete a multipart upload operation. - * @param uploadId multipart operation Id - * @param partETags list of partial uploads - * @return the result - * @throws AmazonClientException on problems. - */ - CompleteMultipartUploadResult completeMultipartUpload(String uploadId, - List<PartETag> partETags) throws AmazonClientException { - Preconditions.checkNotNull(uploadId); - Preconditions.checkNotNull(partETags); - Preconditions.checkArgument(!partETags.isEmpty(), - "No partitions have been uploaded"); - LOG.debug("Completing multipart upload {} with {} parts", - uploadId, partETags.size()); - // a copy of the list is required, so that the AWS SDK doesn't - // attempt to sort an unmodifiable list. - return s3.completeMultipartUpload( - new CompleteMultipartUploadRequest(bucket, - key, - uploadId, - new ArrayList<>(partETags))); - } - - /** - * Abort a multipart upload operation. - * @param uploadId multipart operation Id - * @throws AmazonClientException on problems. - */ - void abortMultipartUpload(String uploadId) throws AmazonClientException { - LOG.debug("Aborting multipart upload {}", uploadId); - s3.abortMultipartUpload( - new AbortMultipartUploadRequest(bucket, key, uploadId)); - } - - /** - * Create and initialize a part request of a multipart upload. - * Exactly one of: {@code uploadStream} or {@code sourceFile} - * must be specified. - * @param uploadId ID of ongoing upload - * @param partNumber current part number of the upload - * @param size amount of data - * @param uploadStream source of data to upload - * @param sourceFile optional source file. - * @return the request. - */ - UploadPartRequest newUploadPartRequest(String uploadId, - int partNumber, int size, InputStream uploadStream, File sourceFile) { - Preconditions.checkNotNull(uploadId); - // exactly one source must be set; xor verifies this - Preconditions.checkArgument((uploadStream != null) ^ (sourceFile != null), - "Data source"); - Preconditions.checkArgument(size > 0, "Invalid partition size %s", size); - Preconditions.checkArgument(partNumber > 0 && partNumber <= 10000, - "partNumber must be between 1 and 10000 inclusive, but is %s", - partNumber); - - LOG.debug("Creating part upload request for {} #{} size {}", - uploadId, partNumber, size); - UploadPartRequest request = new UploadPartRequest() - .withBucketName(bucket) - .withKey(key) - .withUploadId(uploadId) - .withPartNumber(partNumber) - .withPartSize(size); - if (uploadStream != null) { - // there's an upload stream. Bind to it. - request.setInputStream(uploadStream); - } else { - request.setFile(sourceFile); - } - return request; - } - - /** - * The toString method is intended to be used in logging/toString calls. - * @return a string description. - */ - @Override - public String toString() { - final StringBuilder sb = new StringBuilder( - "{bucket=").append(bucket); - sb.append(", key='").append(key).append('\''); - sb.append('}'); - return sb.toString(); - } - - /** - * PUT an object directly (i.e. not via the transfer manager). - * @param putObjectRequest the request - * @return the upload initiated - * @throws IOException on problems - */ - PutObjectResult putObject(PutObjectRequest putObjectRequest) - throws IOException { - try { - return putObjectDirect(putObjectRequest); - } catch (AmazonClientException e) { - throw translateException("put", putObjectRequest.getKey(), e); - } + return invoker.retry("listMultipartUploads", prefix, true, + () -> s3.listMultipartUploads(request).getMultipartUploads()); + } + + /** + * Abort a multipart upload. + * Retry policy: none. + * @param destKey destination key + * @param uploadId Upload ID + */ + @Retries.OnceRaw + void abortMultipartUpload(String destKey, String uploadId) { + LOG.info("Aborting multipart upload {} to {}", uploadId, destKey); + getAmazonS3Client().abortMultipartUpload( + new AbortMultipartUploadRequest(getBucket(), + destKey, + uploadId)); + } + + /** + * Abort a multipart upload. + * Retry policy: none. + * @param upload the listed upload to abort. + */ + @Retries.OnceRaw + void abortMultipartUpload(MultipartUpload upload) { + String destKey; + String uploadId; + destKey = upload.getKey(); + uploadId = upload.getUploadId(); + if (LOG.isInfoEnabled()) { + DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + LOG.info("Aborting multipart upload {} to {} initiated by {} on {}", + uploadId, destKey, upload.getInitiator(), + df.format(upload.getInitiated())); + } + getAmazonS3Client().abortMultipartUpload( + new AbortMultipartUploadRequest(getBucket(), + destKey, + uploadId)); + } + + /** + * Create a new instance of the committer statistics. + * @return a new committer statistics instance + */ + public S3AInstrumentation.CommitterStatistics newCommitterStatistics() { + return instrumentation.newCommitterStatistics(); + } + + /** + * Return the capabilities of this filesystem instance. + * @param capability string to query the stream support for. + * @return whether the FS instance has the capability. + */ + @Override + public boolean hasCapability(String capability) { + + switch (capability.toLowerCase(Locale.ENGLISH)) { + + case CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER: + // capability depends on FS configuration + return isMagicCommitEnabled(); + + default: + return false; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 94d7701..7e6d640 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -18,9 +18,9 @@ package org.apache.hadoop.fs.s3a; -import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; import com.amazonaws.services.s3.model.SSECustomerKey; import com.google.common.base.Preconditions; @@ -39,7 +39,6 @@ import java.io.EOFException; import java.io.IOException; import static org.apache.commons.lang3.StringUtils.isNotEmpty; -import static org.apache.hadoop.fs.s3a.S3AUtils.*; /** * The input stream for an S3A object. @@ -86,6 +85,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { private String serverSideEncryptionKey; private final S3AInputPolicy inputPolicy; private long readahead = Constants.DEFAULT_READAHEAD_RANGE; + private final Invoker invoker; /** * This is the actual position within the object, used by @@ -104,14 +104,29 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { */ private long contentRangeStart; + /** + * Create the stream. + * This does not attempt to open it; that is only done on the first + * actual read() operation. + * @param s3Attributes object attributes from a HEAD request + * @param contentLength length of content + * @param client S3 client to use + * @param stats statistics to update + * @param instrumentation instrumentation to update + * @param readahead readahead bytes + * @param inputPolicy IO policy + * @param invoker preconfigured invoker + */ public S3AInputStream(S3ObjectAttributes s3Attributes, long contentLength, AmazonS3 client, FileSystem.Statistics stats, S3AInstrumentation instrumentation, long readahead, - S3AInputPolicy inputPolicy) { - Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()), "No Bucket"); + S3AInputPolicy inputPolicy, + Invoker invoker) { + Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()), + "No Bucket"); Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key"); Preconditions.checkArgument(contentLength >= 0, "Negative content length"); this.bucket = s3Attributes.getBucket(); @@ -126,6 +141,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { this.serverSideEncryptionKey = s3Attributes.getServerSideEncryptionKey(); this.inputPolicy = inputPolicy; setReadahead(readahead); + this.invoker = invoker; } /** @@ -149,22 +165,22 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { " streamPosition={}, nextReadPosition={}", uri, reason, targetPos, contentRangeFinish, length, pos, nextReadPos); - streamStatistics.streamOpened(); - try { - GetObjectRequest request = new GetObjectRequest(bucket, key) - .withRange(targetPos, contentRangeFinish - 1); - if (S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) && - StringUtils.isNotBlank(serverSideEncryptionKey)){ - request.setSSECustomerKey(new SSECustomerKey(serverSideEncryptionKey)); - } - wrappedStream = client.getObject(request).getObjectContent(); - contentRangeStart = targetPos; - if (wrappedStream == null) { - throw new IOException("Null IO stream from reopen of (" + reason + ") " - + uri); - } - } catch (AmazonClientException e) { - throw translateException("Reopen at position " + targetPos, uri, e); + long opencount = streamStatistics.streamOpened(); + GetObjectRequest request = new GetObjectRequest(bucket, key) + .withRange(targetPos, contentRangeFinish - 1); + if (S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) && + StringUtils.isNotBlank(serverSideEncryptionKey)){ + request.setSSECustomerKey(new SSECustomerKey(serverSideEncryptionKey)); + } + String text = String.format("Failed to %s %s at %d", + (opencount == 0 ? "open" : "re-open"), uri, targetPos); + S3Object object = invoker.retry(text, uri, true, + () -> client.getObject(request)); + wrappedStream = object.getObjectContent(); + contentRangeStart = targetPos; + if (wrappedStream == null) { + throw new IOException("Null IO stream from reopen of (" + reason + ") " + + uri); } this.pos = targetPos; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org